### Load in Data

In [None]:
import os
import pandas as pd
import torch
from torch import nn
from hampel import hampel
from tqdm import tqdm
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
import numpy as np


C_ROOT_DIR = 'data/charging/'
D_ROOT_DIR = 'data/discharging/'

# Charging
C_train_df = pd.read_pickle(os.path.join(C_ROOT_DIR, f'train_df.pkl'))
C_val_df = pd.read_pickle(os.path.join(C_ROOT_DIR, f'val_df.pkl'))
C_test_df = pd.read_pickle(os.path.join(C_ROOT_DIR, f'test_df.pkl'))

# Discharging
D_train_df = pd.read_pickle(os.path.join(D_ROOT_DIR, f'train_df.pkl'))
D_val_df = pd.read_pickle(os.path.join(D_ROOT_DIR, f'val_df.pkl'))
D_test_df = pd.read_pickle(os.path.join(D_ROOT_DIR, f'test_df.pkl'))


# Standardize and extract data
features = ['I', 'V', 'T', 'c', 'IR', 'dV/dt']
label = 'Q'

C_X_train = StandardScaler().fit_transform(C_train_df[features])
C_y_train = C_train_df[[label]].to_numpy()

D_X_train = StandardScaler().fit_transform(D_train_df[features])
D_y_train = D_train_df[[label]].to_numpy()

In [None]:
# Assign GPU if available 
device = 'cuda' if torch.cuda.is_available() else 'cpu'

### Model Architecture

In [None]:
num_hidden_units = 50
dropout = 0.2547
class charging_model(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.layer1 = nn.Linear(6, num_hidden_units).float()
        self.act1 = nn.Sigmoid() 
        self.dropout1 = nn.Dropout(dropout)


        self.layer2 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act2 = nn.Sigmoid() 
        self.dropout2 = nn.Dropout(dropout)

        self.layer3 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act3 = nn.Sigmoid() 
        self.dropout3 = nn.Dropout(dropout)

        self.layer4 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act4 = nn.Sigmoid()
   

        self.output = nn.Linear(num_hidden_units, 1)
        

    def forward(self, x):
        x = self.act1(self.layer1(x))
        x = self.dropout1(x)
       
        x = self.act2(self.layer2(x))
        x = self.dropout2(x)

        x = self.act3(self.layer3(x))
        x = self.dropout3(x)
        
        x = self.act4(self.layer4(x))
        x = self.output(x)
        
        return x

C_ff = charging_model().to(device)


num_hidden_units = 100
dropout = 0.1545
class discharging_model(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.layer1 = nn.Linear(6, num_hidden_units).float()
        self.act1 = nn.Sigmoid() 
        self.dropout1 = nn.Dropout(dropout)


        self.layer2 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act2 = nn.Sigmoid() 
        self.dropout2 = nn.Dropout(dropout)

        self.layer3 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act3 = nn.Sigmoid() 
        self.dropout3 = nn.Dropout(dropout)

        self.output = nn.Linear(num_hidden_units, 1)
        
    def forward(self, x):
        x = self.act1(self.layer1(x))
        x = self.dropout1(x)
       
        x = self.act2(self.layer2(x))
        x = self.dropout2(x)

        x = self.act3(self.layer3(x))
        x = self.dropout3(x)
        
        x = self.output(x)
       
        return x

D_ff = discharging_model().to(device)

In [None]:
C_window_size = 19
D_window_size = 2

num_hidden_units = 200
class discharging_LSTM(nn.Module):
    def __init__(self):
        super().__init__()

        self.lstm1 = nn.LSTM(6, num_hidden_units, batch_first=True).float()
        self.lstm1.flatten_parameters() 


        self.layer2 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act2 = nn.Sigmoid()

        self.output = nn.Linear(num_hidden_units, 1)

    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.act2(self.layer2(x))
        x = self.output(x)
        return x
        
D_lstm = discharging_LSTM().to(device)


num_hidden_units = 100
class charging_LSTM(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.lstm1 = nn.LSTM(6, num_hidden_units, batch_first=True).float()
        self.lstm1.flatten_parameters() 

        self.layer2 = nn.Linear(num_hidden_units, num_hidden_units).float()
        self.act2 = nn.ReLU()

        self.output = nn.Linear(num_hidden_units, 1)


    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.act2(self.layer2(x))
        x = self.output(x)
        return x


C_lstm = charging_LSTM().to(device)

### Helper functions for training

In [None]:
from tqdm import tqdm
import torch.optim as optim

# change the data to sequence to sequence data for the lstm
def lstm_transform_data(data, target, lookback):
    local_data = []
    local_target = []
    
    for i in range(len(data) - lookback):
        feature = np.array(data[i: i + lookback])
        temp_target = np.array(target[i + 1: i + lookback + 1])

        local_data.append(feature)
        local_target.append(temp_target)
    
    X = np.array(local_data)
    Y = np.array(local_target)
    return X, Y



# helper function for training
def train(model, X_train, y_train, lr=0.01, n_epochs=1, batch_size=128, lookback=None, silent= False):
    # LSTM transform
    if lookback:
        
        local_X_train = []
        local_y_train = []
        for i in range(len(X_train) - lookback):
            feature = X_train[i: i + lookback]
            target = y_train[i + 1: i + lookback + 1]
            local_X_train.append(feature)
            local_y_train.append(target)
        X_train = np.array(local_X_train)
        y_train = np.array(local_y_train)


    loss_fn = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    batch_start = torch.arange(0, len(X_train), batch_size)
    
    model.train()
    for epoch in range(n_epochs):
        for start in tqdm(batch_start, disable=silent):
            X_batch = X_train[start:start+batch_size]
           
            X_batch_tensor = torch.from_numpy(X_batch)
            X_batch_tensor = X_batch_tensor.to(device)
            X_batch_tensor = X_batch_tensor.float()
            y_batch = y_train[start:start+batch_size]
            y_batch_tensor = torch.from_numpy(y_batch).to(device).float()
            y_pred = model(X_batch_tensor)
            loss = loss_fn(y_pred, y_batch_tensor)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    model.eval()
    return model


### Preload Models

In [None]:
# To use the models used in the paper, load the model state
D_trained_lstm = torch.load("preTrainedModels/discharging_lstm.pth")
C_trained_lstm = torch.load("preTrainedModels/charging_lstm.pth")
C_trained_ff = torch.load("preTrainedModels/charging_ff.pth")       
D_trained_ff = torch.load("preTrainedModels/discharging_ff.pth")

### Optional: Retrain the models to replicate our process

In [None]:
# To train the models yourself, execute the below code
C_trained_ff = train(C_ff, C_X_train, C_y_train, n_epochs=7, batch_size=64, lr=0.005251)
D_trained_ff = train(D_ff, D_X_train, D_y_train, n_epochs=7, batch_size=256, lr=0.02625)

C_trained_lstm = train(C_lstm, C_X_train, C_y_train, n_epochs=2, batch_size=64, lr=0.01733, lookback=19)
D_trained_lstm = train(D_lstm, D_X_train, D_y_train, n_epochs=2, batch_size=128, lr=0.01783, lookback=2)

### Continuous Learning

For simplicity, we seperate true-label fine-tuning from psuedo-label fine-tuning. The former uses the ground truth values for the test set, while the later generates its own labels for the test set (refered to as psuedo-labels)

In [None]:
import copy
from hampel import hampel
import warnings
from sklearn.metrics import mean_squared_error

warnings.simplefilter("ignore")

def generate_pseudo_labels(trained_model, X_unlabeled, batch_size=128, lookback=-1):

    if lookback != -1:
        X_unlabeled, _ = lstm_transform_data(X_unlabeled, [], lookback)
    
    batch_start = torch.arange(0, len(X_unlabeled), batch_size)
    pseudo_labels = []

    trained_model.eval()
    with torch.no_grad():
        for start in batch_start:
            X_batch = X_unlabeled[start:start+batch_size]
            X_batch_tensor = torch.from_numpy(X_batch).to(device).float()
            
            y_pseudo_label = trained_model(X_batch_tensor)
            if lookback != -1:
                pseudo_labels.append(y_pseudo_label.cpu().numpy()[:, -1, :])
            else:
                pseudo_labels.append(y_pseudo_label.cpu().numpy())

    return np.concatenate(pseudo_labels, axis=0)

def rolling_median_filter(data, window_size=100):
    data_filtered = data.copy()
    n = len(data)
    
    for i in range(n):
        # Define the left and right window boundaries
        left = max(0, i - window_size)
        right = min(n, i + window_size + 1)
        
        # Compute the median of the surrounding values
        median = np.median(data[left:right])

        # Replace value if it's outside the median range
        if data[i] > (median * 1.5):
            data_filtered[i] = median  # Replace with median
        
            
    return data_filtered

def customFilter(data, window_size,mu, medianFilter=False):
    filtered_data = pd.DataFrame()
    filtered_data = data.apply(lambda x: hampel(x, window_size=window_size, n_sigma=mu).filtered_data, axis=0)

    if medianFilter:
        filtered_data = rolling_median_filter(np.array(filtered_data))
    
    return filtered_data


def true_label_finetune(model, X_train, y_train, X_test, y_test,
                    n_epochs=2, batch_size=128, lookback = -1):

    #  combine
    X_train_combined = np.concatenate([X_train, X_test], axis=0)
    y_train_combined = np.concatenate([y_train, y_test], axis=0)

    # train using true-labels
    if lookback != -1:
        ft_model = train(copy.deepcopy(model), X_train_combined, y_train_combined, n_epochs=n_epochs,
                         batch_size=batch_size, lookback=lookback, silent=True)
    else:    
        ft_model = train(copy.deepcopy(model), X_train_combined, y_train_combined, n_epochs=n_epochs,
                         batch_size=batch_size, silent=True)    
    return ft_model
 
def hybrid_model_finetune(model1, model2, X_train, y_train, X_unlabeled, model1_preds, model2_preds, val_df,
                          n_epochs=2, batch_size=128, lookback=0, cycle=-1):
          
    # Assign truth values based on the 
    # opposite models predictions and 
    # concat with training data
    y_psuedo_label_for_model1 = model2_preds
    y_psuedo_label_for_model2 = model1_preds
    X_train_combined = np.concatenate([X_train, X_unlabeled[lookback:]], axis=0)
    y_train_combined = np.concatenate([y_train, y_psuedo_label_for_model1], axis=0)


    # finetune model1 (feedforward)
    ft_model1 = train(copy.deepcopy(model1), X_train_combined, y_train_combined, n_epochs=n_epochs,\
                             batch_size=batch_size, silent=True)

    # finetune model2 (LSTM)
    X_train_combined = np.concatenate([X_train, X_unlabeled], axis=0)
    y_train_combined = np.concatenate([y_train, y_psuedo_label_for_model2], axis=0)
    ft_model2 = train(copy.deepcopy(model2), X_train_combined, y_train_combined, n_epochs=n_epochs,\
                             batch_size=batch_size, lookback=lookback, silent=True)


    # if the test set has a greater cycle life than anything
    # in the validation set, use the max cycle in the validation
    max_cycle = val_df['c'].max()
    cycle = min(cycle, max_cycle)
    
    # obtain target cycle from validation; note: currently only supports single cycle
    mask = (val_df['c'] == cycle)
    X_val_subset = val_df.loc[mask, features]
    y_val_subset = val_df.loc[mask, [label]]

    # Filter and transform validation subset
    X_val_subset = customFilter(X_val_subset, 100, 2.5)
    X_val_subset = StandardScaler().fit_transform(X_val_subset)
    y_val_subset = y_val_subset.to_numpy()
    lstm_X_test_cycle, _ = lstm_transform_data(X_val_subset, [], lookback)
    
    # predict with pre-fine-tuned models                 
    y_pred_model1 = model1(torch.from_numpy(X_val_subset).to(device).float())
    y_pred_model2 = model2(torch.from_numpy(lstm_X_test_cycle).to(device).float())
    y_pred = (y_pred_model2.cpu().detach().numpy()[:, -1, :] + y_pred_model1.cpu().detach().numpy()[lookback:]) / 2

    # output smoothing
    y_pred = customFilter(pd.DataFrame(y_pred), 100, 2.5, medianFilter=True)

    # predict with fine-tuned models
    ft_y_pred_model1 = ft_model1(torch.from_numpy(X_val_subset).to(device).float())
    ft_y_pred_model2 = ft_model2(torch.from_numpy(lstm_X_test_cycle).to(device).float())
    ft_y_pred = (ft_y_pred_model2.cpu().detach().numpy()[:, -1, :] + ft_y_pred_model1.cpu().detach().numpy()[lookback:]) / 2

    # output smoothing
    ft_y_pred = customFilter(pd.DataFrame(ft_y_pred), 100, 2.5, medianFilter=True)

    """
    Compare the error on the validation set before and after finetuning.
    If the fine-tuning models perform better, return those. Otherwise,
    "rollback" the changes and return the models from before finetuning.
    """
    base_error = mean_squared_error(y_val_subset[lookback:], y_pred, squared=False)        
    ft_error = mean_squared_error(y_val_subset[lookback:], ft_y_pred, squared=False)

    if base_error < ft_error:
        return model1, model2      
    
    elif base_error > ft_error:   
        return ft_model1, ft_model2
    
    return ft_model1, ft_model2
    

#### True-Label

In [None]:
import copy
import numpy as np
from sklearn.metrics import mean_squared_error
import csv

raw_predictions = {}

"""
CL hyperparameters

bs - batch size for finetuning
e - Finetuning epochs
cycle_interval - number of cyles between finetuning 
ratio - portion of target cycle's training data to use
"""
bs = 256
e = 2
cycle_interval = 1
ratio = 0.6

"""
Hampel hyperparameters
window - window size for input smoothing
mu - hampel mu value for input smoothing

"""
window = 10
mu = 2.5


# fine-tuning
C_finetuned_ff = copy.deepcopy(C_trained_ff).to(device)
D_finetuned_ff = copy.deepcopy(D_trained_ff).to(device)
C_finetuned_lstm = copy.deepcopy(C_trained_lstm).to(device)
D_finetuned_lstm = copy.deepcopy(D_trained_lstm).to(device)


# baseline
C_trained_ff.to(device)
D_trained_ff.to(device)
C_trained_lstm.to(device)
D_trained_lstm.to(device)


C_max_cycle = C_test_df['c'].max()
D_max_cycle = D_test_df['c'].max()
max_cycle = int(min(C_max_cycle, D_max_cycle))


print("True-Label SoC% error per cycle")
print("*" * 85) 
for cycle in range(1, max_cycle + 1, cycle_interval):
    
    # Charging: Extract target cycles from training set
    C_mask = (C_train_df['c'] >= cycle) & (C_train_df['c'] < cycle + cycle_interval)
    C_X_train_subset = C_train_df.loc[C_mask, features]
    C_y_train_subset = C_train_df.loc[C_mask, [label]]
    
    # Discharging: Extract target cycles from training set
    D_mask = (D_train_df['c'] >= cycle) & (D_train_df['c'] < cycle + cycle_interval)
    D_X_train_subset = D_train_df.loc[D_mask, features]
    D_y_train_subset = D_train_df.loc[D_mask, [label]]

    # Apply Hampel filter and transform training data to NumPy arrays
    C_X_train_subset = customFilter(C_X_train_subset,window,mu)
    D_X_train_subset = customFilter(D_X_train_subset,window,mu)
    C_X_train_subset = StandardScaler().fit_transform(C_X_train_subset)
    C_y_train_subset = C_y_train_subset.to_numpy()
    D_X_train_subset = StandardScaler().fit_transform(D_X_train_subset)
    D_y_train_subset = D_y_train_subset.to_numpy()

    
    # Only use 60% of the cycle's train data based
    # on HP-tuning with true-label finetuning
    cutoff = int(len(C_X_train_subset) * ratio)
    C_X_train_subset =  C_X_train_subset[:cutoff]
    C_y_train_subset = C_y_train_subset[:cutoff]
    D_X_train_subset =  D_X_train_subset[:cutoff]
    D_y_train_subset = D_y_train_subset[:cutoff]


    # Charging: Extract target cycles from test set
    C_mask = (C_test_df['c'] >= cycle) & (C_test_df['c'] < cycle + cycle_interval)
    C_X_test_cycle = C_test_df.loc[C_mask, features]
    C_y_test_cycle = C_test_df.loc[C_mask, [label]]
    
    # Discharging: Extract target cycles from test set
    D_mask = (D_test_df['c'] >= cycle) & (D_test_df['c'] < cycle + cycle_interval)
    D_X_test_cycle = D_test_df.loc[D_mask, features]
    D_y_test_cycle = D_test_df.loc[D_mask, [label]]

    # Get non-filtered version for baseline (models not using CL)
    nf_C_X_test_cycle = copy.deepcopy(C_X_test_cycle)
    nf_D_X_test_cycle = copy.deepcopy(D_X_test_cycle)
    nf_C_X_test_cycle = StandardScaler().fit_transform(nf_C_X_test_cycle)
    nf_D_X_test_cycle = StandardScaler().fit_transform(nf_D_X_test_cycle)
    
    # Apply Hampel filter and transform testing data to NumPy arrays
    C_X_test_cycle = customFilter(C_X_test_cycle,window,mu)
    D_X_test_cycle = customFilter(D_X_test_cycle,window,mu)
    C_X_test_cycle = StandardScaler().fit_transform(C_X_test_cycle)
    C_y_test_cycle = C_y_test_cycle.to_numpy()
    D_X_test_cycle = StandardScaler().fit_transform(D_X_test_cycle)
    D_y_test_cycle = D_y_test_cycle.to_numpy()

    
    # Set the baseline models to evaluation mode
    C_trained_ff.eval()
    D_trained_ff.eval()
    C_trained_lstm.eval()
    D_trained_lstm.eval()
    
    # Set the fine-tuning models to evaluation mode
    C_finetuned_ff.eval() 
    D_finetuned_ff.eval()
    C_finetuned_lstm.eval()
    D_finetuned_lstm.eval()
    
    with torch.no_grad():
         # Predict with base models
        C_y_pred_base_ff = C_trained_ff(torch.from_numpy(nf_C_X_test_cycle).to(device).float())
        D_y_pred_base_ff = D_trained_ff(torch.from_numpy(nf_D_X_test_cycle).to(device).float())
        
        nf_lstm_C_X_test_cycle, _ = lstm_transform_data(nf_C_X_test_cycle, [], C_window_size)
        nf_lstm_D_X_test_cycle, _ = lstm_transform_data(nf_D_X_test_cycle, [], D_window_size)
        C_y_pred_base_lstm = C_trained_lstm(torch.from_numpy(nf_lstm_C_X_test_cycle).to(device).float())
        D_y_pred_base_lstm = D_trained_lstm(torch.from_numpy(nf_lstm_D_X_test_cycle).to(device).float())

        print(C_y_pred_base_lstm.shape)
        
        # Predict with finetuned models 
        C_y_pred_ft_ff = C_finetuned_ff(torch.from_numpy(C_X_test_cycle).to(device).float())
        D_y_pred_ft_ff = D_finetuned_ff(torch.from_numpy(D_X_test_cycle).to(device).float())

        lstm_C_X_test_cycle, _ = lstm_transform_data(C_X_test_cycle, [], C_window_size)
        lstm_D_X_test_cycle, _ = lstm_transform_data(D_X_test_cycle, [], D_window_size)
        C_y_pred_ft_lstm = C_finetuned_lstm(torch.from_numpy(lstm_C_X_test_cycle).to(device).float())
        D_y_pred_ft_lstm = D_finetuned_lstm(torch.from_numpy(lstm_D_X_test_cycle).to(device).float())

    # average predictions (hybrid model approach)
    C_y_pred_ft_hybrid = (C_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :] + C_y_pred_ft_ff.cpu().detach().numpy()[C_window_size:]) / 2
    D_y_pred_ft_hybrid = (D_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :] + D_y_pred_ft_ff.cpu().detach().numpy()[D_window_size:]) / 2 
    

    # Apply Hampel and median filter to individual model outputs
    C_ff = customFilter(pd.DataFrame(C_y_pred_ft_ff.cpu().detach().numpy()), 100, mu, medianFilter=True)
    D_ff = customFilter(pd.DataFrame(D_y_pred_ft_ff.cpu().detach().numpy()), 100, mu, medianFilter=True)
    C_lstm = customFilter(pd.DataFrame(C_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :]), 100, mu, medianFilter=True)
    D_lstm = customFilter(pd.DataFrame(D_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :]), 100, mu, medianFilter=True)

    # Apply Hampel and median filter to hybrid outputs
    C_y_pred_ft_hybrid = customFilter(pd.DataFrame(C_y_pred_ft_hybrid), 100, mu, medianFilter=True)
    D_y_pred_ft_hybrid = customFilter(pd.DataFrame(D_y_pred_ft_hybrid), 100, mu, medianFilter=True)
     

    
    # # perform true-label fine-tuning
    C_finetuned_ff = true_label_finetune(copy.deepcopy(C_finetuned_ff).to(device),
                                         C_X_train_subset, C_y_train_subset, C_X_test_cycle, C_y_test_cycle, 
                                         n_epochs=e, batch_size=bs)     
                            
    D_finetuned_ff = true_label_finetune(copy.deepcopy(D_finetuned_ff).to(device),
                                         D_X_train_subset, D_y_train_subset, D_X_test_cycle, D_y_test_cycle,
                                         n_epochs=e, batch_size=bs)
    
    
    C_finetuned_lstm = true_label_finetune(copy.deepcopy(C_finetuned_lstm).to(device),
                                           C_X_train_subset, C_y_train_subset, C_X_test_cycle, C_y_test_cycle, 
                                           n_epochs=e, batch_size=bs, lookback=C_window_size)
    
    D_finetuned_lstm = true_label_finetune(copy.deepcopy(D_finetuned_lstm).to(device),
                                           D_X_train_subset, D_y_train_subset, D_X_test_cycle, D_y_test_cycle, 
                                           n_epochs=e, batch_size=bs, lookback=D_window_size)     
    
    C_y_pred_base_hybrid = (C_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] + C_y_pred_base_ff.cpu().detach().numpy()[C_window_size:]) / 2
    D_y_pred_base_hybrid = (D_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] + D_y_pred_base_ff.cpu().detach().numpy()[D_window_size:]) / 2 
    
    print(f"{cycle} --",  
          "FF:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:]/ 1.1) ,list(C_y_pred_base_ff.cpu().detach().numpy()[C_window_size:] / 1.1) + list(D_y_pred_base_ff.cpu().detach().numpy()[D_window_size:] / 1.1), squared=False),2),
          "LSTM:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:]/ 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] / 1.1) + list(D_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] / 1.1), squared=False),2),
         "Hybrid:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_y_pred_base_hybrid / 1.1) + list(D_y_pred_base_hybrid / 1.1), squared=False),2),
          
          "FT-FF:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:]/ 1.1) ,list(C_ff[C_window_size:] / 1.1) + list(D_ff[D_window_size:] / 1.1), squared=False),2),
          "FT-LSTM:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:]/ 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_lstm/ 1.1) + list(D_lstm / 1.1), squared=False),2),
         "FT-Hybrid:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_y_pred_ft_hybrid / 1.1) + list(D_y_pred_ft_hybrid / 1.1), squared=False),2),
        )

#### Psuedo-Label

In [None]:
import copy
import numpy as np
from sklearn.metrics import mean_squared_error
import csv

raw_predictions = {}

"""
CL hyperparameters

bs - batch size for finetuning
e - Finetuning epochs
cycle_interval - number of cyles between finetuning 
ratio - portion of target cycle's training data to use
"""
bs = 256
e = 2
cycle_interval = 1
ratio = 0.6

"""
Hampel hyperparameters
window - window size for input smoothing
mu - hampel mu value for input smoothing

"""
window = 10
mu = 2.5


# hybrid fine-tuning
C_finetuned_ff = copy.deepcopy(C_trained_ff).to(device)
D_finetuned_ff = copy.deepcopy(D_trained_ff).to(device)
C_finetuned_lstm = copy.deepcopy(C_trained_lstm).to(device)
D_finetuned_lstm = copy.deepcopy(D_trained_lstm).to(device)


# baseline
C_trained_ff.to(device)
D_trained_ff.to(device)
C_trained_lstm.to(device)
D_trained_lstm.to(device)


C_max_cycle = C_test_df['c'].max()
D_max_cycle = D_test_df['c'].max()
max_cycle = int(min(C_max_cycle, D_max_cycle))


print("Psuedo-label SoC% error per cycle")
print("*" * 65) 
for cycle in range(1, max_cycle + 1, cycle_interval):
    
    # Charging: Extract target cycles from training set
    C_mask = (C_train_df['c'] >= cycle) & (C_train_df['c'] < cycle + cycle_interval)
    C_X_train_subset = C_train_df.loc[C_mask, features]
    C_y_train_subset = C_train_df.loc[C_mask, [label]]
    
    # Discharging: Extract target cycles from training set
    D_mask = (D_train_df['c'] >= cycle) & (D_train_df['c'] < cycle + cycle_interval)
    D_X_train_subset = D_train_df.loc[D_mask, features]
    D_y_train_subset = D_train_df.loc[D_mask, [label]]

    # Apply Hampel filter and transform training data to NumPy arrays
    C_X_train_subset = customFilter(C_X_train_subset,window,mu)
    D_X_train_subset = customFilter(D_X_train_subset,window,mu)
    C_X_train_subset = StandardScaler().fit_transform(C_X_train_subset)
    C_y_train_subset = C_y_train_subset.to_numpy()
    D_X_train_subset = StandardScaler().fit_transform(D_X_train_subset)
    D_y_train_subset = D_y_train_subset.to_numpy()

    
    # Only use 60% of the cycle's train data based
    # on HP-tuning with true-label finetuning
    cutoff = int(len(C_X_train_subset) * ratio)
    C_X_train_subset =  C_X_train_subset[:cutoff]
    C_y_train_subset = C_y_train_subset[:cutoff]
    D_X_train_subset =  D_X_train_subset[:cutoff]
    D_y_train_subset = D_y_train_subset[:cutoff]


    # Charging: Extract target cycles from test set
    C_mask = (C_test_df['c'] >= cycle) & (C_test_df['c'] < cycle + cycle_interval)
    C_X_test_cycle = C_test_df.loc[C_mask, features]
    C_y_test_cycle = C_test_df.loc[C_mask, [label]]
    
    # Discharging: Extract target cycles from test set
    D_mask = (D_test_df['c'] >= cycle) & (D_test_df['c'] < cycle + cycle_interval)
    D_X_test_cycle = D_test_df.loc[D_mask, features]
    D_y_test_cycle = D_test_df.loc[D_mask, [label]]

    # Get non-filtered version for baseline (models not using CL)
    nf_C_X_test_cycle = copy.deepcopy(C_X_test_cycle)
    nf_D_X_test_cycle = copy.deepcopy(D_X_test_cycle)
    nf_C_X_test_cycle = StandardScaler().fit_transform(nf_C_X_test_cycle)
    nf_D_X_test_cycle = StandardScaler().fit_transform(nf_D_X_test_cycle)
    
    # Apply Hampel filter and transform testing data to NumPy arrays
    C_X_test_cycle = customFilter(C_X_test_cycle,window,mu)
    D_X_test_cycle = customFilter(D_X_test_cycle,window,mu)
    C_X_test_cycle = StandardScaler().fit_transform(C_X_test_cycle)
    C_y_test_cycle = C_y_test_cycle.to_numpy()
    D_X_test_cycle = StandardScaler().fit_transform(D_X_test_cycle)
    D_y_test_cycle = D_y_test_cycle.to_numpy()

    
    # Set the baseline models to evaluation mode
    C_trained_ff.eval()
    D_trained_ff.eval()
    C_trained_lstm.eval()
    D_trained_lstm.eval()

    
    # Set the CL-WR models to evaluation mode
    C_finetuned_ff.eval() 
    D_finetuned_ff.eval()
    C_finetuned_lstm.eval()
    D_finetuned_lstm.eval()
    
    with torch.no_grad():
         # Predict with base models
        C_y_pred_base_ff = C_trained_ff(torch.from_numpy(nf_C_X_test_cycle).to(device).float())
        D_y_pred_base_ff = D_trained_ff(torch.from_numpy(nf_D_X_test_cycle).to(device).float())
        
        nf_lstm_C_X_test_cycle, _ = lstm_transform_data(nf_C_X_test_cycle, [], C_window_size)
        nf_lstm_D_X_test_cycle, _ = lstm_transform_data(nf_D_X_test_cycle, [], D_window_size)
        C_y_pred_base_lstm = C_trained_lstm(torch.from_numpy(nf_lstm_C_X_test_cycle).to(device).float())
        D_y_pred_base_lstm = D_trained_lstm(torch.from_numpy(nf_lstm_D_X_test_cycle).to(device).float())
  
        # Predict with models finetuned using CL-WR
        C_y_pred_ft_ff = C_finetuned_ff(torch.from_numpy(C_X_test_cycle).to(device).float())
        D_y_pred_ft_ff = D_finetuned_ff(torch.from_numpy(D_X_test_cycle).to(device).float())

        lstm_C_X_test_cycle, _ = lstm_transform_data(C_X_test_cycle, [], C_window_size)
        lstm_D_X_test_cycle, _ = lstm_transform_data(D_X_test_cycle, [], D_window_size)
        C_y_pred_ft_lstm = C_finetuned_lstm(torch.from_numpy(lstm_C_X_test_cycle).to(device).float())
        D_y_pred_ft_lstm = D_finetuned_lstm(torch.from_numpy(lstm_D_X_test_cycle).to(device).float())

    # average predictions (hybrid model approach)
    C_y_pred_ft_hybrid = (C_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :] + C_y_pred_ft_ff.cpu().detach().numpy()[C_window_size:]) / 2
    D_y_pred_ft_hybrid = (D_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :] + D_y_pred_ft_ff.cpu().detach().numpy()[D_window_size:]) / 2 
    

    # Apply Hampel and median filter to hybrid outputs
    C_y_pred_ft_hybrid = customFilter(pd.DataFrame(C_y_pred_ft_hybrid), 100, mu, medianFilter=True)
    D_y_pred_ft_hybrid = customFilter(pd.DataFrame(D_y_pred_ft_hybrid), 100, mu, medianFilter=True)
     
    # Apply Hampel and median filter to individual model outputs for dual-model CL-WR
    C_ff = customFilter(pd.DataFrame(C_y_pred_ft_ff.cpu().detach().numpy()), 100, mu, medianFilter=True)
    D_ff = customFilter(pd.DataFrame(D_y_pred_ft_ff.cpu().detach().numpy()), 100, mu, medianFilter=True)
    C_lstm = customFilter(pd.DataFrame(C_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :]), 100, mu, medianFilter=True)
    D_lstm = customFilter(pd.DataFrame(D_y_pred_ft_lstm.cpu().detach().numpy()[:, -1, :]), 100, mu, medianFilter=True)
 
    # perform CL-WR
    C_finetuned_ff, C_finetuned_lstm = hybrid_model_finetune(copy.deepcopy(C_finetuned_ff).to(device), copy.deepcopy(C_finetuned_lstm).to(device),
                                                             C_X_train_subset, C_y_train_subset, C_X_test_cycle, C_ff, C_lstm, C_val_df,
                                                             n_epochs=e, batch_size=bs, lookback=C_window_size, cycle=cycle)
    
    D_finetuned_ff, D_finetuned_lstm = hybrid_model_finetune(copy.deepcopy(D_finetuned_ff).to(device), copy.deepcopy(D_finetuned_lstm).to(device),
                                                             D_X_train_subset, D_y_train_subset, D_X_test_cycle, D_ff, D_lstm, D_val_df,
                                                             n_epochs=e, batch_size=bs,lookback=D_window_size, cycle=cycle)
          
    C_y_pred_base_hybrid = (C_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] + C_y_pred_base_ff.cpu().detach().numpy()[C_window_size:]) / 2
    D_y_pred_base_hybrid = (D_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] + D_y_pred_base_ff.cpu().detach().numpy()[D_window_size:]) / 2 
    
    print(f"{cycle} --",  
          "FF:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:]/ 1.1) ,list(C_y_pred_base_ff.cpu().detach().numpy()[C_window_size:] / 1.1) + list(D_y_pred_base_ff.cpu().detach().numpy()[D_window_size:] / 1.1), squared=False),2),
          "LSTM:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:]/ 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] / 1.1) + list(D_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :] / 1.1), squared=False),2),
         "Hybrid:", 
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_y_pred_base_hybrid / 1.1) + list(D_y_pred_base_hybrid / 1.1), squared=False),2),
         "CL-WR:",
          round(100 * mean_squared_error(list(C_y_test_cycle[C_window_size:] / 1.1) +  list(D_y_test_cycle[D_window_size:] / 1.1) ,list(C_y_pred_ft_hybrid / 1.1) + list(D_y_pred_ft_hybrid / 1.1), squared=False),2)
    )
    
    """
    Prediction array format:
    
        truth_charging, truth_discharging, 
        feedforward_charging, feedforward_discharging,
        lstm_charging, lstm_discharging,
        hybrid_charging, hybrid_discharging,
        CL_WR_charging, CL_WR_discharging
    """
    raw_items = [
        C_y_test_cycle[C_window_size:], D_y_test_cycle[D_window_size:], 
        C_y_pred_base_ff.cpu().detach().numpy()[C_window_size:], D_y_pred_base_ff.cpu().detach().numpy()[D_window_size:],
        C_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :], D_y_pred_base_lstm.cpu().detach().numpy()[:, -1, :],
        C_y_pred_base_hybrid, D_y_pred_base_hybrid,
        C_y_pred_ft_hybrid, D_y_pred_ft_hybrid
    ]
    processed_items = []
    for item in raw_items:
        processed_items.append(item.flatten().tolist())
    raw_predictions[cycle] = processed_items


In [None]:
import json
# Optionally, save the predictions for later analysis
with open("raw_psuedo_label_predictions.json", "w") as f:
    json.dump(raw_predictions, f, indent=4)  