In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# Load dataset
df = pd.read_csv("synthetic_malaria_mutation_dataset.csv")

# Use only 'Collection_Year' and 'mutation_label'
df = df[['Geo_Location', 'Tissue_Specimen_Source', 'Collection_Year', 'mutation_label']]


# Remove any rows with missing values (optional but recommended)
df = df.dropna()

# Split into train and test
split = int(0.8 * len(df))
df_train = df[:split].copy()
df_test = df[split:].copy()

# Define target and features
features = ["mutation_label"]
target = "mutation_label"


# Normalize
scaler = MinMaxScaler()
df_train[features] = scaler.fit_transform(df_train[features])
df_test[features] = scaler.transform(df_test[features])


In [3]:
def normalize(a, min_a=None, max_a=None):
    if min_a is None:
        min_a, max_a = np.min(a, axis=0), np.max(a, axis=0)
    return (a - min_a) / (max_a - min_a + 0.0001), min_a, max_a

In [4]:
# ✅ Updated normalization function
def normalize(df, features, min_val=None, max_val=None):
    if min_val is None:
        min_val = df[features].min()
    if max_val is None:
        max_val = df[features].max()

    df_norm = df.copy()
    df_norm[features] = (df[features] - min_val) / (max_val - min_val + 0.0001)
    return df_norm, min_val, max_val

# ✅ Use it like this (make sure `features` is already defined)
df_train, min_train, max_train = normalize(df_train, features=features)
df_test, _, _ = normalize(df_test, features=features, min_val=min_train, max_val=max_train)




In [5]:
from QLSTM import SequenceDataset
from torch.utils.data import DataLoader

sequence_length = 3
batch_size = 1

train_dataset = SequenceDataset(df_train, target=target, features=features, sequence_length=sequence_length)
test_dataset = SequenceDataset(df_test, target=target, features=features, sequence_length=sequence_length)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Check
X, y = next(iter(train_loader))
print("✅ Features shape:", X.shape)
print("🎯 Target shape:", y.shape)


✅ Features shape: torch.Size([1, 3, 1])
🎯 Target shape: torch.Size([1])


In [6]:
def train_model(data_loader, model, loss_function, optimizer):
    num_batches = len(data_loader)
    total_loss = 0
    model.train()
    
    for X, y in data_loader:
        output = model(X)
        loss = loss_function(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / num_batches
    print(f"Train loss: {avg_loss}")
    return avg_loss


def test_model(data_loader, model, loss_function): 
    num_batches = len(data_loader)
    total_loss = 0

    model.eval()
    with torch.no_grad():
        for X, y in data_loader:
            output = model(X)
            total_loss += loss_function(output, y).item()

    avg_loss = total_loss / num_batches
    print(f"Test loss: {avg_loss}")
    return avg_loss

def predict(data_loader, model):
    """Just like `test_loop` function but keep track of the outputs instead of the loss
    function.
    """
    output = torch.tensor([])
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            y_star = model(X)
            output = torch.cat((output, y_star), 0)
    
    return output

In [7]:
import torch
import torch.nn as nn
from QLSTM import ShallowRegressionLSTM

learning_rate = 0.01
num_hidden_units = 7

model = ShallowRegressionLSTM(
    num_sensors=len(features),
    hidden_units=num_hidden_units,
    num_layers=1
)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
# Count number of parameters
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of parameters: {num_params}")

Number of parameters: 288


: 

In [None]:
classical_loss_train = []
classical_loss_test = []
print("Untrained test\n--------")
# test_loss = test_model(test_loader, model, loss_function)
print()

num_epochs = 50

for ix_epoch in range(num_epochs):
    print(f"Epoch {ix_epoch}\n---------")
    train_loss = train_model(train_loader, model, loss_function, optimizer=optimizer)
    test_loss = test_model(test_loader, model, loss_function)
    classical_loss_train.append(train_loss)
    classical_loss_test.append(test_loss)
   


Untrained test
--------

Epoch 0
---------


In [None]:
train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_eval_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

ystar_col_Q = "Model Forecast"
df_train[ystar_col_Q] = predict(train_eval_loader, model).numpy()
df_test[ystar_col_Q] = predict(test_eval_loader, model).numpy()

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 7))
plt.plot(range(len(df_test)), df_test['mutation_label'], label="Real Data")
plt.plot(range(len(df_test)), df_test["Model Forecast"], label="LSTM Test Prediction")
plt.ylabel('Mutation Label')
plt.xlabel('Samples')
plt.legend()
plt.show()


In [None]:
plt.plot(classical_loss_train, label='classical_loss_train')
# plt.plot(classical_loss_test, label='classical_loss_test')
pd.DataFrame(classical_loss_train).to_csv('LSTM_loss.csv', index=False)
plt.title('Train loss')
plt.legend()

In [None]:
import math
from sklearn.metrics import mean_squared_error

# Calculate RMSE between actual and predicted mutation values
train_rmse = math.sqrt(mean_squared_error(df_train["mutation_label"], df_train["Model Forecast"]))
test_rmse = math.sqrt(mean_squared_error(df_test["mutation_label"], df_test["Model Forecast"]))

print(f"Train RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")


In [None]:
import numpy as np  # ✅ Make sure this is included

# Calculate the accuracy of the model (tolerance = 0.1)
def accuracy(y, y_star):
    return np.mean(np.abs(y - y_star) < 0.1)

train_accuracy = accuracy(df_train["mutation_label"], df_train["Model Forecast"])
test_accuracy = accuracy(df_test["mutation_label"], df_test["Model Forecast"])

print(f"Train accuracy: {train_accuracy:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")


In [None]:
import json
import numpy as np

# Example: Assume predicted values are in a numpy array
predicted_values = np.array([1.23, 4.56, 7.89])  # This is just a placeholder for your actual predicted values

# Save the predicted values to a JSON file
with open('prediction.json', 'w') as f:
    json.dump(predicted_values.tolist(), f)
