In [106]:
# Import necessary libraries
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import plotly.graph_objects as go
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [107]:
# Load the CSV file
teraterm_path = "C:/Users/hylth/AppData/Local/teraterm5/"
file_name = "taps"
data = pd.read_csv(teraterm_path + file_name, header=None)

# Assert numbers are numeric.
data = data.apply(pd.to_numeric)

# Assert that the data has 7 columns.
assert data.shape[1] == 7, "Data should have 7 columns."

# Assign column names.
data.columns = ['t', 'ax', 'ay', 'az', 'gx', 'gy', 'gz']

# Scale data to sec g and deg/sec.
# data['t'] = data['t'] / 1_000_000
# data[['ax', 'ay', 'az']] = data[['ax', 'ay', 'az']] / 2048 # For a full-scale of 16gs.
# data[['gx', 'gy', 'gz']] = data[['gx', 'gy', 'gz']] / 65.5 # For a full-scale of ??.

In [108]:
# Define a threshold for contact detection.
threshold = 1000

# Create labels based on the threshold.
window = 10
data['contact'] = (
    (data[['ax', 'ay', 'az']].diff(periods=window).fillna(0).abs() > threshold).any(axis=1)
).astype(int)

# Keep only the first "contact==1" and unset the next 100, then repeat for the next.
curr_contact = 0
while True:
    contact_true_idx = data.index[data['contact'] == 1]
    if contact_true_idx.size <= curr_contact:
        break
    data.loc[contact_true_idx[curr_contact]+1 : contact_true_idx[curr_contact]+100, 'contact'] = 0
    curr_contact += 1

In [109]:
# Segment data into separate recordings.
# Find indices where 't' decreases (i.e., recording restarts)
segment_breaks = data.index[data['t'].diff() < 0].tolist()

# Add start and end indices
segment_indices = [0] + segment_breaks + [len(data)]

# Create a list of DataFrames, each representing a segment of increasing 't'
segments = [data.iloc[segment_indices[i]:segment_indices[i+1]].reset_index(drop=True) for i in range(len(segment_indices)-1)]

# Plots

In [110]:
# Manual label modifications.
def UnsetLabel(segment_idx, label_indices):
    segment_idx -= 1
    label_indices = [idx - 1 for idx in label_indices]
    seg = segments[segment_idx]
    # Iterate over the list of indices to unset
    for label_idx in sorted(label_indices, reverse=True):
        # Get the nth label index where 'contact' is 1
        nth_label_idx = seg.index[seg['contact'] == 1][label_idx]
        seg.at[nth_label_idx, 'contact'] = 0  # Unset the label

# UnsetLabel(1,[4])
# UnsetLabel(1,[2])
# UnsetLabel(2,[1])
# UnsetLabel(3,[0])
# UnsetLabel(4,[0,2,4])
# UnsetLabel(5,[0])

# Plot ax, ay, az over time for each segment using Plotly.
for seg in segments:
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=seg['t'], y=seg['ax'], mode='lines', name='ax', line=dict(color='red')))
    fig.add_trace(go.Scatter(x=seg['t'], y=seg['ay'], mode='lines', name='ay', line=dict(color='green')))
    fig.add_trace(go.Scatter(x=seg['t'], y=seg['az'], mode='lines', name='az', line=dict(color='blue')))
    # Add vertical lines for contact points
    for t_contact in seg['t'][seg['contact'] == 1]:
        fig.add_vline(x=t_contact, line=dict(color='black', dash='dash', width=1), opacity=0.7)

    fig.update_layout(
        title='Acceleration and Contact over Time',
        xaxis_title='t',
        yaxis_title='Acceleration',
        legend_title='Sensor'
    )
    fig.show()

# Create Normalized Feature-Label Tensors

In [123]:
# Create sequences from a numpy array.
def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data) - seq_length):
        sequences.append(data[i:i+seq_length])  # Extract sequence
    return np.array(sequences) # returns a 3D array of size (len(data)-seq_length, seq_length, data_cols).

is_first_seg = True
for seg in segments:
    # Create sequences for training and testing.
    sequence_length = 30
    features = seg[['ax','ay','az']].values # numpi array.
    labels = seg['contact'].values # numpi array.
    feature_sequences = create_sequences(features, sequence_length)
    label_sequences = create_sequences(labels, sequence_length)
    if is_first_seg:
        is_first_seg = False
        all_feature_sequences = feature_sequences.copy()
        all_label_sequences = label_sequences.copy()
    else:
        all_feature_sequences = np.append(all_feature_sequences, feature_sequences, axis=0)
        all_label_sequences = np.append(all_label_sequences, label_sequences, axis=0)

# Convert features and labels into training and testing datasets.
feature_training, feature_testing, label_training, label_testing = train_test_split(
    all_feature_sequences, all_label_sequences, test_size=0.2, random_state=42, stratify=all_label_sequences[:, 0]
)

# Normalize data.
scaler = StandardScaler()
num_samples, seq_length, num_features = feature_training.shape
feature_training = feature_training.reshape(-1, num_features)  # Reshape to 2D array.
feature_testing = feature_testing.reshape(-1, num_features)  # Reshape to 2D array.
feature_training = scaler.fit_transform(feature_training) # Normalize.
feature_testing = scaler.transform(feature_testing) # Scale with same normalizing parameters to prevent leakage into training set.
feature_training = feature_training.reshape(num_samples, seq_length, num_features)  # Reshape back to 3D array.
feature_testing = feature_testing.reshape(-1, seq_length, num_features)  # Reshape back to 3D array.

# Convert sequences and targets to PyTorch tensors for training and testing
# Unsqueeze informs the NN that there is 1 label type, 'contact'.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
feature_training_tensor = torch.tensor(feature_training, dtype=torch.float32).to(device)
label_training_tensor = torch.tensor(label_training, dtype=torch.float32).unsqueeze(-1).to(device) # Last dimension is label_idx.
feature_testing_tensor = torch.tensor(feature_testing, dtype=torch.float32).to(device)
label_testing_tensor = torch.tensor(label_testing, dtype=torch.float32).unsqueeze(-1).to(device) # Last dimension is label_idx.

print("feature_training_tensor.shape = " + str(feature_training_tensor.shape))
print("label_training_tensor.shape = " + str(label_training_tensor.shape))
print("feature_testing_tensor.shape = " + str(feature_testing_tensor.shape))
print("label_testing_tensor.shape = " + str(label_testing_tensor.shape))
print("Count of 1s in label_training:", np.sum(label_training[:,0]))
print("Count of 1s in label_testing:", np.sum(label_testing[:,0]))

feature_training_tensor.shape = torch.Size([25887, 30, 3])
label_training_tensor.shape = torch.Size([25887, 30, 1])
feature_testing_tensor.shape = torch.Size([6472, 30, 3])
label_testing_tensor.shape = torch.Size([6472, 30, 1])
Count of 1s in label_training: 17
Count of 1s in label_testing: 4


# Model Class Definitions

In [None]:
class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid() # Activation for binary classification

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))  # Sigmoid activation for output
        return x
    
class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(SimpleRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Define the RNN layer
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)

        # Fully connected layer for output
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden state (h0) with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # Forward propagate through RNN
        out, _ = self.rnn(x, h0)

        # Pass the last time step's output through the fully connected layer
        out = self.fc(out[:, -1, :])  # Use the last time step's output
        return torch.sigmoid(out)  # Sigmoid activation for binary classification

# Initialize Neural Network (RESETS MODEL)

In [95]:
if False:
    # Model parameters
    input_size = X_train.shape[1]  # Number of features (3: ax, ay, az)
    hidden_size = 16  # Number of neurons in the hidden layers
    output_size = 1  # Binary classification (contact or no contact)
    num_layers = 1

    # Initialize the model
    model = SimpleRNN(input_size, hidden_size, output_size, num_layers)
    model = model.to(device)

    # Loss function and optimizer
    criterion = nn.BCELoss()  # Binary Cross-Entropy Loss
    optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer

# Training Loop

In [None]:
# Training loop
epoch = 0
last_loss = float('inf')
milestone = 1
while True:
    epoch += 1

    model.train()  # Set the model to training mode

    # Forward pass
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Early stopping: break if loss increases compared to previous epoch
    if loss.item() > last_loss:
        print(f"Early stopping at epoch {epoch+1} due to increasing loss. {loss.item()} > {last_loss}")
        break
    last_loss = loss.item()

    # Print loss every 100 epochs
    if loss.item() < milestone:
        print(f'Epoch [{epoch}], Loss: {loss.item():.8f}')
        while loss.item() < milestone:
            milestone = milestone/2

# Evaluate Model

In [None]:
import matplotlib.pyplot as plt

# Plot ax, ay, az over time
plt.figure(figsize=(12, 2))
plt.plot(data['t'], data['ax'], label='ax')
plt.plot(data['t'], data['ay'], label='ay')
plt.plot(data['t'], data['az'], label='az')
plt.xlabel('t')
plt.ylabel('Acceleration')
plt.title('Acceleration over Time')
plt.tight_layout()
plt.show()

# Plot contact over time
plt.figure(figsize=(12, 2))
plt.plot(data['t'], data['contact'], label='contact')
plt.xlabel('t')
plt.ylabel('Contact')
plt.title('Contact over Time')
plt.tight_layout()
plt.show()

# Get NN predictions for all data points
model.eval()
with torch.no_grad():
    # Create sequences from all data
    X_all = scaler.transform(data[['ax', 'ay', 'az']].values)
    X_all_seq, _ = create_sequences(X_all, data['contact'].values, sequence_length)
    X_all_seq_tensor = torch.tensor(X_all_seq, dtype=torch.float32).to(device)
    assert X_all_seq_tensor.ndimension() == 3, "X_all_seq_tensor must be 3D"
    y_all_pred = model(X_all_seq_tensor).cpu().numpy().flatten()

# Use time_indices for correct alignment with y_all_pred
time_indices = data['t'][sequence_length:]

# Plot predictions.
plt.figure(figsize=(12, 2))
plt.plot(time_indices, y_all_pred)
prob_thresh = 0.01
plt.text(time_indices.iloc[-1], 0.95, f'Count > {prob_thresh}: {(y_all_pred > prob_thresh).sum()}', color='black', fontsize=12, ha='right', va='top')
plt.show()

# Indices where actual contact is 1
actual_contact_indices = data.index[data['contact'] == 1]
print("Actual contact indices:", actual_contact_indices.tolist())

# Indices where predicted contact probability is above 0.9
predicted_contact_indices = time_indices.index[y_all_pred > 0.9]
print("Predicted contact indices (prob > 0.9):", predicted_contact_indices.tolist())

window = sequence_length*2
for idx in predicted_contact_indices:
    start = max(idx - window, 0)
    end = min(idx + window, len(data) - 1)
    plt.figure(figsize=(12, 3))
    plt.plot(data['t'][start:end+1], data['ax'][start:end+1], label='ax')
    plt.plot(data['t'][start:end+1], data['ay'][start:end+1], label='ay')
    plt.plot(data['t'][start:end+1], data['az'][start:end+1], label='az')
    plt.plot(data['t'][start:end+1], data['contact'][start:end+1], label='contact', linestyle='--', color='black')
    plt.plot(data['t'][start:end+1], y_all_pred[start:end+1], color='magenta', linestyle=':', label='Predicted Contact')
    plt.xlabel('t')
    plt.ylabel('Value')
    plt.title(f'Predicted Contact at index {idx} (t={data["t"][idx]:.3f})')
    plt.legend()
    plt.tight_layout()
    plt.show()