## 3. Modeling

## Wearable device signal preprocessing

In [11]:
import pandas as pd
import os
import numpy as np
import datetime
import matplotlib.pyplot as plt

In [12]:
def moving_average(acc_data):
  # Initialization of variables
  avg = 0
  prevX, prevY, prevZ = 0, 0, 0
  results = []
  # Each second (32 samples) the acceleration data is summarized using the following method:
  for i in range(0, len(acc_data), 32):
    sum_ = 0
    buffX = acc_data[i:i+32, 0]
    buffY = acc_data[i:i+32, 1]
    buffZ = acc_data[i:i+32, 2]
    
    for j in range(len(buffX)):
      sum_ += max(
        abs(buffX[j] - prevX),
        abs(buffY[j] - prevY),
        abs(buffZ[j] - prevZ)
      )
      prevX, prevY, prevZ = buffX[j], buffY[j], buffZ[j]
    #The output is then filtered:
    avg = avg * 0.9 + (sum_ / 32) * 0.1 #
    results.append(avg)

  return results

# create a vector from the data frame (signal imported by pandas)
def create_df_array(dataframe):
  matrix_df=dataframe.values
  # returns 2-d matrix
  matrix = np.array(matrix_df)
  array_df = matrix.flatten()# Convert matrix into an array
  return array_df

# convert UTC arrays to arrays in seconds relative to 0 (record beginning)
def time_abs_(UTC_array):
  new_array=[]
  for utc in UTC_array:
    time=(datetime.datetime.strptime(utc,'%Y-%m-%d %H:%M:%S')-datetime.datetime.strptime(UTC_array[0], '%Y-%m-%d %H:%M:%S')).total_seconds()
    new_array.append(int(time))
  return new_array

def read_signals(main_folder):
  signal_dict = {}
  time_dict = {}
  fs_dict = {}

  # Get a list of subfolders in the main folder
  subfolders = next(os.walk(main_folder))[1]

  utc_start_dict={}
  for folder_name in subfolders:
    csv_path = f'{main_folder}/{folder_name}/EDA.csv'
    df=pd.read_csv(csv_path)
    utc_start_dict[folder_name]= df.columns.tolist()

  # Iterate over the subfolders
  for folder_name in subfolders:
    folder_path = os.path.join(main_folder, folder_name)
    # Get a list of files in the subfolder
    files = os.listdir(folder_path)

    # Initialize a dictionary for the signals in the current subfolder
    signals = {}
    time_line = {}
    fs_signal= {}
    
    # Define the list of desired file names
    desired_files = ['EDA.csv', 'BVP.csv', 'HR.csv', 'TEMP.csv','tags.csv','ACC.csv']

    # Iterate over the files in the subfolder
    for file_name in files:
      file_path = os.path.join(folder_path, file_name)

      # Check if it's a CSV file and if it is in the desired files list
      if file_name.endswith('.csv') and file_name in desired_files:
        # Read the CSV file and store the signal data

        if file_name == 'tags.csv':
          try:
            df = pd.read_csv(file_path,header=None)
            tags_vector = create_df_array(df)
            tags_UTC_vector =np.insert(tags_vector,0,utc_start_dict[folder_name])
            signal_array=time_abs_(tags_UTC_vector)
          except pd.errors.EmptyDataError:
            signal_array=[]
        
        else:
          df = pd.read_csv(file_path)
          fs= df.loc[0]
          fs=int(fs[0])# Get sampling frequency
          df.drop([0],axis = 0,inplace=True) 
          signal_array = df.values
          time_array = np.linspace(0, len(signal_array)/fs,len(signal_array))

        signal_name = file_name.split('.')[0]
        signals[signal_name] = signal_array
        time_line[signal_name] = time_array
        fs_signal[signal_name] = fs

    # Store the signals of the current subfolder in the main dictionary
    signal_dict[folder_name] = signals
    time_dict[folder_name] = time_line
    fs_dict[folder_name] = fs_signal

  return signal_dict, time_dict, fs_dict

dataset_path = r'..\data\data_csv\Wearable_Dataset'
stress_level_v1_path = r'..\data\data_csv\Stress_Level_v1.csv'
stress_level_v2_path = r'..\data\data_csv\Stress_Level_v2.csv'


# Check that path exists before proceeding
if not os.path.exists(dataset_path):
  raise FileNotFoundError(f"Path does not exist: {dataset_path}")

states = os.listdir(dataset_path)

signal_data = {}
time_data = {}
fs_dict = {}
participants = {}

for state in states:
  folder_path = os.path.join(dataset_path, state)
  participants[state] = os.listdir(folder_path)
  signal_data[state], time_data[state], fs_dict[state] = read_signals(folder_path)


  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency
  fs=int(fs[0])# Get sampling frequency


## MIMIC-IV signal preprocessing

In [13]:
from scipy.signal import find_peaks, butter, filtfilt, detrend
import numpy as np
import scipy.signal as signal
import scipy.fftpack as fft
import matplotlib.pyplot as plt

# For each channel it uses a frequency of 500 Hz
# With a sample count of 5000 samples
# The total time is 10 seconds (duration of the signal)

# Load header & data file
hea_file = "../data/data_bin/40689238.hea"
dat_file = "../data/data_bin/40689238.dat"

# Read the header to extract sampling frequency and scaling factors
with open(hea_file, "r") as f:
    lines = f.readlines()

header_info = lines[0].strip().split()
channel_count = int(header_info[1]) # Number of channels
frequency     = int(header_info[2]) # Sampling frequency (Hz)
sample_count  = int(header_info[3]) # Total samples per channel

# Extract per-channel parameters (scale factors and channel names)
# Each subsequent line corresponds to one channel.
scale_factors = []  # list of scale factors for each lead
channel_names = []  # list of lead names

for i in range(1, channel_count + 1):
  parts = lines[i].strip().split()
  # parts[2] is a string like "200.0(0)/mV". Extract the numeric scale factor.
  scale_factor = float(parts[2].split("(")[0])
  scale_factors.append(scale_factor)

  # The last field is the lead name (e.g., I, II, III, aVR, etc.)
  channel_names.append(parts[-1])

# Read data interpreted as int16 and normalize it converting it into mV
# Normalization factor is 1/200mV

# Read the binary .dat file as 16-bit signed integers.
data = np.fromfile(dat_file, dtype=np.int16)

# The data is stored in an interleaved format: sample0 of all channels, sample1 of all channels, etc.
# Reshape the data so that each row corresponds to one channel (lead).
data = data.reshape((sample_count, channel_count))

# Data is stored as:
# [[I, II, III], [I, II, III], ...]
# it has stored sequentially each peak of each lead

# Convert raw data counts to millivolts (mV) using each lead's scale factor.
# For each lead, the conversion is: signal (mV) = raw_value / scale_factor.
# Apply this to each column of the data
scale_factors = np.array(scale_factors)
mv_signal = data / scale_factors.reshape(1, -1) 

# Read data interpreted as int16 and normalize it converting it into mV
# Normalization factor is 1/200mV

# Read the binary .dat file as 16-bit signed integers.
data = np.fromfile(dat_file, dtype=np.int16)

# The data is stored in an interleaved format: sample0 of all channels, sample1 of all channels, etc.
# Reshape the data so that each row corresponds to one channel (lead).
data = data.reshape((sample_count, channel_count))

# Data is stored as:
# [[I, II, III], [I, II, III], ...]
# it has stored sequentially each peak of each lead

# Convert raw data counts to millivolts (mV) using each lead's scale factor.
# For each lead, the conversion is: signal (mV) = raw_value / scale_factor.
# Apply this to each column of the data
scale_factors = np.array(scale_factors)
mv_signal = data / scale_factors.reshape(1, -1) 

# Apply bandpass filter to get finer resulds (TESTING NOT DONE YET)

def bandpass_filter_mimic(signal, lowcut=0.1, highcut=25.0, fs=frequency, order=3):
  nyquist = 0.5 * fs
  low = lowcut / nyquist
  high = highcut / nyquist
  b, a = butter(order, [low, high], btype='band')
  filtered_signal = filtfilt(b, a, signal, axis=0)
  return filtered_signal

# Apply filtering and baseline removal
filtered_signals = bandpass_filter_mimic(mv_signal)

In [14]:
# Extract BVP signal from the specified time range
def bvp_signal(t1, t2, signals, timeline, subject_signals, freq=64):
  start = int(t1 * freq)
  end = int(t2 * freq)

  # Only proceed if 'BVP' exists
  if 'BVP' not in signals[subject_signals]:
      return None
  
  time_range = timeline[subject_signals]['BVP'][start:end]
  filtered_segment = signals[subject_signals]['BVP'][start:end]
  return time_range, filtered_segment
  
# Extract features from the BVP signal
def extract_features(signal):
  signal = np.array(signal)
  features = [
    np.mean(signal),
    np.std(signal),
    np.max(signal),
    np.min(signal),
    np.sum(signal**2),
  ]
  return features


In [15]:
labelled_segments_version1 = [
  (0, 180, 0),     # Baseline         (3:00) → no stress
  (180, 300, 1),   # Stroop           (2:00) → stress
  (300, 600, 0),   # First Rest       (5:00) → no stress
  (600, 780, 1),   # TMCT             (3:00) → stress
  (780, 1080, 0),  # Second Rest      (5:00) → no stress
  (1080, 1110, 1), # Real Opinion     (0:30) → stress
  (1110, 1140, 1), # Opposite Opinion (0:30) → stress
  (1140, 1170, 1), # Subtract Test    (0:30) → stress
]

segment_labels_version1 = {}
for i in range(1, 19):
  subject_id = f'S{i:02d}'
  segment_labels_version1[subject_id] = list(labelled_segments_version1)

labelled_segments_version2 = [
  (0, 180, 0),     # Baseline         (3:00)  → no stress
  (180, 360, 1),   # TMCT             (3:00)  → stress
  (360, 960, 0),   # First Rest       (10:00) → no stress
  (960, 990, 1),   # Real Opinion     (0:30)  → stress
  (990, 1020, 1),  # Opposite Opinion (0:30)  → stress
  (1020, 1620, 0), # Second Rest      (10:00) → no stress
  (1620, 1650, 1), # Subtract Test    (0:30)  → stress
]

segment_labels_version2 = {}
for i in range(1, 19):
  subject_id = f'f{i:02d}'
  segment_labels_version2[subject_id] = list(labelled_segments_version2)

# Combining both versions
segment_labels = {**segment_labels_version1, **segment_labels_version2}

In [16]:
# Create feature and label arrays (X, y)

X, y = [], []

for person in participants['STRESS']:
  if person not in segment_labels:
    continue
  for (t1, t2, label) in segment_labels[person]:
    _, bvp = bvp_signal(t1, t2, signal_data['STRESS'], time_data['STRESS'], person)
    if bvp is not None and len(bvp) > 0:
      features = extract_features(bvp)
      X.append(features)
      y.append(label)


In [17]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

X = np.array(X)
y = np.array(y)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

clf = LogisticRegression()
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))


              precision    recall  f1-score   support

           0       0.92      0.74      0.82        31
           1       0.85      0.96      0.90        48

    accuracy                           0.87        79
   macro avg       0.89      0.85      0.86        79
weighted avg       0.88      0.87      0.87        79



# Test the trained model

To test the trained model from the wearable device, we are going to use mimic.
To be specific, we are going to test the first 2 seconds of the ECG mimic sample and check 
if the patient has stress or no stress

In [20]:
def mimic_signal(t1, t2, channel):
  # Convert to indices
  start_idx = int(t1 * frequency)
  end_idx = int(t2 * frequency)

  clean_signal = filtered_signals[start_idx:end_idx, :]
  
  # Return the specified signal from channel
  return clean_signal[:, channel]

In [21]:
# Get the signal segment for the first channel (0)
# measure the time in seconds
new_signal_segment = mimic_signal(t1=0, t2=2, channel=0)

# new samples, with the same feature columns as training data
X_new = np.array(extract_features(new_signal_segment)).reshape(1, -1)

# Predict stress or no stress on new segment
prediction = clf.predict(X_new)

# Predict stress or no stress on new segment
prediction = clf.predict(X_new)

print("Predicted class (0 = no stress, 1 = stress):", prediction[0])


Predicted class (0 = no stress, 1 = stress): 1
