In [2]:
"""
mounting the google drive to use text data and to clone GItHub repositories
"""

from google.colab import drive
drive.mount('/content/drive/')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive/


In [0]:
"""
important libraries imported
"""

from __future__ import print_function
import os, io, sys, random
import numpy as np
import tensorflow as tf
from tensorflow import one_hot
from tensorflow.keras.callbacks import LambdaCallback
from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras.layers import Dense, LSTM, Activation, Dropout, Input, Lambda, Reshape, Masking
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.utils import to_categorical, plot_model

In [4]:
"""
making sure tensorflow's version2 is used in this notebook
"""

%tensorflow_version 2.x

import os

if int(tf.__version__[0]) < 2:
  !pip install tensorflow==2.1

print("Tensorflow version: " + tf.__version__)

Tensorflow version: 2.2.0-rc3


In [5]:
"""
testing if connected to TPU and/or GPU
"""

import pprint

if 'COLAB_TPU_ADDR' not in os.environ:
  print('Not connected to a TPU runtime.')
else:
  tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR']
  print ('Connected to TPU.\n\nTPU address is', tpu_address)

  with tf.compat.v1.Session((tpu_address)) as session:
    devices = session.list_devices()
    
  print('TPU devices:')
  pprint.pprint(devices)

if tf.test.gpu_device_name() == '':
  print('\n\nNot connected to a GPU runtime.')
else:
  print('\n\nConnected to GPU: ' + tf.test.gpu_device_name())

Not connected to a TPU runtime.


Connected to GPU: /device:GPU:0


In [6]:
"""
cleaning the data and forming the examples
"""

import numpy as np
import io


path = "/content/drive/My Drive/data_1.txt"

with io.open(path, encoding='utf-8') as corpus:
    text = corpus.read()
#print(text)

LENGTH = len(text)
Tx = 11 # length of each example (characters)
vocab = sorted(set([word for line in text for word in line.split()]))
#vocab = sorted(set(list(text))) # list (a set actually) of all the characters in the corpus
print(vocab)
char_to_indices = dict((ch, idx) for idx, ch in enumerate(vocab))
index_to_char = dict((idx, ch) for idx, ch in enumerate(vocab))

# pretty much temporary variables just for the sake of splitting the huge corpus
sentences = [] # X
mapped_chars = [] # Y

step = 3

for i in range(0, LENGTH - Tx, step):
    temp_text = text[i: i+Tx]
    sentences.append(temp_text[:-1])
    mapped_chars.append(temp_text[-1])

m = len(sentences)
print("Lenght:", len(vocab))
X = np.zeros((m, Tx - 1, len(vocab)))
Y = np.zeros((m, len(vocab)))

for i, example in enumerate(sentences):
    X[i, :, :] = one_hot([char_to_indices[ch] for ch in example], depth=len(vocab))
    Y[i, :] = one_hot(char_to_indices[mapped_chars[i]], depth=len(vocab))

# a nuisance is fixed by turning X and Y into numpy arrays
X = np.asarray(X)
Y = np.asarray(Y)

#==============printing data dimesions=========================================
print(f"Length of corpus: {LENGTH}")
print(f"X.shape = {X.shape}")
print(f"Y.shape = {Y.shape}")
print(f"Number of examples: {m}") 

['"', '%', '(', ')', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '=', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'Ü', 'í', 'ó', 'ú', 'ü', 'ş', '–', '’', '“', '”', '…', '三', '倍', '安', '晋']
Lenght: 88


KeyError: ignored

In [0]:
"""
read the function docstring below
"""

def get_example(index=None):
    """
    retrieves the example at index position in X is index is passed, otherwise random example is obtained
    :param index: index of example desired to be retrieved
    :return: string of text
    """

    if index is None:
        index = np.random.randint(low=0, high=m)

    curr_x = [index_to_char[idx] for idx in np.argmax(X[index, :, :], axis=1)]
    curr_y = index_to_char[np.argmax(Y[index, :])]

    x_y = (''.join(curr_x), curr_y)

    return x_y

In [0]:
"""
testing a single example and the time it took to retrieve it
"""

import time

start = time.process_time()
example = get_example()
end = time.process_time()

print(f"Sample X: {example[0]}\nCorresponding Y: {example[1]}")
print(f"\nTime taken for acquiring this example: {end - start} seconds")

In [0]:
"""
network architecture creation
model creation
plot_model allows me to see what my neural network looks like
"""

def Ram_Says(Tx, vocab, output_length):
  # network architecture LSTM -> Dropout -> Reshape -> LSTM -> Dropout -> Dense

  # define the initial hidden state a0 and initial cell state c0
  a0 = Input(shape=(output_length,), name='a0')
  c0 = Input(shape=(output_length,), name='c0')
  a = a0
  c = c0

  X = Input(shape=(Tx, len(vocab)), name='X')
  
  a, _, c = LSTM(units=output_length, activation='tanh', return_state=True, dtype='float32', name=f'lstm_1')(X, [a, c])
  a = Dropout(rate=0.2, name=f'dropout_1')(a)
  a = Reshape((1, output_length), name='reshape_1')(a) # needed after a dropout layer for another LSTM layer
  a = LSTM(units=output_length, activation='tanh', dtype='float32', name=f'lstm_2')(a)
  a = Dropout(rate=0.2, name=f'dropout_2')(a)
  out = Dense(units=len(vocab), activation='softmax', name=f'dense')(a)
    
  model = Model(inputs=[X, a0, c0], outputs=out, name='Ram')

  return model

In [0]:
"""
creating the model and the summary of it
"""
#====================Creating important variables===============================
n_a = 256 # number of hidden state dimensions for each LSTM cell

a0 = np.zeros((m, n_a))
c0 = np.zeros((m, n_a))
#===============================================================================

model = Ram_Says(Tx=Tx - 1, vocab=vocab, output_length=n_a)

plot_model(model, to_file='/content/drive/My Drive/nn_graph.png')

model.summary()

In [0]:
"""
configuring optimizations for the model
fitting the model
"""

learning_rate = 0.01
learning_rate_decay = 0.001

optimizer = Adam(learning_rate=learning_rate, beta_1=0.9, beta_2=0.999, decay=learning_rate_decay)

model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

batch_size = 16384

model.fit([X, a0, c0], Y, batch_size=batch_size, epochs=100)

In [0]:
"""
filepath
"""

filepath = '/content/drive/My Drive/dkp_model.h5'

In [0]:
"""
this code takes care of saving the new model only if its accuracy is better than
that of the last model
"""

# if os.path.exists(filepath):
#   prev_model = load_model(filepath)
#   prev_acc = prev_model.evaluate([X, a0, c0], Y, verbose=0)[1]
#   curr_acc = model.evaluate([X, a0, c0], Y, verbose=0)[1]
#   if curr_acc > prev_acc:
#     print("There was a previous model saved.")
#     print(f"Previous accuracy: {round(prev_acc*100, 2)}%")
#     print(f"Current accuracy: {round(curr_acc*100, 2)}%")
#     model.save(filepath)
#     print('New model is saved.')
#   else:
#     print(f"Previous accuracy: {round(prev_acc*100, 2)}%")
#     print(f"Current accuracy: {round(curr_acc*100, 2)}%")
#     print('Old model is kept.')
# else: # if this is the first time saving the model
model.save(filepath)
print('First time model is saved.')

In [0]:
"""
loading the model for sampling
"""

Ram_says = load_model(filepath)

In [0]:
"""
testing the accuracy of the model on X and Y
"""

accuracy = Ram_says.evaluate([X, a0, c0], Y, verbose=0)[1]
print(f"Accuracy on the training set: {round(accuracy*100, 2)}%")

In [0]:
"""
text sampling time
"""

def sample(preds, temperature=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds) 
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)


def generate_output():
      diversity = random.choice([0.2, 0.5, 0.7, 1.0, 1.2, 1.4])
      diversity = 0.2
      print(f'Diversity: {diversity}')
      generated = ''
      sentence = input('Your text: ')
      generated += sentence
      if len(sentence) > 10:
        sentence = sentence[:10]
      elif len(sentence) < 10:
        rem = 10 - len(sentence)
        sentence += ' ' * rem
      sys.stdout.write(generated + ' ')
      a0 = np.zeros((1, n_a))
      c0 = np.zeros((1, n_a))
      sys.stdout.write(sentence)
      for i in range(1000):
          x_pred = np.zeros((1, Tx-1, len(vocab)))
          for t, char in enumerate(sentence):
              if char != '0':
                  x_pred[0, t, char_to_indices[char]] = 1.
          preds = Ram_says.predict([x_pred, a0, c0], verbose=0)[0]
          next_index = sample(preds, temperature = 1.0)
          next_char = index_to_char[next_index]

          generated += next_char
          sentence = sentence[1:] + next_char

          sys.stdout.write(next_char)
          sys.stdout.flush()

          if next_char == '\n':
              print('\n')
          elif next_char == '\t':
              print('\t')

generate_output()