In [0]:
from google.colab import drive
drive.mount('/drive', force_remount=False)

In [0]:
# load data
!wget https://github.com/jetanaso/datasets/raw/master/S50IF_CON.xls

In [0]:
# import necessary libraries
!pip install mpl_finance

%tensorflow_version 1.x
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.dates as mpl_dates
import matplotlib
from mpl_finance import candlestick_ohlc
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

from sklearn.metrics import accuracy_score
from keras import applications, Input
from keras.preprocessing.image import ImageDataGenerator
from keras import optimizers, regularizers
from keras.models import load_model, Sequential, Model
from keras.layers import Dropout, Flatten, Dense, Activation, Conv2D, MaxPooling2D, BatchNormalization
from tensorflow import set_random_seed

In [0]:
# data preparation
cols = ['Date','Open','High','Low','Close','Volume']
df = pd.read_csv('S50IF_CON.xls', names=cols, index_col=False, skiprows=1, delimiter='\t')
df['Date'] = pd.to_datetime(df['Date'], dayfirst=True)
df['Date'] = df['Date'].apply(mpl_dates.date2num)
df['Open'] = df['Open'].str.replace(',','').astype('float')
df['High'] = df['High'].str.replace(',','').astype('float')
df['Low'] = df['Low'].str.replace(',','').astype('float')
df['Close'] = df['Close'].str.replace(',','').astype('float')
df['Volume'] = df['Volume'].str.replace(',','').astype('float')
df.head()

In [0]:
n_steps = 88
forecast_horizon = 22 

dataset = df.copy()
dataset['Up/Down'] = np.where(dataset['Close'].shift(-forecast_horizon) > dataset['Close'], 1, 0)
dataset = dataset.iloc[:-forecast_horizon]
print(dataset.tail(10))
print(dataset.shape)

In [0]:
# split train/validate/test datasets (90:5:5)
train_dataset = dataset.iloc[:int(len(dataset.index)*.9),:]
validate_dataset = dataset.iloc[int(len(dataset.index)*.9)-n_steps:int(len(dataset.index)*.95),:]
test_dataset = dataset.iloc[int(len(dataset.index)*.95)-n_steps:,:]

validate_dataset.reset_index(drop=True, inplace=True)
test_dataset.reset_index(drop=True, inplace=True)

print(train_dataset.shape)
print(validate_dataset.shape)
print(test_dataset.shape)

In [0]:
# prepare chart pictures (2D) as inputs 
def chart_to_png(dataset, n_pics='default', window_size=100, train=True, validate=False, test=False):
  if n_pics=='default':
    n_pics = int(len(dataset)-window_size+1)
  else:
    n_pics = int(n_pics)
  
  for i in range(n_pics):
    prep_df = dataset.iloc[i:window_size+i,:]
    prep_df.reset_index(drop=True, inplace=True)

    fig = plt.figure(figsize=(3,3), facecolor='w')

    ax1 = plt.subplot2grid((5,5), (0,0), rowspan=4, colspan=4, facecolor='w')
    candlestick_ohlc(ax1, prep_df.iloc[:,:-1].values, width=.6, colorup='g', colordown='r')
    pad = 0.1
    yl = ax1.get_ylim()
    ax1.set_ylim(yl[0]-(yl[1]-yl[0])*pad,yl[1])
    ax1.grid(False)
    ax1.axis('off')

    ax1v = ax1.twinx()
    ax1v.bar(prep_df['Date'].values, prep_df['Volume'].values, color='blue')
    #ax1v.plot(prep_df['Date'].values, prep_df['Volume'].values, color='#00ffe8', lw=.8)
    #ax1v.fill_between(prep_df['Date'].values, prep_df['Volume'].min(), prep_df['Volume'].values, facecolor='#00ffe8', alpha=.5)
    ax1v.set_ylim(0, 12*prep_df['Volume'].max())
    ax1v.grid(False)
    ax1v.axis('off')

    plt.show(block=True)

    if train==True:
      path1 = '/drive/My Drive/Colab Notebooks/chart/train/'
    elif validate==True:
      path1 = '/drive/My Drive/Colab Notebooks/chart/validate/'
    elif test==True:
      path1 = '/drive/My Drive/Colab Notebooks/chart/test/'

    if (prep_df.iloc[-1,-1]==1) & (test==False):
      path2 = path1+'up/'
    elif (prep_df.iloc[-1,-1]==0) & (test==False):
      path2 = path1+'down/'
    elif test==True:
      path2 = path1+'all_classes/'

    fig.savefig(path2+'chart'+str('{0:04d}'.format(i)), facecolor=fig.get_facecolor())
    rgba = Image.open(path2+'chart'+str('{0:04d}'.format(i))+'.png')
    rgb = rgba.convert('RGB')
    print(np.array(rgb).shape)
    im = Image.fromarray(np.array(rgb))
    im.save(path2+'chart'+str('{0:04d}'.format(i))+'.png')
    rgba.close()

if __name__ == '__main__':
  # Generate pictures from train dataset
  chart_to_png(train_dataset, n_pics='default', window_size=int(n_steps), train=True, validate=False, test=False) 
   # Generate pictures from validate dataset
  chart_to_png(validate_dataset, n_pics='default', window_size=int(n_steps), train=False, validate=True, test=False) 
   # Generate pictures from test dataset
  chart_to_png(test_dataset, n_pics='default', window_size=int(n_steps), train=False, validate=False, test=True) 

In [0]:
# define model architecture
img_width, img_height = 216, 216
train_data_dir = '/drive/My Drive/Colab Notebooks/chart/train/'
validation_data_dir = '/drive/My Drive/Colab Notebooks/chart/validate/'
test_data_dir = '/drive/My Drive/Colab Notebooks/chart/test/'
epochs = 20
batch_size = 16
num_models = 30

################################################################################

def fit_model(train_data_dir, validation_data_dir, epochs, batch_size):
  model = Sequential()

  model.add(Conv2D(16, (11, 11), padding='same', kernel_regularizer=regularizers.l2(0.001), input_shape=(img_width, img_height, 3)))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.2))
  model.add(Conv2D(32, (11, 11), activity_regularizer=regularizers.l2(0.001)))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.2))
  model.add(Conv2D(128, (11, 11), activity_regularizer=regularizers.l2(0.001)))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.2))

  model.add(Flatten())
  model.add(Dense(1, activation='sigmoid')) 

  model.compile(loss='binary_crossentropy',
                optimizer=optimizers.Adam(lr=1e-4, beta_1=0.9, beta_2=0.999, amsgrad=False),
                metrics=['accuracy'])

  #model.summary()

  # prepare data augmentation configuration
  train_datagen = ImageDataGenerator(rescale=1./255,
                                    shear_range=0.2,
                                    zoom_range=0.2,
                                    horizontal_flip=False,
                                    vertical_flip=True,
                                    validation_split=0.2)

  validate_datagen = ImageDataGenerator(rescale=1./255)

  train_generator = train_datagen.flow_from_directory(train_data_dir,
                                                      target_size=(img_height, img_width),
                                                      batch_size=batch_size,
                                                      class_mode='binary',
                                                      shuffle=True,
                                                      seed=1)

  validation_generator = validate_datagen.flow_from_directory(validation_data_dir,
                                                              target_size=(img_height, img_width),
                                                              batch_size=1,
                                                              class_mode='binary',
                                                              shuffle=False)

  history = model.fit_generator(train_generator,
                                steps_per_epoch=train_generator.n // train_generator.batch_size,
                                epochs=epochs,
                                verbose=0,
                                validation_data=validation_generator,
                                validation_steps=validation_generator.n // validation_generator.batch_size)
  
  # plotting learning curve
  plt.plot(history.history['loss'], label='train')
  plt.plot(history.history['val_loss'], label='validate')
  plt.title('Loss Development')
  plt.xticks(np.arange(0,epochs+1,step=2))
  plt.legend()
  plt.show()

  plt.plot(history.history['acc'], label='train')
  plt.plot(history.history['val_acc'], label='validate')
  plt.title('Accuracy Development')
  plt.xticks(np.arange(0,epochs+1,step=2))
  plt.legend()
  plt.show()

  return model

################################################################################

for i in range(num_models):
	# fit model
	model = fit_model(train_data_dir, validation_data_dir, epochs, batch_size)
	# save model
	filename = '/drive/My Drive/Colab Notebooks/2dcnn_model_' + str(i+1) + '.h5'
	model.save(filename)
	print('>Saved %s' % filename)

In [0]:
def load_all_models(n_models):
	all_models = list()
	for i in range(n_models):
		filename = '/drive/My Drive/Colab Notebooks/2dcnn_model_' + str(i+1) + '.h5'
		model = load_model(filename)
		all_models.append(model)
		print('>loaded %s' % filename)
	return all_models

################################################################################

# load all models at once
members = load_all_models(num_models)
print('Loaded %d models' % len(members))
print('='*55)

# evaluate standalone models on test dataset
acc_list = []
for i,model in enumerate(members):  
  # make prediction
  test_datagen = ImageDataGenerator(rescale=1./255)
  test_generator = test_datagen.flow_from_directory(test_data_dir,
                                                    target_size=(img_height, img_width),
                                                    batch_size=1,
                                                    class_mode='binary',
                                                    shuffle=False)

  pred = model.predict_generator(test_generator, steps=len(test_generator), verbose=1)

  # get filenames (set shuffle=false in generator is important)
  filenames = test_generator.filenames

  # data frame
  results = pd.DataFrame({'file':filenames, 'prediction':pred[:,0], 'y_hat':np.round(pred)[:,0]})

  # prediction
  results['y_true'] = test_dataset['Up/Down'][n_steps-1:].values
  acc = accuracy_score(results['y_true'], results['y_hat'])
  acc_list.append(acc)
  print('Model#%s Accuracy on test dataset: %.3f%%' % (i+1, acc*100))
print('='*55)
print('Average 2DCNN Accuracy on test dataset: %.3f%%' % (np.mean(acc_list)*100))