In [1]:
import os
import numpy as np
import pandas as pd
import glob
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
from hmmlearn import hmm
from sklearn.metrics import classification_report
# from sklearn.externals import joblib
# from sklearn.utils import joblib
import joblib
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier


In [2]:
# pip install hmmlearn

In [3]:
#load data
# 加载数据
import os
import pandas as pd

def load_data(data_dir):
    gestures = ["circle", "wave", "comeHere", "goAway"]
    data = []
    labels = []

    for gesture in gestures:
        for i in range(1, 6):
            file_path = os.path.join(data_dir, f"{gesture}_{i}.csv")
            df = pd.read_csv(file_path)
            gesture_data = df[["Linear Acceleration x (m/s^2)", "Linear Acceleration y (m/s^2)", "Linear Acceleration z (m/s^2)"]].values
            data.append(gesture_data)
            labels.append(gesture)

    return data, labels


In [4]:
#feature extraction
def feature_extraction(data, labels):
    features = []
    feature_labels = []

    window_size = 240
    step_size = 10

    for gesture_data, label in zip(data, labels):
        for i in range(0, len(gesture_data) - window_size, step_size):
            window = gesture_data[i:i + window_size]
            feature = calculate_features(window)
            features.append(feature)
            feature_labels.append(label)

    return features, feature_labels

def calculate_features(window):
    mean_x = np.mean(window[:, 0])
    mean_y = np.mean(window[:, 1])
    mean_z = np.mean(window[:, 2])

    max_x = np.max(window[:, 0])
    max_y = np.max(window[:, 1])
    max_z = np.max(window[:, 2])

    min_x = np.min(window[:, 0])
    min_y = np.min(window[:, 1])
    min_z = np.min(window[:, 2])

    return [mean_x, mean_y, mean_z, max_x, max_y, max_z, min_x, min_y, min_z]

In [5]:
#data preprocess
def preprocess_data(features, scaler=None):
    if scaler is None:
        scaler = StandardScaler()
        processed_data = scaler.fit_transform(features)
    else:
        processed_data = scaler.transform(features)
    return processed_data, scaler

In [6]:
#Dataset partitioning
def split_data(features, labels, test_size=0.2, random_state=42):
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=random_state)
    return X_train, X_test, y_train, y_test

In [7]:
#model train
from sklearn.svm import SVC

def train_svm(X_train, y_train):
    svm = SVC()
    svm.fit(X_train, y_train)
    return svm

In [8]:
from sklearn.tree import DecisionTreeClassifier

def train_decision_tree(X_train, y_train):
    decision_tree = DecisionTreeClassifier()
    decision_tree.fit(X_train, y_train)
    return decision_tree

In [9]:
from sklearn.ensemble import RandomForestClassifier

def train_random_forest(X_train, y_train):
    random_forest = RandomForestClassifier()
    random_forest.fit(X_train, y_train)
    return random_forest

In [10]:
#import classification_report
def evaluate_model(model, X_test, y_test, target_names):
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred, target_names=target_names)
    print(report)

In [11]:
import joblib

def save_model(model, model_name):
    joblib.dump(model, f"{model_name}.joblib")

In [12]:
#testing section
def load_csv(file_path):
    data = []
    df = pd.read_csv(file_path)
    gesture_data = df[["Linear Acceleration x (m/s^2)", "Linear Acceleration y (m/s^2)", "Linear Acceleration z (m/s^2)"]].values
    data.append(gesture_data)
#     return gesture_data
    return data

def sliding_window(data, window_size, step_size):
    windows = []
    for i in range(0, len(data) - window_size, step_size):
        window = data[i:i + window_size]
        windows.append(window)

    return windows

def feature_extraction_test(data):
    features = []

    window_size = 240
    step_size = 10

    for gesture_data in data:
        for i in range(0, len(gesture_data) - window_size, step_size):
            window = gesture_data[i:i + window_size]
            feature = calculate_features(window)
            features.append(feature)
#     print("Extracted features:", features)

    return features

In [13]:
#testing whole files
def test_model_on_whole_files(model, test_data_dir, scaler):
    gestures = ["circle", "wave", "comeHere", "goAway"]
    correct_count = 0
    total_count = 0

    for gesture in gestures:
        for i in range(1, 6):
            file_path = os.path.join(test_data_dir, f"{gesture}_test_{i}.csv")
            gesture_data = load_csv(file_path)
            features = feature_extraction_test(gesture_data)
            
            if not features:
                print(f"Features for {gesture}_test_{i} are empty. Skipping this file.")
                continue
            
#             processed_features = scaler.transform([features])
            processed_features, _ = preprocess_data(features, scaler)
            prediction = model.predict(processed_features)
            
            print(f"Actual: {gesture} | Predicted: {prediction[0]}")
            
            if prediction[0] == gesture:
                correct_count += 1
            total_count += 1

    accuracy = correct_count / total_count
    print(f"Accuracy: {accuracy}")

In [14]:
#split data and test part of file
def split_data_into_parts(data, num_parts):
    data_array = data[0]
    data_length = len(data_array)
    part_size = data_length // num_parts
    
    data_parts = []
    
    for i in range(0, data_length, part_size):
        start_index = i
        end_index = i + part_size if i + part_size < data_length else data_length
        part_data = data_array[start_index:end_index]
        data_parts.append([part_data])  # 将 part_data 放入一个列表中，使数据结构与 load_csv() 相同
    
    return data_parts


def test_model_on_file_parts(model, test_data_dir, scaler, num_parts=3):
    gestures = ["circle", "wave", "comeHere", "goAway"]
    correct_count = 0
    total_count = 0

    for gesture in gestures:
        for i in range(1, 6):
            file_path = os.path.join(test_data_dir, f"{gesture}_test_{i}.csv")
            gesture_data = load_csv(file_path)
#             print("the data is: ",gesture_data)
            data_parts = split_data_into_parts(gesture_data, num_parts)
#             print("data_parts is: ", data_parts)
            
            for j, part_data in enumerate(data_parts):
#                 print("len(part_data) and part_data is ", len(part_data), part_data)
                features = feature_extraction_test(part_data)
                
                if not features:
                    print(f"Features for {gesture}_test_{i}_part_{j + 1} are empty. Skipping this part.")
                    continue
                
                processed_features, _ = preprocess_data(features, scaler)
                prediction = model.predict(processed_features)
                
                print(f"Actual: {gesture} | Predicted: {prediction[0]} | File: {gesture}_test_{i}_part_{j + 1}")
                
                if prediction[0] == gesture:
                    correct_count += 1
                total_count += 1

    accuracy = correct_count / total_count
    print(f"Accuracy: {accuracy}")


In [15]:
def main():
    data_dir = "data"
    data, labels = load_data(data_dir)

    target_names = ["circle", "wave", "comeHere", "goAway"]

    features, feature_labels = feature_extraction(data, labels)
    processed_data, train_scaler  = preprocess_data(features)

    X_train, X_test, y_train, y_test = split_data(processed_data, feature_labels)

    # Train and evaluate SVM model
    svm_model = train_svm(X_train, y_train)
    print("SVM evaluation:")
    evaluate_model(svm_model, X_test, y_test, target_names)

    # Train and evaluate Decision Tree model
    decision_tree_model = train_decision_tree(X_train, y_train)
    print("Decision Tree evaluation:")
    evaluate_model(decision_tree_model, X_test, y_test, target_names)

    # Train and evaluate Random Forest model
    random_forest_model = train_random_forest(X_train, y_train)
    print("Random Forest evaluation:")
    evaluate_model(random_forest_model, X_test, y_test, target_names)
    
    #test
    test_model_on_whole_files(svm_model, "data_test",train_scaler)
    test_model_on_file_parts(svm_model, "data_test", train_scaler)
    
    # Save models
    save_model(svm_model, "svm_model")
    save_model(decision_tree_model, "decision_tree_model")
    save_model(random_forest_model, "random_forest_model")

if __name__ == "__main__":
    main()


SVM evaluation:
              precision    recall  f1-score   support

      circle       1.00      1.00      1.00       147
        wave       1.00      1.00      1.00       103
    comeHere       1.00      1.00      1.00       118
      goAway       1.00      1.00      1.00        83

    accuracy                           1.00       451
   macro avg       1.00      1.00      1.00       451
weighted avg       1.00      1.00      1.00       451

Decision Tree evaluation:
              precision    recall  f1-score   support

      circle       1.00      1.00      1.00       147
        wave       1.00      1.00      1.00       103
    comeHere       1.00      1.00      1.00       118
      goAway       1.00      1.00      1.00        83

    accuracy                           1.00       451
   macro avg       1.00      1.00      1.00       451
weighted avg       1.00      1.00      1.00       451

Random Forest evaluation:
              precision    recall  f1-score   support

      c