In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import time
import pandas as pd
import math
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense, Flatten
import torch.optim as optim

In [None]:
# IMU-Based Energy Expenditure Estimation for Various Walking Conditions Using a Hybrid CNN–LSTM Model
# conv1d(64, 3)
# maxpooling1d(64, 3)
# 2 LSTM 64 hidden units
# flatten layer
# fully connected layer
# output

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
class DataProcessor:
    def __init__(self, person, weight, attempt):
        self.person = person
        self.weight = weight
        self.attempt = attempt

    def load_imu_data(self, person, weight, attempt):
        imu_path = f'/content/drive/MyDrive/Colab_Notebooks/Master_Thesis/Neural_Network/Dataset/P{person}/W{weight}/A{attempt}/imu/data_neural.csv'
        imu_data = pd.read_csv(imu_path)  # Load CSV data using pandas

        imu_chunks = []
        num_timesteps = imu_data.shape[0]
        num_chunks = num_timesteps // 100
        remainder = num_timesteps % 100

        for i in range(num_chunks):
            chunk = imu_data.iloc[i * 100: (i + 1) * 100]
            imu_chunks.append(torch.tensor(chunk.values))

        if remainder > 20:
            last_chunk = imu_data.iloc[-100:]
            imu_chunks.append(torch.tensor(last_chunk.values))

        return imu_chunks

    def load_emg_data(self, person, weight, attempt):
        emg_path = f'/content/drive/MyDrive/Colab_Notebooks/Master_Thesis/Neural_Network/Dataset/P{person}/W{weight}/A{attempt}/emg/emg_label.csv'
        emg_data = pd.read_csv(emg_path)  # Load CSV data using pandas
        emg_tensor = torch.tensor(emg_data.iloc[:, 1])  # Convert DataFrame to tensor
        return emg_tensor

    def process_data(self):
        all_data = []

        for p in range(1, self.person + 1):
            for w in range(1, self.weight + 1):
                for a in range(1, self.attempt + 1):
                    emg_data = self.load_emg_data(p, w, a)
                    imu_data = self.load_imu_data(p, w, a)
                    for chunk in range(len(imu_data)):
                        imu_chunk = imu_data[chunk]
                        all_data.append((imu_chunk, emg_data))

            print(f'Person {p}/{self.person} done')

        return all_data

# Example usage:
person = 10
weight = 5
attempt = 6

processor = DataProcessor(person, weight, attempt)
all_data = processor.process_data()
print(len(all_data))
for i in range(len(all_data)):
    data, label = all_data[i]
    print(data.shape, label)
print(data)
print(label)
print(all_data[-1])

In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, num_data, out_channels=64, kernel_conv=1, kernel_pool=1, hidden_lstm=64, layers_lstm=1, num_classes=5, time_steps=100):
        super(CNN_LSTM, self).__init__()
        self.batch_size = batch_size

        # Linear layer
        self.linear1 = nn.Linear(time_steps*num_data, num_data)

        # 1D Convolutional Layer
        self.conv1 = nn.Conv1d(in_channels=num_data, out_channels=64, kernel_size=kernel_conv).double()

        # Max Pooling Layer
        self.pool1 = nn.MaxPool1d(kernel_size=kernel_pool).double()

        # LSTM layers
        self.lstm = nn.LSTM(out_channels, hidden_lstm, layers_lstm, batch_first=True).double()

        # Fully connected layer for regression
        self.fc1 = nn.Linear(out_channels, num_classes).double() # Output dimension is 1 for regression

        self.init_weights()

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 1e-1)
                nn.init.zeros_(m.bias)

    def forward(self, x): # shape (batch_size, time_steps, num_data) = (2, 100, 13)
        # print('1', np.shape(x))
        x = x.view(batch_size, -1) # shape (batch_size, 1*time_steps*num_data) = (2, 1300)
        # print('2', np.shape(x))
        x = self.linear1(x.float()) # shape (batch_size, 1*num_data) = (2, 13)
        # print('3', np.shape(x))
        x = x.view(batch_size, 1, 13) # shape (batch_size, 1, 1, num_data) = (2, 1, 13)
        # print('4', np.shape(x))
        x = F.relu(x) # shape (batch_size, num_sensors, 1, num_data) = (2, 1, 13)

        x = x.double()
        x = x.permute(0, 2, 1) # shape (batch_size, num_data, num_sensors) = (2, 13, 1)
        # print('5', np.shape(x))
        x = self.conv1(x) # shape (batch_size, out_channels, _) = (2, 64, 1)
        # print('6', np.shape(x))
        x = self.pool1(x) # shape (batch_size, out_channels, 1) = (2, 64, 1)
        # print('7', np.shape(x))
        x = F.relu(x) # shape (batch_size, out_channels, 1) = (2, 64, 1)

        x = x.squeeze(2) # shape (batch_size, out_channels) = (2, 64)
        # print('8', np.shape(x))
        x, _ = self.lstm(x) # shape (batch_size, out_channels) = (2, 64)

        x = self.fc1(x).float() # shape (batch_size, num_classes) = (2, 5)
        # print('9', np.shape(x))
        output = F.softmax(x, dim=1) # shape (batch_size, num_classes) = (2, 5)
        # print('10', np.shape(output))
        return output

In [None]:
import sys
def train(model, train_loader, time_steps, criterion, optimizer, epochs=5, batch_size=1):
    model.train()
    losses = []
    total_batches = math.ceil(len(train_loader.dataset) / batch_size)
    for epoch in range(epochs):
        running_loss = 0.0
        print('-----------EPOCH %d-----------' % (epoch + 1))
        for i, data in enumerate(train_loader):
            if len(train_loader.dataset) % batch_size != 0 and i == total_batches - 1:
                break
            # print(i)
            inputs, labels = data
            optimizer.zero_grad()
            labels = labels.view(-1)
            inputs = inputs.float()
            outputs = model(inputs)
            # outputs = outputs.float()
            # labels = labels.long()
            # print(outputs)
            # outputs = torch.argmax(outputs, dim=1)
            # print('output:', outputs)
            # print('labels:', labels)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            # losses.append(running_loss / total_batches)
            # running_loss += loss.item() * inputs.size(0)
            if (i + 1) % 20 == 0:
                print('[episode: %d] loss: %f' % (i + 1, running_loss / 2))
                losses.append(running_loss / 2)
                running_loss = 0.0

    print(outputs, labels)
    plt.plot(losses)
    plt.xlabel('Iterations')
    plt.ylabel('Loss')
    plt.title('Training Loss Curve')
    plt.show()

def test(model, test_loader, batch_size=1):
    model.eval()
    total_error = 0
    total_batch = 0
    # total_error2 = 0
    total_batches = math.ceil(len(test_loader.dataset) / batch_size)
    # print(total_batches)
    # print(len(test_loader.dataset) % batch_size)
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            # print('i', i)
            # print('tot', total_batches)
            # print('%', len(test_loader.dataset) % batch_size)
            if len(test_loader.dataset) % batch_size != 0 and i == total_batches - 1:
                print('here')
                break
            inputs, labels = data
            inputs = inputs.float()
            labels = labels.long()
            labels = labels.view(-1)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_error += loss.item()
            total_batch += 1
            # batch_error = torch.abs(outputs.flatten() - labels.flatten()) / labels.flatten()
            # batch_error2 = torch.abs(outputs.flatten() - labels.flatten())
            # total_error += batch_error.mean().item()
            # total_error2 += batch_error2.mean().item()
            if (i + 1) % 20 == 0:
                print(outputs, labels)
    # print('Average accuracy: %f %% ' % (100 - total_error / total_batches * 100))
    # print('Average error between prediction and label: %f' % (total_error2 / total_batches))
    average_loss = total_error / total_batch
    print('Average test loss: {:.4f}'.format(average_loss))

In [None]:
data, label = all_data[0]
data_list = []
label_list = []
for i in range(len(all_data)):
  data, label = all_data[i]
  data_list.append(data)
  label_list.append(label)

num_rows = len(data_list)
indices = torch.randperm(num_rows).tolist()
shuffled_data = [data_list[i] for i in indices]
shuffled_labels = [label_list[i] for i in indices]

shuffled_dataset = (shuffled_data, shuffled_labels)

In [None]:
print(shuffled_data[47], shuffled_labels[47])
print(np.shape(shuffled_data[47]))

In [None]:
# @title Testo del titolo predefinito
# data, labels = dataset[:, :]
# num_rows = data.shape[0]
# indices = torch.randperm(num_rows)
# shuffled_data = data[indices]
# shuffled_labels = labels[indices]
# shuffled_dataset = shuffled_data, shuffled_labels

# data, label = dataset[:]

batch_size = 2

train_set = [(shuffled_data[i], shuffled_labels[i]) for i in range(np.int64(len(all_data)/2))]
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)

test_set = [(shuffled_data[i], shuffled_labels[i]) for i in range(np.int64(len(all_data)/2), len(all_data))]
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2)

# train_set = [(dataset[i]) for i in range(np.int64(len(dataset)/2))]
# train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)

# test_set = [(dataset[i]) for i in range(np.int64(len(dataset)/2), len(dataset))]
# test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2)


In [None]:
num_data = np.shape(shuffled_data)[2]
time_steps = np.shape(shuffled_data)[1]
model = CNN_LSTM(num_data, layers_lstm=2)
# criterion = nn.MSELoss()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

train(model, train_loader, time_steps, criterion, optimizer, epochs=3, batch_size=batch_size)

In [None]:
test(model, test_loader, batch_size=batch_size)