In [None]:
import numpy as np
np.random.seed(1234)

import torch
import torch.nn as nn

torch.set_default_device('cuda')
torch.manual_seed(1234)

my_dtype = torch.float64
torch.set_default_dtype(my_dtype)

import matplotlib.pyplot as plt
from tqdm import tqdm
import scipy.io

In [None]:
def read_nasa_cycle(cycle_num):
    data_file = scipy.io.loadmat(file_path, squeeze_me=False)
    
    data = {}
    data['Imeas'] = torch.tensor(data_file[file_name]['cycle'][0][0][0, cycle_num]['data'][0, 0]['Current_measured'][0, :], requires_grad=True).unsqueeze(-1)
    data['Vmeas'] = torch.tensor(data_file[file_name]['cycle'][0][0][0, cycle_num]['data'][0, 0]['Voltage_measured'][0, :], requires_grad=True).unsqueeze(-1)
    data['T'] = torch.tensor(data_file[file_name]['cycle'][0][0][0, cycle_num]['data'][0, 0]['Temperature_measured'][0, :], requires_grad=True).unsqueeze(-1)
    data['time'] = torch.tensor(data_file[file_name]['cycle'][0][0][0, cycle_num]['data'][0, 0]['Time'][0, :], requires_grad=True).unsqueeze(-1)
    data['cycle_type'] = data_file[file_name]['cycle'][0][0][0, cycle_num]['type'][0]
    
    # Capacity is measured on discharge cycles so only available for these
    if data['cycle_type'] == 'discharge':
        data['capacity'] = data_file[file_name]['cycle'][0][0][0, cycle_num]['data'][0, 0]['Capacity'][0, 0]
        data['Q_nominal'] = torch.tensor(data['capacity']*3600.0, requires_grad=True)
    
    return data

def read_nissan_cycle(filepath, uniform_timestep=True, uniform_dt=1.0):
    data = {}
    
    csv_file = np.loadtxt(filepath, delimiter=',', skiprows=1, usecols=(1,7,8,9,11))
    
    data['capacity']  = 30.0
    data['Q_nominal'] = torch.tensor(data['capacity']*3600.0, requires_grad=True)
    
    data['SOC_t0'] = csv_file[0,4] / data['capacity']
    
    if uniform_timestep:
        num_steps     = int((csv_file[-1,0] - csv_file[0,0]) / uniform_dt)
        data['time']  = np.linspace(csv_file[0,0], csv_file[-1,0], num_steps)
        data['Imeas'] = torch.from_numpy(np.interp(data['time'], csv_file[:,0], csv_file[:,2])).unsqueeze(-1).to('cuda')
        data['Vmeas'] = torch.from_numpy(np.interp(data['time'], csv_file[:,0], csv_file[:,3])).unsqueeze(-1).to('cuda')
        data['time']  = torch.from_numpy(data['time']).unsqueeze(-1).to('cuda')
        data['dt']    = torch.diff(data['time'], dim=0, prepend=torch.zeros((1,1))).to('cuda')
        
        data['time'].requires_grad=True
        data['dt'].requires_grad=True
        data['Imeas'].requires_grad=True
        data['Vmeas'].requires_grad=True

        
    else:
        data['time']  = torch.tensor(csv_file[:,0], requires_grad=True).unsqueeze(-1)
        data['dt']    = torch.tensor(csv_file[:,1], requires_grad=True).unsqueeze(-1)
        data['Imeas'] = torch.tensor(csv_file[:,2], requires_grad=True).unsqueeze(-1)
        data['Vmeas'] = torch.tensor(csv_file[:,3], requires_grad=True).unsqueeze(-1)
    
    return data

In [None]:
# Import data
N_file_start = 0
N_file_end = 500 #2287  # Number of timesteps to take from file (starting at beginning)
n_skips    = 1      # Sample data from file at this interval
N = N_file_end - N_file_start

file_type = 'nissan'
file_name = 'B0005'
file_path = [
             {"path" : "D:/OneDrive - Queen's University Belfast/Documents/Nissan_Data/cell-discharge-bitrode-1c.csv",
              "name" : "cell-discharge-bitrode-1c"},
             {"path" : "D:/OneDrive - Queen's University Belfast/Documents/Nissan_Data/cell-discharge-bitrode-2c.csv",
              "name" : "cell-discharge-bitrode-2c"},
             {"path" : "D:/OneDrive - Queen's University Belfast/Documents/Nissan_Data/cell-discharge-bitrode-3c.csv",
              "name" : "cell-discharge-bitrode-3c"},
             {"path" : "D:/OneDrive - Queen's University Belfast/Documents/Nissan_Data/cell-low-current-hppc-10c.csv",
              "name" : "cell-low-current-hppc-10c"},
             {"path" : "D:/OneDrive - Queen's University Belfast/Documents/Nissan_Data/cell-low-current-hppc-25c-2.csv",
              "name" : "cell-low-current-hppc-25c-2"},
             {"path" : "D:/OneDrive - Queen's University Belfast/Documents/Nissan_Data/cell-low-current-hppc-40c-1.csv",
              "name" : "cell-low-current-hppc-40c-1"},
            ]

# Load data file and cut down to desired length
if file_type == 'nasa':
    data_file = read_nasa_cycle(1)
    print(data_file['cycle_type'])
    for var in data_file:
        if var == 'Imeas' or var == 'Vmeas' or var == 'T' or var == 'time':
            data_file[var] = data_file[var][N_file_start:N_file_end]
            data_file[var] = torch.tensor(data_file[var][::n_skips], requires_grad=True)

    dt = torch.tensor(torch.diff(data_file['time'][0,:,0].cpu().detach()), requires_grad=True, dtype=my_dtype)
    dt = torch.concat((dt, dt[-1:,:]), dim=0)
    
elif file_type == 'nissan':
    
    #Loop through cycle files
    file_num = 0
    data_file = {}
    for file in file_path:
        
        # Read this cycle file
        data_file[file_num] = read_nissan_cycle(file['path'], uniform_timestep=False)

        for var in data_file[file_num]:
            if var == 'Imeas' or var == 'Vmeas' or var == 'T' or var == 'time':
                data_file[file_num][var] = data_file[file_num][var][N_file_start:N_file_end]
                data_file[file_num][var] = torch.tensor(data_file[file_num][var][::n_skips], requires_grad=True)

        data_file[file_num]['dt'] = torch.diff(data_file[file_num]['time'], dim=0, prepend=torch.zeros((1,1)))
                
        if file['name'] == "cell-discharge-bitrode-2c":
            data_file[file_num]['SOC_t0'] = torch.tensor(1.0, requires_grad=True)
            
        elif file['name'] == "cell-discharge-bitrode-3c":
            data_file[file_num]['SOC_t0'] = torch.tensor(1.0, requires_grad=True)
            
        else:
            data_file[file_num]['SOC_t0'] = torch.tensor(0.0, requires_grad=True)
            
        file_num += 1

            
with torch.no_grad():
    plot_n = 0
    
    dSOC = torch.zeros((len(data_file), N, 1))
    SOC  = torch.zeros_like(dSOC)
            
    if file_type == 'nasa':
        if data_file[plot_n]['cycle_type'] == 'discharge':
            SOC[:,0,0] = 1.0  
            
    elif file_type == 'nissan':
        for ii in range(len(data_file)):
            dSOC[ii] = (data_file[ii]['Imeas'] / data_file[ii]['Q_nominal']) * data_file[ii]['dt']
            SOC[ii,0,0] = data_file[ii]['SOC_t0']
            
    for i in tqdm(range(1, SOC.size(1))):
        SOC[:,i,0] = SOC[:,i-1,0] + dSOC[:,i-1,0]
    
    for ii in range(len(data_file)):
        plt.plot(SOC[ii,:,0].cpu())
    plt.ylabel('SOC')
    plt.xlabel('Timestep')
    plt.show()
    plt.close()

In [None]:
class ECM_standard(nn.Module):
    def __init__(self, H1, H2, H3, n_lstm):
        super().__init__()
        
        self.H1 = H1
        self.H2 = H2
        self.H3 = H3
        self.n_lstm = n_lstm
        
        self.act = torch.tanh
        
        # Training records
        self.epochs = 0
        self.data_loss = []
        self.theory_loss = []

        self.inp = nn.Linear(2, self.H2)

        self.encoder  = nn.Linear(self.H1, self.H2)
        self.encoder2 = nn.Linear(self.H2, self.H2)
        
        self.rnn     = nn.LSTM(input_size=self.H2,
                               hidden_size=self.H3,
                               num_layers=self.n_lstm,
                               batch_first=True,
                               bidirectional=False)
        
        self.decoder = nn.Linear(self.H3, self.H2)
        self.decoder2 = nn.Linear(self.H2, 1)
        
        self.bn_inp = nn.BatchNorm1d(H2)
        self.bn_enc1 = nn.BatchNorm1d(H2)
        self.bn_enc2 = nn.BatchNorm1d(H2)
        self.bn_lstm = nn.BatchNorm1d(H3)
        self.bn_dec1 = nn.BatchNorm1d(H2)
        self.bn_dec2 = nn.BatchNorm1d(2)
    
    def forward(self, x_in, hidden_states):
        x = self.act(self.inp(x_in))
        x_lstm, (h1, c1) = self.rnn(x, hidden_states)
        x = self.act(self.decoder(x_lstm))
        x = self.decoder2(x)
        return x, (h1, c1)
    
    def closure(self, y_true, x, hidden_states1, I1_t0, I2_t0):
        h1, c1 = hidden_states1

        # predict
        y_pred, (h1, c1) = model(x, (h1, c1))

        # calculate loss
        loss = nn.MSELoss()(y_pred, y_true)
        
        self.data_loss.append(loss.item())
        
        _ = torch.empty(1,) # empty tensor to make this compatible with the PINN closure

        return loss, (h1, c1), _, _

In [None]:
class ECM_PINN(nn.Module):
    def __init__(self, H1, H2, H3, n_lstm):
        super().__init__()
        
        self.L_data = torch.tensor(1.0, requires_grad=True)
        self.L_theory = torch.tensor(1.0, requires_grad=True)
        self.L_ECM = torch.tensor(1.0e2, requires_grad=True)
        
        self.H1 = H1
        self.H2 = H2
        self.H3 = H3
        self.n_lstm = n_lstm
        
        self.act = torch.tanh

        self.inp = nn.Linear(3, self.H2)

        self.encoder = nn.Linear(self.H1, self.H2)
        self.encoder2 = nn.Linear(self.H2, self.H2)
        
        self.rnn     = nn.LSTM(input_size=self.H2,
                               hidden_size=self.H3,
                               num_layers=self.n_lstm,
                               batch_first=True,
                               bidirectional=False)
        
        
        self.decoder = nn.Linear(self.H3, self.H2)
        self.decoder2 = nn.Linear(self.H2, 1)
        self.out_bias = torch.tensor(2.8, requires_grad=True)
            
        self.soc_decoder = nn.Linear(self.H2, 1)
        self.v_decoder   = nn.Linear(self.H2, 1)

        self.bn_inp = nn.BatchNorm1d(H2)
        self.bn_enc1 = nn.BatchNorm1d(H2)
        self.bn_enc2 = nn.BatchNorm1d(H2)
        self.bn_lstm = nn.BatchNorm1d(H3)
        self.bn_dec1_V = nn.BatchNorm1d(H2)
        self.bn_dec1_soc = nn.BatchNorm1d(H2)
        
        '''
            ECM Parameters
        '''        
        self.Q_nominal = torch.tensor(25*3600.0, requires_grad=True)
        
        # LUT values
        self.R0_0     = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R1_0     = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R2_0     = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R0_1_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R1_1_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R2_1_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R0_2_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R1_2_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.R2_2_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        
        self.C1_0     = nn.Parameter(torch.tensor(1.0, requires_grad=True))
        self.C2_0     = nn.Parameter(torch.tensor(1.0, requires_grad=True))
        self.C1_1_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.C2_1_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.C1_2_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        self.C2_2_SOC = nn.Parameter(torch.tensor(1.0e-5, requires_grad=True))
        
        self.Vocv_0      = nn.Parameter(torch.tensor(2.8, requires_grad=True))

        N_LUT = 3
        self.Vocv_l1 = nn.Linear(1, N_LUT, bias=True)
        self.Vocv_l2 = nn.Linear(N_LUT, 1, bias=True)
        
        self.R0_l1 = nn.Linear(1, N_LUT, bias=True)
        self.R0_l2 = nn.Linear(N_LUT, 1, bias=True)
        
        self.R1_l1 = nn.Linear(1, N_LUT, bias=True)
        self.R1_l2 = nn.Linear(N_LUT, 1, bias=True)
        
        self.R2_l1 = nn.Linear(1, N_LUT, bias=True)
        self.R2_l2 = nn.Linear(N_LUT, 1, bias=True)
        
        self.C1_l1 = nn.Linear(1, N_LUT, bias=True)
        self.C1_l2 = nn.Linear(N_LUT, 1, bias=True)
        
        self.C2_l1 = nn.Linear(1, N_LUT, bias=True)
        self.C2_l2 = nn.Linear(N_LUT, 1, bias=True)

        
        # Training records
        self.epochs = 0
        self.theory_loss = []
        self.data_loss = []
        self.ecm_loss = []
        self.theory_loss_batch = []
        self.data_loss_batch = []
        self.ecm_loss_batch = []
        
        
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            #module.weight.data.normal_(mean=0.0, std=0.1)
            y = 1.0 / torch.sqrt(torch.tensor(torch.numel(module.weight.data), dtype=my_dtype))
            module.weight.data.normal_(0.0, y)
            
            if module.bias is not None:
                module.bias.data.fill_(0.01)
    
    
    '''
        FORWARD PASS
    '''
    
    def forward(self, x_in, hidden_states):
        x = self.act(self.inp(x_in))
        x_lstm, (h1, c1) = self.rnn(x, hidden_states)
        x = self.act(self.decoder(x_lstm))
        x = self.decoder2(x) + self.out_bias
        return x, (h1, c1)
    
    
    '''
        CLOSURE FUNCTION
    '''
    def criterion(self, y_pred, y_true):
        #return torch.sum(torch.square(y_pred - y_true))
        return nn.MSELoss()(y_pred, y_true)
        
    
    def Vocv_func(self, SOC):
        Vocv_SOC = torch.tanh(self.Vocv_l1(SOC))
        Vocv_SOC = torch.square(self.Vocv_l2(Vocv_SOC))
        return self.Vocv_0 + Vocv_SOC
    
    def R0_func(self, SOC):
        R0_SOC = torch.tanh(self.R0_l1(SOC))
        R0_SOC = torch.square(self.R0_l2(R0_SOC))
        return R0_SOC
    
    def R1_func(self, SOC):
        R1_SOC = torch.tanh(self.R1_l1(SOC))
        R1_SOC = torch.square(self.R1_l2(R1_SOC))
        return R1_SOC
    
    def R2_func(self, SOC):
        R2_SOC = torch.tanh(self.R2_l1(SOC))
        R2_SOC = torch.square(self.R2_l2(R2_SOC))
        return R2_SOC
    
    def C1_func(self, SOC):
        C1_SOC = torch.tanh(self.C1_l1(SOC))
        C1_SOC = torch.square(self.C1_l2(C1_SOC))
        return C1_SOC
    
    def C2_func(self, SOC):
        C2_SOC = torch.tanh(self.C2_l1(SOC))
        C2_SOC = torch.square(self.C2_l2(C2_SOC))
        return C2_SOC
    
    def compute_ECM(self, I, SOC_t0, dt_, I1_t0, I2_t0):
        
        # Initialise values
        nt = I.size(1)
        SOC = torch.zeros_like(I, requires_grad=True)
        I1 = torch.zeros_like(I, requires_grad=True)
        I2 = torch.zeros_like(I, requires_grad=True)
        V0 = torch.zeros_like(I, requires_grad=True)
        V1 = torch.zeros_like(I, requires_grad=True)
        V2 = torch.zeros_like(I, requires_grad=True)
        #Vocv = torch.zeros_like(I, requires_grad=True)
        
        # Make indexing tensor to allow specific values to change each timestep
        idx = torch.zeros_like(I, requires_grad=False)
        idx[:,0] = 1.0
        idx.requires_grad=True
        
        # Change in SOC at each timestep
        dSOC = torch.cumsum((I[:,:,:] * dt_[:,:,:]) / self.Q_nominal, dim=1)
        dSOC = torch.concat((torch.zeros_like(dSOC[:,0:1,:]), dSOC[:,:-1,:]), dim=1)
        
        # Initial SOC
        SOC = SOC + SOC_t0.reshape(-1, 1, 1).tile(1, nt, 1) + dSOC
                
        R0 = self.R0_func(SOC)
        R1 = self.R1_func(SOC)
        R2 = self.R2_func(SOC)
        C1 = self.C1_func(SOC)
        C2 = self.C2_func(SOC)
        
        # Calculate values at each timestep
        for t in range(nt):
            
            # Make indexing tensor to allow specific values to change each timestep
            idx = torch.zeros_like(I, requires_grad=False)
            idx[:,t,:] = 1.0
            idx.requires_grad=True
            
            if t > 0:
                I1 = I1 + idx * ((torch.exp(-dt_[:,t,:]/(R1[:,t,:]*C1[:,t,:])) * I1[:,t-1,:]) + 
                                 ((torch.ones_like(I[:,0,:], requires_grad=True) - torch.exp(-dt_[:,t,:]/(R1[:,t,:]*C1[:,t,:]))) * I[:,t-1,:])
                                ).unsqueeze(1)

                I2 = I2 + idx * ((torch.exp(-dt_[:,t,:]/(R2[:,t,:]*C2[:,t,:])) * I2[:,t-1,:]) + 
                                 ((torch.ones_like(I[:,0,:], requires_grad=True) - torch.exp(-dt_[:,t,:]/(R2[:,t,:]*C2[:,t,:]))) * I[:,t-1,:])
                                ).unsqueeze(1)
            else:
                I1 = I1 + idx * ((torch.exp(-dt_[:,t,:]/(R1[:,t,:]*C1[:,t,:])) * I1_t0.reshape(-1, 1)) + 
                                 ((torch.ones_like(I[:,0,:], requires_grad=True) - torch.exp(-dt_[:,t,:]/(R1[:,t,:]*C1[:,t,:]))) * I[:,t,:])
                                ).unsqueeze(1)

                I2 = I2 + idx * ((torch.exp(-dt_[:,t,:]/(R2[:,t,:]*C2[:,t,:])) * I2_t0.reshape(-1, 1)) + 
                                 ((torch.ones_like(I[:,0,:], requires_grad=True) - torch.exp(-dt_[:,t,:]/(R2[:,t,:]*C2[:,t,:]))) * I[:,t,:])
                                ).unsqueeze(1)
            
        V0 = I * R0
        V1 = I * R1
        V2 = I * R2
            
        Vocv = self.Vocv_func(SOC)
            
        V = torch.relu(Vocv + V0 + V1 + V2)  # Total voltage across cell
        
        # Return data
        return(SOC, V, I1, I2)
    
    
    def closure(self, y_data, x, hidden_states, I1_t0, I2_t0):
        
        h1, c1 = hidden_states

        # predict
        y_pred, (h1, c1) = self.forward(x, (h1, c1))
        
        SOC_theory, V_theory, I1_theory, I2_theory = self.compute_ECM(x[:,:,0:1], x[:,0:1,1:2], x[:,:,2:3], I1_t0, I2_t0)
        
        # calculate loss
        loss_theory = self.criterion(y_pred, V_theory)
        loss_data   = self.criterion(y_pred, y_data)
        loss_ecm    = self.criterion(V_theory, y_data)
        
        loss = (self.L_data * loss_data) + (self.L_theory * loss_theory) + (self.L_ECM * loss_ecm)
        
        self.data_loss_batch.append(self.L_data.item() * loss_data.item())
        self.theory_loss_batch.append(self.L_theory.item() * loss_theory.item())
        self.ecm_loss_batch.append(self.L_ECM.item() * loss_ecm.item())
        
        return loss, (h1, c1), I1_theory[:,-1,:], I2_theory[:,-1,:]
    
    
    def closure_ecm_only(self, y_data, x, hidden_states, I1_t0, I2_t0):
        
        SOC_theory, V_theory, I1_theory, I2_theory = self.compute_ECM(x[:,:,0:1], x[:,0:1,1:2], x[:,:,2:3], I1_t0, I2_t0)
        
        # calculate loss
        loss_ecm = self.criterion(V_theory, y_data) + self.criterion(torch.diff(V_theory, dim=1), torch.diff(y_data, dim=1)) * self.L_ECM

        self.ecm_loss_batch.append(self.L_ECM.item() * loss_ecm.item())
        
        return loss_ecm, hidden_states, I1_theory[:,-1,:], I2_theory[:,-1,:]

In [None]:
model = ECM_PINN(2,25,25,1)
model.Q_nominal = data_file[0]['Q_nominal']

optim     = torch.optim.Adam(model.parameters(), lr=1.0e-3)
ECM_optim = torch.optim.Adam(model.parameters(), lr=1.0e-3)

ECM_path = "D:/OneDrive - Queen's University Belfast/Documents/Thesis/Chapter2_ECM/Saved models/Nissan_data/trained_ECM.pth"
model.load_state_dict(torch.load(ECM_path)['model'].state_dict())

loss_train = []

In [None]:
def plot_tests_standard(model, cycle_num, steps, ecm_only=False):
    model.eval()
    
    n_tests = 1  # number of test batches
    
    with torch.no_grad():
        V_pred   = np.asarray([])
        V_ecm    = np.asarray([])
        SOC_ecm  = np.asarray([])
        SOC_pred = np.asarray([])
        I1_pred  = np.asarray([])
        I2_pred  = np.asarray([])
        
        h0 = torch.zeros(model.n_lstm, n_tests, model.H3)
        c0 = torch.zeros(model.n_lstm, n_tests, model.H3)
        
        I1 = torch.zeros((1,1,1))
        I2 = torch.zeros((1,1,1))

        for step in tqdm(range(steps)):

            Time  = torch.tensor(data_file[cycle_num]['time'][step*step_size:(step+1)*step_size],  requires_grad=False, dtype=my_dtype).unsqueeze(0)
            dt_  = Time[:,1:,:] - Time[:,:-1,:]
            dt_  = torch.concat((dt_, dt_[:,-1:,:]), dim=1)
            
            Imeas = torch.tensor(data_file[cycle_num]['Imeas'][step*step_size:(step+1)*step_size], requires_grad=False, dtype=my_dtype).unsqueeze(0)
            Vmeas = torch.tensor(data_file[cycle_num]['Vmeas'][step*step_size:(step+1)*step_size], requires_grad=False, dtype=my_dtype).unsqueeze(0)
            SOC_batch = SOC[cycle_num,step*step_size:(step+1)*step_size,:].unsqueeze(0).type(my_dtype)

            x_batch = torch.concat((Imeas, SOC_batch, dt_), dim=-1)
            
            if not ecm_only:
                y_pred, (h1, c1) = model(x_batch, (h0, c0))
                V_pred = np.append(V_pred, y_pred[0,:,0].cpu().numpy(), axis=0)

                h0 = h1
                c0 = c1
                        
            SOC_ecm_, V_ecm_, I1, I2 = model.compute_ECM(Imeas, SOC_batch[:,0:1,:], dt_, I1[:,-1:,:], I2[:,-1:,:])
            
            V_ecm  = np.append(V_ecm,  V_ecm_[0,:,0].cpu().numpy(), axis=0)
            SOC_ecm  = np.append(SOC_ecm,  SOC_ecm_[0,:,0].cpu().numpy(), axis=0)
            I1_pred = np.append(I1_pred, I1[0,:,0].cpu().numpy(), axis=0)
            I2_pred = np.append(I2_pred, I2[0,:,0].cpu().numpy(), axis=0)

            
        if ecm_only:
            # Plot voltage
            plt.figure(figsize=(12, 4))
            plt.plot(data_file[cycle_num]['time'][:len(V_ecm)].cpu(), data_file[cycle_num]['Vmeas'][:len(V_ecm)].cpu(), 'k--', alpha=0.8, label='Measured')
            plt.plot(data_file[cycle_num]['time'][:len(V_ecm)].cpu(), V_ecm, 'b', label='ECM Prediction')
            plt.ylabel('Voltage (V)')
            plt.xlabel('Time (s)')
            plt.show()
            plt.close()
            
            # Plot currents through branches
            plt.figure(figsize=(12, 4))
            plt.plot(data_file[cycle_num]['time'][:len(V_ecm)].cpu(), data_file[cycle_num]['Imeas'][:len(V_ecm)].cpu(), 'k', alpha=0.8, label='Measured total I')
            plt.plot(data_file[cycle_num]['time'][:len(V_ecm)].cpu(), I1_pred, label='I1')
            plt.plot(data_file[cycle_num]['time'][:len(V_ecm)].cpu(), I2_pred, label='I2')
            plt.ylabel('Voltage (V)')
            plt.xlabel('Time (s)')
            plt.show()
            plt.close()
            
        else:
            # Plot voltage
            plt.figure(figsize=(12, 4))
            plt.plot(data_file[cycle_num]['time'][:len(V_pred)].cpu(), data_file[cycle_num]['Vmeas'][:len(V_pred)].cpu(), 'k--', alpha=0.8, label='Measured')
            plt.plot(data_file[cycle_num]['time'][:len(V_pred)].cpu(), V_pred, 'r', label='Prediction')
            plt.plot(data_file[cycle_num]['time'][:len(V_pred)].cpu(), V_ecm, 'b', label='ECM Prediction')
            plt.ylabel('Voltage (V)')
            plt.xlabel('Time (s)')
            plt.show()
            plt.close()

In [None]:
#n_samples = nData
batch_size = len(data_file)
batches = 1 #-(-n_samples // batch_size)

step_size = 100
steps = N // step_size #-(-N // step_size)    # Round up to nearest integer (so end point that don't make a complete batch are still included)

increase_steps = False
step_gain = 10

random_steps = False

In [None]:
# Record learning process
model.L_data   = torch.tensor(0.0, requires_grad=True)
model.L_theory = torch.tensor(0.0, requires_grad=True)
model.L_ECM    = torch.tensor(1.0, requires_grad=True)

Epochs = 10000 #- len(loss_train)

model.train()

for ep in tqdm(range(Epochs)):
    
    model.epochs += 1
    loss_batch = []
    model.data_loss_batch = []
    model.theory_loss_batch = []
    model.ecm_loss_batch = []
    
    # Shuffle input and output values
    idx = torch.randperm(len(data_file))
    
    if increase_steps:
        if step_gain > steps:
            steps_this_ep = steps
        else:
            if model.epochs > 1:
                if loss_train[-1] < 1.0:
                    step_gain += 1
                    print(f'Training steps increased to {step_gain}')
            steps_this_ep = step_gain

    else:
        steps_this_ep = steps
    
    if random_steps:
        steps_this_ep = int(steps_this_ep * torch.rand((1,)).item())
        if steps_this_ep < 1:
            steps_this_ep = 1
    else:
        steps_this_ep = steps_this_ep
    


        
    for batch in range(batches):
        
        batch_start = batch * batch_size
        batch_end = (batch+1) * batch_size

        h1 = torch.zeros(model.n_lstm, batch_end - batch_start, model.H3)
        c1 = torch.zeros(model.n_lstm, batch_end - batch_start, model.H3)
        
        I1_t0 = torch.zeros((batch_size, 1), requires_grad=True)
        I2_t0 = torch.zeros((batch_size, 1), requires_grad=True)
                
        for step in range(steps_this_ep):

            hidden_states = (h1, c1)
            
            Time      = torch.zeros((batch_size, step_size, 1))
            Imeas     = torch.zeros_like(Time)
            Vmeas     = torch.zeros_like(Time)
            SOC_batch = torch.zeros_like(Time)

            if (step+1)*step_size < len(data_file[0]['time']):
                
                for ii in range(batch_size):                
                    Time[ii]      = data_file[ii]['time'][step*step_size:(step+1)*step_size].unsqueeze(0)
                    Imeas[ii]     = data_file[ii]['Imeas'][step*step_size:(step+1)*step_size].unsqueeze(0)
                    Vmeas[ii]     = data_file[ii]['Vmeas'][step*step_size:(step+1)*step_size].unsqueeze(0)
                    SOC_batch[ii] = SOC[ii, step*step_size:(step+1)*step_size, :].type(my_dtype)
                    
            else:
                for ii in range(batch_size):    
                    Time[ii]      = data_file[ii]['time'][step*step_size:].unsqueeze(0)
                    Imeas[ii]     = data_file[ii]['Imeas'][step*step_size:].unsqueeze(0)
                    Vmeas[ii]     = data_file[ii]['Vmeas'][step*step_size:].unsqueeze(0)
                    SOC_batch[ii] = SOC[ii, step*step_size:, :].type(my_dtype)
                
            dt_  = Time[:,1:,:] - Time[:,:-1,:]
            dt_  = torch.concat((dt_, dt_[:,-1:,:]), dim=1)
                
            x_batch = torch.concat((Imeas.detach(), SOC_batch.detach(), dt_.detach()), dim=-1)
            y_batch = Vmeas.detach()

            x_batch.requires_grad = True
            y_batch.requires_grad = True

            model.zero_grad()
            optim.zero_grad()

            loss_, hidden_states, I1_t0, I2_t0 = model.closure(y_batch, x_batch, hidden_states, I1_t0, I2_t0)
            #loss_ecm, hidden_states, I1_t0, I2_t0 = model.closure_ecm_only(y_batch, x_batch, hidden_states, I1_t0, I2_t0)
            
            loss_.backward()
            optim.step()

            loss_batch = np.append(loss_batch, loss_.clone().item())

            h1, c1 = hidden_states
            h1 = h1.clone().detach()
            c1 = c1.clone().detach()
            I1_t0 = I1_t0.clone().detach()
            I2_t0 = I2_t0.clone().detach()

            h1.requires_grad = True
            c1.requires_grad = True
            I1_t0.requires_grad = True
            I2_t0.requires_grad = True
            
        #if batch % 1 == 0:
            #print(f"Batch {batch+1}/{batches}\t|  Loss: {np.mean(loss_batch)}")
            
    loss_train.append(np.mean(loss_batch))
    model.data_loss.append(np.mean(model.data_loss_batch))
    model.theory_loss.append(np.mean(model.theory_loss_batch))
    model.ecm_loss.append(np.mean(model.ecm_loss_batch))

In [None]:
# Plot training progress
plt.figure(figsize=(12, 4))
plt.plot(loss_train, 'tab:blue', label='Total')
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.yscale('log')
plt.legend()
plt.show()
plt.close()

plt.figure(figsize=(12, 4))
plt.plot(model.data_loss,  'tab:red',  label='Data', alpha=1.0)
plt.plot(model.theory_loss,  'tab:blue',  label='Theory', alpha=1.0)
plt.plot(model.ecm_loss,  'tab:orange',  label='ECM params', alpha=1.0)
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.yscale('log')
plt.xlim([0, len(model.data_loss)])
plt.legend()
plt.show()
plt.close()

plt.figure(figsize=(12, 4))
plt.plot(model.ecm_loss,  'tab:orange',  label='ECM params', alpha=1.0)
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.yscale('log')
plt.xlim([0, len(model.data_loss)])
plt.legend()
plt.show()
plt.close()

In [None]:
for cycle_num in range(len(data_file)):
    print(file_path[cycle_num]['name'])
    plot_tests_standard(model, cycle_num, steps, ecm_only=False)

In [None]:
with torch.no_grad():
    v = model.Vocv_func(torch.linspace(0,1,50).reshape(-1,1))
    
    plt.plot(torch.linspace(0,1,50).cpu(), v[:,0].cpu())
    plt.show()
    plt.close()
    
    v = model.R0_func(torch.linspace(0,1,50).reshape(-1,1))
    
    plt.plot(torch.linspace(0,1,50).cpu(), v[:,0].cpu())
    plt.show()
    plt.close()
    
    v = model.R1_func(torch.linspace(0,1,50).reshape(-1,1))
    
    plt.plot(torch.linspace(0,1,50).cpu(), v[:,0].cpu())
    plt.show()
    plt.close()
    
    v = model.R2_func(torch.linspace(0,1,50).reshape(-1,1))
    
    plt.plot(torch.linspace(0,1,50).cpu(), v[:,0].cpu())
    plt.show()
    plt.close()

In [None]:
save_path = 'ECM_PINN_v4-6_Corrections.pth'

torch.save({
            'model': model,
            'optimizer_state_dict': optim.state_dict(),
            'loss': loss_train,
            }, save_path)