## 題目

1. Use LSTM & CNN model to classify MNIST dataset with at least 90%
2. Use LSTM & CNN model to classify customized candlestick pattern (at least 3 classes)

### 執行

所有檔案: mnist_lstm.py、mnist_cnn.py candlestick_cnn.py

#### HW2. Use LSTM & CNN model to classify MNIST


* 2.1 mnist_lstm.py

In [2]:
from sklearn.metrics import confusion_matrix
import keras
from keras.layers import LSTM
from keras.layers import Dense, Activation, Conv2D, MaxPool2D, Dropout, Flatten
from keras.datasets import mnist
from keras.models import Sequential
from keras.optimizers import Adam


def lstm_preprocess(x_train, x_test, y_train, y_test, n_step, n_input, n_classes):
    x_train = x_train.reshape(-1, n_step, n_input)
    x_test = x_test.reshape(-1, n_step, n_input)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    y_train = keras.utils.to_categorical(y_train, n_classes)
    y_test = keras.utils.to_categorical(y_test, n_classes)
    return (x_train, x_test, y_train, y_test)


def lstm_model(n_input, n_step, n_hidden, n_classes):
    model = Sequential()
    model.add(LSTM(n_hidden, batch_input_shape=(None, n_step, n_input), unroll=True))
    model.add(Dense(n_classes))
    model.add(Activation('softmax'))
    return model


def trainning(model, x_train, y_train, x_test, y_test, 
              learning_rate, training_iters, batch_size):
    adam = Adam(lr=learning_rate)
    model.summary()
    model.compile(optimizer=adam, loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(x_train, y_train,
              batch_size=batch_size, epochs=training_iters,
              verbose=1, validation_data=(x_test, y_test))

def print_confusion_result(x_train, x_test, y_train, y_test, model):
    # get train & test predictions
    train_pred = model.predict_classes(x_train)
    test_pred = model.predict_classes(x_test)
    
    # get train & test true labels
    train_label = y_train
    test_label =  y_test
    
    # confusion matrix
    train_result_cm = confusion_matrix(train_label, train_pred, labels=range(10))
    test_result_cm = confusion_matrix(test_label, test_pred, labels=range(10))
    print(train_result_cm, '\n'*2, test_result_cm)

def mnist_lstm_main():
    # training parameters
    learning_rate = 0.001
    training_iters = 1
    batch_size = 128

    # model parameters
    n_input = 28
    n_step = 28
    n_hidden = 256
    n_classes = 10

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test, y_train_o, y_test_o = lstm_preprocess(x_train, x_test, y_train, y_test, n_step, n_input, n_classes)

    model = lstm_model(n_input, n_step, n_hidden, n_classes)
    trainning(model, x_train, y_train_o, x_test, y_test_o, learning_rate, training_iters, batch_size)
    scores = model.evaluate(x_test, y_test_o, verbose=0)
    print('LSTM test accuracy:', scores[1])
    print_confusion_result(x_train, x_test, y_train, y_test, model)

if __name__ == '__main__':
    mnist_lstm_main()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_1 (LSTM)                (None, 256)               291840    
_________________________________________________________________
dense_3 (Dense)              (None, 10)                2570      
_________________________________________________________________
activation_1 (Activation)    (None, 10)                0         
Total params: 294,410
Trainable params: 294,410
Non-trainable params: 0
_________________________________________________________________
Train on 60000 samples, validate on 10000 samples
Epoch 1/1
LSTM test accuracy: 0.9492
[[5710    1   15    1   13   52   70    1   34   26]
 [   2 6638   30   20    6    3    2   19   15    7]
 [  25   24 5567   25   36   28   10  143   84   16]
 [  25   21   91 5431    3  167    3   40  335   15]
 [   6    9    9    0 5323   15   44   15   66  355]
 [   5   14   13   13   11 5259   32    1   67  

* 2.2 mnist_cnn.py

In [8]:
import numpy as np
import keras
from keras.datasets import mnist
from keras.utils import np_utils
from keras.models import Sequential
from keras.layers import Dense, Activation, Conv2D, MaxPooling2D, Flatten
from keras.optimizers import SGD
import os
import matplotlib.pyplot as plt
 
os.environ['CUDA_VISIBLE_DEVICES']='0' 
 

def lenet():
    model = Sequential()
    #first conv and pool
    model.add(Conv2D(input_shape=(28, 28, 1), kernel_size=(5, 5), filters=20, activation='relu')) 
    model.add(MaxPooling2D(pool_size=(2,2), strides=2, padding='same'))
    #second conv and pool
    model.add(Conv2D(kernel_size=(5, 5), filters=50,  activation='relu', padding='same'))
    model.add(MaxPooling2D(pool_size=(2,2), strides=2, padding='same'))
 
    model.add(Flatten()) 
    model.add(Dense(500, activation='relu')) #fc1
    model.add(Dense(10, activation='softmax')) #fc2
    sgd = SGD(lr=0.01,decay=1e-6,momentum=0.9,nesterov=True) 
    model.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy']) 
    return model 
 
if __name__ == '__main__':
    
    #data
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    X_train = X_train.reshape(-1, 28, 28, 1)
    X_test = X_test.reshape(-1, 28, 28, 1)
    X_train = X_train / 255 
    X_test = X_test / 255
    y_train = np_utils.to_categorical(y_train, num_classes=10) #label onehot化
    y_test = np_utils.to_categorical(y_test, num_classes=10)
 
    
    #train and test
    model = lenet() 
    print('Training')
    history = model.fit(X_train, y_train, epochs=1, batch_size=32,validation_split=0.2) 
    print('\nTesting')
    text_loss, text_accuracy = model.evaluate(X_test, y_test) 
 
    print('\ntest loss: ', text_loss)
    print('\ntest accuracy: ', text_accuracy)
    
    #confusion
    
    print_confusion_result(X_train, X_test, np.argmax(y_train, axis=1), np.argmax(y_test, axis=1), model)
 
    
    #save model
    model.save('./model/lenet.h5')  

Training
Train on 48000 samples, validate on 12000 samples
Epoch 1/1

Testing

test loss:  0.04306174347337801

test accuracy:  0.9852
[[5889    5    7    2    1    3    7    1    3    5]
 [   1 6710   10    1    1    0    3   10    5    1]
 [   4   18 5805   45    7    0    1   56   18    4]
 [   2    1    7 6061    0   19    1   16    9   15]
 [   1   12    7    2 5731    0   18    9    6   56]
 [   4    2    2   32    3 5334   16    2   18    8]
 [  15    7    2    2    2   15 5860    0   15    0]
 [   4   15    7    8    5    3    0 6193    6   24]
 [  10   33   15   33    6   15    5    7 5682   45]
 [   8    5    2   14   12   15    1   28    8 5856]] 

 [[ 975    1    1    0    0    0    1    1    1    0]
 [   0 1132    1    0    0    0    1    1    0    0]
 [   0    3 1010    8    0    0    0    9    2    0]
 [   0    0    0 1002    0    3    0    3    2    0]
 [   0    0    3    0  963    0    1    1    3   11]
 [   2    1    0    6    0  878    2    1    1    1]
 [   7    3  

#### HW3. Use CNN model to classify customized candlestick pattern (at least 3 classes)


* candlestick_train_cnn.py

In [None]:
#把HW1的輸出數據，拿來使用，也就是我用了'EveningStar','ShootingStar','BearishHarami'這三個指標。

In [172]:
import pandas as pd
import numpy as np
from pyts.image import GASF
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

from keras import backend as K
from keras import optimizers
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, Activation, MaxPool2D

In [190]:
#load data and process to the dict like pickle shows
def load_csv(path='./data/eurusd_2010_2017_1T_rulebase.csv'):
    
    #get X and Y
    df=pd.read_csv(path)
    cols=['low','high','close','open','EveningStar','ShootingStar','BearishHarami','None']
    temp=df.loc[:,cols]
    data_temp=temp[ (df[cols[-4]]+df[cols[-3]]+df[cols[-2]]) ==1 ]
    index=data_temp.index.values

    X1=temp.loc[:,cols[0]].values
    X2=temp.loc[:,cols[1]].values
    X3=temp.loc[:,cols[2]].values
    X4=temp.loc[:,cols[3]].values
    
    #use GASF transform
    image_size = 10
    gasf = GASF(image_size)
    print("the  data  processing's iter  start")
    for num,i in enumerate(index):
        n=(num+1)%1000
        if n==0:
            print("the  data  processing's iter is ",num+1)
        s=i-9
        e=i+1
        X_gasf1 = gasf.fit_transform(X1[s:e].reshape(1, -1))
        X_gasf2 = gasf.fit_transform(X2[s:e].reshape(1, -1))
        X_gasf3 = gasf.fit_transform(X3[s:e].reshape(1, -1))
        X_gasf4 = gasf.fit_transform(X4[s:e].reshape(1, -1))
    
        if num==0:
            A1=np.stack([X_gasf1,X_gasf2,X_gasf3,X_gasf4]).reshape(1,4,10,10)
        else:
            A2=np.stack([X_gasf1,X_gasf2,X_gasf3,X_gasf4]).reshape(1,4,10,10)
            A1=np.concatenate([A1,A2])
    print("the  data  processing's iter  over")
    
    #get data
    X=A1.reshape(A1.shape[0],10,10,4)
    Y=data_temp.loc[:,cols[-4:-1]].values
    train_x,test_x,train_y,test_y=train_test_split(X,Y,test_size=0.2,random_state=0)
    data={}
    data['train_x']=train_x
    data['train_y']=train_y
    data['test_x']=test_x
    data['test_y']=test_y
    
    return data



#model
def get_cnn_model(params):
    model = Sequential()
    model.add(Conv2D(filters=32, kernel_size=(5,5), padding='same', activation='relu', input_shape=(10, 10, 4)))
    model.add(Conv2D(filters=48, kernel_size=(5,5), padding='valid', activation='relu'))
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dense(84, activation='relu'))
    model.add(Dense(3, activation='softmax'))
    return model

#train
def train_model(params, data):
    model = get_cnn_model(params)
    model.compile(loss='categorical_crossentropy', optimizer=params['optimizer'], metrics=['accuracy'])
    hist = model.fit(x=data['train_x'], y=data['train_y'],
                     batch_size=params['batch_size'], epochs=params['epochs'], verbose=2)
    return (model, hist)

#show
def print_result(data, model):
    # get train & test pred-labels
    train_pred = model.predict_classes(data['train_x'])
    test_pred = model.predict_classes(data['test_x'])
    # get train & test true-labels
    train_label = np.argmax(data['train_y'],axis=1)
    test_label = np.argmax(data['test_y'],axis=1)
    # confusion matrix
    train_result_cm = confusion_matrix(train_label, train_pred, labels=range(3))
    test_result_cm = confusion_matrix(test_label, test_pred, labels=range(3))
    print(train_result_cm, '\n'*2, test_result_cm)

In [194]:
PARAMS = {}
PARAMS['classes'] = 3
PARAMS['lr'] = 0.01
PARAMS['epochs'] = 10
PARAMS['batch_size'] = 64
PARAMS['optimizer'] = optimizers.SGD(lr=PARAMS['lr'])


# # load data & keras model
data = load_csv()
# train cnn model
model, hist = train_model(PARAMS, data)
# train & test result
scores = model.evaluate(data['test_x'], data['test_y'], verbose=0)
print('CNN test accuracy:', scores[1])
print_result(data, model)

the  data  processing's iter  start
the  data  processing's iter is  1000
the  data  processing's iter is  2000
the  data  processing's iter is  3000
the  data  processing's iter is  4000
the  data  processing's iter is  5000
the  data  processing's iter  over
Epoch 1/10
 - 2s - loss: 0.8502 - acc: 0.6026
Epoch 2/10
 - 1s - loss: 0.4655 - acc: 0.8531
Epoch 3/10
 - 1s - loss: 0.3168 - acc: 0.9028
Epoch 4/10
 - 1s - loss: 0.2482 - acc: 0.9234
Epoch 5/10
 - 1s - loss: 0.2129 - acc: 0.9303
Epoch 6/10
 - 1s - loss: 0.1882 - acc: 0.9404
Epoch 7/10
 - 1s - loss: 0.1670 - acc: 0.9493
Epoch 8/10
 - 1s - loss: 0.1537 - acc: 0.9507
Epoch 9/10
 - 1s - loss: 0.1352 - acc: 0.9579
Epoch 10/10
 - 1s - loss: 0.1229 - acc: 0.9615
CNN test accuracy: 0.952020202020202
[[ 762   15   30]
 [  15 2139  139]
 [  18   15 1618]] 

 [[170   1  10]
 [  2 539  29]
 [  9   6 422]]
