In [568]:
import torch
import numpy as np
import torch.nn as nn
import torch.utils
import torch.utils.data
import time
import torch.nn.functional as F

In [569]:


def load_text(file_path = 'shakespeare.txt'):
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    return text

def create_vocab(text):
    vocab = sorted(set(text))
    char2idx = {char_i:i for i, char_i in enumerate(vocab)}
    idx2char = np.array(vocab)
    return vocab, char2idx, idx2char

def text_to_int(text, char2idx):
    return np.array([char2idx[i] for i in text])

def int_to_text(index, idx2char):
    return ''.join(idx2char[index])

def create_dataset(text_as_int, seq_length, batch_size):
    total_num_seq = len(text_as_int) - seq_length
    inputs = []
    targets = []

    for i in range(0,total_num_seq):
        inputs.append(text_as_int[i:i+seq_length])
        targets.append(text_as_int[i+1:i+seq_length+1])
    
    inputs = torch.tensor(np.array(inputs))
    targets = torch.tensor(np.array(targets))
    
    dataset = torch.utils.data.TensorDataset(inputs, targets)
    dataloader = torch.utils.data.DataLoader(dataset,batch_size=batch_size, shuffle=True, drop_last=True)

    return dataloader

# Testing code
""" text = load_text()[:100]
vocab, char2idx, idx2char = create_vocab(text)
int_text = text_to_int(text, char2idx)
text_int = int_to_text(int_text, idx2char)
dataLoad = create_dataset(int_text, 20, 50)
for batch in dataLoad:
    inputs, targets = batch
    for inp, trg in zip(inputs,targets):
        print('--------------------\n')
        print(f'The input: {int_to_text(inp, idx2char)}, corresponds to the output {int_to_text(trg, idx2char)}\n')
        print('--------------------\n')  """

" text = load_text()[:100]\nvocab, char2idx, idx2char = create_vocab(text)\nint_text = text_to_int(text, char2idx)\ntext_int = int_to_text(int_text, idx2char)\ndataLoad = create_dataset(int_text, 20, 50)\nfor batch in dataLoad:\n    inputs, targets = batch\n    for inp, trg in zip(inputs,targets):\n        print('--------------------\n')\n        print(f'The input: {int_to_text(inp, idx2char)}, corresponds to the output {int_to_text(trg, idx2char)}\n')\n        print('--------------------\n')  "

In [570]:
# Let's create all the basis for the RNN architecture implemented with PyTorch
# The size of the feature in nn.RNN(hidden_size, feature) was selected as 'hidden size'
#       for simplicity

class CharRNN(nn.Module):
    def __init__(self, vocab_size, hidden_size, seq_length):
        super(CharRNN, self).__init__()
        self.hidden_size = hidden_size
        self.seq_legth = seq_length

        self.embed = nn.Embedding(vocab_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden):
        x = self.embed(x)
        out, hidden = self.rnn(x, hidden)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
        return torch.zeros(1, batch_size, self.hidden_size)


In [571]:
def get_dataloader(seq_length, batch_size, path_file = 'shakespeare.txt', amount_chars = None):
    if amount_chars:
        text = load_text(path_file)[:amount_chars]
    else:
        text = load_text(path_file)
    print(f'Text of len {len(text)} is being processed.\n')
    vocab, char2idx, idx2char = create_vocab(text)
    text_as_int = text_to_int(text, char2idx)
    dataloader = create_dataset(text_as_int, seq_length, batch_size)

    return dataloader, vocab, char2idx, idx2char, text_as_int

In [572]:
# Let's see how this architecture works

'''
seq_length = 100
batch_size = 64
hidden_size = 128
epochs = 5
learning_rate = 0.003
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
'''

def train_RNN(seq_length, batch_size, hidden_size, epochs, learning_rate, device, amount_chars = None):
    
    # Get data
    dataloader, vocab, char2idx, idx2char, text_as_int = get_dataloader(seq_length, batch_size, amount_chars= amount_chars)

    # Model
    model = CharRNN(len(vocab), hidden_size, seq_length).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    print(f'Training the RNN vanilla network.')

    initial_run_time = time.time()

    # Training
    for epoch in range(epochs):
        
        start_time = time.time()

        for x_batch, y_batch in dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            hidden = model.init_hidden(batch_size).to(device)
            optimizer.zero_grad()
            output, hidden = model(x_batch, hidden)
            loss = criterion(output.view(-1, len(vocab)), y_batch.view(-1))
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}, batch duration {time.time() - start_time:.2f} seconds.\n')
    
    print(f'Total training time {time.time()-initial_run_time:.2f}.\n')


    return model, char2idx, idx2char

In [573]:
# Let's create a function to evaluate the model

def generat_text(model, start_string, char2idx, idx2char, length = 200, device = 'cpu', is_lstm = False):

    model.eval()
    input_eval = torch.tensor([char2idx[i] for i in start_string]).unsqueeze(0).to(device)

    if not is_lstm:
        hidden = model.init_hidden(1).to(device)
    elif is_lstm:
        hidden = model.init_hidden(1, device)

    generated = list(start_string)

    with torch.no_grad():
        for i in range(length):

            output, hidden = model(input_eval, hidden)
            logits = output[:,-1, :] # In this line we can add temperature
            probs = torch.softmax(logits, dim = 1).squeeze()

            next_idx = torch.multinomial(probs,1).item()
            next_char = idx2char[next_idx]

            generated.append(next_char)

            input_eval = torch.tensor([[next_idx]]).to(device)
    
    return ''.join(generated)

In [574]:
seq_length = 10
batch_size = 20
hidden_size = 128
epochs = 5
learning_rate = 0.003
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [575]:
# Train the vanilla RNN model
model_rnn, char2idx, idx2char = train_RNN(seq_length, batch_size, hidden_size, epochs, learning_rate, device, amount_chars=10000)
text = generat_text(model_rnn, start_string='ROMEO: ', char2idx= char2idx, idx2char=idx2char, device=device)
print("\nGenerated text:\n")
print(text)

Text of len 10000 is being processed.

Training the RNN vanilla network.
Epoch 1/5, Loss: 1.7044, batch duration 0.66 seconds.

Epoch 2/5, Loss: 1.5352, batch duration 0.63 seconds.

Epoch 3/5, Loss: 1.5348, batch duration 0.61 seconds.

Epoch 4/5, Loss: 1.4427, batch duration 0.62 seconds.

Epoch 5/5, Loss: 1.4925, batch duration 0.59 seconds.

Total training time 3.12.


Generated text:

ROMEO: you do plespor'd shop,
Your moremay am! you musts and that down cit toe.
Hang aboud dearter But even up and pllobble coof me who cive
But is nos know appleat I dourselves. What he is a kind guly: they


In [576]:
# Let's create all the basis for the RNN architecture implemented with PyTorch
# The size of the feature in nn.RNN(hidden_size, feature) was selected as 'hidden size'
#       for simplicity

class CharRNN_with_Temperature(nn.Module):
    def __init__(self, vocab_size, hidden_size, seq_length,T):
        super(CharRNN_with_Temperature, self).__init__()
        self.hidden_size = hidden_size
        self.seq_legth = seq_length
        self.T = T

        self.embed = nn.Embedding(vocab_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden):
        x = self.embed(x)
        out, hidden = self.rnn(x, hidden)
        out = self.fc(out)/self.T
        return out, hidden
    
    def init_hidden(self, batch_size):
        return torch.zeros(1, batch_size, self.hidden_size)


In [577]:
# Let's see how this architecture works

'''
seq_length = 100
batch_size = 64
hidden_size = 128
epochs = 5
learning_rate = 0.003
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
'''

def train_RNN_with_Temperature_scaling(seq_length, batch_size, hidden_size,T, epochs, learning_rate, device, amount_chars = None):
    
    # Get data
    dataloader, vocab, char2idx, idx2char, text_as_int = get_dataloader(seq_length, batch_size, amount_chars= amount_chars)

    # Model
    model = CharRNN_with_Temperature(len(vocab), hidden_size, seq_length,T).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    print(f'Training the RNN vanilla network with temperatue scaling .')

    initial_run_time = time.time()

    # Training
    for epoch in range(epochs):
        
        start_time = time.time()

        for x_batch, y_batch in dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            hidden = model.init_hidden(batch_size).to(device)
            optimizer.zero_grad()
            output, hidden = model(x_batch, hidden)
            loss = criterion(output.view(-1, len(vocab)), y_batch.view(-1))
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}, batch duration {time.time() - start_time:.2f} seconds.\n')
    
    print(f'Total training time {time.time()-initial_run_time:.2f}.\n')


    return model, char2idx, idx2char

In [578]:
# Train the vanilla RNN model
model_rnn, char2idx, idx2char = train_RNN_with_Temperature_scaling(seq_length, batch_size, hidden_size, 0.1, epochs, learning_rate, device, amount_chars=10000)
text = generat_text(model_rnn, start_string='ROMEO: ', char2idx= char2idx, idx2char=idx2char, device=device)
print("\nGenerated text:\n")
print(text)

Text of len 10000 is being processed.

Training the RNN vanilla network with temperatue scaling .
Epoch 1/5, Loss: 2.0851, batch duration 0.67 seconds.

Epoch 2/5, Loss: 1.8266, batch duration 0.62 seconds.

Epoch 3/5, Loss: 1.5535, batch duration 0.62 seconds.

Epoch 4/5, Loss: 1.6689, batch duration 0.60 seconds.

Epoch 5/5, Loss: 1.5753, batch duration 0.59 seconds.

Total training time 3.10.


Generated text:

ROMEO: sir; wherained; hee will olly.

MENENIUS:
The stinous, killy. was evil.

MENENIUS:
Their rus, evil. He the ot, the belly's,
What are seeesond felay. But, I am thirflubs,
That wark mished agaings a thu


In [579]:
# Train the vanilla RNN model
model_rnn, char2idx, idx2char = train_RNN_with_Temperature_scaling(seq_length, batch_size, hidden_size, 0.95, epochs, learning_rate, device, amount_chars=10000)
text = generat_text(model_rnn, start_string='ROMEO: ', char2idx= char2idx, idx2char=idx2char, device=device)
print("\nGenerated text:\n")
print(text)

Text of len 10000 is being processed.

Training the RNN vanilla network with temperatue scaling .
Epoch 1/5, Loss: 1.7806, batch duration 0.61 seconds.

Epoch 2/5, Loss: 1.4406, batch duration 0.60 seconds.

Epoch 3/5, Loss: 1.3916, batch duration 0.59 seconds.

Epoch 4/5, Loss: 1.2649, batch duration 0.59 seconds.

Epoch 5/5, Loss: 1.3443, batch duration 0.60 seconds.

Total training time 2.99.


Generated text:

ROMEO: the store-hould hele down are use the pion one to eac, you haths. He receive him first Citizen:
Well:
Speak thrighs granted ther?

Second Citizen:
Conjrates, my gide him,
Sexate, you munterflicts here


In [875]:
# Let's create a function to evaluate the model

def generat_text_nucleus_sampling(model, start_string, char2idx, idx2char, length = 200, device = 'cpu', is_lstm = False):

    model.eval()
    p = 0.95
    input_eval = torch.tensor([char2idx[i] for i in start_string]).unsqueeze(0).to(device)

    if not is_lstm:
        hidden = model.init_hidden(1).to(device)
    elif is_lstm:
        hidden = model.init_hidden(1, device)

    generated = list(start_string)

    with torch.no_grad():
        for i in range(length):

            output, hidden = model(input_eval, hidden)
            logits = output[:,-1, :] # In this line we can add temperature
            probs = torch.softmax(logits, dim = 1).squeeze()
            sorted_logits, sorted_indices = torch.sort(logits, descending=True)
            sorted_probs = F.softmax(sorted_logits,dim=-1)
            new_probs = torch.zeros_like(probs)
            cum = 0
            index = 0
            for v in range(sorted_logits.size(1)):
                cum+=sorted_probs[0,v:v+1]
                index=v
                if (cum>=p):
                    break
            new_probs[sorted_indices[0,:index+1]]=probs[sorted_indices[0,:index+1]]/cum
            #print(new_probs)

            next_idx = torch.multinomial(new_probs,1).item()
            next_char = idx2char[next_idx]

            generated.append(next_char)

            input_eval = torch.tensor([[next_idx]]).to(device)
    
    return ''.join(generated)

In [876]:
# Let's create all the basis for the RNN architecture implemented with PyTorch
# The size of the feature in nn.RNN(hidden_size, feature) was selected as 'hidden size'
#       for simplicity

class CharRNN_with_Nucleus_Sampling(nn.Module):
    def __init__(self, vocab_size, hidden_size, seq_length,p):
        super(CharRNN_with_Nucleus_Sampling, self).__init__()
        self.hidden_size = hidden_size
        self.seq_legth = seq_length
        self.p = p
        self.vocab_size = vocab_size

        self.embed = nn.Embedding(vocab_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden):
        x = self.embed(x)
        out, hidden = self.rnn(x, hidden)
        logits = self.fc(out)


        return logits, hidden
    
    def init_hidden(self, batch_size):
        return torch.zeros(1, batch_size, self.hidden_size)


In [879]:


def train_RNN_with_nucleus_sampling(seq_length, batch_size, hidden_size,p, epochs, learning_rate, device, amount_chars = None):
    
    # Get data
    dataloader, vocab, char2idx, idx2char, text_as_int = get_dataloader(seq_length, batch_size, amount_chars= amount_chars)

    # Model
    model = CharRNN_with_Nucleus_Sampling(len(vocab), hidden_size, seq_length,p).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    print(f'Training the RNN vanilla network with temperatue scaling .')

    initial_run_time = time.time()

    # Training
    for epoch in range(epochs):
        
        start_time = time.time()

        for x_batch, y_batch in dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            hidden = model.init_hidden(batch_size).to(device)
            optimizer.zero_grad()
            output, hidden = model(x_batch, hidden)
            loss = criterion(output.view(-1, len(vocab)), y_batch.view(-1))
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}, batch duration {time.time() - start_time:.2f} seconds.\n')
    
    print(f'Total training time {time.time()-initial_run_time:.2f}.\n')


    return model, char2idx, idx2char


In [880]:
# Train the vanilla RNN model
model_rnn, char2idx, idx2char = train_RNN_with_nucleus_sampling(seq_length, batch_size, hidden_size, 0.8, 5, learning_rate, device, amount_chars=100000)
text = generat_text_nucleus_sampling(model_rnn, start_string='ROMEO: ', char2idx= char2idx, idx2char=idx2char, device=device)
print("\nGenerated text:\n")
print(text)

Text of len 100000 is being processed.

Training the RNN vanilla network with temperatue scaling .
Epoch 1/5, Loss: 1.6557, batch duration 6.04 seconds.

Epoch 2/5, Loss: 1.7329, batch duration 6.03 seconds.

Epoch 3/5, Loss: 1.6524, batch duration 6.03 seconds.

Epoch 4/5, Loss: 1.6755, batch duration 5.74 seconds.

Epoch 5/5, Loss: 1.6929, batch duration 5.96 seconds.

Total training time 29.79.


Generated text:

ROMEO: be masted, I carry to armse senate well, no, hear are do that cannot wing.

Second Senator:
Conselfery make shall wards good ray!

CORIOLANUS:
We conole rece?

CORIOLANUS:
O gare, and catccoveral, sir
