# **1: Network Testing**

This tutorial demonstrates how to evaluate the double LSTM-based model for the sequence-to-scalar future hysteresis step prediction. The testset seqeunces include data that has the same sampling time steps as the training dataset. However the code is able to decipher the sequence length (provided that it has a sampling frequency), and upsample or downsample as needed to fit the model criteria. If the sequence is longer than the designed memory, then only the most recent time steps (amount to the total training memory time) are taken. If the data length is shorter, then the extra empty memory will repeat the constant of the most distant memory data point. 


# **Step 0: Import Packages**

In this step we import the important packages that are necessary for the testing.

In [7]:
# from google.colab import drive
# drive.mount('/content/drive')

import torch
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
import numpy as np
import json
import h5py
import math
import csv
import time
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# **Step 1: Define Network Structure**
The structure of the duel LSTM-based encoder-projector neural network are defined here. The network structure does not change from the training structure. Refer to the PyTorch document for more details.

In [8]:
# Define model structures and functions

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.lstm_B = nn.LSTM(1, 6, num_layers=1, batch_first=True, bidirectional=False)

        self.lstm_H = nn.LSTM(1, 6, num_layers=1, batch_first=True, bidirectional=False)

        self.projector = nn.Sequential(
            nn.Linear(6 *2 + 2 , 6 *2 + 2),
            nn.Tanh(),
            nn.Linear(6 *2 + 2, 8),
            nn.Tanh(),
            nn.Linear(8 , 1)
        )

    def forward(self, seq_B: Tensor, seq_H: Tensor, scal: Tensor, T: Tensor, device) -> Tensor:

        seq_B = seq_B.float()
        seq_H = seq_H.float()
        scal = scal.float()
        T = T.float()

        x_B, _ = self.lstm_B(seq_B)
        x_B = x_B[:, -1, :]

        x_H, _ = self.lstm_H(seq_H)
        x_H = x_H[:, -1, :]


        output = self.projector(torch.cat((scal, T, x_B, x_H), dim=1))
        output = output.to(device)

        return output
    
    



# **Step 2: Load the Testing Dataset**

Dataset needs to be processed before testing. The provided dataset includes multiple segment split ratios. Three ratios, 10:90%, 50:50%, and 90:10% data ratios are provided. Output H sequence is post processed again to provide autoregressive function. User input should provide the information of sampling time, which is the same as the model in this Challenge.

In [9]:
#Define parameters
def get_dataset(sample_freq_model, B, H, H_true, T, data_length, path_root):

    # Load from JSON
    Norm_file_name = 'Normalization_Params.json'
    with open(path_root+Norm_file_name, 'r') as f:
        Param = json.load(f)
        
    # Initialization of the normalization Data for each material
    print("Normalization Initiated")
    mean_B = np.array(Param['mean_B'])
    std_B = np.array(Param['std_B'])
    mean_H = np.array(Param['mean_H'])
    std_H = np.array(Param['std_H'])
    mean_out = np.array(Param['mean_out'])
    std_out = np.array(Param['std_out'])
    mean_Scal = np.array(Param['mean_Scal'])
    std_Scal = np.array(Param['std_Scal'])    
    mean_T = np.array(Param['mean_T'])    
    std_T = np.array(Param['std_T'])   

    H_len = H[0].size
    sample_freq_test = sample_freq_model  # Sampling frequency for testing sequence set the same as model sampling frequency.
    sample_time_test = 1/sample_freq_test
    total_time_H = (H_len - 1) * sample_time_test

    data_length_model = data_length
    sample_time_model = 1/sample_freq_model

    data_length_test = H_len              # memory data length of testing dataset (as B is a full sequence including past and future points, and H has only past memory sequence)
    
    # Time step difference
    if total_time_H != sample_time_model * (data_length_model-1):
        if data_length_model > data_length_test: # when testing data length is shorter
            # Fill the empty elements with the same value as the first value of the original sequence
            B_ftr = B[:,data_length_test:]
            H_ftr = H_true[:,data_length_test:]
            B = np.concatenate([np.tile(B[:,0].reshape(-1,1),(1,data_length_model - data_length_test)),B[:, : data_length_test]],axis = 1) 
            H = np.concatenate([np.tile(H[:,0].reshape(-1,1),(1,data_length_model - data_length_test)),H[:, : data_length_test]],axis = 1)
        else: # when testing data length is longer
            # Use only the recent data_length values
            B_ftr = B[:,data_length_test:]
            H_ftr = H_true[:,data_length_test:]
            B = B[:,data_length_test - data_length_model:data_length_test]
            H = H[:,data_length_test - data_length_model:data_length_test]


    in_B = B
    in_H = H
    in_B_next = B
    B_scal = B_ftr[:, [0]]
    H_out = H_ftr[:, [0]]
    T_scal = T


    for i in range(len(B_ftr[0])-1):
        in_B_next = np.roll(in_B_next, -1, axis = 1)
        in_B_next[:, -1] = B_ftr[:, i]

        B_scal = np.vstack([B_scal, B_ftr[:, [i+1]]])
        H_out = np.vstack([H_out, H_ftr[:, [i+1]]])
        T_scal = np.vstack([T_scal, T])
        in_B = np.vstack([in_B, in_B_next])
        in_H = np.vstack([in_H, np.zeros_like(H)])



    B_scal = (B_scal-mean_Scal)/std_Scal
    T_scal = (T_scal-mean_T)/std_T
    in_B = (in_B-mean_B)/std_B
    in_H = (in_H-mean_H)/std_H
    H_out = (H_out-mean_out)/std_out

    normH = [mean_out, std_out]

    in_B = torch.from_numpy(in_B).float().view(-1,data_length_model, 1)
    in_H = torch.from_numpy(in_H).float().view(-1,data_length_model, 1)
    out = torch.from_numpy(H_out).float().view(-1,1)
    B_scal = torch.from_numpy(B_scal).float().view(-1,1)
    T_scal = torch.from_numpy(T_scal).float().view(-1,1)

    return torch.utils.data.TensorDataset(in_B, in_H, B_scal, T_scal, out), normH

# **Step 3: Testing the Model**

The loaded dataset is directly used as the test set. The model state dictionary file (.sd) containing all the trained parameter values is loaded and tested.

In [10]:
#Defind parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


# Config the model testing
def Test(sample_freq_model, B, H, H_true, T, data_length, path_root, state_dict):

    # Load dataset in iterations
    idx_nan = np.argmax(np.isnan(H), axis = 1) #  Find the column idx for each row where NaN occurs. idx_nan is the future B input, memory should precede this point
    has_nan = np.any(np.isnan(H), axis = 1) # Find all column idx where NaN occurs
    idx_nan[~has_nan] = -1 
    lgt_change = np.where(np.diff(idx_nan))[0] + 1  # Find the row idx where the NaN changes occur
    start = np.insert(lgt_change, 0 , 0) # Start idx for each ratios of partial data
    stop = np.append(lgt_change, len(idx_nan))  

# Perform the evulation iteratively by each data segment split ratio
    for i, (start, stop) in enumerate(zip(start, stop)):
        idx_st = idx_nan[start]  # Row starting idx 
        B_seq = B[start:stop, :]
        H_mem = H[start:stop, 0 : idx_st]
        T_scal = T[start:stop, :]
        H_meas = H_true[start:stop, :]

        # Hyperparameters
        BATCH_SIZE = len(idx_nan[start : stop])

        # Load dataset
        dataset, normH = get_dataset(sample_freq_model, B_seq, H_mem, H_meas, T_scal, data_length, path_root)
    
        print(f"Data Iteration {i+1}")
    
       # Reproducibility
        random.seed(1)
        np.random.seed(1)
        torch.manual_seed(1)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    
        # Select GPU as default device
        device = torch.device("cuda")

        # Split the dataset
        kwargs = {'num_workers': 0, 'pin_memory': True, 'pin_memory_device': "cuda"}
        test_loader =  torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False, **kwargs)
        testData = list(test_loader)


        # Setup network
        net = Net().to(device)

        # Log the number of parameters
        print("Number of parameters: ", count_parameters(net))

        # Load trained parameters
        net.load_state_dict(state_dict, strict=True)
        net.eval()
        print("Model is loaded!")


        #Test
        net.eval()
        out_pred = []
        out_pred = torch.tensor(out_pred)
        out_meas = []
        out_meas = torch.tensor(out_meas)
        outputs = torch.empty((0,1))
        outputs = outputs.to(device)

  
    
        previous_in_H= None
        with torch.no_grad():
            for in_B, in_H, B_scal, T_scal, out in testData:  # Batch level

                if  previous_in_H is None:
                    outputs = net(seq_B = in_B.to(device), seq_H = in_H.to(device), scal = B_scal.to(device), T = T_scal.to(device), device = device)
                    out_pred = out_pred.to(outputs.device)
                    out_pred = torch.cat((out_pred, outputs), dim = 1)
                    out_meas = out_meas.to(out.device)
                    out_meas = torch.cat((out_meas, out), dim = 1)


                else:
                    # Update in_H autoregressively with previous prediction
                    in_H = np.roll(previous_in_H.cpu(),-1,axis = 1)
                    in_H[:, -1] = out_pred[:, -1].cpu().numpy().reshape(-1, 1)  # Set the new value at the end
                    in_H = torch.from_numpy(in_H).float()

                    outputs = net(seq_B = in_B.to(device), seq_H = in_H.to(device), scal = B_scal.to(device), T = T_scal.to(device), device = device)
                    out_pred = out_pred.to(outputs.device)
                    out_pred = torch.cat((out_pred, outputs), dim = 1)
                    out_meas = out_meas.to(out.device)
                    out_meas = torch.cat((out_meas, out), dim = 1)


                previous_in_H = in_H

            # Obtain the tested data
            y_meas = out_meas.cpu().numpy()
            y_meas = y_meas *normH[1]+normH[0]
            y_pred = out_pred.cpu().numpy()
            y_pred = y_pred *normH[1]+normH[0]
            

            # Save the test data into csv files
            with open(path_root + "Testing/pred.csv", "a") as f:
                np.savetxt(f, y_pred, delimiter=',')
                f.close()
            with open(path_root + "Testing/meas.csv", "a") as f:
                np.savetxt(f, y_meas, delimiter=',')
                f.close()

            print("Testing finished! Results are saved!")


        print("Test iteration completed!")




# **Step 4: Main Function**

In [None]:
if __name__ == "__main__":
    Material = '3C90'
    path_root = 'C:/Users/Labadmin/' + Material + '/'

    # File name
    file_B = f"{Material}_Testing_True_B_seq.csv"
    file_H = f"{Material}_Testing_Padded_H_seq.csv"
    file_T = f"{Material}_Testing_True_T.csv"
    file_H_true = f"{Material}_Testing_True_H_seq.csv"

    state_dict = torch.load(path_root + 'Model_LSTM_3C90.sd')

    B = pd.read_csv(path_root + file_B, header=None).to_numpy()
    H = pd.read_csv(path_root + file_H, header=None).to_numpy()
    T_scal = pd.read_csv(path_root + file_T, header=None).to_numpy().reshape(-1, 1)
    H_true = pd.read_csv(path_root + file_H_true, header=None).to_numpy()
    
    print("Data Loading Initiated (CSV)")

    data_length = 80 # Memory data length
    sample_freq_model = 16e6 # Sampling frequency from model training
    # Run the test function
    Test(sample_freq_model, B, H, H_true, T_scal, data_length, path_root, state_dict)

  state_dict = torch.load(path_root + 'Model_LSTM_3C90.sd')


Data Loading Initiated (CSV)
Normalization Initiated
Data Iteration 1
Number of parameters:  771
Model is loaded!
Testing finished! Results are saved!
Test iteration completed!
Normalization Initiated
Data Iteration 2
Number of parameters:  771
Model is loaded!
Testing finished! Results are saved!
Test iteration completed!
Normalization Initiated
Data Iteration 3
Number of parameters:  771
Model is loaded!
Testing finished! Results are saved!
Test iteration completed!
