<a href="https://colab.research.google.com/github/denis-kasak/lstm-activities/blob/main/LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import os
import numpy as np
from scipy import stats
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder
import torch.nn.functional as F


def read_data(file_path):
    """
    Read the data from the file and return a pandas DataFrame.
    """
    # Initialize an empty list to store the data
    data = []

    # Open the file and read each line
    with open(file_path, "r") as file:
        for line in file:
            # Split the line by comma and then strip the semicolon at the end
            split_line = line.strip().split(",")
            if split_line:
                # Remove the semicolon from the last element
                split_line[-1] = split_line[-1].replace(";", "")
                data.append(split_line)

    # Create a DataFrame from the list
    dataframe = pd.DataFrame(
        data, columns=["user", "activity", "timestamp", "x", "y", "z"]
    )

    # Convert appropriate columns to numeric types
    dataframe["user"] = pd.to_numeric(dataframe["user"], errors="coerce")
    dataframe["timestamp"] = pd.to_numeric(dataframe["timestamp"], errors="coerce")
    dataframe["x"] = pd.to_numeric(dataframe["x"], errors="coerce")
    dataframe["y"] = pd.to_numeric(dataframe["y"], errors="coerce")
    dataframe["z"] = pd.to_numeric(dataframe["z"], errors="coerce")

    return dataframe


def preprocess_data(dataframe, window_size, test_split, seed, batch_size):
    """
    Perform preprocessing on the DataFrame.
    """

    # Drop unused columns
    dataframe.dropna(inplace=True)

    # Normalize the accelerometer columns
    scaler = StandardScaler()
    dataframe[["x", "y", "z"]] = scaler.fit_transform(dataframe[["x", "y", "z"]])

    def create_segments(df, window_size, batch_size):
        # Slide a "window_size" wide window with a step size of 1
        segments = []
        labels = []
        for i in range(0, len(df) - window_size, batch_size):
            xs = df["x"].values[i : i + window_size]
            ys = df["y"].values[i : i + window_size]
            zs = df["z"].values[i : i + window_size]
            # Retrieve the most often used label in this segment
            label = stats.mode(df["activity"][i : i + window_size])[0]
            segments.append([xs, ys, zs])
            labels.append(label)
        return segments, labels

    label_encoder = LabelEncoder()
    dataframe["activity"] = label_encoder.fit_transform(dataframe["activity"])

    segments, labels = create_segments(dataframe, window_size, batch_size)

    # Reshape segments and labels
    reshaped_segments = np.asarray(segments, dtype=np.float32).reshape(
        -1, window_size, 3
    )

    labels = np.asarray(labels, dtype=np.int64)

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(
        reshaped_segments, labels, test_size=test_split, random_state=seed
    )

    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    train_data = TensorDataset(X_train_tensor, y_train_tensor)
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

    test_data = TensorDataset(X_test_tensor, y_test_tensor)
    test_loader = DataLoader(test_data, batch_size=batch_size)

    return train_loader, test_loader


class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Create a list of LSTM layers
        self.lstm_layers = nn.ModuleList(
            [
                nn.LSTM(
                    input_dim if i == 0 else hidden_dim, hidden_dim, batch_first=True
                )
                for i in range(num_layers)
            ]
        )

        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Initialize hidden and cell states
        h0 = [
            torch.zeros(1, x.size(0), self.hidden_dim).to(x.device)
            for _ in range(self.num_layers)
        ]
        c0 = [
            torch.zeros(1, x.size(0), self.hidden_dim).to(x.device)
            for _ in range(self.num_layers)
        ]

        # Pass through each LSTM layer with ReLU activation
        for i, lstm in enumerate(self.lstm_layers):
            x, (h0[i], c0[i]) = lstm(x, (h0[i], c0[i]))
            x = F.relu(x)

        # Apply softmax to the output of the last time step
        x = self.fc(x[:, -1, :])
        return F.softmax(x, dim=1)


def compile_and_train(model: LSTMModel, train_loader, num_epochs, learning_rate):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), learning_rate)

    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch+1}, Loss: {loss.item()}")

    return model


def evaluate(model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return correct, total


def main():
    batch_size = 1024
    window_size = 200
    test_split = 0.2
    seed = 42
    input_dim = 3
    hidden_dim = 64
    output_dim = 6
    num_layers = 2
    num_epochs = 60
    learning_rate = 0.0025

    data_frame = read_data("data.txt")

    train_loader, test_loader = preprocess_data(
        data_frame, window_size, test_split, seed, batch_size
    )

    model = LSTMModel(input_dim, hidden_dim, output_dim, num_layers)

    model = compile_and_train(model, train_loader, num_epochs, learning_rate)

    correct, total = evaluate(model, test_loader)

    print(f"Test Accuracy: {100 * correct / total}%")


main()


Epoch 1, Loss: 1.7992538213729858
Epoch 2, Loss: 1.7971982955932617
Epoch 3, Loss: 1.7951492071151733
Epoch 4, Loss: 1.7930686473846436
Epoch 5, Loss: 1.7908666133880615
Epoch 6, Loss: 1.7882570028305054
Epoch 7, Loss: 1.7848953008651733
Epoch 8, Loss: 1.780290961265564
Epoch 9, Loss: 1.7736531496047974
Epoch 10, Loss: 1.763754963874817
Epoch 11, Loss: 1.74937105178833
Epoch 12, Loss: 1.7314822673797607
Epoch 13, Loss: 1.7136703729629517
Epoch 14, Loss: 1.6978780031204224
Epoch 15, Loss: 1.6847068071365356
Epoch 16, Loss: 1.6741887331008911
Epoch 17, Loss: 1.6655701398849487
Epoch 18, Loss: 1.6595643758773804
Epoch 19, Loss: 1.6540772914886475
Epoch 20, Loss: 1.648882269859314
Epoch 21, Loss: 1.644514799118042
Epoch 22, Loss: 1.6408826112747192
Epoch 23, Loss: 1.6378990411758423
Epoch 24, Loss: 1.6355080604553223
Epoch 25, Loss: 1.6336473226547241
Epoch 26, Loss: 1.6322126388549805
Epoch 27, Loss: 1.631088137626648
Epoch 28, Loss: 1.630181074142456
Epoch 29, Loss: 1.6294251680374146
Ep