In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
%matplotlib inline

INFO:tensorflow:Enabling eager execution
INFO:tensorflow:Enabling v2 tensorshape
INFO:tensorflow:Enabling resource variables
INFO:tensorflow:Enabling tensor equality
INFO:tensorflow:Enabling control flow v2


In [2]:
gpu_options = tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=0.3)
sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(gpu_options=gpu_options))
tf.compat.v1.keras.backend.set_session(sess)

In [3]:
def read(path):
    return pd.read_csv(path)

In [4]:
def buildTrain(train, pastWeek=4, futureWeek=1, defaultWeek=1):
    X_train, Y_train = [], []
    for i in range(train.shape[0]-futureWeek-pastWeek):
        X = np.array(train.iloc[i:i+defaultWeek])
        X = np.append(X,train["CCSP"].iloc[i+defaultWeek:i+pastWeek])
        X_train.append(X.reshape(X.size))
        Y_train.append(np.array(train.iloc[i+pastWeek:i+pastWeek+futureWeek]["CCSP"]))
    return np.array(X_train), np.array(Y_train)

In [5]:
sc = MinMaxScaler(feature_range = (0, 1))

In [6]:
def get_data():
    
    ## Read weekly copper price data
    path = "WeeklyFinalData.csv"
    data = read(path)
    
    date = data["Date"]
    data.drop("Date", axis=1, inplace=True)
    
    ## Add time lag (pastWeek=4, futureWeek=1)
    x_data, y_data = buildTrain(data)
    
    ## Data split
    x_train = x_data[0:int(x_data.shape[0]*0.8)]
    x_test = x_data[int(x_data.shape[0]*0.8):]
    
    y_train = y_data[0:int(y_data.shape[0]*0.8)]
    y_test = y_data[int(y_data.shape[0]*0.8):]
    
    ## Normalize
    x_train_scaled = sc.fit_transform(x_train)
    x_test_scaled = sc.transform(x_test)
    
    y_train_scaled = sc.fit_transform(y_train)
    y_test_scaled = sc.transform(y_test)
    
    ## Other information
    nb_output = 1
    batch_size = 10
    input_shape = (data.shape[1],)
    
    return (nb_output, batch_size, input_shape, x_train_scaled, x_test_scaled, y_train_scaled, y_test_scaled)

### 3-2 a

In [57]:
class network_first():
    
    
    def __init__(self, nb_neuro):
        nb_output, batch_size, input_shape, x_train_scaled, x_test_scaled, y_train_scaled, y_test_scaled = get_data()

        # Stop criteria - threshold
        self.threshold_for_error = 1e-2
        self.threshold_for_lr = 1e-4

        # Input data
        self.x = tf.convert_to_tensor(x_train_scaled, np.float32)
        self.y = tf.convert_to_tensor(y_train_scaled, np.float32)

        # Learning rate
        self.learning_rate = 1e-2
        
        # Optimizer
        self.optimizer = tf.optimizers.SGD(self.learning_rate)

        # Hidden layer I
        self.n_neurons_in_h1 = nb_neuro
        self.W1 = tf.Variable(tf.random.truncated_normal([self.x.shape[1], self.n_neurons_in_h1], mean=0, stddev=1), name='weights1')
        self.b1 = tf.Variable(tf.random.truncated_normal([self.n_neurons_in_h1], mean=0, stddev=1), name = "biases1")

        # Output layer
        self.Wo = tf.Variable(tf.random.truncated_normal([self.n_neurons_in_h1, self.y.shape[1]], mean=0, stddev=1), name='weightsOut')
        self.bo = tf.Variable(tf.random.truncated_normal([self.y.shape[1]], mean=0, stddev=1), name='biasesOut')

#     loss_list = np.array([])
#     predicted_values = np.array([])

    # forward operation
    def forward(self):
        with tf.GradientTape() as tape:

            y1 = tf.nn.relu((tf.matmul(self.x, self.W1)+self.b1), name='activationLayer1')
            yo = (tf.matmul(y1,self.Wo)+self.bo)
            
            # performance measure
            diff = yo-self.y
            loss = tf.reduce_mean(diff**2)
    
        return(yo, loss, tape)
    
    # backward operation
    def backward(self,tape,loss):
        self.optimizer = tf.optimizers.SGD(self.learning_rate)
        gradients = tape.gradient(loss, [self.W1, self.Wo, self.b1, self.bo])
        self.optimizer.apply_gradients(zip(gradients, [self.W1, self.Wo, self.b1, self.bo]))
        
    
    # tunning the parameter
    def tuning(self, epoch):
        
        for i in range(1,epoch+1):
            
            yo, loss, tape = self.forward()
            
            if tf.reduce_all(tf.math.abs(yo-self.y) <= self.threshold_for_error):
                print("Acceptable SLFN")
                return (self.W1, self.b1, self.Wo, self.bo)

            else:
                    
                # tuning and check the loss performance of the next step
                self.backward(tape,loss)

                # Identify whether the current adjustment times are greater than the specified times
                if i >= epoch:

                    # Return the parameter of the current model
                    print("Unacceptable SLFN (i >= specific number of times)")
                    return (self.W1, self.b1, self.Wo, self.bo)

In [58]:
# print(build_algorithm(10))
network = network_first(32)
W1, b1, Wo, bo = network.tuning(100)

Unacceptable SLFN (i >= specific number of times)


### 3-2 b

In [61]:
class network_second():
    
    
    def __init__(self, nb_neuro):
        nb_output, batch_size, input_shape, x_train_scaled, x_test_scaled, y_train_scaled, y_test_scaled = get_data()

        # Stop criteria - threshold
        self.threshold_for_error = 1e-2
        self.threshold_for_lr = 1e-4

        # Input data
        self.x = tf.convert_to_tensor(x_train_scaled, np.float32)
        self.y = tf.convert_to_tensor(y_train_scaled, np.float32)

        # Learning rate
        self.learning_rate = 1e-2
        
        
        # Optimizer
        self.optimizer = tf.optimizers.SGD(self.learning_rate)

        # Hidden layer I
        self.n_neurons_in_h1 = nb_neuro
        self.W1 = tf.Variable(tf.random.truncated_normal([self.x.shape[1], self.n_neurons_in_h1], mean=0, stddev=1), name='weights1')
        self.b1 = tf.Variable(tf.random.truncated_normal([self.n_neurons_in_h1], mean=0, stddev=1), name = "biases1")

        # Output layer
        self.Wo = tf.Variable(tf.random.truncated_normal([self.n_neurons_in_h1, self.y.shape[1]], mean=0, stddev=1), name='weightsOut')
        self.bo = tf.Variable(tf.random.truncated_normal([self.y.shape[1]], mean=0, stddev=1), name='biasesOut')

#     loss_list = np.array([])
#     predicted_values = np.array([])

    # forward operation
    def forward(self):
        with tf.GradientTape() as tape:

            y1 = tf.nn.relu((tf.matmul(self.x, self.W1)+self.b1), name='activationLayer1')
            yo = (tf.matmul(y1,self.Wo)+self.bo)
            
            # performance measure
            diff = yo-self.y
            loss = tf.reduce_mean(diff**2)
    
        return(yo, loss, tape)
    
    # backward operation
    def backward(self,tape,loss):
        self.optimizer = tf.optimizers.SGD(self.learning_rate)
        gradients = tape.gradient(loss, [self.W1, self.Wo, self.b1, self.bo])
        self.optimizer.apply_gradients(zip(gradients, [self.W1, self.Wo, self.b1, self.bo]))
        
    
    # tunning the parameter
    def tuning(self, epoch):
        
        for i in range(1,epoch+1):
            
            yo, loss, tape = self.forward()
            
            if tf.reduce_all(tf.math.abs(yo-self.y) <= self.threshold_for_error):
                print("Acceptable SLFN")
                return (self.W1, self.b1, self.Wo, self.bo)

            else:
                while True:
                    
                    # Save the current papameter
                    W1_pre = self.W1
                    Wo_pre = self.Wo
                    b1_pre = self.b1
                    bo_pre = self.bo
                    loss_pre = loss

                    # tuning and check the loss performance of the next step
                    self.backward(tape,loss)
                    yo, loss, tape = self.forward()
                    
                    # Confirm whether the adjusted loss value is smaller than the current one
                    if loss < loss_pre:
                        
                        # Multiply the learning rate by 1.2
                        self.learning_rate *= 1.2

                        # Identify whether the current adjustment times are greater than the specified times
                        if i >= epoch:
                            
                            # Return the parameter of the current model
                            print("Unacceptable SLFN (i >= specific number of times)")
                            return (self.W1, self.b1, self.Wo, self.bo)
                        
                        # On the contrary, it jumps out of the loop of adjusting the learning rate
                        else:
                            break

                    # On the contrary, reduce the learning rate
                    else:
                        
                        # Identify whether the current learning rate is less than the threshold
                        if self.learning_rate < self.threshold_for_lr:
                            
                            # If true, return the current model parameters
                            print("Unacceptable SLFN (learning_rate < threshold)")
                            return (self.W1, self.b1, self.Wo, self.bo)
                        
                        # On the contrary, maintain the original parameter and adjust the learning rate
                        else:
                            self.W1 = W1_pre
                            self.Wo = Wo_pre
                            self.b1 = b1_pre
                            self.bo = bo_pre
                            self.learning_rate *= 0.7

In [63]:
network = network_second(32)
W1, b1, Wo, bo = network.tuning(100)

Unacceptable SLFN (i >= specific number of times)


### 3-2 c

In [79]:
class network_third():
    
    
    def __init__(self, nb_neuro):
        nb_output, batch_size, input_shape, x_train_scaled, x_test_scaled, y_train_scaled, y_test_scaled = get_data()

        # Stop criteria - threshold
        self.threshold_for_error = 1e-2*2
        self.threshold_for_lr = 1e-2

        # Input data
        self.x = tf.convert_to_tensor(x_train_scaled, np.float32)
        self.y = tf.convert_to_tensor(y_train_scaled, np.float32)

        # Learning rate
        self.learning_rate = 1e-2
        
        # Optimizer
        self.optimizer = tf.optimizers.SGD(self.learning_rate)

        # Hidden layer I
        self.n_neurons_in_h1 = nb_neuro
        self.W1 = tf.Variable(tf.random.truncated_normal([self.x.shape[1], self.n_neurons_in_h1], mean=0, stddev=1), name='weights1')
        self.b1 = tf.Variable(tf.random.truncated_normal([self.n_neurons_in_h1], mean=0, stddev=1), name = "biases1")

        # Output layer
        self.Wo = tf.Variable(tf.random.truncated_normal([self.n_neurons_in_h1, self.y.shape[1]], mean=0, stddev=1), name='weightsOut')
        self.bo = tf.Variable(tf.random.truncated_normal([self.y.shape[1]], mean=0, stddev=1), name='biasesOut')

#     loss_list = np.array([])
#     predicted_values = np.array([])

    # forward operation
    def forward(self):
        with tf.GradientTape() as tape:

            y1 = tf.nn.relu((tf.matmul(self.x, self.W1)+self.b1), name='activationLayer1')
            yo = (tf.matmul(y1,self.Wo)+self.bo)
            
            # performance measure
            diff = yo-self.y
            loss = tf.reduce_mean(diff**2)
    
        return(yo, loss, tape)
    
    # backward operation
    def backward(self,tape,loss):
        
        self.optimizer = tf.optimizers.SGD(self.learning_rate)
        gradients = tape.gradient(loss, [self.W1, self.Wo, self.b1, self.bo])
        self.optimizer.apply_gradients(zip(gradients, [self.W1, self.Wo, self.b1, self.bo]))
        
    
    # tunning the parameter
    def tuning(self):
        
        while True:
            
            yo, loss, tape = self.forward()
            
            if tf.reduce_all(tf.math.abs(yo-self.y) <= self.threshold_for_error):
                print("Acceptable SLFN")
                return (self.W1, self.b1, self.Wo, self.bo)

            else:
                while True:
                    
                    # Save the current papameter
                    W1_pre = self.W1
                    Wo_pre = self.Wo
                    b1_pre = self.b1
                    bo_pre = self.bo
                    loss_pre = loss

                    # tuning and check the loss performance of the next step
                    self.backward(tape,loss)
                    yo, loss, tape = self.forward()
                    
                    # Confirm whether the adjusted loss value is smaller than the current one
                    if loss < loss_pre:
                        
                        # Multiply the learning rate by 1.2
                        self.learning_rate *= 1.2
                        break

                    # On the contrary, reduce the learning rate
                    else:
                        
                        # Identify whether the current learning rate is less than the threshold
                        if self.learning_rate < self.threshold_for_lr:
                            
                            # If true, return the current model parameters
                            print("Unacceptable SLFN (learning_rate < threshold)")
                            return (W1, b1, Wo, bo)
                        
                        # On the contrary, maintain the original parameter and adjust the learning rate
                        else:
                            self.W1 = W1_pre
                            self.Wo = Wo_pre
                            self.b1 = b1_pre
                            self.bo = bo_pre
                            self.learning_rate *= 0.7

In [80]:
network = network_third(32)
W1, b1, Wo, bo = network.tuning()

Unacceptable SLFN (learning_rate < threshold)
