In [2]:
# #安裝套件
# !pip install tensorflow
# !pip install keras

In [3]:
# # 使用gpu
# import tensorflow.compat.v1 as tf
# tf.disable_v2_behavior()

# device_name = tf.test.gpu_device_name()
# # if device_name != '/device:GPU:0':
# #   raise SystemError('GPU device not found')
# print('Found GPU at: {}'.format(device_name))

In [4]:
# 掛載雲端硬碟
from google.colab import drive

drive.mount("/content/drive", force_remount=True)


Mounted at /content/drive


In [5]:
!ls "/content/drive/My Drive"

 AIinFQA			    '國立臺北大學 (1).png'   財經犯罪偵查心得報告.docx
'Colab Notebooks'		    '國立臺北大學 (2).png'   財經犯罪手法與審理心得報告.docx
 NTPUAcc112			     國立臺北大學.png
 不動產價格趨勢與估價心得報告.docx   國際租稅


In [6]:
# 全域變數
googlepath = "drive/MyDrive/Colab Notebooks/EarlyWarningStocksGAN/EWSGAN/"

In [7]:
import time
import os
import matplotlib.pyplot as plt
import tensorflow as tf
from math import sqrt
import numpy as np
from keras.layers import GRU, Dense, Flatten, Conv1D, LeakyReLU, Dropout
from keras import Sequential
from pickle import load
from sklearn.metrics import mean_squared_error

In [8]:
#global variable
get_last_days = 182 #30
googlepath = "drive/MyDrive/Colab Notebooks/EarlyWarningStocksGAN/EWSGAN"

In [9]:
def LoadDataToList(filespath):
  data_list = []
  for file_name in os.listdir(filespath):
    file_path = os.path.join(filespath, file_name)
    loaded_data = np.load(file_path, allow_pickle=True)
    #print(f"{file_name} shape: {loaded_data.shape}")
    for data in loaded_data:
      data_list.insert(0,data)
  # print(data_list)
  return np.array(data_list)

In [10]:
# Load data
X_train = LoadDataToList(f'{googlepath}/X_train')
y_train = LoadDataToList(f'{googlepath}/Y_train')
X_test = LoadDataToList(f'{googlepath}/X_test')
y_test = LoadDataToList(f'{googlepath}/Y_test')
yc_train = LoadDataToList(f'{googlepath}/YC_train')
yc_test = LoadDataToList(f'{googlepath}/YC_test')

In [11]:
def make_generator_model(input_dim, output_dim, feature_size) -> tf.keras.models.Model:

  model = Sequential()
  model.add(GRU(units=1024, return_sequences = True, input_shape=(input_dim, feature_size),
                recurrent_dropout=0.2))
  model.add(GRU(units=512, return_sequences = True, recurrent_dropout=0.2)) # 256, return_sequences = True
  model.add(GRU(units=256, recurrent_dropout=0.2)) #, recurrent_dropout=0.1
  # , recurrent_dropout = 0.2
  model.add(Dense(128))
  # model.add(Dense(128))
  model.add(Dense(64))
  #model.add(Dense(16))
  model.add(Dense(units=output_dim))
  return model

In [12]:
def make_discriminator_model():

  cnn_net = tf.keras.Sequential()
  cnn_net.add(Conv1D(32, input_shape=(4, 1), kernel_size=3, strides=2, padding='same', activation=LeakyReLU(alpha=0.01)))
  cnn_net.add(Conv1D(64, kernel_size=5, strides=2, padding='same', activation=LeakyReLU(alpha=0.01)))
  cnn_net.add(Conv1D(128, kernel_size=5, strides=2, padding='same', activation=LeakyReLU(alpha=0.01)))
  cnn_net.add(Flatten())
  cnn_net.add(Dense(220, use_bias=False))
  cnn_net.add(LeakyReLU())
  cnn_net.add(Dense(220, use_bias=False, activation='relu'))
  cnn_net.add(Dense(1, activation='sigmoid'))
  return cnn_net

In [13]:
class GAN:
  def __init__(self, generator, discriminator, opt):
    self.opt = opt
    self.lr = opt["learning_rate"]
    self.generator = generator
    self.discriminator = discriminator
    self.cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
    self.generator_optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr)
    self.discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr)
    self.batch_size = self.opt['bs']
    self.checkpoint_dir = '../training_checkpoints'
    self.checkpoint_prefix = os.path.join(self.checkpoint_dir, "ckpt")
    self.checkpoint = tf.train.Checkpoint(generator_optimizer=self.generator_optimizer,
                                          discriminator_optimizer=self.discriminator_optimizer,
                                          generator=self.generator,
                                          discriminator=self.discriminator)

  def discriminator_loss(self, real_output, fake_output):
    real_loss = self.cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = self.cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

  def generator_loss(self, fake_output):
    return self.cross_entropy(tf.ones_like(fake_output), fake_output)

  @tf.function
  def train_step(self, real_x, real_y, yc):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_data = self.generator(real_x, training=True)
      generated_data_reshape = tf.reshape(generated_data, [generated_data.shape[0], generated_data.shape[1], 1])
      d_fake_input = tf.concat([tf.cast(generated_data_reshape, tf.float64), yc], axis=1)
      real_y_reshape = tf.reshape(real_y, [real_y.shape[0], real_y.shape[1], 1])
      d_real_input = tf.concat([real_y_reshape, yc], axis=1)

      # #Reshape for MLP
      # d_fake_input = tf.reshape(d_fake_input, [d_fake_input.shape[0], d_fake_input.shape[1]])
      d_fake_input = d_fake_input[:, :4, :]
      # d_real_input = tf.reshape(d_real_input, [d_real_input.shape[0], d_real_input.shape[1]])
      d_real_input = d_real_input[:, :4, :]

      real_output = self.discriminator(d_real_input, training=True)
      fake_output = self.discriminator(d_fake_input, training=True)

      gen_loss = self.generator_loss(fake_output)
      disc_loss = self.discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)

    self.generator_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))
    self.discriminator_optimizer.apply_gradients(
        zip(gradients_of_discriminator, self.discriminator.trainable_variables))
    return real_y, generated_data, {'d_loss': disc_loss, 'g_loss': gen_loss}

  def train(self, real_x, real_y, yc, opt):
    train_hist = {}
    train_hist['D_losses'] = []
    train_hist['G_losses'] = []
    train_hist['per_epoch_times'] = []
    train_hist['total_ptime'] = []

    epochs = opt["epoch"]
    for epoch in range(epochs):
      start = time.time()

      real_price, fake_price, loss = self.train_step(real_x, real_y, yc)

      G_losses = []
      D_losses = []

      Real_price = []
      Predicted_price = []

      D_losses.append(loss['d_loss'].numpy())
      G_losses.append(loss['g_loss'].numpy())

      Predicted_price.append(fake_price.numpy())
      Real_price.append(real_price.numpy())

      # Save the model every 15 epochs
      if (epoch + 1) % 15 == 0:
        tf.keras.models.save_model(generator, 'gen_model_%d.h5' % epoch)
        self.checkpoint.save(file_prefix=self.checkpoint_prefix + f'-{epoch}')
        print('epoch', epoch + 1, 'd_loss', loss['d_loss'].numpy(), 'g_loss', loss['g_loss'].numpy())
      print('Time for epoch {} is {} sec'.format(epoch + 1, time.time() - start))
      # For printing loss
      epoch_end_time = time.time()
      per_epoch_ptime = epoch_end_time - start
      train_hist['D_losses'].append(D_losses)
      train_hist['G_losses'].append(G_losses)
      train_hist['per_epoch_times'].append(per_epoch_ptime)

    # Reshape the predicted result & real
    Predicted_price = np.array(Predicted_price)
    Predicted_price = Predicted_price.reshape(Predicted_price.shape[1], Predicted_price.shape[2])
    Real_price = np.array(Real_price)
    Real_price = Real_price.reshape(Real_price.shape[1], Real_price.shape[2])

    plt.plot(train_hist['D_losses'], label='D_loss')
    plt.plot(train_hist['G_losses'], label='G_loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    return Predicted_price, Real_price,sqrt(mean_squared_error(Real_price, Predicted_price)) , np.sqrt(mean_squared_error(Real_price, Predicted_price)) / np.mean(
        Real_price)

In [None]:
input_dim = X_train.shape[1]
feature_size = X_train.shape[2]
output_dim = y_train.shape[1]

## For Bayesian
opt = {"learning_rate": 0.001, "epoch": 50, 'bs': 64}

generator = make_generator_model(X_train.shape[1], output_dim, X_train.shape[2])
discriminator = make_discriminator_model()
gan = GAN(generator, discriminator, opt)
Predicted_price, Real_price, RMSE, RMSPE = gan.train(X_train, y_train, yc_train, opt)

print(f'RMSE:{RMSE}')
print(f'RMSPE:{RMSPE}')

  output, from_logits = _get_logits(
