In [None]:
!pip install iTransformer

In [None]:
import torch
from iTransformer import iTransformer


#Load dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import pandas as pd

# Define the file paths

file_path3 = '/content/drive/MyDrive/SuperUROP /Data Analysis/JPL_training_data.csv'
file_path4  = '/content/drive/MyDrive/SuperUROP /Data Analysis/JPL_testing_data.csv'
# Use pandas to read the CSV files and then convert them to NumPy arrays
JPL_train = pd.read_csv(file_path3).values
JPL_test=pd.read_csv(file_path4).values

In [None]:
#Remove row number (in 1st column)
JPL_train=JPL_train[:,1:]
JPL_test=JPL_test[:,1:]

In [None]:
import math

# Extracting the unique IDs from column 3
unique_ids = np.unique(JPL_train[:, 3])

# Finding the corresponding number of unique charging parameters for each ID
users_charg_sessions = {uid: JPL_train[JPL_train[:, 3] == uid, -1][0] for uid in unique_ids}

n_values=users_charg_sessions


In [None]:
users_from_training = set(JPL_train[:, 3])
mask = np.isin(JPL_test[:, 3], list(users_from_training))
filtered_JPL_test = JPL_test[mask]

users_from_testing = set(JPL_test[:, 3])
mask = np.isin(JPL_train[:, 3], list(users_from_testing))
filtered_JPL_train = JPL_train[mask]

In [None]:
# Converting to DataFrame
df_train = pd.DataFrame(filtered_JPL_train, columns=['arrival_time', 'departure_time', 'duration', 'user_id', 'energy', 'no_sessions'])
df_test = pd.DataFrame(filtered_JPL_test, columns=['arrival_time', 'departure_time', 'duration', 'user_id', 'energy', 'no_sessions'])

# Sorting by start_time within each user_id
sorted_JPL_train = df_train.sort_values(by=['user_id', 'arrival_time'])
sorted_JPL_test = df_test.sort_values(by=['user_id', 'arrival_time'])

sorted_JPL_train

In [None]:
user_counts_train = sorted_JPL_train['user_id'].value_counts()
min_sessions_train=min(user_counts_train)
min_sessions_train

In [None]:
# Grouping by 'user_id' and taking the last n rows for each user
filtered_sessions = sorted_JPL_train.groupby('user_id').apply(lambda x: x.tail(min_sessions_train))

last_n_rows_per_user = filtered_sessions.reset_index(drop=True)
last_n_rows_per_user=last_n_rows_per_user[['arrival_time', 'user_id']]

In [None]:
# Resetting the index
sorted_JPL_test = sorted_JPL_test.reset_index(drop=True)
sorted_JPL_test=sorted_JPL_test[['arrival_time', 'user_id']]

In [None]:
def convert_to_hours(time_str):
    time_parts = time_str.split()[1].split(':') # Splitting to get only the time part
    hours = int(time_parts[0]) + int(time_parts[1])/60 + int(time_parts[2])/3600
    return round(hours, 2) # Rounding to 2 decimal places


last_n_rows_per_user['arrival_time_hours'] = last_n_rows_per_user['arrival_time'].apply(convert_to_hours)
last_n_rows_per_user = last_n_rows_per_user.drop(columns=['arrival_time'])
last_n_rows_per_user

In [None]:
sorted_JPL_test['arrival_time_hours'] = sorted_JPL_test['arrival_time'].apply(convert_to_hours)
sorted_JPL_test = sorted_JPL_test.drop(columns=['arrival_time'])
sorted_JPL_test

In [None]:
arrival_time_per_user = last_n_rows_per_user.groupby('user_id')['arrival_time_hours'].apply(list)
arrival_time_per_user

In [None]:
arrival_time_per_user_test = sorted_JPL_test.groupby('user_id')['arrival_time_hours'].apply(list)
arrival_time_per_user_test

In [None]:
time_series = torch.tensor(arrival_time_per_user.tolist()).unsqueeze(0)

time_series.shape, time_series.dtype

In [None]:
transposed_time_series = time_series.transpose(1, 2)  # (batch, lookback len, variates)

transposed_time_series.shape, transposed_time_series.dtype

In [None]:
array_list_test = [np.array(lst) for lst in arrival_time_per_user_test]

# Find the maximum length among the arrays
max_length = max(len(arr) for arr in array_list_test)

# Pad each array to have the same length
padded_array_list_test = [np.pad(arr, (0,max_length - len(arr)), 'constant') for arr in array_list_test]


In [None]:
padded_array_list_test

#Train iTransformer model

#Evaluate model

##Modify test dataset in the correct format

In [None]:
# Combine the arrays into a single 2D array and convert to a tensor
combined_array = torch.tensor(padded_array_list_test).T  # Transpose to get the shape [20, 43]

# Reshape to [1, 20, 43]
test_tensor = combined_array.unsqueeze(0)

# Verify the shape
print(test_tensor.shape)  # Should output torch.Size([1, 20, 43])

In [None]:
# Assuming predictions and ground_truth are your tensors of shape [1, 19, 43]
# and both are already in the same dtype, preferably float32

# Step 1: Create a mask where the ground_truth is not zero
mask = test_tensor_adjusted_float != 0

# Step 2: Apply the mask to both tensors
masked_predictions = predictions[mask]
masked_ground_truth = test_tensor_adjusted_float[mask]

# Step 3: Compute the MSE
# Ensure that masked_predictions and masked_ground_truth are not empty
if masked_predictions.numel() > 0 and masked_ground_truth.numel() > 0:
    mse = torch.mean((masked_predictions - masked_ground_truth) ** 2)
else:
    mse = torch.tensor(float('nan'))  # Or handle this case as per your requirement

print(f"Mean Squared Error: {mse}")


In [None]:
import torch
import torch.optim as optim
from iTransformer import iTransformer
from torch.utils.data import DataLoader
import copy
import torch.nn as nn
import torch.nn.init as init
import random

# Function to calculate SMAPE
def calculate_smape(predictions, ground_truth):
    predictions = predictions.float()
    ground_truth = ground_truth.float()
    mask = ground_truth != 0
    masked_predictions = predictions[mask]
    masked_ground_truth = ground_truth[mask]
    numerator = torch.abs(masked_predictions - masked_ground_truth)
    denominator = torch.abs(masked_predictions) + torch.abs(masked_ground_truth)
    smape = torch.mean(numerator / denominator)
    return smape.item()

# Initialization methods
def xavier_init(m):
    if isinstance(m, nn.Linear):
        init.xavier_uniform_(m.weight)
        if m.bias is not None:
            init.zeros_(m.bias)

def kaiming_init(m):
    if isinstance(m, nn.Linear):
        init.kaiming_uniform_(m.weight, nonlinearity='relu')
        if m.bias is not None:
            init.zeros_(m.bias)

def uniform_init(m):
    if isinstance(m, nn.Linear):
        init.uniform_(m.weight, -0.1, 0.1)
        if m.bias is not None:
            init.zeros_(m.bias)

initialization_methods = [xavier_init, kaiming_init, uniform_init]

# Constants
lookback_len = 19
num_variates = 43
pred_length = 19
num_iterations = 200

best_smape_error = float('inf')
best_model_state = None
best_init_method = None

for iteration in range(num_iterations):
    # Randomly select an initialization method
    init_method = random.choice(initialization_methods)

    # Model initialization
    model = iTransformer(
        num_variates=num_variates,
        lookback_len=lookback_len,
        dim=32,
        depth=4,
        heads=8,
        dim_head=32,
        pred_length=pred_length,
        num_tokens_per_variate=2,
    )

    # Apply selected initialization
    model.apply(init_method)

    # Loss function and optimizer
    loss_function = torch.nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=1000)

    # DataLoader for training data
    train_loader = DataLoader(transposed_time_series, batch_size=2, shuffle=False)

    # Training loop
    model.train()
    for epoch in range(1000):
        total_loss = 0
        for inputs in train_loader:
            x = inputs[:, :lookback_len, :]
            y = inputs[:, 1:lookback_len+1, :]
            optimizer.zero_grad()
            outputs = model(x)
            loss = loss_function(outputs[pred_length], y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()
    # Evaluation
    test_tensor_adjusted = transposed_time_series[:, :19, :]
    test_tensor_adjusted_float = test_tensor_adjusted.float()
    model.eval()

    with torch.no_grad():
        model_output = model(test_tensor_adjusted_float)
        if pred_length in model_output:
            predictions = model_output[pred_length]
            smape_error = calculate_smape(predictions, test_tensor_adjusted_float)
            if smape_error < best_smape_error:
                best_smape_error = smape_error
                best_model_state = copy.deepcopy(model.state_dict())
                best_init_method = init_method.__name__

  # Print the best initialization method and SMAPE error
                print(f"Best Initialization Method: {best_init_method}")
                print(f"Best SMAPE Error(%): {best_smape_error*100}")

  # Optionally, save the best model state
  # if best_model_state is not None:
  #     torch.save(best_model_state, 'best_model.pth')
