In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/ri-projekat
%ls

/content/drive/MyDrive/ri-projekat
bible.txt  main.ipynb         [0m[01;34mmodel-1661265670[0m/  [01;34mmodel-1661294627[0m/
input.txt  [01;34mmodel-1660904954[0m/  [01;34mmodel-1661265700[0m/  test.pickle


In [3]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.utils.extmath import cartesian
import pickle
import time
import os

import warnings
warnings.simplefilter('error', RuntimeWarning)

In [4]:
def saveModel(model,filename):
  with open(filename, "wb") as outfile:
    pickle.dump(model, outfile)
  pass

def loadModel(filename):
  with open(filename, "rb") as infile:
    model = pickle.load(infile)
    return model

In [5]:
class ActivationLinear:
  def forward(self,X):
    return X
  def backward(self,dh,output):
    return dh

class ActivationReLU:
  def forward(self,X):
    return np.maximum(0,X)
  def backward(self,dh,output):
    dh[output <= 0] = 0
    return dh

class ActivationTanh:
  def forward(self,X):
    return np.tanh(X)
  def backward(self,dh,output):
    dh = (1 - output*output) * dh
    return dh

class ActivationSigmoid:
  def forward(self,X):
    return 1/(1+np.exp(-X))
  def backward(self,dh,output):
    dh = output*(1-output) * dh
    return dh

class ActivationSoftmax:
  def forward(self,X):
    eX = np.exp(X)
    return eX / np.sum(eX, axis=1, keepdims=True)
  def backward(self,dh,output):
    return dh

In [6]:
class Dense:
  def __init__(self,input_size,layer_size,activation=ActivationLinear()):
    self.input_size = input_size
    self.layer_size = layer_size
    self.W = 0.1*np.random.randn(input_size,layer_size)
    self.b = 0.1*np.random.randn(1,layer_size)
    self.activation = activation
  def reset(self):
    pass
  def forward(self,X):
    # self.W = np.clip(self.W,-1,1)
    # self.b = np.clip(self.b,-1,1)
    self.input = X
    val = np.dot(X,self.W) + self.b
    self.output = self.activation.forward(val)
    return self.output
  def backward(self,d):
    d = self.activation.backward(d,self.output)
    dW = np.dot(self.input.T,d)
    db = np.sum(d,axis=0,keepdims=True)
    dh = np.dot(d,self.W.T)

    dW = np.clip(dW,-1,1)
    db = np.clip(db,-1,1)

    return dh, dW, db, dW

class Recurrent:
  def __init__(self,input_size,layer_size,activation=ActivationLinear()):
    self.input_size = input_size
    self.layer_size = layer_size
    self.W = 0.1*np.random.randn(input_size,layer_size)
    self.Wh = 0.1*np.random.randn(layer_size,layer_size)
    self.b = 0.1*np.random.randn(1,layer_size)
    self.hprev = np.zeros((1,self.layer_size))
    # self.h = np.zeros((1,layer_size))
    self.activation = activation
  def reset(self):
    self.hprev = np.zeros((1,self.layer_size))
  def forward(self,X):
    # self.W = np.clip(self.W,-1,1)
    # self.b = np.clip(self.b,-1,1)
    # self.Wh = np.clip(self.Wh,-1,1)
    self.input = X
    self.output = np.zeros((X.shape[0],self.layer_size))
    self.h = np.zeros((X.shape[0],self.layer_size))
    self.h[0] = self.hprev
    for i in range(X.shape[0]):
      val = np.dot(X[i],self.W) + np.dot(self.h[i], self.Wh) + self.b
      self.output[i] = self.activation.forward(val)
      if(i+1 < X.shape[0]):
        self.h[i+1] = self.output[i]
      else:
        self.hprev = self.output[i]
    return self.output
  def backward(self,d):
    dW = np.zeros_like(self.W)
    db = np.zeros_like(self.b)
    dWh = np.zeros_like(self.Wh)
    for i in reversed(range(d.shape[0])):
      d[i] = self.activation.backward(d[i],self.output[i])
      dW += np.dot(self.input[i][np.newaxis].T,d[i][np.newaxis])
      db += np.sum(d[i],axis=0,keepdims=True)
      dWh += np.dot(self.h[i][np.newaxis].T,d[i][np.newaxis])
    dh = np.dot(d,self.W.T)

    dW = np.clip(dW,-1,1)
    db = np.clip(db,-1,1)
    dWh = np.clip(dWh,-1,1)

    return dh, dW, db, dWh

class LSTM:
  def __init__(self,input_size,layer_size,activation=ActivationLinear()):
    self.input_size = input_size
    self.layer_size = layer_size
    self.W = 0.1*np.random.randn(input_size + layer_size,4*layer_size)
    self.b = 0.1*np.random.randn(1,4*layer_size)
    self.hprev = np.zeros((1,self.layer_size))
    self.cprev = np.zeros((1,self.layer_size))
    self.activation = activation
    self.tanh = ActivationTanh()
    self.sigmoid = ActivationSigmoid()
  def reset(self):
    self.hprev = np.zeros((1,self.layer_size))
    self.cprev = np.zeros((1,self.layer_size))
  def forward(self,X):
    self.input = X
    self.output = np.zeros((X.shape[0],self.layer_size))
    self.h = np.zeros((X.shape[0],self.input_size + self.layer_size))
    self.h[0,self.input_size:] = self.hprev
    self.c = np.zeros((X.shape[0],self.layer_size))
    self.c[0] = self.cprev
    self.ct = np.zeros((X.shape[0],self.layer_size))
    self.IFOG = np.zeros((X.shape[0],self.W.shape[1]))
    self.IFOGf = np.zeros((X.shape[0],self.W.shape[1]))
    for i in range(X.shape[0]):
      self.h[i,0:self.input_size] = np.copy(X[i])
      self.IFOG[i] = np.dot(self.h[i],self.W) + self.b
      self.IFOGf[i,:self.layer_size] = self.tanh.forward(self.IFOG[i,:self.layer_size])
      self.IFOGf[i,self.layer_size:] = self.sigmoid.forward(self.IFOG[i,self.layer_size:])
      ctmp = self.IFOGf[i,:self.layer_size] * self.IFOGf[i,self.layer_size:2*self.layer_size]
      ctmp += self.IFOGf[i,2*self.layer_size:3*self.layer_size] * self.c[i]
      self.ct[i] = self.tanh.forward(ctmp)
      htmp = self.IFOGf[i,3*self.layer_size:] * self.ct[i]
      if(i+1 < X.shape[0]):
        self.c[i+1] = ctmp
        self.h[i+1,self.input_size:] = htmp        
      else:
        self.cprev = ctmp
        self.hprev = htmp

      val = htmp
      self.output[i] = self.activation.forward(val)
    return self.output
  def backward(self,d):
    dW = np.zeros_like(self.W)
    db = np.zeros_like(self.b)

    dIFOG = np.zeros(self.IFOG.shape)
    dIFOGf = np.zeros(self.IFOGf.shape)
    dc = np.zeros(self.c.shape)

    d = self.activation.backward(d,self.output)

    for i in reversed(range(d.shape[0])):

      dIFOGf[i,3*self.layer_size:] = self.ct[i] * d[i]

      dc[i] += self.tanh.backward(self.IFOGf[i,3*self.layer_size:]*d[i],self.ct[i])

      dIFOGf[i,:self.layer_size] = self.IFOGf[i,self.layer_size:2*self.layer_size] * dc[i]
      dIFOGf[i,self.layer_size:2*self.layer_size] = self.IFOGf[i,:self.layer_size] * dc[i]

      dIFOGf[i,2*self.layer_size:3*self.layer_size] = self.c[i] * dc[i]
      if(i>0):
        dc[i-1]+= self.IFOGf[i,2*self.layer_size:3*self.layer_size] * dc[i]

      dIFOG[i,:self.layer_size] = self.tanh.backward(dIFOGf[i,:self.layer_size],self.IFOGf[i,:self.layer_size])
      dIFOG[i,self.layer_size:] = self.sigmoid.backward(dIFOGf[i,self.layer_size:],self.IFOGf[i,self.layer_size:])

      dW += np.dot(self.h[i][np.newaxis].T,dIFOG[i][np.newaxis])
      db += np.sum(dIFOG[i],axis=0,keepdims=True)

      dh = np.dot(dIFOG[i],self.W.T)
      if(i>0):
        d[i-1] += dh[self.input_size:]

    dh = np.dot(dIFOG[0],self.W.T)

    dW = np.clip(dW,-1,1)
    db = np.clip(db,-1,1)

    return dh, dW, db, dW


In [7]:
class LossMSE:
  def __init__(self):
    pass
  def calculate(self,y_pred,y_true):
    return np.mean((y_pred - y_true)**2)
  def derivative(self,y_pred,y_true):
    return 2*(y_pred - y_true)/y_true.shape[0] 

class LossCrossEntropy:
  def __init__(self):
    pass
  def calculate(self,y_pred,y_true):
    log_prob = -np.log(y_pred[range(y_true.shape[0]),y_true])
    return np.mean(log_prob)
  def derivative(self,y_pred,y_true):
    d = np.copy(y_pred)
    d[range(y_true.shape[0]),y_true] -= 1
    return d/y_true.shape[0]

In [8]:
class OptimizerGD:
  def __init__(self,n_epochs=10000,learning_rate=0.05,batch_size=0,loss=LossMSE(),displayProgress=None, printEvery = 100):
    self.n_epochs = n_epochs
    self.learning_rate = learning_rate
    self.batch_size = batch_size
    self.loss_function = loss
    self.displayProgress=displayProgress
    self.printEvery = printEvery
  def optimize(self,X,y,layers):
    batch_size = self.batch_size
    if batch_size == 0:
      batch_size = X.shape[0]
    for i in range(self.n_epochs+1):
      for j in range(int(np.ceil(X.shape[0]/batch_size))):
        batch_start = (j*batch_size) % X.shape[0]
        batch_end = np.minimum(batch_start + batch_size, X.shape[0])
        output = X[batch_start:batch_end+1]
        for layer in layers:
          output = layer.forward(output)

        yt = y[batch_start:batch_end+1]

        loss = self.loss_function.calculate(output,yt)
        if(i % self.printEvery == 0 and j==0):
          print('epoch %d/%d, loss: %f' % (i, self.n_epochs, loss))

        dh = self.loss_function.derivative(output,yt)
        for layer in reversed(layers):
          dh, dW, db, dWh = layer.backward(dh)
          layer.W -= self.learning_rate*dW
          layer.b -= self.learning_rate*db

          if hasattr(layer, 'Wh'):
            layer.Wh -= self.learning_rate*dWh

class OptimizerAdam:
  def __init__(self,n_epochs=10000,learning_rate=0.05,batch_size=0,beta1=0.9,beta2=0.999,delta=1e-7,loss=LossMSE(),displayProgress=None, printEvery = 100):
    self.n_epochs = n_epochs
    self.learning_rate = learning_rate
    self.batch_size = batch_size
    self.alpha = learning_rate
    self.beta1 = beta1
    self.beta2 = beta2
    self.delta = delta
    self.loss_function = loss
    self.displayProgress=displayProgress
    self.printEvery = printEvery
    self.layersInitialized = False
  def optimize(self,X,y,layers):
    if not self.layersInitialized:
      for layer in layers:
        layer.mW = np.zeros_like(layer.W)
        layer.vW = np.zeros_like(layer.W)
        layer.mb = np.zeros_like(layer.b)
        layer.vb = np.zeros_like(layer.b)
        if hasattr(layer, 'Wh'):
          layer.mWh = np.zeros_like(layer.Wh)
          layer.vWh = np.zeros_like(layer.Wh)
      if self.batch_size == 0:
        self.batch_size = X.shape[0]

      if isinstance(self.loss_function, LossCrossEntropy):
        self.smooth_loss = -np.log(1.0/X.shape[1])*self.batch_size
      else:
        self.smooth_loss = 0
      self.i = 0
      self.j = 0
      self.layersInitialized = True

    batch_size = self.batch_size

    while self.i < self.n_epochs+1:
      # for j in range(int(np.ceil(X.shape[0]/batch_size))):
      #   batch_start = (j*batch_size) % X.shape[0]
      #   batch_end = np.minimum(batch_start + batch_size, X.shape[0])
      #   output = X[batch_start:batch_end+1]
      while self.j < X.shape[0] - batch_size + 1:
        batch_start = self.j
        batch_end = self.j + batch_size
        output = np.copy(X[batch_start:batch_end])

        for layer in layers:
          layer.reset()
          output = layer.forward(output)

        yt = np.copy(y[batch_start:batch_end])

        loss = self.loss_function.calculate(output,yt)
        if isinstance(self.loss_function, LossCrossEntropy):
          self.smooth_loss = self.smooth_loss * 0.999 + loss * 0.001
        else:
          self.smooth_loss = loss
        if(self.i % self.printEvery == 0 and self.j==0):
          print('epoch %d/%d, loss: %f' % (self.i, self.n_epochs, self.smooth_loss))

        dh = self.loss_function.derivative(output,yt)
        for layer in reversed(layers):
          dh, dW, db, dWh = layer.backward(dh)

          layer.mW = self.beta1 * layer.mW + (1-self.beta1) * dW
          layer.vW = self.beta2 * layer.vW + (1-self.beta2) * np.square(dW)
          mW_hat = layer.mW / (1 - self.beta1**(self.i+1))
          vW_hat = layer.vW / (1 - self.beta2**(self.i+1))
          layer.W -= self.learning_rate*mW_hat/(np.sqrt(vW_hat) + self.delta)
          
          layer.mb = self.beta1 * layer.mb + (1-self.beta1) * db
          layer.vb = self.beta2 * layer.vb + (1-self.beta2) * np.square(db)
          mb_hat = layer.mb / (1 - self.beta1**(self.i+1))
          vb_hat = layer.vb / (1 - self.beta2**(self.i+1))
          layer.b -= self.learning_rate*db

          if hasattr(layer, 'Wh'):
            layer.mWh = self.beta1 * layer.mWh + (1-self.beta1) * dWh
            layer.vWh = self.beta2 * layer.vWh + (1-self.beta2) * np.square(dWh)
            mWh_hat = layer.mWh / (1 - self.beta1**(self.i+1))
            vWh_hat = layer.vWh / (1 - self.beta2**(self.i+1))
            layer.Wh -= self.learning_rate*dWh
        self.j += 1
      self.j = 0
      if(self.i % self.printEvery == 0 and self.displayProgress):
         self.displayProgress(self.model)
      for layer in layers:
        layer.reset()
      self.i += 1
      if(self.i % self.printEvery == 0):
        saveModel(self.model, "model-" + str(self.model.timestamp) + "/" + "checkpoint-" + str(self.i) + "-" + str(round(self.smooth_loss,6)) + ".pickle")
    self.i = 0

In [9]:
class Sequential:
  def __init__(self,optimizer=OptimizerGD()):
    self.timestamp = round(time.time())
    os.makedirs("model-" + str(self.timestamp), exist_ok=True)
    self.layers = []
    self.optimizer = optimizer
    optimizer.model = self
    print("Model " + str(self.timestamp) + " initialized.")
  def add(self,layer):
    self.layers.append(layer)
  def train(self,X,y):
    self.optimizer.optimize(X,y,self.layers)
  def predict(self,X):
    output = np.copy(X)
    for layer in self.layers:
      layer.reset()
      output = layer.forward(output)
    return output

In [10]:
def TestRegression():
  X = np.arange(-1,1,0.01).reshape((-1,1))
  y = np.sin(X*3*np.pi)

  optimizer = OptimizerAdam(learning_rate=0.05,n_epochs=100000,batch_size = 0,loss=LossMSE(),printEvery=10000)

  model = Sequential(optimizer)
  model.add(Dense(1,10,activation = ActivationTanh()))
  model.add(Dense(10,1,activation = ActivationLinear()))

  model.train(X,y)

  plt.plot(X,y)
  plt.plot(X,model.predict(X))
# TestRegression()

In [11]:
def TestClassification():
  # Dataset code from https://cs231n.github.io/neural-networks-case-study/#data
  N = 100 # number of points per class
  D = 2 # dimensionality
  K = 3 # number of classes
  X = np.zeros((N*K,D)) # data matrix (each row = single example)
  y = np.zeros(N*K, dtype='uint8') # class labels
  for j in range(K):
    ix = range(N*j,N*(j+1))
    r = np.linspace(0.0,1,N) # radius
    t = np.linspace(j*4,(j+1)*4,N) + np.random.randn(N)*0.2 # theta
    X[ix] = np.c_[r*np.sin(t), r*np.cos(t)]
    y[ix] = j
  # lets visualize the data:
  plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.Spectral)
  plt.show()

  optimizer = OptimizerAdam(learning_rate=0.01,n_epochs=10000,batch_size = 0,loss=LossCrossEntropy(),printEvery = 1000)

  model = Sequential(optimizer)
  model.add(Dense(2,100,activation = ActivationReLU()))
  model.add(Dense(100,3,activation = ActivationSoftmax()))

  model.train(X,y)

  X_visualize = cartesian([np.arange(-1,1,0.01), np.arange(-1,1,0.01)])
  y_pred = np.argmax(model.predict(X_visualize), axis=1)
  plt.scatter(X_visualize[:, 0], X_visualize[:, 1], c=y_pred, s=40, cmap=plt.cm.gray,marker='.')
  plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.Spectral, marker='o')
  plt.show()
# TestClassification()

In [12]:
# some code here is taken from https://gist.github.com/karpathy/d4dee566867f8291f086
# data = "The quick brown fox jumps over the lazy dog. "
data = open('input.txt', 'r').read()
chars = list(set(data))
chars.sort()
data_size, vocab_size = len(data), len(chars)
print('data has %d characters, %d unique.' % (data_size, vocab_size))
char_to_ix = { ch:i for i,ch in enumerate(chars) }
ix_to_char = { i:ch for i,ch in enumerate(chars) }

# seq_length = 10
Xt = np.array([char_to_ix[ch] for ch in data[:-1]]).reshape((-1,1))
yt = np.array([char_to_ix[ch] for ch in data[1:]]).reshape((-1,1))

# print(''.join(ix_to_char[ix[0]] for ix in Xt))
# print(''.join(ix_to_char[ix[0]] for ix in yt))

X = np.zeros((data_size-1,vocab_size))
for i in range(data_size-1):
  X[i][Xt[i]] = 1

# y = np.zeros((data_size-1,vocab_size))
# for i in range(data_size-1):
#   y[i][yt[i]] = 1

y = yt.T[0]

data has 12430 characters, 54 unique.


In [13]:
def showSample(model):
  sample = np.copy(X[0][np.newaxis])
  for i in range(100):
    next = model.predict(sample)
    sample = np.vstack((sample,next[-1,:]))

  y_pred = np.argmax(sample, axis=1)
  print(''.join(ix_to_char[ix] for ix in y_pred))

In [14]:
def TestTextGeneration():
  optimizer = OptimizerAdam(learning_rate=0.01,n_epochs=1000,batch_size = 64,loss=LossCrossEntropy(),displayProgress = showSample, printEvery=1)

  hidden_size = 128

  model = Sequential(optimizer)
  model.add(LSTM(vocab_size,hidden_size,activation = ActivationLinear()))
  # model.add(Recurrent(vocab_size,hidden_size,activation = ActivationTanh()))
  # model.add(Recurrent(hidden_size,hidden_size,activation = ActivationTanh()))
  # model.add(Dense(hidden_size,hidden_size,activation = ActivationTanh()))
  model.add(Dense(hidden_size,vocab_size,activation = ActivationSoftmax()))

  model.train(X,y)
# TestTextGeneration()

In [21]:
def TestContinueTraining():
  model = loadModel("model-1661265700/checkpoint-57-0.15962206.pickle")

  model.train(X,y)
# TestContinueTraining()

In [16]:
def samplePrompt(model,prompt,length):
  Xt = np.array([char_to_ix[ch] for ch in prompt]).reshape((-1,1))

  X = np.zeros((len(prompt),vocab_size))
  for i in range(len(prompt)):
    X[i][Xt[i]] = 1

  sample = np.copy(X)
  for i in range(length):
    next = model.predict(sample)
    sample = np.vstack((sample,next[-1,:]))

  y_pred = np.argmax(sample, axis=1)
  print(''.join(ix_to_char[ix] for ix in y_pred))  

In [22]:
def TestSampleModel():
  model = loadModel("model-1661265700/checkpoint-57-0.15962206.pickle")
  samplePrompt(model,"I am writing this under an appreciable mental strain, ",100)
TestSampleModel()

I am writing this under an appreciable mental strain, sharmly id meriniul.
The end is near. I hear a noise at the door, as of some immense slippery body l
