In [1]:
import warnings
warnings.filterwarnings("ignore")

import glob
import os
import re
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pywt import wavedec
import pywt
from scipy.signal import butter, filtfilt
import scipy.stats
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn import metrics
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from mlxtend.plotting import plot_confusion_matrix
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, accuracy_score
from xgboost import XGBClassifier
from scipy import signal
from scipy.signal import periodogram
import joblib

In [2]:
# train_data="C:/Users/LENOVO/Downloads/class-20250510T171956Z-1-001/class/Train"
test_data='class-20250510T171956Z-1-001/class/Test'
# horizontal_file_train="data_train_h.csv"
# vertical_file_train="data_train_v.csv"
horizontal_file_test="data_test_h.csv"
vertical_file_test="data_test_v.csv"
def read_and_process_signal_files(root_dir, horizontal_file, vertical_file):
    # Map folder names to labels
    class_labels = {
        'up': 0,
        'down': 1,
        'right': 2,
        'left': 3,
        'blink': 4
    }

    # Counters
    sample_counts = {label: 0 for label in class_labels.values()}
    skipped_files = []

    with open(horizontal_file, 'w') as csv_h, open(vertical_file, 'w') as csv_v:
        # Loop over each subfolder
        for subdir_name in os.listdir(root_dir):
            subdir_path = os.path.join(root_dir, subdir_name)
            if not os.path.isdir(subdir_path):
                continue

            class_key = subdir_name.lower()
            if class_key not in class_labels:
                print(f"Skipping unknown class folder: {subdir_name}")
                continue

            label = class_labels[class_key]

            # Search for horizontal signal files
            h_files = glob.glob(os.path.join(subdir_path, "*h.txt"))

            for h_file in h_files:
                v_file = h_file.replace('h.txt', 'v.txt')
                if not os.path.exists(v_file):
                    print(f"Missing v.txt for: {h_file}")
                    skipped_files.append((h_file, "missing v.txt"))
                    continue

                try:
                    with open(h_file, 'r') as f_h, open(v_file, 'r') as f_v:
                        h_raw = f_h.read().strip()
                        v_raw = f_v.read().strip()

                        if not h_raw or not v_raw:
                            print(f"Skipping empty file(s): {h_file}, {v_file}")
                            skipped_files.append((h_file, "empty file"))
                            continue

                        h_lines = h_raw.replace('\n', ',').strip(',')
                        v_lines = v_raw.replace('\n', ',').strip(',')

                        # Debug print
                        print(f"Processing: {h_file}, Label: {label}")

                        # Write to CSVs
                        csv_h.write(h_lines + ',' + str(label) + '\n')
                        csv_v.write(v_lines + ',' + str(label) + '\n')

                        sample_counts[label] += 1
                        print(f"Saved: {os.path.basename(h_file)} and {os.path.basename(v_file)} — Label {label}")

                except Exception as e:
                    print(f"Error reading files: {h_file}, {v_file}\n{e}")
                    skipped_files.append((h_file, str(e)))

    # Summary
    print("\n=== Summary ===")
    for label, count in sample_counts.items():
        class_name = [k for k, v in class_labels.items() if v == label][0]
        print(f"{class_name} ({label}): {count} samples")

    if skipped_files:
        print("\nSkipped file details:")
        for file, reason in skipped_files:
            print(f"{file} — {reason}")

    # Load CSVs into DataFrames
    df_h = pd.read_csv(horizontal_file, header=None)
    df_v = pd.read_csv(vertical_file, header=None)

    # Display label distribution
    Y_h = df_h.iloc[:, -1]
    Y_v = df_v.iloc[:, -1]

    print(f"\nUnique labels in Y_h: {Y_h.unique()}")
    print(f"Unique labels in Y_v: {Y_v.unique()}")

    print("\nSample count per label (horizontal):")
    print(Y_h.value_counts())

    print("\nSample count per label (vertical):")
    print(Y_v.value_counts())

    return df_h, df_v
# dfTrain_h, dfTrain_v = read_and_process_signal_files(train_data,horizontal_file_train,vertical_file_train)
dfTest_h, dfTest_v=read_and_process_signal_files(test_data,horizontal_file_test,vertical_file_test)

Processing: class-20250510T171956Z-1-001/class/Test\Blink\kirp17h.txt, Label: 4
Saved: kirp17h.txt and kirp17v.txt — Label 4
Processing: class-20250510T171956Z-1-001/class/Test\Blink\kirp18h.txt, Label: 4
Saved: kirp18h.txt and kirp18v.txt — Label 4
Processing: class-20250510T171956Z-1-001/class/Test\Blink\kirp19h.txt, Label: 4
Saved: kirp19h.txt and kirp19v.txt — Label 4
Processing: class-20250510T171956Z-1-001/class/Test\Blink\kirp20h.txt, Label: 4
Saved: kirp20h.txt and kirp20v.txt — Label 4
Processing: class-20250510T171956Z-1-001/class/Test\Down\asagi17h.txt, Label: 1
Saved: asagi17h.txt and asagi17v.txt — Label 1
Processing: class-20250510T171956Z-1-001/class/Test\Down\asagi18h.txt, Label: 1
Saved: asagi18h.txt and asagi18v.txt — Label 1
Processing: class-20250510T171956Z-1-001/class/Test\Down\asagi19h.txt, Label: 1
Saved: asagi19h.txt and asagi19v.txt — Label 1
Processing: class-20250510T171956Z-1-001/class/Test\Down\asagi20h.txt, Label: 1
Saved: asagi20h.txt and asagi20v.txt — 

In [3]:
# Split into features and labels
X_h_test = dfTest_h.iloc[:, :-1]  # All columns except the last
X_v_test = dfTest_v.iloc[:, :-1] 
Y_h_test = dfTest_h.iloc[:, -1].astype(int)  # Last column as integer labels
Y_v_test = dfTest_v.iloc[:, -1].astype(int)

# Show shapes
print("\nX_h shape:", X_h_test.shape)
print("X_v shape:", X_v_test.shape)

# Show unique labels
print("\nUnique labels in Y_h:", Y_h_test.unique())
print("Unique labels in Y_v:", Y_v_test.unique())

# Count samples per label
print("\nSample count per label (horizontal):")
print(Y_h_test.value_counts().sort_index())

print("\nSample count per label (vertical):")
print(Y_v_test.value_counts().sort_index())


X_h shape: (20, 251)
X_v shape: (20, 251)

Unique labels in Y_h: [4 1 3 2 0]
Unique labels in Y_v: [4 1 3 2 0]

Sample count per label (horizontal):
251
0    4
1    4
2    4
3    4
4    4
Name: count, dtype: int64

Sample count per label (vertical):
251
0    4
1    4
2    4
3    4
4    4
Name: count, dtype: int64


In [4]:
def butter_bandpass_filter(Input_Signal,LOW_Cutoff,High_cuttOff,Sampling_Rate,order):
    nyq = 0.5 *Sampling_Rate
    low = LOW_Cutoff/ nyq
    high = High_cuttOff / nyq
    Numerator,denominator = butter(order,[low,high],btype = "band",output = "ba",analog = False,fs = None)
    filtered = filtfilt(Numerator,denominator,Input_Signal)
    return filtered

# def resample_filter(Filtered_Data):
#     resampled_Signal = []
#     for i in Filtered_Data:
#         re_Sgnl = signal.resample(i,50)
#         resampled_Signal.append(re_Sgnl)
#     return resampled_Signal

# def remove_dc(signal_array):
#     return signal_array - np.mean(signal_array, axis=1, keepdims=True)

# # 4. Normalization (Z-score)
# def normalize(signal_array):
#     mean = np.mean(signal_array, axis=1, keepdims=True)
#     std = np.std(signal_array, axis=1, keepdims=True)
#     return (signal_array - mean) / std

# cutoff = 10  # Choose your cutoff frequency
filtered_Signal_h_test = butter_bandpass_filter(X_h_test,LOW_Cutoff=.5,High_cuttOff=20,Sampling_Rate=176,order=2)
filtered_Signal_v_test = butter_bandpass_filter(X_v_test,LOW_Cutoff=0.5,High_cuttOff=20,Sampling_Rate=176,order=2)
# res_rsample_h=resample_filter(filtered_Signal_h)

# normalized_h = normalize(zero_mean_h)


# filtered_Signal_v = butter_bandpass_filter(X_v,LOW_Cutoff=0.5,High_cuttOff=20,Sampling_Rate=176,order=4)

# res_rsample_v =resample_filter(filtered_Signal_v)

# zero_mean_v = remove_dc(res_rsample_v)


# normalized_v = normalize(zero_mean_v)


X_test_combined = np.concatenate((filtered_Signal_h_test, filtered_Signal_v_test), axis=1)

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.signal import find_peaks
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

# ---------- Morphological Feature Extraction ----------
def extract_morphological_features(signal_data):
    features = []
    for signal in signal_data:
        # Wavelength
        wavelength = np.sum(np.abs(np.diff(signal)))

        # Peaks and valleys
        peaks, _ = find_peaks(signal)
        valleys, _ = find_peaks(-signal)

        # Peak amplitude and position
        if peaks.size > 0:
            peak_idx = np.argmax(signal[peaks])
            peak_amp = signal[peaks[peak_idx]]
            peak_pos = peaks[peak_idx]
        else:
            peak_amp = 0
            peak_pos = 0

        # Valley amplitude and position
        if valleys.size > 0:
            valley_idx = np.argmin(signal[valleys])
            valley_amp = signal[valleys[valley_idx]]
            valley_pos = valleys[valley_idx]
        else:
            valley_amp = 0
            valley_pos = 0

        # Area under curve (absolute)
        area = np.trapz(np.abs(signal))

        # Append all features
        features.append([
            wavelength, peak_amp, valley_amp, area,
            peak_pos, valley_pos
        ])
    return np.array(features)


In [7]:

h_features_test = extract_morphological_features(filtered_Signal_h_test)
v_features_test = extract_morphological_features(filtered_Signal_v_test)
morph_features_test = np.concatenate([h_features_test, v_features_test], axis=1)

# Feature labels
columns = [
    'Wavelength (H)', 'Peak Amplitude (H)', 'Valley Amplitude (H)', 'Area Under Curve (H)','Peak Position (H)','Valley Position (H)',
    'Wavelength (V)', 'Peak Amplitude (V)', 'Valley Amplitude (V)', 'Area Under Curve (V)','Peak Position (V)','Valley Position (V)'
]

morph_df_test = pd.DataFrame(morph_features_test, columns=columns)


In [8]:
selected_columns = ['Peak Amplitude (H)', 'Peak Position (H)','Valley Position (H)', 'Peak Amplitude (V)','Peak Position (V)','Valley Position (V)']
selected_features_df_test = morph_df_test[selected_columns]

In [10]:
svm_model_loaded = joblib.load("Morphological Feature model.joblib")

In [11]:
from sklearn.metrics import accuracy_score

# Get predictions
predictions = svm_model_loaded.predict(selected_features_df_test)

# Evaluate accuracy
accuracy = accuracy_score(Y_h_test, predictions)
print(f"✅ Loaded Model Test Accuracy: {accuracy:.4f}")


✅ Loaded Model Test Accuracy: 0.9500


In [12]:
# Step 1: Define the label mapping
label_map = {
    0: 'up',
    1: 'down',
    2: 'right',
    3: 'left',
    4: 'blink'
}

# Step 2: Map predicted and true labels
mapped_true_labels = [label_map[label] for label in Y_h_test]
mapped_predicted_labels = [label_map[label] for label in predictions]


In [13]:
print("Mapped True Labels:", mapped_true_labels[0:15])
print("Mapped Predicted Labels:", mapped_predicted_labels[0:15])

Mapped True Labels: ['blink', 'blink', 'blink', 'blink', 'down', 'down', 'down', 'down', 'left', 'left', 'left', 'left', 'right', 'right', 'right']
Mapped Predicted Labels: ['blink', 'up', 'blink', 'blink', 'down', 'down', 'down', 'down', 'left', 'left', 'left', 'left', 'right', 'right', 'right']
