In [1]:
import random
random.seed(10)

In [2]:
import re
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from random import *

# **Loading Text**

In [3]:
from google.colab import drive

drive.mount('/content/drive')

import os

path = '/content/drive/MyDrive/Colab Notebooks/Cap11'

Mounted at /content/drive


In [4]:
text = open(os.path.join(path, 'texto.txt'), 'r').read()

# **Data PreProcessing**

In [5]:
sentences = re.sub('[,.!?\\-]', '', text.lower()).split('\n')

In [None]:
print(sentences)

In [6]:
word_list = list(set(' '.join(sentences).split()))

In [None]:
print(word_list)

In [7]:
word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}

In [None]:
word_dict

In [8]:
for i, w in enumerate(word_list):
    word_dict[w] = i + 4

In [None]:
print(word_dict)

In [9]:
number_dict = {i: w for i, w in enumerate(word_dict)}

In [None]:
print(number_dict)

In [10]:
vocab_size = len(word_dict)
print(vocab_size)

70


In [11]:
token_list = list()

In [12]:
for sentence in sentences:
  arr = [word_dict[s] for s in sentence.split()]
  token_list.append(arr)

In [None]:
text[0:29]

In [None]:
token_list[0]

# **Hyperparameters**

In [13]:
batch_size = 6
n_segments = 2
dropout = 0.2

max_len = 100

max_pred = 7

n_layers = 6

n_heads = 12

d_model = 768

d_ff = d_model * 4

d_k = d_v = 64

NUM_EPOCHS = 50

In [14]:
def make_batch():
  batch = []

  positive = negative = 0

  while positive != batch_size / 2 or negative != batch_size / 2:
    tokens_a_index, tokens_b_index = randrange(len(sentences)), randrange(len(sentences))

    tokens_a, tokens_b = token_list[tokens_a_index], token_list[tokens_b_index]

    input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']]

    segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (1 + len(tokens_b))

    n_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15))))

    cand_maked_pos = [i for i, token in enumerate(input_ids) if token != word_dict['[CLS]'] and token != word_dict['[SEP]']]

    shuffle(cand_maked_pos)

    masked_tokens, masked_pos = [], []

    for pos in cand_maked_pos[:n_pred]:
      masked_pos.append(pos)
      masked_tokens.append(input_ids[pos])

      if random() < 0.8:
        input_ids[pos] = word_dict['[MASK]']
      elif random() < 0.5:
        index = randint(0, vocab_size - 1)
        input_ids[pos] = word_dict[number_dict[index]]

    n_pad = max_len - len(input_ids)
    input_ids.extend([0] * n_pad)
    segment_ids.extend([0] * n_pad)

    if max_pred > n_pred:
      n_pad = max_pred - n_pred
      masked_tokens.extend([0] * n_pad)
      masked_pos.extend([0] * n_pad)

    if tokens_a_index + 1 == tokens_b_index and positive < batch_size / 2:
      batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True])
      positive += 1
    elif tokens_a_index + 1 != tokens_b_index and negative < batch_size / 2:
      batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False])
      negative += 1

  return batch

In [15]:
def get_attn_pad_masked(seq_q, seq_k):
  batch_size, len_q = seq_q.size()
  batch_size, len_k = seq_k.size()
  pad_attn_masked = seq_k.data.eq(0).unsqueeze(1)
  return pad_attn_masked.expand(batch_size, len_q, len_k)

In [16]:
batch = make_batch()

In [17]:
input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))

In [None]:
input_ids[0]

In [None]:
segment_ids[0]

In [None]:
masked_tokens[0]

In [None]:
masked_pos[0]

In [None]:
isNext[0]

In [None]:
get_attn_pad_masked(input_ids, input_ids)[0][0], input_ids[0]

# **Creating Model**

In [19]:
# GeLU Activation Function
def gelu(x):
  return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))

In [26]:
class Embedding(nn.Module):
  def __init__(self):
    super(Embedding, self).__init__()

    self.tok_embed = nn.Embedding(vocab_size, d_model)

    self.pos_embed = nn.Embedding(max_len, d_model)

    self.seg_embed = nn.Embedding(n_segments, d_model)

    self.norm = nn.LayerNorm(d_model)

  def forward(self, x, seg):
    seq_len = x.size(1)

    pos = torch.arange(seq_len, dtype = torch.long)

    pos = pos.unsqueeze(0).expand_as(x)

    embedding = self.tok_embed(x) + self.pos_embed(pos) + self.seg_embed(seg)

    return self.norm(embedding)

In [32]:
class ScaledDotProductAttention(nn.Module):
  def __init__(self):
    super(ScaledDotProductAttention, self).__init__()

  def forward(self, Q, K, V, attn_mask):
    scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)

    scores.masked_fill_(attn_mask, -1e9)

    attn = nn.Softmax(dim = -1)(scores)

    context = torch.matmul(attn, V)

    return context, attn

In [34]:
class MultiHeadAttention(nn.Module):
  def __init__(self):
    super(MultiHeadAttention, self).__init__()

    self.W_Q = nn.Linear(d_model, d_k * n_heads)

    self.W_K = nn.Linear(d_model, d_k * n_heads)

    self.W_V = nn.Linear(d_model, d_v * n_heads)
  def forward(self, Q, K, V, attn_mask):
    residual, batch_size = Q, Q.size(0)

    q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)

    k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)

    v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)

    attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)

    context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)

    context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)

    output = nn.Linear(n_heads * d_v, d_model)(context)

    return nn.LayerNorm(d_model)(output + residual), attn

In [27]:
emb = Embedding()

In [28]:
embeds = emb(input_ids, segment_ids)

In [30]:
attenM = get_attn_pad_masked(input_ids, input_ids)

In [35]:
MHA = MultiHeadAttention()(embeds, embeds, embeds, attenM)

In [36]:
output, A = MHA

In [None]:
A[0][0]

In [38]:
class PoswiseFeedForward(nn.Module):
  def __init__(self):
    super(PoswiseFeedForward, self).__init__()

    self.fc1 = nn.Linear(d_model, d_ff)

    self.fc2 = nn.Linear(d_ff, d_model)

  def forward(self, x):
    return self.fc2(gelu(self.fc1(x)))

In [None]:
class EncoderLayer(nn.Module):
  def __init__(self):
    super(EncoderLayer, self).__init__()

    self.enc_self_attn = MultiHeadAttention()

    self.pos_ffn = PoswiseFeedForward()

  def forward(self, enc_inputs, enc_self_attn_mask):
    enc_inputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask)

    enc_inputs = self.pos_ffn(enc_inputs)

    return enc_inputs, attn

In [None]:
class BERT(nn.Module):
  def __init__(self):
    super(BERT, self).__init__()

    self.embedding = Embedding()

    self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

    self.fc = nn.Linear(d_model, d_model)

    self.activ1 = nn.Tanh()

    self.linear = nn.Linear(d_model, d_model)

    self.activ2 = gelu()

    self.norm = nn.LayerNorm(d_model)

    self.classifier = nn.Linear(d_model, 2)

    embed_weight = self.embedding.tok_embed.weight

    n_vocab, n_dim = embed_weight.size()

    self.decoder = nn.Linear(n_dim, n_vocab, bias = False)

    self.decoder.weight = embed_weight

    self.decoder_bias = nn.Parameter(torch.zeros(n_vocab))

  def forward(self, input_ids, segment_ids, masked_pos):
    output = self.embedding(input_ids, segment_ids)

    enc_self_attn_mask = get_attn_pad_masked(input_ids, input_ids)

    for layer in self.layers:
      output, enc_self_attn = layer(output, enc_self_attn_mask)

    h_pooled = self.activ1(self.fc(output[:, 0]))

    logits_clsf = self.classifier(h_pooled)

    masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1))

    h_masked = torch.gather(output, 1, masked_pos)

    h_masked = self.norm(self.activ2(self.linear(h_masked)))

    logits_lm = self.decoder(h_masked) + self.decoder_bias

    return logits_lm, logits_clsf