In [36]:
# ---------------------- Fake online accuracy using shifted windows

import numpy as np
import pandas as pd
import joblib
from scipy.signal import butter, filtfilt
import numpy as np
from sklearn.metrics import classification_report, accuracy_score

lowc = 30.0
highc = 100.0

def bandpass_filter(data, lowcut=lowc, highcut=highc, fs=250.0, order=4):
    nyq = 0.5 * fs  # 125 Hz
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data)

#--------------Feature functions
def rms(signal):
    return np.sqrt(np.mean(signal**2))

def zero_crossings(signal):
    signs = np.sign(signal)
    # Replace zeros with the previous sign or 1 (to avoid false zero crossings)
    for i in range(1, len(signs)):
        if signs[i] == 0:
            signs[i] = signs[i-1] if signs[i-1] != 0 else 1
    return np.sum(np.diff(signs) != 0)


def waveform_length(signal):
    return np.sum(np.abs(np.diff(signal)))

def mav(signal): # mean absolute value
    return np.mean(np.abs(signal))

def iav(signal):  # Integrated Absolute Value
    return np.sum(np.abs(signal))


# ---------- Load EMG data
path = "C:/Quick_Disk/tonge_project/data/Recordings_7_classes4/data.csv"
df = pd.read_csv(path, sep='\t', skiprows=5, engine='python')
df.columns = [f"ch_{i}" for i in range(df.shape[1])]

# ---------- Load annotations
annotations_path = r"C:\Quick_Disk\tonge_project\scripts\data\annotations\7_classes_annotations4.csv"
annotations = pd.read_csv(annotations_path, header=0, names=['class', 'timestamp'])
annotations['timestamp'] = pd.to_numeric(annotations['timestamp'], errors='coerce')

print("Data and annotations loaded")

# ---------- Parameters
sampling_rate = 250        
window_size_seconds = 1
channels = ['ch_1', 'ch_2', 'ch_3', 'ch_4']
emg_timestamps = df['ch_22']  

offset = 0.1
#offsets = [0, -2*offset,-offset, offset, 2*offset]
offsets = [0]

clf_RF = joblib.load("7_classes_rf3.pkl")
scaler = joblib.load('scaler_rf3.pkl')
#clf_RF = joblib.load("7_classes_svm.pkl")
#scaler = joblib.load('scaler_svm.pkl')
#clf_RF = joblib.load("7_classes_knn.pkl")
#scaler = joblib.load('scaler_knn.pkl')

feature_names = ['RMS', 'ZC', 'WL', 'MAV', 'STD', 'VAR', 'IAV']
final_feature_names = [f"{ch}_{fn}" for ch in channels for fn in feature_names]

# ---------- Run prediction using exact annotation timestamps
predictions = []
true_labels = []

for idx, row in annotations.iterrows():
    label = row['class']
    base_time = row['timestamp']

    for shift in offsets:
        start_time = base_time + shift
        end_time = start_time + window_size_seconds

        # Find samples within the window
        idxs = (df['ch_22'] >= start_time) & (df['ch_22'] < end_time)
    
        if not any(idxs):
            print(f"No data for window starting at {start_time:.3f} (label={label})")
            continue
    
        # Filter and normalize each channel
        window_data = {}
        for ch in channels:
            raw_signal = df[ch][idxs]
            if len(raw_signal) == 0:
                print("raw signal length 0")
                continue
                
            window_data[ch] = bandpass_filter(raw_signal)
    
        # Feature extraction
        feats = []
        for ch in channels:
            signal = window_data[ch]
            feats.append(rms(signal))
            feats.append(zero_crossings(signal))
            feats.append(waveform_length(signal))
            feats.append(mav(signal))
            feats.append(np.std(signal))
            feats.append(np.var(signal))
            feats.append(iav(signal))
    
        # Convert features to DataFrame with correct column names
        feat_df = pd.DataFrame([feats], columns=final_feature_names)
        
        # Scale the features
        feat_scaled = scaler.transform(feat_df)
        
        # Wrap back into a DataFrame to preserve feature names
        feat_scaled_df = pd.DataFrame(feat_scaled, columns=final_feature_names)
        
        # Predict
        prediction = clf_RF.predict(feat_scaled_df)[0]

        predictions.append(prediction)
        true_labels.append(label)
    
        print(f"At {start_time:.3f}s (label={label}): Predicted class = {prediction}")

print(f"\nTotal predictions made: {len(predictions)}")

print("\nAccuracy:", accuracy_score(true_labels, predictions))
print(classification_report(true_labels, predictions, zero_division=0))



Data and annotations loaded
At 1749637444.959s (label=n): Predicted class = c
At 1749637447.011s (label=n): Predicted class = c
At 1749637449.340s (label=n): Predicted class = c
At 1749637451.955s (label=n): Predicted class = c
At 1749637454.260s (label=n): Predicted class = c
At 1749637458.474s (label=l): Predicted class = c
At 1749637461.813s (label=l): Predicted class = c
At 1749637464.821s (label=l): Predicted class = c
At 1749637468.066s (label=l): Predicted class = c
At 1749637470.996s (label=l): Predicted class = c
At 1749637477.600s (label=k): Predicted class = c
At 1749637480.624s (label=k): Predicted class = c
At 1749637484.837s (label=k): Predicted class = c
At 1749637488.011s (label=k): Predicted class = c
At 1749637490.808s (label=k): Predicted class = c
At 1749637497.500s (label=r): Predicted class = c
At 1749637499.878s (label=r): Predicted class = c
At 1749637503.444s (label=r): Predicted class = c
At 1749637506.376s (label=r): Predicted class = c
At 1749637509.016s (la