In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp /content/drive/MyDrive/pytorch_colab/rolling_and_plot.py .

In [None]:
# import os
# assert os.environ['COLAB_TPU_ADDR'], 'Make sure to select TPU from Edit > Notebook settings > Hardware accelerator'
# # !pip install cloud-tpu-client==0.10 torch==1.11.0 https://storage.googleapis.com/tpu-pytorch/wheels/colab/torch_xla-1.11-cp37-cp37m-linux_x86_64.whl

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import ToTensor, Lambda

# import torch_xla
# import torch_xla.core.xla_model as xm

from sklearn.preprocessing import MaxAbsScaler, MinMaxScaler
from sklearn.model_selection import train_test_split

from dataclasses import dataclass

In [None]:
import numpy as np
import pandas as pd
from rolling_and_plot import *

%reload_ext autoreload
%autoreload 2

In [None]:
# Get cpu or gpu device for training.
# device = xm.xla_device()
device = torch.device("cuda")
print(f"Using {device} device")

# TOC

* [Preprocessing](#pre)

* [Data Loading](#dload)

* [Models](#model)

<a id="pre"></a>
# PreProcessing 

## <a id="g">G class</a>

In [None]:
@dataclass
class G:
#     split_time = None #for now, might be useless
    # sequence_length = 5
    num_features = 3 # delta_t, current, voltage, and soc at t-1
    window_size = 512
    batch_size = 16
    epochs = 100

In [None]:
# from google.colab import files
file = pd.read_csv("/content/")

In [None]:
data_plot(data = [file],
          title="OCV v SOC",
          x = ["test time (sec)"],
          y = ["soc"],
          markers = "lines",
          color = "darkorchid",
          x_title = "Test Time (sec)",
          y_title = "SOC"
         )

In [None]:
file = normalize(file.loc[:,["current","voltage","soc"]].iloc[::7])
#uses sklearn.preprocessing

In [None]:
x_train, x_test, y_train, y_test = rolling_split(file, G.window_size)
x_train.shape, x_test.shape, y_train.shape, y_test.shape
#uses sklearn.model_selection

In [None]:
# np.array_split(train,len(train)//6) not applicable here

# Data Loader <a id="dload"></a>

In [None]:
class BatterySet(Dataset):
    def __init__(self, x: np.ndarray, y: np.ndarray):
    
        self.logits = torch.from_numpy(x.squeeze()).to(device)
        self.labels = torch.from_numpy(y).to(device)
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return (self.logits[idx], self.labels[idx])

In [None]:
train_dataloader = BatterySet(x_train, y_train)
test_dataloader = BatterySet(x_test, y_test)

In [None]:
train_dataloader = DataLoader(train_dataloader, batch_size=G.batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataloader, batch_size=G.batch_size, shuffle=False)

In [None]:
for X,y in train_dataloader.dataset:
    print(f"Shape of X [window, features]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
for batch, (x,y) in enumerate(test_dataloader.dataset):
    print(batch,x,y)
    break

# Creating Models <a id="model"></a>

Go to [G class](#g)

In [None]:
# # Get cpu or gpu device for training.
# device = torch.device("mps")
# print(f"Using {device} device")

# Define model
class LSTMNetwork(nn.Module):
    
    def __init__(self):
        super(LSTMNetwork, self).__init__()

        self.batch_norm1 = nn.BatchNorm1d(G.batch_size, momentum = 0.15,eps=1.0e-5)
        self.dropout = nn.Dropout(0.4)


        self.lstm = nn.LSTM(G.num_features, G.batch_size, 3, batch_first = True, dropout = 0.25)
        for name, param in self.lstm.named_parameters():
            if 'bias' in name:
                nn.init.uniform_(param, a=0.001, b=0.04)
            elif 'weight_ih' in name:
                nn.init.kaiming_normal_(param, nonlinearity = "relu")
            elif 'weight_hh' in name:
                nn.init.orthogonal_(param)

        # self.linear_relu_stack = nn.Sequential(
        #     nn.Linear(G.batch_size, G.batch_size // 4),
        #     nn.ReLU(),
        #     nn.Linear(G.batch_size // 4, 1)
        #     )
        # for name, param in self.linear_relu_stack.named_parameters():
        #     if 'bias' in name:
        #         nn.init.uniform_(param, a=0.0001, b=0.999)
        #     elif 'weight' in name:
        #         nn.init.kaiming_normal_(param, nonlinearity = "relu")

        self.linear1 = nn.Linear(G.batch_size, G.batch_size)
        for name, param in self.linear1.named_parameters():
            if 'bias' in name:
                nn.init.uniform_(param, a=0.01, b=0.09)
            elif 'weight' in name:
                nn.init.kaiming_normal_(param, nonlinearity = "relu")
        
        self.linear2 = nn.Linear(G.batch_size, 1)
        for name, param in self.linear2.named_parameters():
            if 'bias' in name:
                nn.init.uniform_(param, a=0.05, b=0.09)
            elif 'weight' in name:
                nn.init.kaiming_normal_(param, nonlinearity = "relu")

#     def l2_normalize(self, x, dim = 1):
#         "apparently weight decay in the optimize functions does l2 regularization"
#         return nn.functional.normalize(x, p = 2.0 , dim = dim)
    
    def forward(self, x):

        x_out, (h_n, c_n)  = self.lstm(x)
        for i in [x_out, h_n, c_n]:
            i = i.squeeze() # removes any extra one-sized dimensions created
            i = self.batch_norm1(i)
        out = self.dropout(self.linear1(h_n[-1])) # sends the final layer's hidden state values to the Dense Layer
        out = nn.ReLU()(out)
        out = self.linear2(out)

        return out

In [None]:
model = LSTMNetwork().to(device)
print(model)

In [None]:
model.load_state_dict(torch.load("drive/MyDrive/pytorch_colab/model_state_dict.pth", map_location = device))
optimizer.load_state_dict(torch.load("drive/MyDrive/pytorch_colab/optimizer_state_dict.pth", map_location = device))
model.train()
# model.eval()

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)

    for batch, (x,y) in enumerate(dataloader.dataset):
        optimizer.zero_grad() #resets the gradient graph, a pytorch shortcoming that is required
        
        #forward
        predict = model(x)
        loss = loss_fn(predict, y)

        #backward
        loss.backward() #Backprop on 1-D Dense Layer output doesnt work on M1 Mac's "mps" device
                        # This is apparently fixed in PyTorch 1.13.0 which has not been officially released yet
                        # I am also too lazy to build 1.13.0 from the master repo on their github
        optimizer.step()

        if loss.isnan():
            print("loss was NaN")
            break

        if batch % (size // 4) == 0:
            loss, current = loss.item(), batch
            print(f"loss: {loss:>7f}  [{current:4d}/{size:4d}]")
    scheduler.step(loss) # reduces the learning rate based on a scheduler

def test_loop(dataloader, model, loss_fn, prev_loss:list):
    size = len(dataloader.dataset)
    num_batches = size // G.batch_size
    test_loss, perc_error = 0.0, 0.0
    counter = 0
    with torch.no_grad(): #doesnt update parameters (we are testing not training)
        for x,y in dataloader.dataset:
            predict = model(x).reshape(y.shape)
            test_loss += loss_fn(predict, y).item()
            perc_error += torch.mean(torch.abs(predict - y) / (y+ 1e-7) * 100, 0)
            #### will have to change the way correct is computed!!
           
            counter += 1
            if counter % (size // 4) == 0:
                print(f"{counter} / {size} tested")
            
            
    test_loss /= num_batches
    prev_loss.append(test_loss)
    perc_error /= size
    print(f"Test Error: \nAverage Accuracy: {100 - perc_error}%, Avg Loss: {test_loss:>8f}\n")
    return perc_error, test_loss

## Log Cosh Loss Function

In [None]:
def log_cosh_loss(y_pred: torch.tensor, y_ground: torch.tensor) -> torch.tensor:
    x = y_pred - y_ground
    return x + torch.nn.functional.softplus(-2. * x) - np.log(2.0)

class LogCoshLoss(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self,
                y_pred: torch.Tensor,
                y_true: torch.Tensor) -> torch.Tensor:
        return log_cosh_loss(y_pred, y_true)

**Literature**

The LogCoshLoss is the Loss function used by Hannan et al. in their article in the Journal *Nature*: [Deep learning approach towards accurate state of charge estimation for lithium-ion batteries using self-supervised transformer model](https://www.nature.com/articles/s41598-021-98915-8).

However they used a Transformer Network which I am doing as well in another project implemented in Tensorflow.

In [None]:
# loss_fn = nn.SmoothL1Loss()
# loss_fn = nn.HuberLoss()
# # loss_fn = nn.MSELoss()
loss_fn = LogCoshLoss()
# loss_fn = nn.KLDivLoss()

#I am trying no weight decay for a bit
optimizer = torch.optim.Adam(model.parameters(),
                             lr = 0.005,
                             weight_decay= 1e-4
                            )

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", factor = 0.15, patience = 2, cooldown = 1, verbose=True)
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 8, gamma=0.4, last_epoch=-1, verbose=True)

# Training

In [None]:
for name, param in model.linear1.named_parameters():
    if "bias" in name:
        print(name, param)

In [None]:
prev_loss = []
for epoch in range(G.epochs):
    print(f"Epoch {epoch+1}/{G.epochs}\n----------------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn, prev_loss)
    # if len(prev_loss) > 4 and [prev_loss[-1]] * 3 == [prev_loss[-i] for i in range(2,5)]:
    #     print("Test ended early because predictions are not changing")
    #     break

    if epoch % 25 == 0:
        torch.save(model.state_dict(), "drive/MyDrive/pytorch_colab/model_state_dict.pth")
        torch.save(optimizer.state_dict(), "drive/MyDrive/pytorch_colab/optimizer_state_dict.pth")

print("Completed")

In [None]:
torch.save(model.state_dict(), "drive/MyDrive/pytorch_colab/model_state_dict.pth")
torch.save(optimizer.state_dict(), "drive/MyDrive/pytorch_colab/optimizer_state_dict.pth")

In [None]:
pred = []
with torch.no_grad():
    for i in range(len(test_dataloader.dataset)):
        pred.append(model(test_dataloader.dataset.logits[i]))

print(max(pred), min(pred))

In [None]:
np_pred = np.array([p.detach().cpu().numpy() for p in pred])
np_labels = torch.clone(test_dataloader.dataset.labels).detach().cpu().numpy()

visualize = pd.DataFrame(data={"pred": np_pred.squeeze(),
                               "labels": np_labels.squeeze()})
visualize

In [None]:
with torch.no_grad():
    predi = model(train_dataloader.dataset.logits[100])
    print(predi, train_dataloader.dataset.labels[100])