In [2]:
import datetime
import numpy as np
import pandas as pd
from scipy.io import loadmat
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM

In [3]:
# 데이터 로드 함수 정의
def load_data(battery):
    mat = loadmat('C:/LEE/batterydata/' + battery + '.mat')
    print('Total data in dataset: ', len(mat[battery][0, 0]['cycle'][0]))
    counter = 0
    dataset = []
    capacity_data = []

    for i in range(len(mat[battery][0, 0]['cycle'][0])):
        row = mat[battery][0, 0]['cycle'][0, i]
        if row['type'][0] == 'discharge':
            ambient_temperature = row['ambient_temperature'][0][0]
            date_time = datetime.datetime(int(row['time'][0][0]),
                                          int(row['time'][0][1]),
                                          int(row['time'][0][2]),
                                          int(row['time'][0][3]),
                                          int(row['time'][0][4])) + datetime.timedelta(seconds=int(row['time'][0][5]))
            data = row['data']
            capacity = data[0][0]['Capacity'][0][0]
            for j in range(len(data[0][0]['Voltage_measured'][0])):
                voltage_measured = data[0][0]['Voltage_measured'][0][j]
                current_measured = data[0][0]['Current_measured'][0][j]
                temperature_measured = data[0][0]['Temperature_measured'][0][j]
                current_load = data[0][0]['Current_load'][0][j]
                voltage_load = data[0][0]['Voltage_load'][0][j]
                time = data[0][0]['Time'][0][j]
                dataset.append([counter + 1, ambient_temperature, date_time, capacity,
                                voltage_measured, current_measured,
                                temperature_measured, current_load,
                                voltage_load, time])
            capacity_data.append([counter + 1, ambient_temperature, date_time, capacity])
            counter += 1

    return [pd.DataFrame(data=dataset,
                         columns=['cycle', 'ambient_temperature', 'datetime',
                                  'capacity', 'voltage_measured',
                                  'current_measured', 'temperature_measured',
                                  'current_load', 'voltage_load', 'time']),
            pd.DataFrame(data=capacity_data,
                         columns=['cycle', 'ambient_temperature', 'datetime',
                                  'capacity'])]

In [4]:
# 데이터 로드 및 확인
dataset, capacity = load_data('B0005')
pd.set_option('display.max_columns', 10)
print(dataset.head())

# Capacity 데이터 프레임 통계량
print("Capacity 데이터프레임 주요 통계량:")
print(capacity['capacity'].describe())

Total data in dataset:  616
   cycle  ambient_temperature            datetime  capacity  voltage_measured  \
0      1                   24 2008-04-02 15:25:41  1.856487          4.191492   
1      1                   24 2008-04-02 15:25:41  1.856487          4.190749   
2      1                   24 2008-04-02 15:25:41  1.856487          3.974871   
3      1                   24 2008-04-02 15:25:41  1.856487          3.951717   
4      1                   24 2008-04-02 15:25:41  1.856487          3.934352   

   current_measured  temperature_measured  current_load  voltage_load    time  
0         -0.004902             24.330034       -0.0006         0.000   0.000  
1         -0.001478             24.325993       -0.0006         4.206  16.781  
2         -2.012528             24.389085       -1.9982         3.062  35.703  
3         -2.013979             24.544752       -1.9982         3.030  53.781  
4         -2.011144             24.731385       -1.9982         3.011  71.922  
Capac

In [5]:
# SoH 계산
attrib = ['cycle', 'datetime', 'capacity']
dis_ele = capacity[attrib]
C = dis_ele['capacity'][0]
dis_ele['SoH'] = dis_ele['capacity'] / C

In [6]:
# 학습 및 테스트 데이터 구성
attribs = ['capacity', 'voltage_measured', 'current_measured', 'temperature_measured', 'current_load', 'voltage_load', 'time']
train_dataset = dataset[attribs]

# 데이터 정규화
sc = MinMaxScaler(feature_range=(0, 1))
train_dataset = sc.fit_transform(train_dataset)

In [7]:
# 훈련 데이터 구성
X_train, y_train = [], []
for i in range(10, len(train_dataset) - 1):
    X_train.append(train_dataset[i-10:i, :])  # 전 주기 데이터를 사용하여 예측
    y_train.append(train_dataset[i, 0])  # capacity 값만 예측 대상
X_train, y_train = np.array(X_train), np.array(y_train)

In [8]:
# LSTM 모델 구성
regress = Sequential()
regress.add(LSTM(units=200, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
regress.add(Dropout(0.4))
regress.add(LSTM(units=200, return_sequences=True))
regress.add(Dropout(0.4))
regress.add(LSTM(units=200, return_sequences=True))
regress.add(Dropout(0.4))
regress.add(LSTM(units=200))
regress.add(Dropout(0.4))
regress.add(Dense(units=1))
regress.compile(optimizer='adam', loss='mean_squared_error')

  super().__init__(**kwargs)


In [9]:
# 모델 요약 및 학습
regress.summary()
regress.fit(X_train, y_train, epochs=200, batch_size=25)

Epoch 1/200
[1m2011/2011[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 55ms/step - loss: 0.0078
Epoch 2/200
[1m  64/2011[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:04[0m 64ms/step - loss: 0.0014

KeyboardInterrupt: 

In [None]:
# 테스트 데이터 준비
data_test = dataset.loc[(dataset['cycle'] >= 50)]
data_set_test = data_test[attribs].values
data_set_test = sc.transform(data_set_test)

X_test = []
for i in range(10, len(data_set_test)):
    X_test.append(data_set_test[i-10:i, :])
X_test = np.array(X_test)

# 예측 수행
pred = regress.predict(X_test)
pred = sc.inverse_transform(pred)
pred = pred[:, 0]

In [None]:
# 예측 결과 시각화
ln = len(data_test)
plt.figure(figsize=(16, 10))
plt.plot(dis_ele['cycle'], dis_ele['capacity'], label="Actual data", color='blue')
plt.plot(range(50, 50+len(pred)), pred, label="Prediction data", color='red')
plt.plot([0, len(dis_ele)], [1.485, 1.485], 'g--', label="Threshold (80%)")
plt.ylabel('Capacity')
plt.xlabel('Cycle')
plt.legend()
plt.title('Discharge B0005 (Prediction) start in cycle 50 - RULe=-8, window-size=10')
plt.show()

In [None]:
# 평가 지표
rmse = np.sqrt(mean_squared_error(data_test['capacity'].iloc[10:], pred))
r2 = r2_score(data_test['capacity'].iloc[10:], pred)
print('Test RMSE: %.3f' % rmse)
print('Test R^2 Score: %.3f' % r2)