In [18]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/gesture-recognition/rgwt.csv
/kaggle/input/gesture-recognition/from sensor.csv
/kaggle/input/gesture-recognition/realistic_gesture_dataset_with_idle.csv
/kaggle/input/gesture-recognition/evansgesturekiosk-default-rtdb-export (1).csv
/kaggle/input/gesture-recognition/realistic_gesture_dataset_with_idle_v2 (2).csv


In [30]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report

In [31]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F

In [32]:
data = pd.read_csv('/kaggle/input/gesture-recognition/EVANSdataset_expanded (1).csv') 

In [33]:
WINDOW_SIZE = 50  # Samples per window
STRIDE = 10        # How much the window shifts for each sample
BATCH_SIZE = 32     # Batch size for training
LEARNING_RATE = 0.001
NUM_EPOCHS = 100
TEST_SIZE = 0.3 #test size
dropout =0.5

In [34]:
data['timestamp'] = pd.to_datetime(data['TIME STAMP'])

# Function to create windows
def create_windows(df, window_size, stride):
    windows = []
    labels = []
    for i in range(0, len(df) - window_size + 1, stride):
        window = df.iloc[i:i + window_size].copy()
        label = window['GESTURE'].value_counts().idxmax() # Majority vote label
        windows.append(window)
        labels.append(label)
    return windows, labels

windows, labels = create_windows(data, WINDOW_SIZE, STRIDE)

# Function to extract features from a window
def extract_features(window):
    # Sensor Columns
    sensor_cols = ['UP', 'DOWN', 'LEFT', 'RIGHT']

    # Calculate differences between consecutive rows for each sensor
    diffs = window[sensor_cols].diff().fillna(0)

    # Calculate rolling mean for each sensor
    rolling_mean = window[sensor_cols].rolling(window=5).mean().fillna(0)

    # Calculate rolling standard deviation for each sensor
    rolling_std = window[sensor_cols].rolling(window=5).std().fillna(0)

    # Combine the above features.
    features = pd.concat([diffs, rolling_mean, rolling_std], axis=1)

    # Add timestamp-related features: time since the start of window.
    window['time_diff'] = (window['timestamp'] - window['timestamp'].iloc[0]).dt.total_seconds() #Time since start of the window
    time_based_features = window[['time_diff']] #Select the series

    features = pd.concat([features, time_based_features], axis=1)

    features = features.drop(columns=['timestamp'], errors='ignore') 

    return features

  data['timestamp'] = pd.to_datetime(data['TIME STAMP'])


In [35]:
X = []
for window in windows:
    X.append(extract_features(window))

# --- Convert to NumPy arrays before further processing ---
X_list = [extract_features(window).values for window in windows]
X = np.stack(X_list) # shape: (num_windows, WINDOW_SIZE, num_features)
y = np.array(labels)

In [36]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=42, stratify=y)

# Fit the LabelEncoder on the combined training and testing labels
label_encoder = LabelEncoder()
label_encoder.fit(np.concatenate([y_train, y_test])) #Fit on both train and test

y_train_encoded = label_encoder.transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

scaler = StandardScaler()
X_train_scaled = np.array([scaler.fit_transform(window) for window in X_train])
X_test_scaled = np.array([scaler.transform(window) for window in X_test])

In [37]:
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor = torch.tensor(y_test_encoded, dtype=torch.long)

In [43]:
class GestureDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = GestureDataset(X_train_tensor, y_train_tensor)
test_dataset = GestureDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# --- Define LSTM Model ---
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes,dropout=0.5):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True,dropout=dropout, bidirectional= True)
        self.fc = nn.Linear(hidden_size*2, num_classes)
        self.dropout_layer=nn.Dropout(dropout)

    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(x.device)

        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        out = self.dropout_layer(out[:, -1, :])
        # Decode the hidden state of the last time step
        out = self.fc(out)
        return out

In [44]:
class_counts = torch.bincount(y_train_tensor)
class_weights = 1. / class_counts.float()
class_weights = class_weights / class_weights.sum()
class_weights = class_weights.to(device)

input_size = X_train_tensor.shape[2] # Number of features
hidden_size = 128
num_layers = 4
num_classes = len(np.unique(y)) # Make sure num_classes lines up with existing classes
print(f"Num sensors: {input_size}, num_classes: {num_classes}")

model = LSTMModel(input_size, hidden_size, num_layers, num_classes, dropout)

# --- Loss and Optimizer ---
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

Num sensors: 13, num_classes: 4


In [45]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

LSTMModel(
  (lstm): LSTM(13, 128, num_layers=4, batch_first=True, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=256, out_features=4, bias=True)
  (dropout_layer): Dropout(p=0.5, inplace=False)
)

In [47]:
# --- Validation Function ---
def validate(model, val_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    val_loss = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    return val_loss / len(val_loader)

from torch.optim.lr_scheduler import ReduceLROnPlateau
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=False)

best_val_loss = float('inf')
patience = 5
no_improve = 0

for epoch in range(NUM_EPOCHS):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{NUM_EPOCHS}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
    # Validation loop
    val_loss = validate(model, test_loader, criterion)
    scheduler.step(val_loss)
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        no_improve = 0
    else:
        no_improve += 1
        if no_improve == patience:
            print("Early stopping")
            break
    

Early stopping


In [48]:
model.eval()
all_predictions = []
all_labels = []

with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    print(f'Accuracy of the model on the test set: {100 * correct / total:.2f} %')

# --- Classification Report ---
print("\nClassification Report:")
print(classification_report(y_test_encoded, all_predictions, target_names=label_encoder.classes_))

Accuracy of the model on the test set: 27.12 %

Classification Report:
              precision    recall  f1-score   support

  SWIPE DOWN       0.00      0.00      0.00        17
  SWIPE LEFT       0.20      0.08      0.11        13
 SWIPE RIGHT       0.29      0.67      0.41        15
    SWIPE UP       0.25      0.36      0.29        14

    accuracy                           0.27        59
   macro avg       0.19      0.28      0.20        59
weighted avg       0.18      0.27      0.20        59



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [49]:
print("Unique values in y_test_encoded:", np.unique(y_test_encoded))
print("Unique values in all_predictions:", np.unique(all_predictions))


Unique values in y_test_encoded: [0 1 2 3]
Unique values in all_predictions: [1 2 3]
