In [None]:
#Input parameters
startDate = "01/01/2021" # {"startDate": "dd/mm/YYYY"}
endDate = "31/12/2021" # {"startDate": "dd/mm/YYYY"}
assetSn = "UPS A8"


In [None]:
#pip install  git+https://bitbucket.org/freemens/ion_sdk.git@assembly --upgrade

In [None]:
AMAZON_FACTORY_API= 'https://amazon.altergo.io/'
AMAZON_IOT_API= 'https://iot.amazon.altergo.io/'
apiKey="c65cd9531694143e4a747c544fe1ae7c"

In [None]:
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from functools import reduce
import pandas as pd
from getpass import getpass
import ion_sdk.edison_api.edison_api as eapi
from ion_sdk.edison_api.models.factoryModel import EdisonGenericComponent, Model,CurrentState
from typing import List, Optional, Union, Iterable, Callable


## Lair Functions

In [None]:
#@title Base Call for the Calendar Ageing Model (LAIR)

class CalendarAging():

    def __init__(self, params_dict: dict):
        # Initialise the Cycle Aging class.
        self._alpha = params_dict.get("alpha", None)
        if self.alpha is None:
            self.alpha = self.alpha_function
        self._integrate = params_dict.get("integrate", None)
        if self.integrate is None:
            self.integrate = self.bdf1
        self._Ea = params_dict["Ea"]
        self._z = params_dict["z"]
        if len(params_dict["k"]) == 2:
            self._k = params_dict["k"]
        else:
            raise ValueError("Length of k should be 2.")

    def calibrate(self, train_df: pd.DataFrame):
        # Function to calibrate the values of ks given the training data (x, y)
        # Steps Description:
        # Function to get the estimation and returning the MSE
        # Run this in a minimizer.
        
        pass
    
    def _get_Q(self, x_data: pd.DataFrame, params):
        # Function to calculate Q from the input_df that has been split into cycles.
        df = x_data.copy()
        df["alpha"] = [self.alpha(params[:11], params[11], row["SoC"], row["DoD"], row["C_rate"], row["T"]) for _, row in df.iterrows()]
        output_df = self.integrate(df, params[12])
        return output_df["Q"]

    def estimate(self, input_df: pd.DataFrame, init_SoH: float = 1.0):
        # Estimate the value of SoH given the input dataframe.
        # Steps Description:
        # The input dataframe contains the characteristic values (SoC, T)
        # The dataframe is used to calculate the RHS of the differential equation.
        # Since the characteristic values are not continuous, but rather (x,y) pairs, we cannot use a direct integration method from scipy.
        # Thus, we manually implement BDF Order 1 (Forward Euler, easy to implement, everybody understands it). 
        # One thing to immediately note is that since the function is non-linear, BDF1 may face stability issues. 
        # Given the "soft" nature of the differential equation, I do not see this as a problem. However, as a back-up, we can do a higher order BDF to aid in stability, or go for AB/AB2/AM/AM2.
        df = input_df.copy()
        df["alpha"] = [self._get_alpha(row["SoC"], row["T"]) for _, row in df.iterrows()]
        output_df = self.integrate(df, self.z)
        output_df.loc[:,"Q"] += init_SoH
        output_df.loc[:,"Q"].fillna(init_SoH, inplace=True)
        return output_df

    def predict(self, input_df: pd.DataFrame):
        # Predict the future value of SoH given the historic input dataframe till a particular time in the future.
        # Steps Description:
        # The input dataframe is already split into cycles, and the characteristic values of the cycle (SoC, DoD, C_rate, T)
        # The cycles dataframe is extended (duplicated) further to some point in the future.
        # The extended cycles dataframe is used to calculate the RHS of the differential equation.
        # The steps are similar to estimate now.
        pass
    
    @property
    def alpha(self):
        return self._alpha

    @alpha.setter
    def alpha(self, func: Callable):
        if isinstance(func, Callable):
            self._alpha = func
        else:
            raise TypeError("Provided alpha must be callable.")
            
    @property
    def integrate(self):
        return self._integrate

    @integrate.setter
    def integrate(self, func: Callable):
        if isinstance(func, Callable):
            self._integrate = func
        else:
            raise TypeError("Provided integrate must be callable.")
    
    @property
    def Ea(self):
        return self._Ea

    @Ea.setter
    def Ea(self, val: float):
        self._Ea = val 
    
    @property
    def z(self):
        return self._z

    @z.setter
    def z(self, val: float):
        self._z = val

    @property
    def k(self):
        return self._k

    @k.setter
    def k(self, val: list):
        if isinstance(val, Iterable):
            if len(val) == 2:
                self._k = val
            else:
                raise ValueError("Length of Iterable provided should be 2.")
        else:
            raise TypeError(f"Cannot set k to a {type(val)}. It must be an Iterable.")

    def _get_alpha(self, SoC, T):
        # This can be sped up by using a full-blown matrix multiplication. However, I believe this goes against the notion of 
        # having an extremely flexible function for alpha. If such is not the case, I will vectorise it.
        return self.alpha(self.k, self.Ea, SoC, T)

    # Default functions (if User does not provide a function to obtain alpha or perform integration)
    @staticmethod
    def bdf1(input_df: pd.DataFrame, z):
        # Method to integrate the given the dataframe with the values of alpha and Ah throughput. The method is customised for this particular case
        df = input_df.copy()
        df["dt"] = df["t"].diff().fillna(0)
        df["dQ"] = -1 * (df["t"] ** (z - 1)) * df["alpha"] * z * df["dt"]
        df["dQ"].fillna(0, inplace=True)
        df["Q"] = df["dQ"].cumsum()
        return df

    @staticmethod
    def alpha_function(k, Ea, SoC, T):
        R = 8.314462
        # Switch Case to make the piecewise SoC Function
        if SoC < 0:
            raise ValueError("SoC cannot be less than 0.")
        elif (SoC >=0) and (SoC <= 0.3):
            A = k[0] * 1
        elif (SoC > 0.3) and (SoC <= 0.6):
            A = k[0] * 2
        elif (SoC > 0.6) and (SoC <= 1):
            A = k[0] * 3
        else:
            raise ValueError("SoC cannot be greater than 1.")
        return A * k[1] * np.exp(-Ea / R / T)

In [None]:
#@title Functional Execution Definitions (LAIR)

def estimate_calendar_SoH(params_dict: dict, input_df: pd.DataFrame):
    # Creates a CalendarAging class, and estimates the SoH given the input dataframe.
    pass

def predict_calendar_SoH(params_dict: dict, input_df: pd.DataFrame):
    # Creates a CalendarAging class, and predicts the SoH given the input dataframe till a particular time in the future.
    pass

def calibrate_calendar_SoH(params_dict: dict, train_df: pd.DataFrame):
    pass

## Asset fetching 

In [None]:
## Connect the client
edApi=eapi.Client(apiKey,AMAZON_FACTORY_API,AMAZON_IOT_API)

In [None]:
# Fetch the asset via serial number
print('test')
assets = edApi.getAssets(assetSn,20)
asset = None
for a in assets:
    if a.serial_number == assetSn:
        asset = a
        
print(asset.serial_number)

In [None]:
# Check if there are children

batteries = []

if len(asset.current_state.child_components) > 0:
    childs = asset.current_state.child_components
    for c in childs:
        if c.model.category.name == 'Battery':
            batteries.append(c)

for b in batteries:
    print(b.serial_number)
    

In [None]:
startDate = "01/01/2021" # {"startDate": "dd/mm/YYYY"}
endDate = "31/12/2021" # {"startDate": "dd/mm/YYYY"}

startDate = list(map(int,startDate.split('/')))


endDate = list(map(int,endDate.split('/')))


In [None]:
print(startDate)

In [None]:
startDate = eapi.edisonDate(startDate[2],startDate[1],startDate[0],00,00)
endDate = eapi.edisonDate(endDate[2],endDate[1],endDate[0],00,00)

In [None]:
sensorNameList=["SoC","Average Cell Temperature"]

req={
    "assets":batteries,
    "sensorNames":sensorNameList,
    "startDate":startDate,
    "endDate":endDate
        }
edApi.getAssetDataFrame(**req)

In [None]:
def fix_soc(x):
    if x < 0:
        return 0
    elif x > 1:
        return 1
    else:
        return x

def fix_T(x):
    if x < 253.2:
        return 253.2 # Guarding against floating point error
    elif x > 373.1:
        return 373.1
    else:
        return x


In [None]:
for bat in batteries:
    bat.df["Average Cell Temperature"] = bat.df["Average Cell Temperature"] + 273.15 # Adjust Temperature
    bat.df["SoC"] /= 100 # Adjust SoC
    bat.df["SoC"] = bat.df["SoC"].apply(lambda x: fix_soc(x)) # Fix SoC
    bat.df["Average Cell Temperature"] = bat.df["Average Cell Temperature"].apply(lambda x: fix_T(x)) # Fix T
    bat.df = bat.df[~((bat.df["SoC"].isna()) | (bat.df["Average Cell Temperature"].isna()))] # Drop any NaN rows
    bat.df["T"] = bat.df["Average Cell Temperature"]

#add time column in no. of days
    if "t" not in bat.df.columns:
                # Convert the index to a time column
                bat.df.index = pd.to_datetime(bat.df.index)
                bat.df["t"] = bat.df.index.to_series().diff().dt.total_seconds().fillna(0).cumsum() / 3600 / 24 + 0.0

    bat.df.drop(["Average Cell Temperature"], axis = 1,inplace= True)

In [None]:
print(bat)


# Calendar aging parameters

In [None]:
#@title Calendar Aging Parametersa
#  soc parameters
a = 0.22773873
b = 0.05369682
c = 2.30882327
d = -0.15441163
e = -0.83467311
f = 1.41733655

x = np.array([ a,  b,  c, d, e , f])

def f_soc(soc):
    if soc < 0:
        raise ValueError()
    elif soc <= 0.1:
        return x[0:2].dot([soc, 1])
    elif soc <= 0.50:
        return x[2:4].dot([soc, 1])
    elif soc <= 1.0:
        return x[4:6].dot([soc, 1])
    else:
        raise ValueError()

def f_T(Ea, T):
    R = 8.314462
    return np.exp(-Ea / R / T) / temp_scale 

def new_alpha_function(k, Ea, SoC, T):
    return k[0] * f_soc(SoC) * k[1] * f_T(Ea, T)

In [None]:


for bar in batteries:
    Ea = 69505.17765
    true_SoC_scale = 275495513.62328136
    temp_scale = 1.8537064913229408e-10
    CalModel = CalendarAging(
        {
            "alpha": new_alpha_function,
            "k": [true_SoC_scale, temp_scale],
            "Ea": Ea, 
            "z": 1.0,
        }
    )
    bar.df = CalModel.estimate(bar.df, init_SoH=1.0)

In [None]:
def getDtSn(sn):
    dtSn = sn +('-DT')
    return dtSn

In [None]:
for bat in batteries:
    newSn = bat.serial_number + '-DT'
    print(newSn)
    edApi.createAssetBySerial('rack',newSn)

In [None]:
digitalTwinAssets = []
for bat in batteries :
    dtSn = getDtSn(bat.serial_number) 
    digitalTwinAssets.append(edApi.getAsset(dtSn))

for dt in digitalTwinAssets:
    print(dt.serial_number)

In [None]:
for bat in batteries:
    for dt in digitalTwinAssets:
        if bat.serial_number in dt.serial_number:
            bat.df["SoH"] = bat.df["Q"]
            dt.df = bat.df
            uploadSensorList = ["SoH"]
            edApi.updateSensorDataByFile(dt, uploadSensorList)

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib import pyplot as plt
import matplotlib as mpl
from matplotlib.offsetbox import AnchoredText
%matplotlib inline

In [None]:
for bat in batteries:
    #@title Plotting
    fig = make_subplots(rows = 1, cols = 1, shared_xaxes=True)
    # Plot everything
    fig.add_trace(
        go.Scatter(
            x = bat.df.index,
            y = bat.df["SoH"],
            name = " Calendar Aging"
        ),
        row = 1,
        col = 1,
    )
    fig.show()