<a name='0'></a>
## Import

In [None]:
from google.colab import drive
drive.mount('content/drive')

In [None]:
!cp /content/drive/MyDrive/transformer_soc/rolling_and_plot.py .
!cp /content/drive/MyDrive/transformer_soc/220617_soc.csv .

In [59]:
from os import environ
environ["TF_CPP_MIN_LOG_LEVEL"] = "1"
# removes tensorflow warnings triggered because of Tensorflow incompatibility with my Apple M1 chip.
# ignore this when using a non Apple Silicon device, ie. Google Colab or the likes.

import tensorflow as tf
from tensorflow.keras.layers import MultiHeadAttention, Dense, Input, Dropout, BatchNormalization

from sklearn.preprocessing import MaxAbsScaler, MinMaxScaler
from sklearn.model_selection import train_test_split

from dataclasses import dataclass

In [None]:
import numpy as np
import pandas as pd
from transformer_helper import *
from rolling_and_plot import data_plot, rolling_split, normalize, validate

%reload_ext autoreload
%autoreload 2

Will have to figure out how to set device to cuda in TensorFlow

## Table of Contents

- [Import](#0)
- [Preprocessing](#win)
- [Encoder](#enc)
    - [Encoder Layer](#enc-lay)
    - [Full Encoder](#full-enc)
- [Decoder](#dec)
    - [Decoder Layer](#dec-lay)
    - [Full Decoder](#full-dec)
- [Transformer](#transform)

# Literature:


According to [A Transformer-based Framework for Multivariate Time Series Representation Learning](https://dl.acm.org/doi/abs/10.1145/3447548.3467401):
Using **Batch Normalization is significantly more effective** for multivariate time-series than using the traditional Layer Normalization method found in NLP.

In addition, according to [Deep learning approach towards accurate state of charge estimation for lithium-ion batteries using self-supervised transformer model](https://www.nature.com/articles/s41598-021-98915-8#Sec9):
Using a transformer network while **forgoing the Decoder Layer** is more effective for the application of State-of-Charge estimation.

$\large{Self\ Attention}$
$$
\text { Attention }(Q, K, V)=\operatorname{softmax}\left(\frac{Q K^{T}}{\sqrt{d_{k}}}+{M}\right) V
$$

$\large{Input}$

Voltage, Current, SOC at times:
$$t - window\_size - 1 \rightarrow t - 1 $$

**Note**

Cannot use embedding layers with battery data because of floating point values and negative values

In [8]:
@dataclass
class G:
    num_features = 3 # current, voltage, and soc at t minus G.window_size -> t minus 1
    window_size = 64
    batch_size = 16
    epochs = 10
    dense_dim = 32
    model_dim = 128
    num_heads = 16
    num_layers = 6

<a id="win"></a>
## Preprocessing

In [None]:
# from google.colab import files
file = pd.read_csv("/content/220617_soc.csv")

In [None]:
data_plot(data = [file],
          title="OCV v SOC",
          x = ["test time (sec)"],
          y = ["soc"],
          markers = "lines",
          color = "darkorchid",
          x_title = "Test Time (sec)",
          y_title = "SOC"
         )

In [None]:
file = normalize(file.loc[:,["current","voltage","soc"]].iloc[::5])
#uses sklearn.preprocessing

In [None]:
x_train, x_test, y_train, y_test = rolling_split(file, G.window_size)
x_train.shape, x_test.shape, y_train.shape, y_test.shape
#uses sklearn.model_selection

<a name='enc'></a>
## Encoder

In [183]:
def FullyConnected(dense_dim = G.dense_dim):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dense_dim, activation='relu'),  # (G.batch_size, G.dense_dim)
        tf.keras.layers.Dense(dense_dim, activation='relu'),  # (G.batch_size, G.dense_dim)
        tf.keras.layers.Dense(G.num_features)  # (G.batch_size, G.num_features)
    ])

<a name='enc-lay'></a>
###  Encoder Layer

In [186]:
class EncoderLayer(tf.keras.layers.Layer):
    """
    The encoder layer is composed by a multi-head self-attention mechanism,
    followed by a simple, positionwise fully connected feed-forward network. 
    This archirecture includes a residual connection around each of the two 
    sub-layers, followed by batch normalization.
    """
    def __init__(self,
                 num_heads,
                 num_features,
                 dense_dim,
                 dropout_rate=0.1,
                 batchnorm_eps=1e-6):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(num_heads = num_heads,
                                      key_dim = num_features,
                                      dropout = dropout_rate,
                                     )
        #feed-forward-network
        self.ffn = FullyConnected(dense_dim = dense_dim)
        
        
        self.batchnorm1 = BatchNormalization(epsilon=batchnorm_eps)
        self.batchnorm2 = BatchNormalization(epsilon=batchnorm_eps)

        self.dropout_ffn = Dropout(dropout_rate)
    
    def call(self, x, training):
        """
        Forward pass for the Encoder Layer
        
        Arguments:
            x -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
            training -- Boolean, set to true to activate
                        the training mode for dropout layers
            mask -- Boolean mask to ensure that the padding is not 
                    treated as part of the input
        Returns:
            encoder_layer_out -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
        """
        # Dropout is added by Keras automatically if the dropout parameter is non-zero during training
        attn_output = self.mha(query = x,
                               value = x) # Self attention
        
        out1 = self.batchnorm1(tf.add(x, attn_output))  # (G.batch_size, G.window_size, G.dense_dim)
        
        ffn_output = self.ffn(out1)
    
        ffn_output =  self.dropout_ffn(ffn_output)
        
        encoder_layer_out = self.batchnorm2(tf.add(ffn_output, out1))
        # (G.batch_size, G.window_size, G.num_features)
        return encoder_layer_out

<a name='full-enc'></a>
### Full Encoder

In [196]:
class Encoder(tf.keras.layers.Layer):
    """
    The entire Encoder starts by passing the input to an embedding layer 
    and using positional encoding to then pass the output through a stack of
    encoder Layers
        
    """  
    def __init__(self,
                 num_layers = G.num_layers,
                 num_heads = G.num_heads,
                 num_features = G.num_features,
                 dense_dim = G.dense_dim,
                 input_size = G.num_features,
                 maximum_position_encoding = G.window_size,
                 dropout_rate=0.1,
                 batchnorm_eps=1e-6):
        
        super(Encoder, self).__init__()

#         self.embedding_dim = embedding_dim
        self.num_layers = num_layers

#         self.embedding = Embedding(input_vocab_size, self.embedding_dim)
        self.pos_encoding = positional_encoding(maximum_position_encoding, 
                                                input_size)


        self.enc_layers = [EncoderLayer(num_heads = num_heads,
                                        num_features = num_features,
                                        dense_dim = dense_dim,
                                        dropout_rate = dropout_rate,
                                        batchnorm_eps = batchnorm_eps) 
                           for _ in range(self.num_layers)]

        self.dropout = Dropout(dropout_rate)
        
    def call(self, x, training):
        """
        Forward pass for the Encoder
        
        Arguments:
            x -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
            training -- Boolean, set to true to activate
                        the training mode for dropout layers
            mask -- Boolean mask to ensure that the padding is not 
                    treated as part of the input
        Returns:
            out2 -- Tensor of shape (G.batch_size, G.window_size, G.dense_dim)
        """
        seq_len = tf.shape(x)[1]
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x)
        
        
        for i in range(self.num_layers):
            x = self.enc_layers[i](x,training)
            
        # only need the final time's data : time = t-1 from the window
        # x has shape (G.batch_size, G.window_size, G.dense_dim)
        # but I am only returning time t-1:
        return x[:, -1, :] # (G.batch_size, G.dense_dim)

<a name='transform'></a> 
## Transformer

In [198]:
class Transformer(tf.keras.Model):
    """
    Complete transformer with an Encoder and a Decoder
    """
    def __init__(self,
                 num_layers = G.num_layers,
                 num_heads = G.num_heads,
                 dense_dim = G.dense_dim,
                 max_positional_encoding_input = G.window_size,
                 max_positional_encoding_target = G.window_size,
                 dropout_rate=0.1,
                 batchnorm_eps=1e-6):
        super(Transformer, self).__init__()

        self.encoder = Encoder()

#         self.decoder = Decoder() # No Decoder is the method used by Hannan et al in the Journal Nature


        self.final_stack = tf.keras.Sequential(tf.keras.layers.Dense(dense_dim,activation = "relu"),
                                               tf.keras.layers.Dense(1, activation = "relu")
                                              )
    
    def call(self, x, y, training, look_ahead_mask, dec_padding_mask):
        """
        Forward pass for the entire Transformer
        Arguments:
            input_data -- Tensor of shape (batch_size, input_seq_len, fully_connected_dim)
                              An array of the windowed voltage, current and time data
            output_soc -- Tensor of shape (batch_size, target_seq_len, fully_connected_dim)
                              An array of the indexes of the words in the output sentence
            training -- Boolean, set to true to activate
                        the training mode for dropout layers
            enc_padding_mask -- Boolean mask to ensure that the padding is not treated as part of the input
            look_ahead_mask -- Boolean mask for the target_input
            dec_padding_mask -- Boolean mask for the second multihead attention layer
        Returns:
            final_output -- SOC prediction at time t
            attention_weights - Dictionary of tensors containing all the attention weights for the decoder
                                each of shape Tensor of shape (batch_size, num_heads, target_seq_len, input_seq_len)
        
        """
        enc_output = self.encoder(x, training) # (G.batch_size, G.dense_dim)
        
#         dec_output, attention_weights = self.decoder(output_sentence, enc_output, training,
#                                                      look_ahead_mask, dec_padding_mask)
#         assert dec_output.shape == 
        
        final_output = self.final_dense(enc_output) # (G.batch_size, 1)

    
        return final_output

In [None]:

x = tf.random.uniform((G.batch_size, G.window_size, G.num_features))
y = tf.random.normal((G.batch_size,1))
# Don't need padding masks for battery data because the zeros are important values
# enc_padding_mask = create_padding_mask(x)
# dec_padding_mask = create_padding_mask(y)
look_ahead_mask = create_look_ahead_mask(x.shape[1])


t = Transformer()
x = t(x,y,True, look_ahead_mask, dec_padding_mask)
x

<a id = "train"></a>
# Training

In [200]:
tf.keras.optimizers.Adam?

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate = 0.1,
                                     beta1 = 0.9,
                                     beta2 = 0.999
                                    )
loss_fn = tf.keras.losses.LogCosh()