In [2]:
%load_ext kedro.ipython

In [None]:
%reload_ext kedro.ipython

In [3]:
data_set = catalog.load("s3_conc_aligned_df")

In [15]:
data_set.head()

Unnamed: 0,exp_no,timestamp_bin,A1_Resistance,A1_Resistance_diff,A1_Resistance_norm,A1_Sensor,A1_Sensor_diff,A1_Sensor_norm,SHT40_Humidity,SHT40_temp,index,resistance_ratio,ace_conc,expo_time
0,0,0.0,720650.75,0.0,1.0,4518.0,0.0,1.0,42.835,29.43,4481.5,1.768473,3.033e-07,3.0
1,0,1.0,720361.875,0.0,0.999599,4519.5,0.0,1.000332,42.84,29.435,4483.5,1.768473,3.033e-07,3.0
2,0,2.0,719688.47,-673.405,0.998665,4523.0,3.5,1.001107,42.845,29.445,4485.5,1.768473,3.033e-07,3.0
3,0,3.0,719015.94,-1634.81,0.997731,4526.5,8.5,1.001881,42.83,29.435,4487.5,1.768473,3.033e-07,3.0
4,0,4.0,718727.905,-1345.095,0.997332,4528.0,7.0,1.002213,42.845,29.44,4489.5,1.768473,3.033e-07,3.0


In [35]:
df = data_set[["exp_no","timestamp_bin","A1_Resistance","resistance_ratio"]]
df[df["exp_no"]==1].tail()

Unnamed: 0,exp_no,timestamp_bin,A1_Resistance,resistance_ratio
3642,1,1825.0,725202.97,1.816453
3643,1,1826.0,725690.25,1.816453
3644,1,1827.0,726275.345,1.816453
3645,1,1828.0,726275.345,1.816453
3646,1,1829.0,726470.785,1.816453


In [22]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import StandardScaler

In [36]:
features = torch.tensor(df[['timestamp_bin', 'A1_Resistance']].values).float()
targets = torch.tensor(df['resistance_ratio'].values).float().unsqueeze(1)  # Ensure target is 2D
groups = torch.tensor(df['exp_no'].values)

# Group-wise shuffle and split
gss = GroupShuffleSplit(test_size=0.2, n_splits=1, random_state=42)
train_idx, val_idx = next(gss.split(features.numpy(), groups=groups.numpy()))

# Normalize features
scaler = StandardScaler()
train_features = features[train_idx]
scaler.fit(train_features.numpy())  # Fit only on training data

scaled_train_features = torch.tensor(scaler.transform(train_features.numpy()))
scaled_val_features = torch.tensor(scaler.transform(features[val_idx].numpy()))

# Custom dataset
class ResistanceDataset(Dataset):
    def __init__(self, features, targets):
        # Ensure features are in the shape [batch_size, num_channels, sequence_length]
        self.features = features.unsqueeze(2)  # Add a new dimension for sequence_length
        self.targets = targets

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        return self.features[index], self.targets[index]

# Update datasets with scaled features
train_dataset = ResistanceDataset(scaled_train_features, targets[train_idx])
val_dataset = ResistanceDataset(scaled_val_features, targets[val_idx])

train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=10, shuffle=False)

In [50]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=16, kernel_size=3, padding=1)
        # self.pool = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(32, 16)  # Adjust the input features depending on the output size after conv and pool layers
        self.fc2 = nn.Linear(16, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = torch.flatten(x, 1)  # Flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x




In [None]:
from torch.optim import Adam
from math import sqrt


device = torch.device("mps" if torch.cuda.is_available() else "cpu")

model = CNNModel().to(device)
criterion = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=0.001)

def calculate_rmse(outputs, labels):
    mse = torch.mean((outputs - labels) ** 2)  # Calculate MSE as a tensor
    return torch.sqrt(mse)  # Return RMSE as a tensor



def train_model(model, train_loader, val_loader, n_epochs):
    for epoch in range(n_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        epoch_loss = running_loss / len(train_loader.dataset)
        
        # Validation
        model.eval()

        # During validation
        val_rmse = 0.0
        total_samples = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                batch_rmse = calculate_rmse(outputs, labels) * inputs.size(0)
                val_rmse += batch_rmse.item()  # Properly use .item() to convert tensor to float
                total_samples += inputs.size(0)

        val_rmse /= total_samples
        print(f'Epoch {epoch+1} Train Loss: {epoch_loss:.4f} Val RMSE: {val_rmse:.4f}')

# Training the model
train_model(model, train_loader, val_loader, n_epochs=15)


Epoch 1 Train Loss: 0.0033 Val RMSE: 0.0267
Epoch 2 Train Loss: 0.0020 Val RMSE: 0.0279
Epoch 3 Train Loss: 0.0019 Val RMSE: 0.0270
Epoch 4 Train Loss: 0.0018 Val RMSE: 0.0261
Epoch 5 Train Loss: 0.0018 Val RMSE: 0.0244
Epoch 6 Train Loss: 0.0017 Val RMSE: 0.0236
Epoch 7 Train Loss: 0.0017 Val RMSE: 0.0246
Epoch 8 Train Loss: 0.0017 Val RMSE: 0.0236
Epoch 9 Train Loss: 0.0017 Val RMSE: 0.0251
Epoch 10 Train Loss: 0.0017 Val RMSE: 0.0249


24/05/09 19:38:12 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2078814 ms exceeds timeout 120000 ms
24/05/09 19:38:12 WARN SparkContext: Killing executors is not supported by current scheduler.
24/05/09 19:38:13 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

Epoch 11 Train Loss: 0.0017 Val RMSE: 0.0242


24/05/09 19:42:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

Epoch 12 Train Loss: 0.0017 Val RMSE: 0.0307


24/05/09 20:17:39 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

Epoch 13 Train Loss: 0.0016 Val RMSE: 0.0257


24/05/09 20:33:25 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

Epoch 14 Train Loss: 0.0016 Val RMSE: 0.0240


24/05/09 20:43:52 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

Epoch 15 Train Loss: 0.0016 Val RMSE: 0.0258


24/05/09 21:14:52 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

In [29]:
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10)

Epoch 1: Train Loss: 0.0032, Val RMSE: 0.0400
Epoch 2: Train Loss: 0.0019, Val RMSE: 0.0365
Epoch 3: Train Loss: 0.0018, Val RMSE: 0.0406
Epoch 4: Train Loss: 0.0018, Val RMSE: 0.0381
Epoch 5: Train Loss: 0.0018, Val RMSE: 0.0394
Epoch 6: Train Loss: 0.0017, Val RMSE: 0.0438
Epoch 7: Train Loss: 0.0017, Val RMSE: 0.0400
Epoch 8: Train Loss: 0.0017, Val RMSE: 0.0373
Epoch 9: Train Loss: 0.0017, Val RMSE: 0.0445
Epoch 10: Train Loss: 0.0017, Val RMSE: 0.0439
