<a href="https://colab.research.google.com/github/arnisafazla/Neural_Networks_Implementations/blob/main/encoder_decoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import h5py
import os
import numpy as np
import matplotlib.pyplot as plt
import math
import random
from numpy.random import seed
from numpy.random import randn
from numpy import linspace
import cv2
from google.colab.patches import cv2_imshow

In [None]:
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
file = h5py.File("/content/drive/MyDrive/EEE443/hw3/assign3_data1.h5", "r")
data = file['data']
data = np.array([np.stack((sample[0], sample[1], sample[2]), axis=2) for sample in data])

In [None]:
class Dataset(object):
  def __init__(self, data, to_grayscale=True, normalize=True):
    self.raw_data = np.array(data)
    processed_data = data
    if to_grayscale:
      processed_data = self.to_grayscale(processed_data)
    if normalize:
      processed_data = self.normalize_data(processed_data)
    self.data = np.array(processed_data)
  def to_grayscale(self, data):
    R = data.T[0].T
    G = data.T[1].T
    B = data.T[2].T
    return R * 0.2126 + G * 0.7152 + B * 0.0722
  def normalize_data(self, data):
    data = data - np.mean(data)
    data = np.clip(data, data.mean() - 3 * data.std(), data.mean() + 3 * data.std())
    data = (data - data.min()) / (data.max() - data.min())
    return data * (0.9 - 0.1) + 0.1
  def visualize(self, indices):
    # no_of_images should be 10 * k
    img_ver_raw = []
    img_hor_raw = []
    img_ver = []
    img_hor = []
    for sample in range(indices.shape[0]):
      normed = cv2.normalize(self.raw_data[indices[sample]], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
      resized = cv2.resize(normed, (100, 100))
      img_hor_raw.append(resized)

      normed = cv2.normalize(self.data[indices[sample]], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
      resized = cv2.resize(normed, (100, 100))
      img_hor.append(resized)
      if (sample + 1) % 10 == 0:
        img_ver_raw.append(img_hor_raw)
        img_hor_raw = []
        img_ver.append(img_hor)
        img_hor = []

    cv2.imwrite('raw_images.png', cv2.vconcat([cv2.hconcat(row) for row in img_ver_raw]))
    cv2.imwrite('processed_images.png', cv2.vconcat([cv2.hconcat(row) for row in img_ver]))

In [None]:
dataset = Dataset(data)

In [None]:
indices = np.random.randint(0,data.shape[0],200)
dataset.visualize(indices)

In [None]:
class DenseLayer (object):
  def __init__(self, no_of_units, input_shape, init_type, init_std, init_range, learning_rate):
    self.no_of_units = no_of_units
    if init_type == 'Gaussian':
      self.W = np.random.normal(0, init_std, no_of_units * (input_shape + 1)).reshape((no_of_units, input_shape + 1))
    elif init_type == 'uniform':
      self.W = np.random.uniform(low=init_range[0], high=init_range[1], size=no_of_units * input_shape).reshape((no_of_units, input_shape))
      self.b = np.random.uniform(low=init_range[0], high=init_range[1], size=no_of_units)
    # W has b as well
    self.learning_rate = learning_rate
    self.alpha = alpha
    self.update = np.zeros(self.W.shape)
    self.r = np.zeros(self.W.shape)
    return
    # alpha and r for RMSProp optimization.

  def forward_pass(self, X):
    # print('in forward pass')
    # print("x: ", X.shape)
    X_e = np.array([np.append(row, -1) for row in X])
    # print("X_e, self.W.T: ")
    # print(X_e.shape, self.W.T.shape)
    v = np.matmul(X_e, self.W.T)
    # save for backpropagation
    self.X = X_e
    self.v = v
    # print("tanh(v): ")
    # print(tanh(v).shape)
    return tanh(v)
  def back_propogate(self, error, batch_size):
    # print("error: ", error)
    delta = np.array([error[i] * tanh_derivative(self.v[i]) for i in range(self.v.shape[0])])
    # print("delta: ", delta)
    # print("X: ", self.X)
    gradient = np.matmul(delta.T, self.X)
    self.r = self.alpha * self.r + (1 - self.alpha) * gradient ** 2
    gradient_sum = np.sum(np.abs(gradient))
    # print("     W: ", self.W.shape)
    # print("gradient: ", self.gradient)
    self.W_update = self.learning_rate * gradient / (0.0000000000000001 + np.sqrt(self.r))
    self.W_update = self.W_update / batch_size  # take average of the W updates
    # print("W_update: ", self.W_update)
    W_raw = self.W.T[0:self.W.shape[-1] - 1].T
    self.W = self.W + self.W_update 
    # print("     W_raw: ", W_raw.shape)
    # print("     delta: ", delta.shape)
    next_error = np.matmul(delta, W_raw)
    return next_error, gradient_sum

In [None]:
class AutoEncoderModel(object):
  def __init__(self, layers, units, input_shape, init_type='Gaussian', init_std=0.1, init_range=(-0.1,0.1), learning_rate=0.1):
    self.layers = []
    self.layers.append(DenseLayer(units[0], input_shape=input_shape, init_type=init_type, init_std=init_std, init_range=init_range, learning_rate=learning_rate))
    for i in range(layers-1):
      self.layers.append(DenseLayer(units[i+1], input_shape=units[i], init_type=init_type, init_std=init_std, init_range=init_range, learning_rate=learning_rate))
    self.no_of_layers = layers
    self.input_shape = input_shape

  def train(self, X_train, Y_train, X_test, Y_test, batch_size, max_epochs, verbose=0):
    MSE_train_hist = []
    MCE_train_hist = []
    MSE_test_hist = []
    MCE_test_hist = []

    n = X_train.shape[0]
    no_of_batches = n // batch_size
    extra = n % batch_size

    # shuffle X_test, Y_Test once
    index = [*range(Y_test.shape[0])]
    random.shuffle(index)
    X_test_shuffled = np.array([X_test[i] for i in index])
    Y_test_shuffled = np.array([Y_test[i] for i in index])

    for epoch in range(max_epochs):   # ??
      # for loop to see the smallest error
      gradient = 0
      MSE_train = 0
      MCE_train = 0

      # shuffle X_train, Y_train
      index = [*range(n)]
      random.shuffle(index)
      X_shuffled = np.array([X_train[i] for i in index])
      Y_shuffled = np.array([Y_train[i] for i in index])

      for i in range(no_of_batches):
        if i < no_of_batches - 1:
          net_batch_size = batch_size
          batch_X = X_shuffled[i * batch_size:(i+1) * batch_size]
          batch_Y = Y_shuffled[i * batch_size:(i+1) * batch_size]
          
        else:
          net_batch_size = batch_size + extra
          batch_X = X_shuffled[i * batch_size:(i+1) * batch_size + extra]
          batch_Y = Y_shuffled[i * batch_size:(i+1) * batch_size + extra]
          
        for layer in self.layers:
          batch_X = layer.forward_pass(batch_X)
        MSE_train = MSE_train + MSE(batch_X, batch_Y)
        MCE_train = MCE_train + MCE(batch_X, batch_Y)
        # return batch_X, batch_Y
        layer_error = error(batch_X, batch_Y)
        # print("output layer error: ", layer_error)
        for i in range(self.no_of_layers-1, -1, -1):
          # try:
          layer_error, grad = self.layers[i].back_propogate(layer_error, net_batch_size)
          gradient += grad
          #except:
          #  return 
          # print("hidden layer error: ", layer_error)
          # in backpropogate update the Ws and return W.T * delta for the next layer
      MSE_train_hist.append(MSE_train / no_of_batches)
      MCE_train_hist.append(MCE_train / no_of_batches)
      MSE_test, MCE_test = self.test(X_test_shuffled, Y_test_shuffled)
      MSE_test_hist.append(MSE_test)
      MCE_test_hist.append(MCE_test)
      if verbose:
        print("Epoch ", epoch, " => MSE_train: ", MSE_train, ", MCE_train: ", MCE_train, ", MSE_test: ", MSE_test, ", MCE_test: ", MCE_test, " gradient: ", gradient)
    return MSE_train_hist, MCE_train_hist, MSE_test_hist, MCE_test_hist

  def test(self, X, Y):
    X_tmp = X
    for layer in self.layers:
      X_tmp = layer.forward_pass(X_tmp)
      # X_tmp has the output now
    return MSE(X_tmp, Y), MCE(X_tmp, Y)

In [None]:
L_pre = L_hid = 64
L_post = 50
l = 5 * 10 ** (-4)
beta = 0.1
rho = 0.1
L_in = input_shape = data.shape[1] * data.shape[2]
w0 = np.sqrt(6 / (L_pre + L_post))
learning_rate = 0.1
autoencoder = Model(layers=2, units=[L_pre, L_post], input_shape=input_shape, init_type='uniform', init_range=(-w0,w0), learning_rate=learning_rate)

In [None]:
W1 = np.random.uniform(low=-w0, high=w0, size=L_pre * Lin).reshape((L_pre, Lin))
b1 = np.random.uniform(low=-w0, high=w0, size=L_pre)
W2 = np.random.uniform(low=-w0, high=w0, size=L_post * Lhid).reshape((L_pre, Lin))
b2 = np.random.uniform(low=-w0, high=w0, size=L_post)

In [None]:
def aeCost(We, data, params):
  # We = [W1 W2 b1 b2]
  # data: (input_shape, N) : literally data
  # params: Lin, Lhid, l (lambda), beta, rho
  # Lin : input_shape, Lhid: L_pre, Lout: L_post
  # assume linear activation function for now
  Lin, Lhid, l, beta, rho = params
  W1, W2, b1, b2 = We
  N = data.shape[0]
  X = data.reshape((N, data[1] * data[2]))
  hidden_signal = np.matmul(X, W1.T) - b1
  # hidden_signal[hidden_signal < 0] = 0        # relu activation
  output = np.matmul(hidden_signal, W2.T) - b2
  first_term = np.sum((output - X) ** 2) / (2 * N)
  second_term = (np.sum(W1 ** 2) + np.sum(np.concatenate(W2 ** 2))) * l / 2
  rho_b = np.array([np.average(row) for row in hidden_signal.T])
  # rho is already given
  third_term = kl_divergence(rho, rho_b) * beta
  J = first_term + second_term + third_term

  # calculate gradients for the layers separately
  first_gradient_common = -(1/N) * np.sum(X - output)
  first_gradient_2 = np.matmul(first_gradient_common, hidden_signal)
  first_gradient_1 = np.matmul(np.matmul(first_gradient_common, W2), X)
  second_gradient_2 = l * W2
  second_gradient_1 = l * W1
  rho_over_b = np.sum(rho / rho_b)
  third_gradient_1 = np.array([-beta * rho_over_b[i] * X[i] for i in range(Lhid)])
  J_grad_1 = first_gradient_1 + second_gradient_1
  J_grad_2 = first_gradient_1 + second_gradient_1


In [None]:
def rvs(p,size=1):
  uniform = np.random.uniform(low=0, high=1, size=size)
  print(uniform)
  return np.array([uniform < p], dtype=int)
def kl_divergence(p, q):
	return sum(p[i] * log2(p[i]/q[i]) for i in range(p.shape[0]))