# Timeseries project

# Imports

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from sklearn.decomposition import PCA
import pandas as pd
from sklearn.model_selection import train_test_split

# Real data

In [2]:
# Set seed
random_SEED = 42

Split into train and test set

In [3]:
train_fraction = 0.7
GRF_F_V_PRO_right = pd.read_csv(r'GRF_F_V_PRO_right.csv')
patient_IDs = np.unique(GRF_F_V_PRO_right['SUBJECT_ID'])

# Number of samples in the training data set
train_size = int(len(patient_IDs)*train_fraction)

patient_IDs_train, patient_IDs_test = train_test_split(patient_IDs, train_size = train_size, random_state=random_SEED)

# Create test dataset
train_set = GRF_F_V_PRO_right.loc[GRF_F_V_PRO_right['SUBJECT_ID'].isin(patient_IDs_train)]
test_set = GRF_F_V_PRO_right.loc[GRF_F_V_PRO_right['SUBJECT_ID'].isin(patient_IDs_test)]

train_set.index = pd.RangeIndex(len(train_set.index))
test_set.index = pd.RangeIndex(len(test_set.index))

Transform data (optional)

In [4]:
# Ensure train_set is a standalone DataFrame
train_set = train_set.copy()

# Subtract mean for numeric columns from column 3 onwards
numeric_cols = train_set.iloc[:, 3:].select_dtypes(include='number')
numeric_cols = numeric_cols - numeric_cols.mean()
train_set.loc[:, train_set.columns[3:]] = numeric_cols

# Ensure train_set is a standalone DataFrame
test_set = test_set.copy()

# Subtract mean for numeric columns from column 3 onwards
numeric_cols = test_set.iloc[:, 3:].select_dtypes(include='number')
numeric_cols = numeric_cols - numeric_cols.mean()
test_set.loc[:, test_set.columns[3:]] = numeric_cols

In [5]:
sequence_length = test_set.iloc[:,3:].shape[1]

Split the train sets into several trainsets for each autoencoder

In [6]:
num_splits = 10

# Set a random seed for reproducibility
np.random.seed(random_SEED)

train_datasets = []

# Create a copy of the IDs to sample from without replacement
remaining_IDs = patient_IDs_train.copy()

for i in range(num_splits):
    if i == num_splits - 1:  # For the last split, take all remaining IDs
        split = remaining_IDs
    else:
        split = np.random.choice(remaining_IDs, size=len(patient_IDs_train) // num_splits, replace=False)
        # Remove sampled IDs from remaining IDs
        remaining_IDs = np.setdiff1d(remaining_IDs, split)
    
    i_train_set = train_set.loc[train_set['SUBJECT_ID'].isin(split)]
    i_train_set.index = pd.RangeIndex(len(i_train_set.index))
    train_datasets.append(i_train_set)

# Define autoencoder

Define the autoencoder

In [7]:
# Define the Autoencoder Model
class TimeSeriesAutoencoder(nn.Module):
    def __init__(self, input_dim, encoding_dim):
        super(TimeSeriesAutoencoder, self).__init__()
        # Encoder layers
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 32),
            nn.ReLU(),
            nn.Linear(32, encoding_dim)
        )
        
        # Decoder layers
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 32),
            nn.ReLU(),
            nn.Linear(32, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [8]:
# Extract Encoder
class Encoder(nn.Module):
    def __init__(self, original_autoencoder):
        super(Encoder, self).__init__()
        # Copy encoder layers from the trained autoencoder
        self.encoder = original_autoencoder.encoder

    def forward(self, x):
        return self.encoder(x)

In [9]:
def train_data_loader(split, train_datasets):
    
    train_data = train_datasets[split]
    train_data = np.array(train_data,dtype=np.float32)[:,3:]

    train_dataset = TensorDataset(torch.tensor(train_data))
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    
    return(train_dataloader)

In [10]:
# Train the Autoencoder
def training_loop(num_epochs, dataloader, model, optimizer, criterion):
    # Training loop
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in dataloader:
            inputs = batch[0]

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, inputs)  # MSE between output and input

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Print average loss
        if epoch % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(dataloader):.4f}")
    
    return model

# Train the autoencoder

Define model parameters

In [11]:
# Model parameters
input_dim = sequence_length
encoding_dim = 10  # Dimension of the encoded representation

# Loss function and optimizer
criterion = nn.MSELoss()

# Number of training epochs
num_epochs = 20

Train the ensamble of autoencoders

In [12]:
models = []

for i in range(num_splits):
    model = TimeSeriesAutoencoder(input_dim=input_dim, encoding_dim=encoding_dim)
    
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    dataloader = train_data_loader(i, train_datasets)
    
    model = training_loop(num_epochs, dataloader, model, optimizer, criterion)
    
    models.append(model)

Epoch [1/20], Loss: 0.0063
Epoch [1/20], Loss: 0.0069
Epoch [1/20], Loss: 0.0066
Epoch [1/20], Loss: 0.0067
Epoch [1/20], Loss: 0.0070
Epoch [1/20], Loss: 0.0078
Epoch [1/20], Loss: 0.0073
Epoch [1/20], Loss: 0.0075
Epoch [1/20], Loss: 0.0083
Epoch [1/20], Loss: 0.0068


Train one model on all data

In [13]:
model = TimeSeriesAutoencoder(input_dim=input_dim, encoding_dim=encoding_dim)
optimizer = optim.Adam(model.parameters(), lr=0.001)

train_dataset = TensorDataset(torch.tensor(np.array(train_set,dtype=np.float32)[:,3:]))
dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

model = training_loop(num_epochs, dataloader, model, optimizer, criterion)

Epoch [1/20], Loss: 0.0011


# Evaluation

In [14]:
# Evaluate the Autoencoder on test data
# Testing the model on a few samples to see reconstruction
test_data = np.array(test_set, dtype=np.float32)[:,3:]
test_data_torch = torch.tensor(test_data)
model.eval()  # Set model to evaluation mode
with torch.no_grad():
    reconstructed = model(test_data_torch)
    mse = criterion(model(test_data_torch), test_data_torch)
    print(mse)

tensor(8.6096e-05)


In [15]:
reconstructed_per_model = []
with torch.no_grad():
    for i in range(1):
        i_model = models[i]
        i_model.eval()  # Set model to evaluation mode

        reconstructed_per_model.append(i_model(test_data_torch).numpy())

    reconstructed_mean = np.array(reconstructed_per_model).mean(axis=0)
    mse_mean = criterion(torch.tensor(reconstructed_mean), test_data_torch)
    print(mse_mean)

tensor(0.0001)


In [None]:
colors = ['blue', 'orange', 'green']

fig, axes = plt.subplots(2, 1, figsize=(8, 8))  # Create 2 subplots vertically stacked

# First plot: Test Data vs Reconstructed
for i, j in enumerate([100, 200, 300]):  # Plot three trajectories
    axes[0].plot(test_data_torch[j], label=f'Test Data {i+1}', linestyle='-', color=colors[i])
    axes[0].plot(reconstructed[j], label=f'Reconstructed {i+1}', linestyle='--', color=colors[i])

axes[0].set_title("Comparison of Test and Reconstructed Data Trajectories")
axes[0].set_xlabel("Time Step")
axes[0].set_ylabel("Value")
axes[0].legend(loc='lower center', ncol=2)
axes[0].grid(True, linestyle='--', alpha=0.5)

# Second plot: Test Data vs Reconstructed Mean
for i, j in enumerate([100, 200, 300]):  # Plot three trajectories
    axes[1].plot(test_data_torch[j], label=f'Test Data {i+1}', linestyle='-', color=colors[i])
    axes[1].plot(reconstructed_mean[j], label=f'Reconstructed Mean {i+1}', linestyle='--', color=colors[i])

axes[1].set_title("Comparison of Test and Reconstructed Mean Data Trajectories")
axes[1].set_xlabel("Time Step")
axes[1].set_ylabel("Value")
axes[1].legend(loc='lower center', ncol=2)
axes[1].grid(True, linestyle='--', alpha=0.5)

# Adjust layout and show plot
plt.tight_layout()  # Ensure the subplots do not overlap
plt.show()
