##IMPORT

In [1]:
import numpy as np
from PIL import Image
# from matplotlib.patheffects import withSimplePatchShadow
import os
import cv2
# from operator import le
from sklearn.model_selection import train_test_split, KFold

## Fungsi Aktivasi

In [2]:
def relu(input):
    return np.max(input,0)

def sigmoid(input):
    return 1/(1+np.exp(-input))

## Layer

In [3]:
class DenseLayer:
    def __init__(self, n_unit, activation):
        self.n_unit = n_unit
        self.activation = activation
        self.bias = np.zeros(n_unit)
        self.weight = []
        self.deltaW = np.zeros(n_unit)

    def init_weights(self, n_inputs):
        self.n_inputs = n_inputs
        self.weight = np.random.randn(self.n_unit,n_inputs)
        self.deltaW = np.zeros((self.n_unit))

    def forward(self,inputs):
        if len(self.weight) == 0:
           self.init_weights(len(inputs))
        self.input = inputs
        multisum = np.array([])
        for i in range(self.n_unit):
            multisum = np.append(multisum, 
            np.sum(np.multiply(self.weight[i], inputs)) + self.bias[i])

        if self.activation == 'sigmoid':
            matrixsigmoid = np.vectorize(sigmoid)
            self.output = matrixsigmoid(multisum)
        elif self.activation == 'linear':
            self.output = multisum
        else:
            self.output = np.maximum(multisum, 0)
        return self.output
    
    def d_sigmoid(self, inputs):
        sigm = sigmoid(inputs)
        return sigm * (1 - sigm)
    
    def d_relu(self,input):
        return 1.0 if input >= 0 else 0

    def d_act_funct(self,activation,inputs):
        if (activation=='sigmoid'):
            return self.d_sigmoid(inputs)
        else:
            return self.d_relu(inputs)

    def backward(self,inputs):
        derivative = np.array([])
        for i in self.output:
            derivative = np.append(derivative, self.d_act_funct(self.activation, i))
        self.deltaW += np.multiply(derivative, inputs)
        dE = np.matmul(inputs, self.weight)
        return dE
    
    def update_weights(self, learn_rate, momentum):
        for i in range(self.n_unit):
            self.weight[i] = self.weight[i] - ((momentum * self.weight[i]) + (learn_rate * self.deltaW[i] * self.input))

        self.bias = self.bias - ((momentum * self.bias) + (learn_rate * self.deltaW))
        self.deltaW = np.zeros((self.n_unit))

class ConvolutionLayer:
    def __init__(self, nb_channel, nb_filter, filter_size, padding=0, stride=1):
        self.nb_channel = nb_channel
        self.nb_filter = nb_filter
        self.filter_size = filter_size
        self.padding = padding
        self.stride = stride
        self.weight = np.random.randn(nb_filter, nb_channel, filter_size, filter_size)
        self.bias = np.zeros((nb_filter))
        self.dw = np.zeros((nb_filter, nb_channel, filter_size, filter_size))
        self.db = np.zeros((nb_filter))
    
    def add_zero_padding(self, inputs):
        w_pad = inputs.shape[1] + self.padding * 2
        h_pad = inputs.shape[2] + self.padding * 2
        
        inputs_padded = np.zeros((inputs.shape[0], w_pad, h_pad))
        for s in range(inputs.shape[0]):
            inputs_padded[s, self.padding:w_pad-self.padding, self.padding:h_pad-self.padding] = inputs[s, :, :]

        return inputs_padded
    
    def update_weights(self, learn_rate, momentum):
        self.weight -= learn_rate * self.dw
        self.bias -= learn_rate * self.db

        # Reset error
        self.dw = np.zeros((self.nb_filter, self.nb_channel, self.filter_size, self.filter_size))
        self.db = np.zeros((self.nb_filter))
    
    def forward(self, inputs):
        w, h = inputs.shape[1], inputs.shape[2]
        v_w = int((w - self.filter_size + 2*self.padding)/self.stride + 1)
        v_h = int((h - self.filter_size + 2*self.padding)/self.stride + 1)
        
        self.inputs = self.add_zero_padding(inputs)
        featureMap = np.zeros((self.nb_filter, v_w, v_h))

        for k in range(self.nb_filter):
            for i in range(v_w):
                for j in range(v_h):
                    recField = self.inputs[:, i:i+self.filter_size, j:j+self.filter_size]
                    featureMap[k, i, j] = np.sum(recField * self.weight[k, :, :, :] + self.bias[k])
        
        return self.detector(featureMap)

    def backward(self, prev_errors):
        dx = np.zeros(self.inputs.shape)
        dw = np.zeros(self.weight.shape)
        db = np.zeros(self.bias.shape)

        f, w, h = prev_errors.shape

        for k in range(f):
            db[k] = np.sum(prev_errors[k, :, :])
        
        for k in range(f):
            for i in range(w):
                for j in range(h):
                    dw[k, :, :, :] += prev_errors[k, i, j] * self.inputs[:, i:i+self.filter_size, j:j+self.filter_size]
                    dx[:, i:i+self.filter_size, j:j+self.filter_size] += prev_errors[k, i, j] * self.weight[k, :, :, :]

        self.dw += dw
        self.db += db
        
        return self.detector(dx)

    def detector(self,input):
        return np.maximum(input, 0)

class Pooling:
    def __init__(self,filter_size,stride_size,mode):
        self.filter_size = filter_size
        self.stride_size = stride_size
        self.mode = mode

    def forward(self, inputs):
        self.input = inputs
        depth = inputs.shape[0]
        length = ((inputs.shape[1] - self.filter_size) // self.stride_size) + 1
        pooled_map = np.zeros([depth, length, length], dtype=np.double)

        for d in range(0, depth):
            for w in range(0, length):
                for h in range(0, length):
                    if (self.mode.lower() == 'average'):
                        pooled_map[d,w,h] = self.average(inputs,d,w,h)
                    elif (self.mode.lower() == 'max'):
                        pooled_map[d,w,h] = self.max(inputs,d,w,h)
        return pooled_map
    
    def average(self,inputs,d,r_pos,b_pos):
        return np.average(inputs[d, 
                                r_pos*self.filter_size:(r_pos*self.filter_size + self.filter_size),
                                b_pos*self.filter_size:(b_pos*self.filter_size + self.filter_size)
                                ])

    def max(self,inputs,d,r_pos,b_pos):
        return np.max(inputs[d, 
                                r_pos*self.filter_size:(r_pos*self.filter_size + self.filter_size),
                                b_pos*self.filter_size:(b_pos*self.filter_size + self.filter_size)
                                ])

    def backward(self, prev_errors):
        F, W, H = self.input.shape
        dx = np.zeros(self.input.shape)
        for i in range(0, F):
            for j in range(0, W, self.filter_size):
                for k in range(0, H, self.filter_size):
                    st = np.argmax(self.input[i, j : j + self.filter_size, k : k + self.filter_size])
                    (idx, idy) = np.unravel_index(st, (self.filter_size, self.filter_size))
                    if ((j + idx) < W and (k + idy) < H):
                        dx[i, j + idx, k + idy] = prev_errors[i, int(j / self.filter_size) % prev_errors.shape[1], int(k / self.filter_size) % prev_errors.shape[2]]
        return dx
    
    def update_weights(self, learn_rate, momentum):
        # Todo
        pass

class FlattenLayer:
    def init(self):
        pass

    def forward(self, inputs):
        self.channel, self.width, self.height = inputs.shape
        output = inputs.flatten()
        return output
    
    def backward(self, inputs):
        return inputs.reshape(self.channel, self.width, self.height)

    def update_weights(self, learning_rate, momentum):
        pass

class LSTMLayer:
    def __init__(self, input_size, n_cells):
        self.input_size = input_size
        self.n_cells = n_cells

        self.c_prev = np.zeros((n_cells,1))
        self.h_prev = np.zeros((n_cells,1))

        self.sigmoid = np.vectorize(sigmoid) # allow function to receive input in form of vector

        #parameternya dijadiin class
        self.input_param = self.LSTMParameter(self.input_size, self.n_cells)
        self.cell_param = self.LSTMParameter(self.input_size, self.n_cells)
        self.forget_param = self.LSTMParameter(self.input_size, self.n_cells)
        self.output_param = self.LSTMParameter(self.input_size, self.n_cells)
        self.training_param = {}
    
    class LSTMParameter:
        def __init__(self,size,n_cells):
            self.u = np.random.rand(n_cells, size)
            self.w = np.random.rand(n_cells)
            self.b = np.random.rand(n_cells)


    def forgetGate(self, timestep):
        self.training_param['f'+str(timestep)] = self.sigmoid(
                np.dot(self.forget_param.u, self.x[timestep]) + 
                np.dot(self.forget_param.w, self.h_prev) + 
                self.forget_param.b
            )

    def inputGate(self,timestep):
        self.training_param['i' + str(timestep)] = self.sigmoid(
            np.matmul(self.input_param.u,self.x[timestep]) 
            + np.multiply(self.input_param.w,self.h_prev[timestep]) 
            + self.input_param.b)
    
    def cellState(self,timestep):
        self.training_param['Caccent'+str(timestep)] = np.tanh(
            np.dot(self.cell_param.u, self.x[timestep]) + np.dot(self.cell_param.w, self.h_prev) + self.cell_param.b)
        self.training_param['C'+str(timestep)] = (np.multiply(
            self.training_param['f'+str(timestep)], self.c_prev) + 
            np.multiply(self.training_param['i'+str(timestep)], self.training_param['Caccent'+str(timestep)]))

    def outputGate(self, timestep):
        self.training_param['o'+str(timestep)] = self.sigmoid(
                np.dot(self.output_param.u, self.x[timestep]) + 
                np.dot(self.output_param.w, self.h_prev) + 
                self.output_param.b
            )
        self.training_param['h'+str(timestep)] = np.multiply(self.training_param['o'+str(timestep)], np.tanh(self.training_param['C'+str(timestep)]))
    
    def forward(self, inputs):
        self.x = inputs
        for i in range(self.n_cells):
            self.forgetGate(i)
            self.inputGate(i)
            self.cellState(i)
            self.outputGate(i)
            self.c_prev = self.training_param['C'+str(i)]
            self.h_prev = self.training_param['h'+str(i)]
        
        output = self.training_param['h'+str(self.n_cells-1)]
        return output

    def backward(self, inputs):
        # engga ada di spek
        pass

    def update_weights(self, learning_rate, momentum):
        # engga ada di spek
        pass

## Model

In [4]:
import pickle
from sklearn import metrics

class Model:
    def __init__(self, layers=[]):
        self.layers = layers
    
    def save_model(self, namefile):
        with open(namefile, 'wb') as f:
            pickle.dump(self.layers,  f, protocol=pickle.HIGHEST_PROTOCOL)
    
    def load_model(self,namefile):
        with open(namefile, 'rb') as f:
            temp = pickle.load(f)
        self.layers = temp.copy()
    
    def forward(self, inputs):
        out = inputs.copy()
        for layer in self.layers:
            out = layer.forward(out)
        return out
    
    def _forward(self,inputs):
        out = inputs.copy()
        result = [out]
        for layer in self.layers:
            out = layer.forward(out)
            result.append(out)

        return result
    
    def fit(self,features, target, batch_size, epoch, learn_rate, momentum=1):
        y = np.array([])
        output = np.array([])

        for e in range(epoch):
            print("Epoch: ", e+1)
            sum_loss = 0
            
            for b in range(batch_size):
                current_idx = (batch_size * e + b) % len(features)
                res = self._forward(features[current_idx])
                current_output = res[len(res) - 1][0]
                current_y = target[current_idx]
                
                dE = np.array([current_y - current_output]) * -1
                for i in reversed(range(len(self.layers))):
                    dE = self.layers[i].backward(dE)
                sum_loss += 0.5 * (current_y - current_output)**2
                
                y = np.append(y, current_y)
                output = np.rint(np.append(output, current_output))

            for l in reversed(range(len(self.layers))):
                self.layers[l].update_weights(learn_rate, momentum)
            avg_loss = sum_loss/batch_size
            
            print("Loss: ", avg_loss)
            print("Accuracy: ", metrics.accuracy_score(y, output))

class ModelLSTM:

    def __init__(self, layers=[]):
        self.layers = layers
    
    def forward(self, inputs):
        out = inputs.copy()
        for layer in self.layers:
            out = layer.forward(out)
        return out
    
    def _forward(self,inputs):
        out = inputs.copy()
        result = [out]
        for layer in self.layers:
            out = layer.forward(out)
            result.append(out)
        return result
    
    def fit(self,features):
        return self.forward(features)


## Tubes 1 CNN

## Read Image

In [5]:

# function to read image to numpy array of file name
def read_dataset(dataset_path):
    folder_list = os.listdir(dataset_path)
    folder_list = sorted(folder_list)
    folder_path = []; class_label = np.ndarray(shape=(0)); class_dictionary = {}
    
    for i in range(len(folder_list)):#loop for all class folders
        class_folder_path = os.path.join(dataset_path, folder_list[i])
        list_image_name = os.listdir(class_folder_path)
        list_image_name = sorted(list_image_name)
        temp_folder_path = [os.path.join(class_folder_path, j) for j in list_image_name]
        folder_path += temp_folder_path
        temp_class_label = np.full((len(list_image_name)),np.int16(i))
        class_label = np.concatenate((class_label, temp_class_label), axis=0)
        class_dictionary[str(i)] = folder_list[i]
    return np.asarray(folder_path), class_label, class_dictionary

# function to read all images in a folder to numpy array of image matrix
def list_img_to_matrix(folder_path, size = (400, 400)):
    list_of_image_matrix = []
    for file_img in folder_path:
        image = cv2.imread(file_img, 1)

        image_matrix = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_matrix = cv2.resize(image_matrix, size)
        image_matrix = np.array(image_matrix)

        list_of_image_matrix.append(image_matrix)
    
    list_of_image_matrix = np.array(list_of_image_matrix, dtype="object")
    list_of_image_matrix = np.transpose(list_of_image_matrix, (0,3,1,2))
    return list_of_image_matrix


In [6]:
#download dataset
!gdown 1NgRAihCd-2B0YsFOtF1r8VaSBwJJHBaT
!!unzip /content/DATASET_TUBES1.zip -d test

Downloading...
From: https://drive.google.com/uc?id=1NgRAihCd-2B0YsFOtF1r8VaSBwJJHBaT
To: /content/DATASET_TUBES1.zip
  0% 0.00/888k [00:00<?, ?B/s]100% 888k/888k [00:00<00:00, 147MB/s]


['Archive:  /content/DATASET_TUBES1.zip',
 '   creating: test/cats/',
 '  inflating: test/cats/cat.0.jpg     ',
 '  inflating: test/cats/cat.15.jpg    ',
 '  inflating: test/cats/cat.17.jpg    ',
 '  inflating: test/cats/cat.19.jpg    ',
 '  inflating: test/cats/cat.2.jpg     ',
 '  inflating: test/cats/cat.21.jpg    ',
 '  inflating: test/cats/cat.23.jpg    ',
 '  inflating: test/cats/cat.26.jpg    ',
 '  inflating: test/cats/cat.36.jpg    ',
 '  inflating: test/cats/cat.38.jpg    ',
 '  inflating: test/cats/cat.40.jpg    ',
 '  inflating: test/cats/cat.45.jpg    ',
 '  inflating: test/cats/cat.48.jpg    ',
 '  inflating: test/cats/cat.49.jpg    ',
 '  inflating: test/cats/cat.50.jpg    ',
 '  inflating: test/cats/cat.58.jpg    ',
 '  inflating: test/cats/cat.60.jpg    ',
 '  inflating: test/cats/cat.61.jpg    ',
 '  inflating: test/cats/cat.71.jpg    ',
 '  inflating: test/cats/cat.9.jpg     ',
 '   creating: test/dogs/',
 '  inflating: test/dogs/dog.0.jpg     ',
 '  inflating: test/

In [7]:
DATA_PATH = "/content/test"

dataset_path = DATA_PATH
folder_path, class_label, class_dictionary = read_dataset(dataset_path)
Layers = [
    ConvolutionLayer(3,10,2,0,2),
    Pooling(2,2,"max"),
    FlattenLayer(),
    DenseLayer(2,"relu"),
    DenseLayer(4,"relu"),
    DenseLayer(1,"sigmoid")
]

matrix_images = list_img_to_matrix(folder_path)


X_train, X_test, y_train, y_test = train_test_split(matrix_images, class_label, test_size=0.1)

cnn = Model(Layers)
cnn.fit(features=X_train, target=y_train, batch_size=5, epoch=3, learn_rate=0.1)

filename = "model1"
cnn.save_model(filename)

cnn2 = Model()
cnn2.load_model(filename)

output = np.array([])
for data in X_test:
    forward_cnn = cnn2.forward(data)
    output = np.append(output, np.rint(forward_cnn))

print("\nPredicted:", output)
print("\nAccuracy:", metrics.accuracy_score(y_test, output))
print("\nConfusion matrix:\n", metrics.confusion_matrix(y_test, output))

kf = KFold(n_splits=10,shuffle=True)
best_accuracy = 0
best_model = None
for train_index, test_index in kf.split(matrix_images):
    X_train, X_test = matrix_images[train_index], matrix_images[test_index]
    y_train, y_test = class_label[train_index], class_label[test_index]

    cnnKfold = Model(layers=Layers)
    cnnKfold.fit(features=X_train, target=y_train, batch_size=5, epoch=3, learn_rate=0.1)
    output = np.array([])
    for data in X_test:
        forward_cnn = cnnKfold.forward(data)
        output = np.append(output, np.rint(forward_cnn))
    
    accuracy = metrics.accuracy_score(y_test, output)
    print("\nAccuracy:", accuracy)
    print("Confusion matrix:\n", metrics.confusion_matrix(y_test, output))
    if accuracy > best_accuracy:
        best_accuracy = accuracy
    
print("\nBest Accuracy:", best_accuracy)


Epoch:  1


  """


Loss:  0.25
Accuracy:  0.2
Epoch:  2
Loss:  0.1306787176146603
Accuracy:  0.2
Epoch:  3
Loss:  0.1259741615752928
Accuracy:  0.26666666666666666

Predicted: [1. 1. 1. 1.]

Accuracy: 0.0

Confusion matrix:
 [[0 4]
 [0 0]]
Epoch:  1
Loss:  0.12661428005104558
Accuracy:  0.0
Epoch:  2
Loss:  0.11772587648905235
Accuracy:  0.5
Epoch:  3
Loss:  0.11795177231696716
Accuracy:  0.6666666666666666

Accuracy: 0.25
Confusion matrix:
 [[1 0]
 [3 0]]
Epoch:  1
Loss:  0.11794588491276932
Accuracy:  1.0
Epoch:  2
Loss:  0.11794603825290602
Accuracy:  1.0
Epoch:  3
Loss:  0.11794603425902409
Accuracy:  1.0

Accuracy: 0.25
Confusion matrix:
 [[1 0]
 [3 0]]
Epoch:  1
Loss:  0.11794603436304825
Accuracy:  1.0
Epoch:  2
Loss:  0.1179460343603389
Accuracy:  1.0
Epoch:  3
Loss:  0.1179460343604094
Accuracy:  1.0

Accuracy: 0.75
Confusion matrix:
 [[3 0]
 [1 0]]
Epoch:  1
Loss:  0.11794603436040756
Accuracy:  1.0
Epoch:  2
Loss:  0.11794603436040763
Accuracy:  1.0
Epoch:  3
Loss:  0.11794603436040763
Accurac

## TUBES 2

In [8]:
!gdown 1b_C6n6XcDwtQ8nmC-dUGmyAw2B2DrShV
!gdown 1uqVXJcLlCssqIqtR85unU2Z6uiBxqZNT

Downloading...
From: https://drive.google.com/uc?id=1b_C6n6XcDwtQ8nmC-dUGmyAw2B2DrShV
To: /content/ETH-USD-Train.csv
100% 191k/191k [00:00<00:00, 114MB/s]
Downloading...
From: https://drive.google.com/uc?id=1uqVXJcLlCssqIqtR85unU2Z6uiBxqZNT
To: /content/ETH-USD-Test.csv
100% 5.60k/5.60k [00:00<00:00, 10.2MB/s]


In [9]:
from sklearn.model_selection import train_test_split, KFold
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

train_path = '/content/ETH-USD-Train.csv'
test_path = '/content/ETH-USD-Test.csv'

df_train = pd.read_csv(train_path, infer_datetime_format=True)
df_test = pd.read_csv(test_path, infer_datetime_format=True)
df_train['Date'] = pd.to_datetime(df_train.Date)
dataset = df_train.loc[:, ['Close']].values
dataset = dataset.reshape(-1, 1)
scaler = MinMaxScaler(feature_range = (0, 1))
data_scaled = scaler.fit_transform(dataset)

X_train = []
y_train = []
time_step = 32
for i in range(len(data_scaled) - time_step - 1):
    a = data_scaled[i:(i + time_step), 0]
    X_train.append(a)
    y_train.append(data_scaled[i + time_step, 0])
X_train = np.array(X_train)
y_train = np.array(y_train)
print("trainX shape: {}\ntrainY shape: {}". format(X_train.shape, y_train.shape))
print(X_train[0])

trainX shape: (1717, 32)
trainY shape: (1717,)
[0.0500395  0.0454642  0.04872747 0.04729487 0.0491579  0.05358176
 0.05267774 0.05216312 0.05247405 0.05569289 0.0571257  0.05973666
 0.05839797 0.06268138 0.06892405 0.08261865 0.08079221 0.0818612
 0.08377014 0.08219371 0.07259534 0.07673914 0.08084805 0.08019425
 0.08070273 0.08162304 0.08015872 0.0728206  0.07405162 0.07862523
 0.08232062 0.07559844]


In [10]:
layer = [
    LSTMLayer(32, 256),
    DenseLayer(1,"linear")
]
model = ModelLSTM(layer)
model.fit(X_train)

array([860.70039794])