In [7]:

import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.dataset import random_split
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
import numpy as np
import torch.nn.functional as func
import pandas as pd
import warnings
import plotly.io as pio
import plotly.graph_objects as go
import plotly.express as px

pio.renderers.default = "browser"
plotly_config = dict({"scrollZoom": True,'modeBarButtonsToAdd':[
                                        'drawline',
                                        'drawopenpath',
                                        'drawclosedpath',
                                        'drawcircle',
                                        'drawrect',
                                        'eraseshape'
                                       ]})

warnings.filterwarnings("ignore", ".*")

data_len = 3 #要輸入的時序訊號長度
cap_05 = pd.read_csv("features\charge\B0005\B0005_capacity.csv")
cap_07 = pd.read_csv("features\charge\B0007\B0007_capacity.csv")
scaler = MinMaxScaler(feature_range = (-2, 2))
# features_list = ["CCCT","CVCT","V37_419","V38_419","V37_41","V38_41","Temperature","cap"]
features_list = ["CCCT","CVCT","V37_419","V38_419","V37_41","V38_41","cap"]
f_len = len(features_list)

class temp(Dataset):
    def __init__(self, battery_list):
        featurelist=[]
        capacitylist=[]
        for battery in battery_list:
            featurelist.append(f"features\charge\{battery}\{battery}_Features.csv")
            capacitylist.append(f"features\charge\{battery}\{battery}_capacity.csv")
        
        self.feadf = pd.concat((pd.read_csv(f) for f in featurelist), ignore_index=True)
        self.capdf = pd.concat((pd.read_csv(f) for f in capacitylist), ignore_index=True)
        self.feadf["cap"] = self.capdf["Capacity"]
        self.feadf = pd.DataFrame(scaler.fit_transform(self.feadf),
                                columns=self.feadf.keys())
        self.features = self.feadf.loc[:,features_list].to_numpy()
        # print(self.features)
        self.capacity = self.capdf["Capacity"].to_numpy()
        self.sample_len = data_len

    def __len__(self):
        if len(self.capdf) > self.sample_len:
            return len(self.capdf) - self.sample_len
        
        else:
            return 0

    def __getitem__(self, index):
        target = self.capacity[self.sample_len + index]
        target = np.array(target).astype(np.float32)

        input = self.features[index : (index + self.sample_len)]
        input = torch.from_numpy(input).float()   
        # print(input)
        input.reshape(-1, f_len)
        target = torch.from_numpy(target).float()

        return input, target
       
battery_list = ["B0005","B0038","B0028","B0033","B0053","B0007","B0028","B0036","B0047","B0029","B0026","B0018","B0039","B0055","B0046"]
train_battery = battery_list
train_data = temp(train_battery)

test_battery = ["B0006"]
test_data = temp(test_battery)

train_loader = DataLoader(train_data, batch_size = 3)
test_loader = DataLoader(test_data, batch_size = 3)
dataiter = iter(train_loader)
data = next(dataiter)
features, labels = data

class GRU(nn.Module):
    def __init__(self):
        super(GRU, self).__init__()

        self.input_size = f_len # input size : 輸入維度
        self.hidden_size = 400 # hidden_size : 隱藏層的特徵維度
        self.num_layers = 8 # hidden_size : LSTM隱藏層的層數
        self.dropout = 0.1 # dropout : 每一層過後丟棄特定比例神經元

        self.gru = nn.GRU(input_size = self.input_size, hidden_size  = self.hidden_size, bidirectional=True,
                            num_layers = self.num_layers, dropout = self.dropout, batch_first = True)
        for m in self.modules():
            if type(m) in [nn.GRU, nn.GRU, nn.RNN]:
                for name, param in m.named_parameters():
                    if 'weight_ih' in name:
                        torch.nn.init.xavier_uniform_(param.data)
                    elif 'weight_hh' in name:
                        torch.nn.init.orthogonal_(param.data)
                    elif 'bias' in name:
                        param.data.fill_(0.1)

        self.fc1 = nn.Linear(self.hidden_size*2, 600)#設定全連接層
        self.fc2 = nn.Linear(600, 600)#設定全連接層
        self.fc3 = nn.Linear(600, 600)#設定全連接層
        self.fc4 = nn.Linear(600, 1)#設定全連接層


    def forward(self, x):
        # print(x.shape)
        h_0 = torch.zeros([self.num_layers*2, x.shape[0], self.hidden_size], device = x.device)
        c_0 = torch.zeros([self.num_layers*2, x.shape[0], self.hidden_size], device = x.device)

        out, _ = self.gru(x,h_0)# x:新資料輸入 h0:上個隱藏層狀態 c0:上個細胞狀態
        # print(out.shape)
        out = self.fc1(func.tanh(out[:, -1, :]))#接收LSTM單元的輸出，並讓最後一層輸出
        out = self.fc2(func.tanh(out))
        out = self.fc3(func.tanh(out))
        out = self.fc4(func.tanh(out))
        # out3 = self.fc3(out2)
        # out1 = func.tanh(self.fc1(out))
        # out2 = func.tanh(self.fc2(out1))
        # out3 = self.fc3(out2)#接收LSTM單元的輸出，並讓最後一層輸出

        return out

model = GRU()
print(model)

device = torch.device('cuda')
model = model.to(device)

#設定誤差的計算方法
loss_f = nn.MSELoss()
#設定調整誤差的方法
opt = optim.Adam(model.parameters(), lr = 5e-7)

def train():
    # train_loss = 0
    model.train()

    for _, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        pred = model(data)

        pred = pred.view(-1)

        loss = loss_f(pred, target)

        opt.zero_grad()
        loss.backward()
        opt.step()

    return loss.item()

def test():
    model.eval()

    for _, (data, target) in enumerate(test_loader):
        data, target = data.to(device), target.to(device)

        pred = model(data)
        pred = pred.view(-1)
        loss = loss_f(pred, target)

    return loss.item()

def pred(data):
    model.eval()

    with torch.no_grad():
        pred = model(data)
        return pred
    
train_losses = []
test_losses = []
epoch = 50
for i in range(epoch):
    train_loss = train()
    test_loss = test()

    train_losses.append(train_loss)
    test_losses.append(test_loss)

    print("epoch:{}, train_loss:{:0.6f}, test_loss:{:0.6f}".format(i, train_loss, test_loss))

pred_temps = []
for i in range(len(test_data)):
    nor_temp, taget = test_data[i]
    temps = nor_temp
    temps = nor_temp.view(1, data_len, f_len)
    temps = temps.to(device)
    pred_temp = pred(temps)
    pred_temp = pred_temp.detach().cpu().numpy().squeeze()

    # real_temp = scaler.inverse_transform(pred_temp.reshape(-1, 1))
    pred_temps.append(pred_temp)

pred_trains = []
for i in range(len(train_data)):
    nor_temp, taget = train_data[i]
    temps = nor_temp
    temps = nor_temp.view(1, data_len, f_len)
    temps = temps.to(device)
    pred_train = pred(temps)
    pred_train = pred_train.detach().cpu().numpy().squeeze()

    # real_temp = scaler.inverse_transform(pred_temp.reshape(-1, 1))
    pred_trains.append(pred_train)

# month = data_2.Time
capacitytestlist=[]
for battery in test_battery:
    capacitytestlist.append(f"features\charge\{battery}\{battery}_capacity.csv")
captestdf = pd.concat((pd.read_csv(f) for f in capacitytestlist), ignore_index=True)

capacitytrainlist=[]
for battery in train_battery:
    capacitytrainlist.append(f"features\charge\{battery}\{battery}_capacity.csv")
captraindf = pd.concat((pd.read_csv(f) for f in capacitytrainlist), ignore_index=True)

mean_temp = captestdf.Capacity.reset_index(drop=True)
train_cap = captraindf.Capacity.reset_index(drop=True)

torch.save(model.state_dict(),"./GRU_model.pth")

fig1 = px.line(train_losses)
fig1.update_layout(
    dragmode='drawopenpath',
    newshape_line_color='cyan',
    title_text=f'loss'
)
# val = pd.DataFrame(pred_trains)
# fig.add_scatter(val,x=val.index,y=["0"])
fig1.add_scatter(y=test_losses)
fig1.show(config=plotly_config)

fig = px.line(train_cap)
fig.update_layout(
    dragmode='drawopenpath',
    newshape_line_color='cyan',
    title_text=f'train_capacity'
)
# val = pd.DataFrame(pred_trains)
# fig.add_scatter(val,x=val.index,y=["0"])
fig.add_scatter(y=pred_trains)
fig.show(config=plotly_config)

fig2 = px.line(mean_temp)
fig2.update_layout(
    dragmode='drawopenpath',
    newshape_line_color='cyan',
    title_text=f'test_capacity'
)
fig2.add_scatter(y=pred_temps)
fig2.show(config=plotly_config)

featurelist=[]
capacitylist=[]
for battery in battery_list:
    featurelist.append(f"features\charge\{battery}\{battery}_Features.csv")
    capacitylist.append(f"features\charge\{battery}\{battery}_capacity.csv")

feadf = pd.concat((pd.read_csv(f) for f in featurelist), ignore_index=True)
capdf = pd.concat((pd.read_csv(f) for f in capacitylist), ignore_index=True)
feadf["cap"] = capdf["Capacity"]*5000
feadf["Temperature"] = feadf["Temperature"]*100
# fig3 = px.line(feadf,y= list(feadf.columns))
# fig3.update_layout(
#     dragmode='drawopenpath',
#     newshape_line_color='cyan',
#     title_text=f'test_capacity'
# )
# fig3.show(config=plotly_config)
# plt.figure(1)
# plt.plot(train_losses, label = 'train_loss')
# plt.plot(test_losses, label = 'test_loss')
# plt.legend()

# plt.figure(2)
# plt.plot(mean_temp, label = "org_data", color = 'b')
# plt.plot(pred_temps, label = "pred_data", color = 'r')
# plt.title("test")
# plt.legend()

# plt.figure(3)
# plt.plot(train_cap, label = "org_data", color = 'b')
# plt.plot(pred_trains, label = "pred_data", color = 'r')
# plt.title("train")
# plt.legend()

# plt.show()

GRU(
  (gru): GRU(7, 400, num_layers=8, batch_first=True, dropout=0.1, bidirectional=True)
  (fc1): Linear(in_features=800, out_features=600, bias=True)
  (fc2): Linear(in_features=600, out_features=600, bias=True)
  (fc3): Linear(in_features=600, out_features=600, bias=True)
  (fc4): Linear(in_features=600, out_features=1, bias=True)
)
epoch:0, train_loss:0.679359, test_loss:0.702382
epoch:1, train_loss:0.031303, test_loss:0.037600
epoch:2, train_loss:0.050159, test_loss:0.045376
epoch:3, train_loss:0.061752, test_loss:0.057745
epoch:4, train_loss:0.057216, test_loss:0.053310
epoch:5, train_loss:0.049079, test_loss:0.048300
epoch:6, train_loss:0.033993, test_loss:0.042610
epoch:7, train_loss:0.030072, test_loss:0.037010
epoch:8, train_loss:0.026166, test_loss:0.030981
epoch:9, train_loss:0.017067, test_loss:0.024228
epoch:10, train_loss:0.006830, test_loss:0.017199
epoch:11, train_loss:0.005056, test_loss:0.010865
epoch:12, train_loss:0.001661, test_loss:0.005617
epoch:13, train_loss:

In [8]:
featurelist=[]
capacitylist=[]
for battery in ["B0005"]:
    featurelist.append(f"processed_data\charge\{battery}\{battery}_Features.csv")
    capacitylist.append(f"processed_data\charge\{battery}\{battery}_capacity.csv")

feadf = pd.concat((pd.read_csv(f) for f in featurelist), ignore_index=True)
capdf = pd.concat((pd.read_csv(f) for f in capacitylist), ignore_index=True)
feadf["cap"] = capdf["Capacity"]*5000
feadf["Temperature"] = feadf["Temperature"]*100
fig3 = px.line(feadf,y= list(feadf.columns))
fig3.update_layout(
    dragmode='drawopenpath',
    newshape_line_color='cyan',
    title_text=f'test_capacity'
)
fig3.show(config=plotly_config)