In [117]:
#Cài đặt thư viện 
import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams.update({'font.size': 14})
import numpy as np 
import pandas as pd 
from sklearn.metrics import mean_absolute_percentage_error,mean_squared_error,mean_absolute_error
import statsmodels.api as sm
from IPython.display import display, Markdown
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense, SimpleRNN

In [118]:
def data_preprocessing(data_src):
  df = pd.read_csv(data_src, parse_dates=True,
                       index_col=0).sort_values(by='Date', ascending=True)
  #Xóa dấu , và chuyển về float
  df['Price']=df['Price'].replace(',','',regex=True).astype(float)
  
  df = df[['Price']]
  
  return df

In [119]:
def scale_data(df,scaler):
  # 3. Scaler data
  df1=df.reset_index()['Price']
  df1=scaler.fit_transform(np.array(df1).reshape(-1,1))
  return df1

In [120]:
#Lấy số lượng dữ liệu của tập train, test, validation
def get_split_size(df1, train_ratio, test_ratio):
    train_size = int(train_ratio * len(df1))
    test_size = int(test_ratio * len(df1))
    val_size = len(df1) - train_size - test_size
    return train_size, test_size, val_size

In [121]:
#Chia dữ liệu
def split_data(df1,train_size, test_size):
  train_data = df1[:train_size]
  test_data = df1[train_size:train_size+test_size]
  val_data = df1[train_size+test_size:]
  return train_data, test_data, val_data

In [122]:
# 5. Hàm Create Dataset
import numpy
# convert an array of values into a dataset matrix
def create_dataset(dataset, time_step=1):
	dataX, dataY = [], []
	for i in range(len(dataset)-time_step-1):
		a = dataset[i:(i+time_step), 0]   ###i=0, X=0,1,2,3-----99   Y=100 
		dataX.append(a)
		dataY.append(dataset[i + time_step, 0])
	return numpy.array(dataX), numpy.array(dataY)

In [123]:
def reshape_data(train_data,test_data,val_data,time_step):
    
    #6. Reshape into X=t,t+1,t+2..t+99 and Y=t+100
    X_train, y_train = create_dataset(train_data, time_step)
    X_val, yval = create_dataset(val_data, time_step)
    X_test, ytest = create_dataset(test_data, time_step)

    # 7. Reshape input to be [samples, time steps, features] which is required for LSTM
    X_train =X_train.reshape(X_train.shape[0],X_train.shape[1] , 1)
    X_test = X_test.reshape(X_test.shape[0],X_test.shape[1] , 1)
    X_val = X_val.reshape(X_val.shape[0],X_val.shape[1] , 1)
    return X_train, y_train, X_test, ytest, X_val, yval

In [124]:
# 8. Define LSTM Model
def build_model(time_step,unit_model):
    model = Sequential()
    model.add(SimpleRNN(unit_model,activation='relu',return_sequences=True,input_shape=(time_step, 1)))
    model.add(SimpleRNN(unit_model,activation='relu',return_sequences=True))
    model.add(SimpleRNN(unit_model,activation='relu'))
    model.add(Dense(1))
    model.compile(loss='mean_squared_error',optimizer='adam')
    return model

In [125]:
def fit(model,X_train,y_train,X_test,ytest):
    # 9. Fit mô hình với dữ liệu train
    model.fit(X_train,y_train,validation_data=(X_test,ytest),epochs=100,batch_size=32,verbose=1)
    return model

In [126]:
def predict_test_and_val(model,X_train,X_test,X_val):
    # 10. Dự báo dữ liệu test, val
    train_predict=model.predict(X_train)
    y_pred=model.predict(X_test)
    y_pred_val=model.predict(X_val)
    return train_predict, y_pred, y_pred_val

In [127]:
def inverse_data(train_predict,y_pred,y_pred_val,scaler):
    # 11. Chuẩn hóa dữ liệu y_pred, y_pred_val
    train_predict=scaler.inverse_transform(train_predict)
    y_pred=scaler.inverse_transform(y_pred)
    y_pred_val=scaler.inverse_transform(y_pred_val)
    return train_predict, y_pred, y_pred_val

In [None]:
def mda_cal(actual: np.ndarray, predicted: np.ndarray):
    return np.mean((np.sign(actual[1:] - actual[:-1]) == np.sign(predicted[1:] - actual[:-1])).astype(int))

In [128]:
def deviation(ytest, pred, scaler):
  test=scaler.inverse_transform(ytest.reshape(-1,1))
  rmse=np.sqrt(mean_squared_error(test,pred))
  print(f"RMSE: {rmse:.2f}")
  mape=mean_absolute_percentage_error(test,pred)
  print(f"MAPE: {mape*100:.2f}%")
  mda = mda_cal(test, pred)
  print(f"MDA: {mda:.2f}")

In [129]:
def predict30days(val_data, model):
    val_len=len(val_data)
    # 13. Dự báo 30 ngày tiếp theo
    x_input=val_data[60:].reshape(1,-1)
    x_input.shape

    temp_input=list(x_input)
    temp_input=temp_input[0].tolist()

    from numpy import array

    lst_output=[]
    n_steps=val_len-60
    i=0
    while(i<30):
        
        if(len(temp_input)>n_steps):
            #print(temp_input)
            x_input=np.array(temp_input[1:])
            print("{} day input {}".format(i,x_input))
            x_input=x_input.reshape(1,-1)
            x_input = x_input.reshape((1, n_steps, 1))
            #print(x_input)
            yhat = model.predict(x_input, verbose=0)
            print("{} day output {}".format(i,yhat))
            temp_input.extend(yhat[0].tolist())
            temp_input=temp_input[1:]
            #print(temp_input)
            lst_output.extend(yhat.tolist())
            i=i+1
        else:
            x_input = x_input.reshape((1, n_steps,1))
            yhat = model.predict(x_input, verbose=0)
            print(yhat[0])
            temp_input.extend(yhat[0].tolist())
            print(len(temp_input))
            lst_output.extend(yhat.tolist())
            i=i+1
    return lst_output

In [130]:
#Visualization
def visualize_overview(df,train_size,test_size,val_size,train_data,test_data,val_data,y_pred,y_pred_val,scaler):
    #train
    plt.figure(figsize=(12,6))
    train_index = df[:train_size].index
    plt.plot(train_index,scaler.inverse_transform(train_data))
    #test
    test_index = df[train_size:train_size+test_size].index
    plt.plot(test_index,scaler.inverse_transform(test_data))
    #test predict
    test_predict_index = df[train_size+41:train_size+test_size].index
    plt.plot(test_predict_index,(y_pred))
    #val
    val_index = df[train_size+test_size:train_size+test_size+val_size].index
    plt.plot(val_index,scaler.inverse_transform(val_data))
    #val predict
    val_predict_index = df[train_size+test_size+41:train_size+test_size+val_size].index
    plt.plot(val_predict_index,y_pred_val)
    #prediect_data_index = pd.RangeIndex(start=len(df1)-1, stop=len(df1)+29, step=1)
    #plt.plot(prediect_data_index,scaler.inverse_transform(lst_output))
    #plt.legend(['Train','Test','Predict','Validate','ValidatePred','Predict30days'])
    plt.legend(['Train','Test','Predict','Validate','ValidatePred'])
    plt.show()

In [131]:
#Làm hàm tổng quát nhận vào tên ngân hàng và tỉ lệ train, test, val, time_step, unit_model
def RNN_with_bank_and_ratio(bank,train_ratio,test_ratio,val_ratio,time_step,unit_model):
     #in markdown để biết đang chạy ngân hàng nào và tỉ lệ chia tập train/test/validation nào
     display(Markdown('## '+bank+' '+str(train_ratio*10)[0]+'-'+str(test_ratio*10)[0]+'-'+str(val_ratio*10)[0]))

     #Đọc dữ liệu từ file csv
     df=data_preprocessing(bank)

     #scale dữ liệu
     scaler=MinMaxScaler(feature_range=(0,1))
     df1=scale_data(df,scaler)

     #chia dữ liệu thành 3 tập train, test, val
     train_size,test_size,val_size=get_split_size(df1,train_ratio,test_ratio)
     train_data,test_data,val_data=split_data(df1,train_size,test_size)

     #reshape dữ liệu
     X_train, y_train, X_test, ytest, X_val, yval=reshape_data(train_data,test_data,val_data,time_step)

     #tạo model
     model=build_model(time_step,unit_model)

     #fit model
     model=fit(model,X_train,y_train,X_test,ytest)

     #Tạo tên đồ thị
     title='RNN '+bank+' '+str(train_ratio*10)[0]+'-'+str(test_ratio*10)[0]+'-'+str(val_ratio*10)[0]

     #Dự báo dữ liệu test, val
     train_predict, y_pred, y_pred_val=predict_test_and_val(model,X_train,X_test,X_val)

     #Chuẩn hóa dữ liệu y_pred, y_pred_val
     train_predict, y_pred, y_pred_val=inverse_data(train_predict, y_pred, y_pred_val,scaler)

     #Vẽ biểu đồ
     visualize_overview(df,train_size,test_size,val_size,train_data,test_data,val_data,y_pred,y_pred_val,scaler)

     #Đánh giá độ chính xác  
     print('Test set')
     deviation(ytest,y_pred,scaler)
     print('--------')
     print('Validation set')
     deviation(yval,y_pred_val,scaler)