In [1]:

import sys, os, platform
from pathlib import Path

print("Python executable:", sys.executable)
print("Python version:", sys.version)
print("OS:", platform.platform())
print("Working directory:", os.getcwd())

PROJECT_ROOT = Path(r"C:\Users\aibel\OneDrive\Desktop\Heizel Ann Joseph\Parkinsons Disease")
PROCESSED = PROJECT_ROOT / "data" / "processed"

print("PROJECT_ROOT exists:", PROJECT_ROOT.exists())
print("PROCESSED exists:", PROCESSED.exists())


Python executable: c:\Users\aibel\OneDrive\Desktop\Heizel Ann Joseph\Parkinsons Disease\.venv\Scripts\python.exe
Python version: 3.10.0 (tags/v3.10.0:b494f59, Oct  4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)]
OS: Windows-10-10.0.26100-SP0
Working directory: c:\Users\aibel\OneDrive\Desktop\Heizel Ann Joseph\Parkinsons Disease\backups\latest
PROJECT_ROOT exists: True
PROCESSED exists: True


In [2]:

import numpy as np
import pickle
from tensorflow.keras.models import load_model

MODEL_DIR = PROJECT_ROOT / "models"
model = load_model(MODEL_DIR / "blstm_pahaw_model.h5")

with open(PROCESSED / "scaler.pkl", "rb") as f:
    scaler = pickle.load(f)

with open(PROCESSED / "imputer.pkl", "rb") as f:
    imputer = pickle.load(f)

print("✅ Model, scaler, and imputer loaded successfully")


✅ Model, scaler, and imputer loaded successfully


In [3]:

import math
import pandas as pd
from scipy.signal import savgol_filter, find_peaks
from scipy.optimize import curve_fit

def load_svc(path):
    rows = []
    with open(path, 'r', errors='ignore') as f:
        for ln in f:
            parts = ln.strip().split()
            if len(parts) >= 7:
                parts = parts[-7:]
                try:
                    x, y, t, pen, az, alt, pr = parts
                    rows.append([float(x), float(y), float(t), int(float(pen)),
                                 float(az), float(alt), float(pr)])
                except:
                    continue
    return pd.DataFrame(rows, columns=["x","y","time","pen","azim","alt","press"])

def preprocess_df(df):
    df = df.copy().reset_index(drop=True)
    df['y'] -= df['y'].mean()
    df['time_s'] = df['time'] - df['time'].iloc[0]
    df['dt'] = df['time_s'].diff().fillna(1/1000).replace(0, 1/1000)

    if len(df) >= 7:
        df['x_s'] = savgol_filter(df['x'], 7, 2)
        df['y_s'] = savgol_filter(df['y'], 7, 2)
    else:
        df['x_s'] = df['x']
        df['y_s'] = df['y']

    df['vx'] = df['x_s'].diff().fillna(0) / df['dt']
    df['vy'] = df['y_s'].diff().fillna(0) / df['dt']
    df['speed'] = np.sqrt(df['vx']**2 + df['vy']**2)

    df['ax'] = df['vx'].diff().fillna(0) / df['dt']
    df['ay'] = df['vy'].diff().fillna(0) / df['dt']

    with np.errstate(divide='ignore', invalid='ignore'):
        num = df['vx']*df['ay'] - df['vy']*df['ax']
        den = (df['vx']**2 + df['vy']**2)**1.5
        df['curvature'] = np.abs(num) / (den + 1e-12)

    df['curvature'] = df['curvature'].fillna(0)
    return df

def segment_by_pen(df):
    strokes = []
    start = None
    for i,p in enumerate(df['pen']):
        if p == 1 and start is None:
            start = i
        elif p == 0 and start is not None:
            strokes.append((start, i-1))
            start = None
    if start is not None:
        strokes.append((start, len(df)-1))
    return strokes

def split_all_strokes(df, pen_strokes, prom=0.05, dist=8, min_points=6):
    subs = []
    for (s,e) in pen_strokes:
        seg = df.iloc[s:e+1].reset_index(drop=True)
        speed = seg['speed'].to_numpy()
        peaks, _ = find_peaks(-speed, prominence=prom, distance=dist)
        if len(peaks) == 0:
            subs.append((s,e))
        else:
            prev = s
            for p in peaks:
                cut = s + int(p)
                if cut-prev >= min_points:
                    subs.append((prev, cut))
                prev = cut+1
            if e-prev >= min_points:
                subs.append((prev, e))
    return subs


In [4]:

def compute_substroke_feature_vector(seg, df, s, e):
    # Temporal & spatial
    duration = seg['time_s'].iloc[-1] - seg['time_s'].iloc[0]

    amp_x = seg['x_s'].max() - seg['x_s'].min()
    amp_y = seg['y_s'].max() - seg['y_s'].min()
    amp = np.sqrt(amp_x**2 + amp_y**2)

    # Kinematic
    mean_speed = seg['speed'].mean()
    mean_press = seg['press'].mean()
    mean_curvature = seg['curvature'].mean()  

    # Beta 
    beta_A = mean_speed
    beta_a = 2.0
    beta_b = 2.0

    # Ellipse 
    ell_a = amp
    ell_b = amp / 2 if amp != 0 else 0.0
    ell_ecc = np.sqrt(1 - (ell_b**2)/(ell_a**2)) if ell_a != 0 else 0.0

    # Fuzzy perceptual flags
    f_speed_high = 1.0 if mean_speed > np.median(df['speed']) else 0.0
    f_press_high = 1.0 if mean_press > np.median(df['press']) else 0.0
    f_curv_high  = 1.0 if mean_curvature > np.median(df['curvature']) else 0.0

    return np.array([
        duration,           # 1
        amp,                # 2
        mean_speed,         # 3
        mean_press,         # 4
        mean_curvature,     # 5  ← FIX
        beta_A,             # 6
        beta_a,             # 7
        beta_b,             # 8
        ell_a,              # 9
        ell_b,              # 10
        ell_ecc,            # 11
        f_speed_high,       # 12
        f_press_high,       # 13
        f_curv_high         # 14
    ], dtype=float)


In [8]:

# SUBJECT-LEVEL PREDICTION

SUBJECT_ID = "00040"   
SUBJECT_DIR = PROJECT_ROOT / "data" / "PaHaW_dataset" / "PaHaW_public" / SUBJECT_ID

svc_files = sorted(SUBJECT_DIR.glob("*.svc"))
print(f"Found {len(svc_files)} files for subject {SUBJECT_ID}")

pd_scores = []
file_preds = []

for svc in svc_files:
    print("\nProcessing:", svc.name)

    df = load_svc(svc)
    df = preprocess_df(df)
    pen_strokes = segment_by_pen(df)
    subs = split_all_strokes(df, pen_strokes, prom=0.05, dist=8, min_points=6)

    features = []
    for (s, e) in subs:
        seg = df.iloc[s:e+1].reset_index(drop=True)
        if len(seg) < 6:
            continue
        vec = compute_substroke_feature_vector(seg, df, s, e)
        features.append(vec)

    if len(features) == 0:
        print("⚠️ Skipped (no valid substrokes)")
        continue

    X = np.vstack(features)
    X[np.isinf(X)] = np.nan
    X = imputer.transform(X)
    X = scaler.transform(X)

    CAP = 300
    feat_dim = X.shape[1]

    if X.shape[0] > CAP:
        X = X[:CAP]
    else:
        pad = CAP - X.shape[0]
        X = np.vstack([X, np.zeros((pad, feat_dim))])

    X = X.reshape(1, CAP, feat_dim)

    prob = model.predict(X, verbose=0)[0][0]
    label = "PD" if prob >= 0.5 else "Healthy"

    pd_scores.append(prob)
    file_preds.append(label)

# FINAL SUBJECT-LEVEL DECISION - MAJORITY VOTING + MEAN SCORE

from collections import Counter
import numpy as np

vote_counts = Counter(file_preds)
majority_label = vote_counts.most_common(1)[0][0]

mean_pd = float(np.mean(pd_scores))

if vote_counts["PD"] == vote_counts["Healthy"]:
    final_label = "Parkinson's" if mean_pd >= 0.5 else "Healthy"
else:
    final_label = "Parkinson's" if majority_label == "PD" else "Healthy"

print("\n==============================")
print("FINAL SUBJECT-LEVEL RESULT")
print("==============================")
print("Subject:", SUBJECT_ID)
print("File-wise predictions:", file_preds)
print("Vote counts:", dict(vote_counts))
print("Mean PD probability:", round(mean_pd, 3))
print("Final Diagnosis:", final_label)
print("==============================")


Found 8 files for subject 00040

Processing: 00040__1_1.svc

Processing: 00040__2_1.svc

Processing: 00040__3_1.svc

Processing: 00040__4_1.svc

Processing: 00040__5_1.svc

Processing: 00040__6_1.svc

Processing: 00040__7_1.svc

Processing: 00040__8_1.svc

FINAL SUBJECT-LEVEL RESULT
Subject: 00040
File-wise predictions: ['PD', 'PD', 'PD', 'PD', 'PD', 'PD', 'PD', 'PD']
Vote counts: {'PD': 8}
Mean PD probability: 0.828
Final Diagnosis: Parkinson's
