Importing libaries

In [1]:
import pandas as pd
import glob
import re
import os
import sys
import pickle
import datetime
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn import tree, metrics
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, ConfusionMatrixDisplay
from scipy.signal import butter, filtfilt, find_peaks
from sklearn.tree import DecisionTreeClassifier,export_graphviz
from sklearn.model_selection import train_test_split

Adding helpers:

In [None]:
def calc_magnitude(data):

    # Calculate magnitude
    data['accel_mag'] = np.sqrt(data['x']**2 + data['y']**2 + data['z']**2) # absolute accel magnitude
    data['accel_mag'] = data['accel_mag'] - data['accel_mag'].mean() # detrend: "remove gravity"

    return data

In [3]:
def remove_noise(data,sampling_rate):
    from scipy.signal import butter, filtfilt, find_peaks

    # Low pass filter
    cutoff = 5 # Hz
    order = 2
    b, a = butter(order, cutoff/(sampling_rate/2), btype='lowpass')
    data['filtered_accel_mag'] = filtfilt(b, a, data['accel_mag'])

    return data

In [4]:
def add_features(window):
    features = {}
    features['avg'] = window['filtered_accel_mag'].mean()
    features['max'] = window['filtered_accel_mag'].quantile(1)
    features['med'] = window['filtered_accel_mag'].quantile(0.5)
    features['min'] = window['filtered_accel_mag'].quantile(0)
    features['q25'] = window['filtered_accel_mag'].quantile(0.25)
    features['q75'] = window['filtered_accel_mag'].quantile(0.75)
    features['std'] = window['filtered_accel_mag'].std()
    df = pd.DataFrame()
    df = df._append(features,ignore_index=True)
    return df

In [None]:
def train_decision_tree(frames):
    # Extract feature columns
    X = frames[['avg', 'max', 'med', 'min', 'q25', 'q75', 'std']]

    # Extract target column
    y = frames['activity']

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    # Create model
    dt_model = DecisionTreeClassifier(criterion='entropy',max_depth=5).fit(X_train, y_train)
    dt_pred = dt_model.predict(X_test)

    # Evaluate on test set
    acc = dt_model.score(X_test, y_test)
    dt_cm = confusion_matrix(y_test, dt_pred, labels=dt_model.classes_)
    print(classification_report(y_test, dt_pred))
    print("Accuracy on test set:", acc)

    return dt_model,dt_cm,acc

In [None]:
def calc_gyro_features(data):
    data['gyro_mag'] = np.sqrt(data['gx']**2 + data['gy']**2 + data['gz']**2)
    data['gyro_mag'] = data['gyro_mag'] - data['gyro_mag'].mean()  # detrend
    return data

Calulating the orientation for sleep tracking:

In [None]:
def calc_orientation(data):
    data['pitch'] = np.arctan2(data['y'], np.sqrt(data['x']**2 + data['z']**2))
    data['roll'] = np.arctan2(-data['x'], data['z'])
    return data

Adding extra feauters for sleep position:

In [None]:
def add_position_features(window):
    features = {}
    
    base_features = add_features(window)
    
    features['pitch_mean'] = window['pitch'].mean()
    features['pitch_std'] = window['pitch'].std()
    features['roll_mean'] = window['roll'].mean()
    features['roll_std'] = window['roll'].std()
    
    features['stability'] = window['filtered_accel_mag'].std() / window['filtered_accel_mag'].mean()
    features['movement_intensity'] = np.sum(np.abs(np.diff(window['filtered_accel_mag'])))
    
    df = pd.DataFrame()
    df = df._append({**base_features.iloc[0], **features}, ignore_index=True)
    return df

In [None]:
def extract_position_features(data, window_sec, sample_rate, position):
    data['timestamp'] = pd.to_datetime(data.index, unit='s')
    data.set_index('timestamp', inplace=True)
    
    data = calc_orientation(data)
    
    window_size = f'{window_sec}s'
    
    resampled_data = data.resample(window_size)
    
    features_df = pd.DataFrame()
    
    for timestamp, window in resampled_data:
        if not window.empty:
            features = add_position_features(window)
            
            features['position'] = position
            
            features_df = pd.concat([features_df, features], ignore_index=True)
    
    return features_df

In [None]:
def train_position_classifier(frames):
    features = ['avg', 'max', 'med', 'min', 'q25', 'q75', 'std',
                'pitch_mean', 'pitch_std', 'roll_mean', 'roll_std',
                'stability', 'movement_intensity']
    
    X = frames[features]
    y = frames['position']

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )

    dt_model = DecisionTreeClassifier(
        criterion='entropy',
        max_depth=6,  
        min_samples_leaf=5  
    ).fit(X_train, y_train)
    
    dt_pred = dt_model.predict(X_test)

    acc = dt_model.score(X_test, y_test)
    dt_cm = confusion_matrix(y_test, dt_pred, labels=dt_model.classes_)
    print(classification_report(y_test, dt_pred))
    print("Accuracy on test set:", acc)

    return dt_model, dt_cm, acc

In [None]:
def process_sleep_data(root, output_filename="sleep_data.csv"):
    all_data = pd.DataFrame()
    
    csv_files = glob.glob(os.path.join(root, '**', '*.csv'), recursive=True)
    
    for file in csv_files:
        position = os.path.basename(os.path.dirname(file))
        
        data = pd.read_csv(file)
        
        data = calc_magnitude(data)
        data = calc_orientation(data)  
        data = remove_noise(data, 100)  
        
        features_df = extract_position_features(data, 5, 100, position)
        
        all_data = pd.concat([all_data, features_df], ignore_index=True)
    
    all_data.to_csv(output_filename, index=False)
    return all_data