In [None]:
# Required installs for training and running the ML model:
# !pip install -U pandas
# !pip install -U scikit-learn
# !pip install -U numpy
# !pip install -U matplotlib
# !pip install -U joblib

Training and Testing

In [None]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import numpy as np

# 1. Load and prepare dataset
df = pd.read_csv('sample_data2.csv')  # Load sensor dataset from CSV

# Encode activity labels (string â†’ integer codes)
label_encoder = LabelEncoder()
df['Activity_encoded'] = label_encoder.fit_transform(df['Activity'])

# 2. Sliding window parameters
window_size = 200  # Number of samples per analysis window
step_size = 50     # Step size between windows

# Lists to store extracted features and labels
features_list = []
labels = []

# Sensor columns used for feature extraction
sensor_cols = [
    'Roll1', 'Pitch1', 'Yaw1',
    'Roll2', 'Pitch2', 'Yaw2',
    'FRS', 'EMG1'
]

# 3. Feature extraction function
def extract_features_from_window(window):
    """
    Extracts statistical features from each sensor column in a given time window.
    Features per column:
      - Mean
      - Standard deviation
      - Minimum value
      - Maximum value
      - Root Mean Square (RMS)
    """
    feats = []
    for col in sensor_cols:
        data = window[col].values
        feats.extend([
            np.mean(data),                    # Mean
            np.std(data),                     # Standard deviation
            np.min(data),                     # Minimum
            np.max(data),                     # Maximum
            np.sqrt(np.mean(data**2))         # RMS
        ])
    return feats

# 4. Apply sliding window to extract features and labels
for start in range(0, len(df) - window_size, step_size):
    window = df.iloc[start:start + window_size]
    feats = extract_features_from_window(window)
    features_list.append(feats)
    labels.append(window['Activity_encoded'].mode()[0])  # Majority vote label


In [3]:
print(df['Activity'].value_counts())



Activity
standing      8914
walking       6998
downstairs    4921
upstairs      4919
sitting       3946
sit_stand     3170
Name: count, dtype: int64


In [4]:
print(dict(zip(label_encoder.transform(label_encoder.classes_), label_encoder.classes_)))


{np.int64(0): 'downstairs', np.int64(1): 'sit_stand', np.int64(2): 'sitting', np.int64(3): 'standing', np.int64(4): 'upstairs', np.int64(5): 'walking', np.int64(6): nan}


In [8]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix

# Convert lists to NumPy arrays for model input
X = np.array(features_list)  # Feature matrix
y = np.array(labels)         # Encoded activity labels

# Split into training (80%) and testing (20%) sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42
)

# Initialize and train the Random Forest classifier
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

# Predict labels for the test set
y_pred = clf.predict(X_test)

# Get the unique labels present in the test set
unique_labels = np.unique(y_test)

# Convert encoded labels back to their original activity names
target_names = label_encoder.inverse_transform(unique_labels)

# Print performance metrics
print(classification_report(
    y_test,
    y_pred,
    labels=unique_labels,
    target_names=target_names
))

# Print confusion matrix
print("Confusion Matrix:\n", confusion_matrix(
    y_test,
    y_pred,
    labels=unique_labels
))


              precision    recall  f1-score   support

  downstairs       1.00      1.00      1.00        22
   sit_stand       1.00      1.00      1.00        12
     sitting       1.00      0.93      0.97        15
    standing       0.90      1.00      0.95        27
    upstairs       1.00      1.00      1.00        27
     walking       1.00      0.93      0.96        28

    accuracy                           0.98       131
   macro avg       0.98      0.98      0.98       131
weighted avg       0.98      0.98      0.98       131

Confusion Matrix:
 [[22  0  0  0  0  0]
 [ 0 12  0  0  0  0]
 [ 0  0 14  1  0  0]
 [ 0  0  0 27  0  0]
 [ 0  0  0  0 27  0]
 [ 0  0  0  2  0 26]]


In [10]:
from sklearn.model_selection import cross_val_score

# Perform 5-fold cross-validation using the same Random Forest classifier
scores = cross_val_score(clf, X, y, cv=5)

# Print the individual fold scores
print("Cross-validation scores:", scores)

# Print the average accuracy across all folds
print("Average accuracy:", scores.mean())


Cross-validation scores: [0.94656489 1.         0.97709924 1.         0.96153846]
Average accuracy: 0.9770405167351732


In [9]:
from sklearn.model_selection import StratifiedKFold, cross_val_score

# StratifiedKFold ensures each fold has the same class distribution as the whole dataset
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Perform cross-validation using stratified folds
scores = cross_val_score(clf, X, y, cv=skf)

# Print results
print("Stratified CV scores:", scores)
print("Average accuracy:", scores.mean())


Stratified CV scores: [0.99236641 0.97709924 0.98473282 0.99236641 0.96923077]
Average accuracy: 0.9831591309453904


Real-Time Predictions 

In [None]:
import joblib

# Save the trained Random Forest model to a file
joblib.dump(clf, "rf_model.pkl")

# Save the label encoder so we can decode predictions later
joblib.dump(label_encoder, "label_encoder.pkl")


['label_encoder.pkl']

In [None]:
import serial
from collections import deque
import warnings
import numpy as np
import joblib

# Suppress non-critical warnings
warnings.filterwarnings("ignore", category=UserWarning)

# ---------------------------------------------------------
# Load trained Random Forest model and Label Encoder
# ---------------------------------------------------------
model = joblib.load("rf_model.pkl")
label_encoder = joblib.load("label_encoder.pkl") 

# ---------------------------------------------------------
# Connect to Teensy via serial port
#   - Replace "COM5" with the correct port for your system
# ---------------------------------------------------------
ser = serial.Serial("COM5", 115200, timeout=1)

# ---------------------------------------------------------
# Buffers for real-time processing
# ---------------------------------------------------------
buffer = deque(maxlen=200)           # Holds the last 200 sensor samples
prediction_history = deque(maxlen=5) # Holds the last 5 predictions (for smoothing)

# ---------------------------------------------------------
# Feature extraction for one time window
# ---------------------------------------------------------
def extract_features(buffer_np):
    """
    Extracts statistical features from each column in the sensor data buffer.
    Features per column:
        - Mean
        - Standard deviation
        - Minimum
        - Maximum
        - Root Mean Square (RMS)
    """
    features = []
    for i in range(buffer_np.shape[1]):  # Loop over sensor columns
        col = buffer_np[:, i]
        features.extend([
            np.mean(col),                   # Mean
            np.std(col),                    # Standard deviation
            np.min(col),                    # Minimum
            np.max(col),                    # Maximum
            np.sqrt(np.mean(col**2))        # RMS
        ])
    return np.array(features).reshape(1, -1)  # Return as 2D array for model

# ---------------------------------------------------------
# Main loop: read serial data, predict, and print result
# ---------------------------------------------------------
try:
    while True:
        # Read one line from serial and split into sensor values
        line = ser.readline().decode("utf-8").strip()
        parts = line.split("\t")
        
        # Expecting exactly 8 sensor readings per line
        if len(parts) != 8:
            continue

        try:
            # Convert sensor readings to floats and add to buffer
            features = list(map(float, parts))
            buffer.append(features)
        except ValueError:
            continue  # Skip lines with invalid data

        # Once buffer is full, make a prediction
        if len(buffer) == buffer.maxlen:
            buffer_np = np.array(buffer)
            feat_vector = extract_features(buffer_np)
            
            # Predict using the trained model
            pred = model.predict(feat_vector)[0]
            prediction_history.append(pred)
    
            # Majority vote smoothing over last N predictions
            smoothed_pred = max(set(prediction_history), key=prediction_history.count)
            
            # Decode numeric prediction back to label
            label = label_encoder.inverse_transform([smoothed_pred])[0]
            print("Prediction:", label)

# Graceful exit on Ctrl+C
except KeyboardInterrupt:
    print("Stopped by user")

# Handle other exceptions
except Exception as e:
    print("Error:", e)

# Always close the serial connection
finally:
    ser.close()


SerialException: could not open port 'COM5': FileNotFoundError(2, 'The system cannot find the file specified.', None, 2)