In [7]:
import numpy as np
import pandas as pd
import random
import math
import os
import scipy.io
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
# import transformers
import matplotlib.pyplot as plt
%matplotlib inline

from tqdm.notebook import tqdm
from math import sqrt
from datetime import datetime
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from data_preprocess import get_failure_idx


In [2]:
# convert str to datatime 
def convert_to_time(hmm):
    year, month, day, hour, minute, second = int(hmm[0]), int(hmm[1]), int(hmm[2]), int(hmm[3]), int(hmm[4]), int(hmm[5])
    return datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second)


# load .mat data
def loadMat(matfile):
    data = scipy.io.loadmat(matfile)
    filename = matfile.split("/")[-1].split(".")[0]
    col = data[filename]
    col = col[0][0][0][0]
    size = col.shape[0]

    data = []
    for i in range(size):
        k = list(col[i][3][0].dtype.fields.keys())
        d1, d2 = {}, {}
        if str(col[i][0][0]) != 'impedance':
            for j in range(len(k)):
                t = col[i][3][0][0][j][0]
                l = [t[m] for m in range(len(t))]
                d2[k[j]] = l
        d1['type'], d1['temp'], d1['time'], d1['data'] = str(col[i][0][0]), int(col[i][1][0]), str(convert_to_time(col[i][2][0])), d2
        data.append(d1)

    return data


# get capacity data
def getBatteryCapacity(Battery):
    cycle, capacity = [], []
    i = 1
    for Bat in Battery:
        if Bat['type'] == 'discharge':
            capacity.append(Bat['data']['Capacity'][0])
            cycle.append(i)
            i += 1
    return [cycle, capacity]


# get the charge data of a battery
def getBatteryValues(Battery, Type='charge'):
    data=[]
    for Bat in Battery:
        if Bat['type'] == Type:
            data.append(Bat['data'])
    return data

In [19]:
batteries = ['B0005', 'B0006', 'B0007', 'B0018']
data_dir = 'data/NASA/'

df = pd.DataFrame()
for bat in batteries:
    print('Load Dataset ' + bat + '.mat ...')
    path = data_dir + bat + '.mat'
    data = loadMat(path)
    cycles_and_capacity = getBatteryCapacity(data)
    temp_df = pd.DataFrame({
        'battery': bat,
        'cycle': cycles_and_capacity[0],
        'capacity': cycles_and_capacity[1]
    })
    df = pd.concat([df, temp_df], ignore_index=True)

display(df)
df.to_csv(data_dir + 'nasa.csv', index=False)

Load Dataset B0005.mat ...


  d1['type'], d1['temp'], d1['time'], d1['data'] = str(col[i][0][0]), int(col[i][1][0]), str(convert_to_time(col[i][2][0])), d2


Load Dataset B0006.mat ...
Load Dataset B0007.mat ...
Load Dataset B0018.mat ...


Unnamed: 0,battery,cycle,capacity
0,B0005,1,1.856487
1,B0005,2,1.846327
2,B0005,3,1.835349
3,B0005,4,1.835263
4,B0005,5,1.834646
...,...,...,...
631,B0018,128,1.362737
632,B0018,129,1.363405
633,B0018,130,1.351865
634,B0018,131,1.354797


In [22]:
from data_preprocess import *

data_path = 'data/NASA.csv'
norm_data_df = read_and_norm(data_path, 2.0, 0.7)
norm_data_df

Unnamed: 0,battery,cycle,capacity,failure_cycle
0,B0005,1,0.928244,125.0
1,B0005,2,0.923164,125.0
2,B0005,3,0.917675,125.0
3,B0005,4,0.917631,125.0
4,B0005,5,0.917323,125.0
...,...,...,...,...
631,B0018,128,0.681369,97.0
632,B0018,129,0.681703,97.0
633,B0018,130,0.675932,97.0
634,B0018,131,0.677398,97.0


In [40]:
features = np.atleast_1d(('capacity'))
bat = 'B0005'
condition = df['battery'] == bat
seq_df = df.loc[condition, features]
print(type(seq_df))
display(seq_df)
# seq_arr = np.transpose(seq_df.to_numpy())
seq_arr = seq_df.to_numpy()
seq_arr.shape

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,capacity
0,1.856487
1,1.846327
2,1.835349
3,1.835263
4,1.834646
...,...
163,1.293464
164,1.288003
165,1.287453
166,1.309015


(168, 1)

In [3]:
def drop_outlier(array,count,bins):
    index = []
    range_ = np.arange(1,count,bins)
    for i in range_[:-1]:
        array_lim = array[i:i+bins]
        sigma = np.std(array_lim)
        mean = np.mean(array_lim)
        th_max,th_min = mean + sigma*2, mean - sigma*2
        idx = np.where((array_lim < th_max) & (array_lim > th_min))
        idx = idx[0] + i
        index.extend(list(idx))
    return np.array(index)


def build_sequences(text, window_size):
    #text:list of capacity
    x, y = [],[]
    for i in range(len(text) - window_size):
        sequence = text[i:i+window_size]
        target = text[i+window_size]

        x.append(sequence)
        y.append(target)
        
    return np.array(x), np.array(y)


# leave-one-out evaluation: one battery is sampled randomly; the remainder are used for training.
def get_train_test(data_dict, name, window_size=8):
    data_sequence=data_dict[name]['capacity']
    train_data, test_data = data_sequence[:window_size+1], data_sequence[window_size+1:]
    train_x, train_y = build_sequences(text=train_data, window_size=window_size)
    for k, v in data_dict.items():
        if k != name:
            data_x, data_y = build_sequences(text=v['capacity'], window_size=window_size)
            train_x, train_y = np.r_[train_x, data_x], np.r_[train_y, data_y]
            
    return train_x, train_y, list(train_data), list(test_data)


def relative_error(y_test, y_predict, threshold):
    true_re, pred_re = len(y_test), 0
    for i in range(len(y_test)-1):
        if y_test[i] <= threshold >= y_test[i+1]:
            true_re = i - 1
            break
    for i in range(len(y_predict)-1):
        if y_predict[i] <= threshold:
            pred_re = i - 1
            break
    return abs(true_re - pred_re)/true_re if abs(true_re - pred_re)/true_re<=1 else 1


def evaluation(y_test, y_predict):
    mse = mean_squared_error(y_test, y_predict)
    rmse = sqrt(mean_squared_error(y_test, y_predict))
    return rmse
    
    

In [5]:
import glob

Battery_list = ['CS2_35', 'CS2_36', 'CS2_37', 'CS2_38']

dir_path = 'data/CALCE/'
batteries = []
Battery = {}
for name in Battery_list:
    print('Load Dataset ' + name + ' ...')
    path = glob.glob(dir_path + name + '/*.xlsx')
    dates = []
    for p in path:
        df = pd.read_excel(p, sheet_name=1)
        print('Load ' + str(p) + ' ...')
        dates.append(df['Date_Time'][0])
    idx = np.argsort(dates)
    path_sorted = np.array(path)[idx]
    
    count = 0
    discharge_capacities = []
    health_indicator = []
    internal_resistance = []
    CCCT = []
    CVCT = []
    for p in path_sorted:
        df = pd.read_excel(p,sheet_name=1)
        print('Load ' + str(p) + ' ...')
        cycles = list(set(df['Cycle_Index']))
        for c in cycles:
            df_lim = df[df['Cycle_Index'] == c]
            #Charging
            df_c = df_lim[(df_lim['Step_Index'] == 2)|(df_lim['Step_Index'] == 4)]
            c_v = df_c['Voltage(V)']
            c_c = df_c['Current(A)']
            c_t = df_c['Test_Time(s)']
            #CC or CV
            df_cc = df_lim[df_lim['Step_Index'] == 2]
            df_cv = df_lim[df_lim['Step_Index'] == 4]
            CCCT.append(np.max(df_cc['Test_Time(s)'])-np.min(df_cc['Test_Time(s)']))
            CVCT.append(np.max(df_cv['Test_Time(s)'])-np.min(df_cv['Test_Time(s)']))

            #Discharging
            df_d = df_lim[df_lim['Step_Index'] == 7]
            d_v = df_d['Voltage(V)']
            d_c = df_d['Current(A)']
            d_t = df_d['Test_Time(s)']
            d_im = df_d['Internal_Resistance(Ohm)']

            if(len(list(d_c)) != 0):
                time_diff = np.diff(list(d_t))
                d_c = np.array(list(d_c))[1:]
                discharge_capacity = time_diff*d_c/3600 # Q = A*h
                discharge_capacity = [np.sum(discharge_capacity[:n]) for n in range(discharge_capacity.shape[0])]
                discharge_capacities.append(-1*discharge_capacity[-1])

                dec = np.abs(np.array(d_v) - 3.8)[1:]
                start = np.array(discharge_capacity)[np.argmin(dec)]
                dec = np.abs(np.array(d_v) - 3.4)[1:]
                end = np.array(discharge_capacity)[np.argmin(dec)]
                health_indicator.append(-1 * (end - start))

                internal_resistance.append(np.mean(np.array(d_im)))
                count += 1

    discharge_capacities = np.array(discharge_capacities)
    health_indicator = np.array(health_indicator)
    internal_resistance = np.array(internal_resistance)
    CCCT = np.array(CCCT)
    CVCT = np.array(CVCT)
    
    idx = drop_outlier(discharge_capacities, count, 40)
    df_result = pd.DataFrame({
        'battery': name,
        'cycle': np.linspace(1,idx.shape[0],idx.shape[0]),
        'capacity': discharge_capacities[idx],
        # 'soh': health_indicator[idx],
        # 'resistance': internal_resistance[idx],
        # 'CCCT': CCCT[idx],
        # 'CVCT': CVCT[idx]
    })
    Battery[name] = df_result
    batteries.append(df_result)
batteries_df = pd.concat(batteries, ignore_index=True)
batteries_df.to_csv("data/CALCE/calce.csv", index=False)
batteries_df

Load Dataset CS2_35 ...
Load data/CALCE/CS2_35/CS2_35_1_10_11.xlsx ...
Load data/CALCE/CS2_35/CS2_35_8_30_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_10_29_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_12_23_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_9_7_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_1_18_11.xlsx ...
Load data/CALCE/CS2_35/CS2_35_11_01_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_8_17_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_11_08_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_2_4_11.xlsx ...
Load data/CALCE/CS2_35/CS2_35_12_20_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_10_15_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_12_06_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_10_22_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_11_24_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_1_24_11.xlsx ...
Load data/CALCE/CS2_35/CS2_35_1_28_11.xlsx ...
Load data/CALCE/CS2_35/CS2_35_9_21_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_8_18_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_9_8_10.xlsx ...
Load data/CALCE/CS2_35/CS2_35_

Unnamed: 0,battery,cycle,capacity,soh,resistance,CCCT,CVCT
0,CS2_35,1.0,1.126385,0.825175,0.094009,6613.059052,2251.498033
1,CS2_35,2.0,1.126160,0.815965,0.091661,6612.402800,2231.967052
2,CS2_35,3.0,1.125966,0.815977,0.094649,6608.560673,2228.216959
3,CS2_35,4.0,1.118508,0.825194,0.091413,6604.732222,2247.561061
4,CS2_35,5.0,1.117210,0.806900,0.091413,6629.211049,2077.692393
...,...,...,...,...,...,...,...
3781,CS2_38,992.0,0.366656,0.137492,0.122835,1485.021686,3244.713049
3782,CS2_38,993.0,0.366665,0.137503,0.123425,1400.679296,3255.944861
3783,CS2_38,994.0,0.357495,0.137495,0.124349,1356.867640,3243.913667
3784,CS2_38,995.0,0.357480,0.137490,0.123536,1327.852449,3242.275622


In [11]:
get_failure_idx(batteries_df[batteries_df['battery'] == 'CS2_35']['capacity'].to_numpy() / 1.1, 0.7)

640