In [None]:

model.eval() 
torch.save(model.state_dict(), '')
print('Model weights saved to step_lstm_more_stairsup.pth')

In [None]:
model = LSTM(6, 64, 2, 0.2)
model.load_state_dict(torch.load(''))
model.eval()
print('Model weights loaded from model_weights.pth')

# SINGLE VERSION

In [6]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import glob

import pandas as pd
import os

class StepDataset(Dataset):
    def __init__(self, folder_path, window_size):
        self.window_size = window_size
        self.data = self.load_data(folder_path)
        self.features = self.data[['accel_x', 'accel_y', 'accel_z', 'gyro_x', 'gyro_y', 'gyro_z']].values
        self.labels = self.data['step_timestamp'].apply(lambda x: 1 if x in [1, 2, 3] else 0).values  # Convert 1, 2, 3 to 1

    def load_data(self, folder_path):
        # Recursively load all .csv files from the folder and its subfolders
        all_files = glob.glob(os.path.join(folder_path, '**', '*.csv'), recursive=True)
        df_list = [pd.read_csv(file) for file in all_files]
        return pd.concat(df_list, ignore_index=True)

    def __len__(self):
        return len(self.data) - self.window_size + 1

    def __getitem__(self, idx):
        x = self.features[idx:idx + self.window_size]
        y = self.labels[idx + self.window_size - 1]
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

In [15]:
# Parameters
folder_path = r''
window_size = 160
batch_size = 32
test_split_ratio = 0.15 

# Create dataset
dataset = StepDataset(folder_path, window_size)

# Calculate the number of samples for training and testing
test_size = int(len(dataset) * test_split_ratio)
train_size = len(dataset) - test_size

# Split the dataset
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout_prob):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.fc = nn.Linear(hidden_size, 1) 

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]  
        out = self.fc(out)  
        return out
    
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize the model
model = LSTM(6, 64, 2, 0.2).to(device)

num_steps = sum(dataset.labels) 
num_no_steps = len(dataset.labels) - num_steps 
pos_weight = torch.tensor([num_no_steps / num_steps], dtype=torch.float32).to(device)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs).squeeze()
        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)

    model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels.float())
            test_loss += loss.item()
    test_loss /= len(test_loader)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

# Evaluate the model with leeway
model.eval()
correct = 0
total = 0

sampling_rate = 100 
time_leeway_ms = 100 
sample_leeway = int((time_leeway_ms / 1000) * sampling_rate)

real_steps = 0
counted_steps = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs).squeeze()
        predicted = (outputs > 0.5).float()

        labels_np = labels.cpu().numpy()
        predicted_np = predicted.cpu().numpy()

        for i in range(len(labels_np)):
            label = labels_np[i]
            pred = predicted_np[i]

            if label != 0:
                real_steps += 1
            if label != 0 and torch.any(predicted == label):
                counted_steps += 1
            if torch.any(predicted == label):
                correct += 1

            total += 1

try:
    accuracy = 100 * correct / total
    print(f'Accuracy with leeway: {accuracy:.2f}%')
    print(f'Real steps: {real_steps}, Counted steps: {counted_steps}')
except ZeroDivisionError:
    print('No steps detected in the test set')


## TEST A FOLDER

In [None]:
import torch
import pandas as pd
import os
import torch.nn as nn
import torch.optim as optim
import numpy as np
import time  # Import the time module

def test_single_file(model, csv_file, window_size, sample_leeway, output_file, device='cpu'):
    # Load the data
    data = pd.read_csv(csv_file)
    features = data[['accel_x', 'accel_y', 'accel_z', 'gyro_x', 'gyro_y', 'gyro_z']].values
    timestamps = data['step_timestamp'].values

    inputs = []
    for i in range(len(features) - window_size + 1):
        x = features[i:i + window_size]
        inputs.append(torch.tensor(x, dtype=torch.float32))

    model.to(device)
    model.eval()

    correct = 0
    total = 0
    predicted_steps = []

    with torch.no_grad():
        for i in range(len(inputs)):
            input_tensor = inputs[i].unsqueeze(0).to(device)  # Add batch dimension and move to device
            
            output = model(input_tensor).squeeze()
            
            predicted = (output > 0.5).float()  # Use threshold to determine class (0 or 1)

            label = timestamps[i + window_size - 1]
            print(f'Predicted: {predicted}, Label: {label}')
            # Check if the prediction is within the leeway
            if abs(predicted.item() - label) <= sample_leeway:
                correct += 1
            predicted_steps.append(predicted.item())
            total += 1

    if total != 0:
        accuracy = 100 * correct / total
        print(f'Accuracy with leeway: {accuracy}%')

    predicted_steps_df = pd.DataFrame(predicted_steps, columns=['predicted_step_timestamp'])
    predicted_steps_df.to_csv(output_file, index=False)
    print(f'Predicted steps saved to {output_file}')

def process_folder(model, input_folder, output_folder, window_size, sample_leeway, device='cpu'):
    os.makedirs(output_folder, exist_ok=True)

    # Iterate over all files and subfolders in the input folder
    for root, _, files in os.walk(input_folder):
        for filename in files:
            if filename.endswith('.csv'):
                input_file = os.path.join(root, filename)
                relative_path = os.path.relpath(input_file, input_folder)
                output_file = os.path.join(output_folder, relative_path)
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                print(f'Processing {input_file}...')
                test_single_file(model, input_file, window_size, sample_leeway, output_file, device)

# Parameters
input_folder = r''
output_folder = r''
window_size = 160
sampling_rate = 100 
time_leeway_ms = 100 
sample_leeway = int((time_leeway_ms / 1000) * sampling_rate) 

# Initialize the model (make sure to load the trained model weights)
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout_prob):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.fc = nn.Linear(hidden_size, 1)  # Binary classification

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]  # Take the output of the last timestep
        out = self.fc(out)  # Logits
        return out

# Check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = LSTM(6, 64, 2, 0.2).to(device)
model.load_state_dict(torch.load(''))  

# Process the folder
process_folder(model, input_folder, output_folder, window_size, sample_leeway, device)

## TEST A SINGLE FILE

In [None]:
import pandas as pd

# Load the CSV file
input_file = r''
df = pd.read_csv(input_file)

# Drop the first column (by index)
df = df.drop(df.columns[0], axis=1)

# Save the modified DataFrame to a new file
output_file = 'combined_eti10krokow4634c53c-0399-41a5-baa0-73475c39fc68_accelerometer.cs'
df.to_csv(output_file, index=False)

print(f"First column removed. Saved to {output_file}")