In [69]:
%%time
import torch
from torch import nn
import torch.nn.functional as F

CPU times: total: 0 ns
Wall time: 0 ns


In [70]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [71]:
with open(r"D:\MegaSync\MEGAsync\MegaSync\Machine Learning Projects\pytoch_learnings\PYTORCH_NOTEBOOKS\Data\shakespeare.txt","r",encoding='utf-8') as f:
    text = f.read()
    print(len(text))
    import re
    text = re.sub(' +',' ',text)

5445609


In [72]:
print(len(text))

5053317


In [73]:
all_characters = set(text)

In [74]:
decoder = dict(enumerate(all_characters))

In [75]:
encoder = {char: ind for ind,char in decoder.items()}

In [76]:
encoded_text = np.array([encoder[char] for char in text])

In [77]:
def one_hot_encoder(encoded_text,num_uni_chars):
    one_hot = np.zeros((encoded_text.size,num_uni_chars))
    one_hot = one_hot.astype(np.float32)
    one_hot[np.arange(one_hot.shape[0]),encoded_text.flatten()] = 1.0
    one_hot = one_hot.reshape((*encoded_text.shape,num_uni_chars))
    return one_hot

In [78]:
arr = np.array([1,2,0])
arr

array([1, 2, 0])

In [79]:
one_hot_encoder(arr,3)

array([[0., 1., 0.],
       [0., 0., 1.],
       [1., 0., 0.]], dtype=float32)

In [80]:
def generate_batches(encoded_text,samp_per_batch=10,seq_len=50):
    char_per_batch =samp_per_batch * seq_len
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    encoded_text = encoded_text[:num_batches_avail*char_per_batch]
    encoded_text = encoded_text.reshape((samp_per_batch,-1))
    
    for n in range(0,encoded_text.shape[1],seq_len):
        x = encoded_text[:,n:n+seq_len]
        y = np.zeros_like(x)
        
        try:
            y[:,:-1] = x[:,1:]
            y[:,-1] = encoded_text[:,n+seq_len]
        
        except:
            y[:,:-1] = x[:,1:]
            y[:,-1] = encoded_text[:,0]
        yield x,y

In [81]:
sample_text = encoded_text[:20]

In [82]:
batch_generator = generate_batches(encoded_text=sample_text,samp_per_batch=2,seq_len=5)

In [83]:
for i in batch_generator:
    print(i[0])

[[42 38 18 42 38]
 [ 9 40 45 68 63]]
[[ 4 68 41 64 38]
 [65  0 38 47 68]]


In [84]:
sample_text

array([42, 38, 18, 42, 38,  4, 68, 41, 64, 38,  9, 40, 45, 68, 63, 65,  0,
       38, 47, 68])

In [85]:
class CharModel(nn.Module):
    
    def __init__(self,all_chars,num_hidden=256,num_layers=4,drop_prob=0.5,use_gpu=True):
        
        super().__init__()
        
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char:ind for ind,char in decoder.items()}
        
        self.lstm = nn.LSTM(len(self.all_chars),num_hidden,num_layers,dropout=drop_prob,batch_first = True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc_linear = nn.Linear(num_hidden,len(self.all_chars))
        
    def forward(self,x,hidden):
        
        lstm_output,hidden = self.lstm(x,hidden)
        
        drop_output = self.dropout(lstm_output)
        
        drop_output = drop_output.contiguous().view(-1,self.num_hidden)
        
        final_out = self.fc_linear(drop_output)
        
        return final_out, hidden
        
    def hidden_state(self,batch_size):
        
        if self.use_gpu:
            
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                      torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
            
        else:
            
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                      torch.zeros(self.num_layers,batch_size,self.num_hidden))

        return hidden


In [86]:
model =CharModel(all_chars=all_characters,
                 num_hidden=512,
                 num_layers=3,
                 drop_prob=0.5,
                 use_gpu=True
                )

In [87]:
total_param = []

for p in model.parameters():
    total_param.append(int(p.numel()))

In [88]:
sum(total_param)

5470292

In [89]:
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

In [90]:
train_percent = 0.1

In [91]:
train_ind = int(len(encoded_text)*train_percent)

In [92]:
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

In [93]:
len(train_data)

505331

In [94]:
len(val_data)

4547986

In [95]:
train_data = encoded_text[:train_ind]

In [96]:
val_data = encoded_text[train_ind:]

In [97]:
epochs = 1
batch_size = 100
tracker = 0
seq_len = 100

num_char = max(encoded_text)+1


In [98]:
# Set model to train
model.train()


# Check to see if using GPU
if model.use_gpu:
    model.cuda()

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data,batch_size,seq_len):
        
        tracker += 1
        
        # One Hot Encode incoming data
        x = one_hot_encoder(x,num_char)
        
        # Convert Numpy Arrays to Tensor
        
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        # Adjust for GPU if necessary
        
        if model.use_gpu:
            
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        # Reset Hidden State
        # If we dont' reset we would backpropagate through all training history
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs,hidden)
        loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
        loss.backward()
        
        # POSSIBLE EXPLODING GRADIENT PROBLEM!
        # LET"S CLIP JUST IN CASE
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        
        optimizer.step()
        
        
        
        ###################################
        ### CHECK ON VALIDATION SET ######
        #################################
        
        if tracker % 25 == 0:
            
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data,batch_size,seq_len):
                
                # One Hot Encode incoming data
                x = one_hot_encoder(x,num_char)
                

                # Convert Numpy Arrays to Tensor

                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)

                # Adjust for GPU if necessary

                if model.use_gpu:

                    inputs = inputs.cuda()
                    targets = targets.cuda()
                    
                # Reset Hidden State
                # If we dont' reset we would backpropagate through 
                # all training history
                val_hidden = tuple([state.data for state in val_hidden])
                
                lstm_output, val_hidden = model.forward(inputs,val_hidden)
                val_loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
                val_losses.append(val_loss.item())
            
            # Reset to training model after val for loop
            model.train()
            
            print(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")

Epoch: 0 Step: 25 Val Loss: 3.300767660140991
Epoch: 0 Step: 50 Val Loss: 3.2924044132232666


In [99]:
model_name = 'hidden512_layers3_shakes.net'

In [100]:
torch.save(model.state_dict(),model_name)
model.load_state_dict(torch.load(r"D:\MegaSync\MEGAsync\MegaSync\Machine Learning Projects\pytoch_learnings\PYTORCH_NOTEBOOKS\06-NLP-with-PyTorch\Final_Shakespeare.net"))

<All keys matched successfully>

In [101]:
def predict_next_char(model, char, hidden=None, k=1):
        
        # Encode raw letters with model
        encoded_text = model.encoder[char]
        
        # set as numpy array for one hot encoding
        # NOTE THE [[ ]] dimensions!!
        encoded_text = np.array([[encoded_text]])
        
        # One hot encoding
        encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
        
        # Convert to Tensor
        inputs = torch.from_numpy(encoded_text)
        
        # Check for CPU
        if(model.use_gpu):
            inputs = inputs.cuda()
        
        
        # Grab hidden states
        hidden = tuple([state.data for state in hidden])
        
        
        # Run model and get predicted output
        lstm_out, hidden = model(inputs, hidden)

        
        # Convert lstm_out to probabilities
        probs = F.softmax(lstm_out, dim=1).data
        
        
        
        if(model.use_gpu):
            # move back to CPU to use with numpy
            probs = probs.cpu()
        
        
        # k determines how many characters to consider
        # for our probability choice.
        # https://pytorch.org/docs/stable/torch.html#torch.topk
        
        # Return k largest probabilities in tensor
        probs, index_positions = probs.topk(k)
        
        
        index_positions = index_positions.numpy().squeeze()
        
        # Create array of probabilities
        probs = probs.numpy().flatten()
        
        # Convert to probabilities per index
        probs = probs/probs.sum()
        
        # randomly choose a character based on probabilities
        char = np.random.choice(index_positions, p=probs)
       
        # return the encoded value of the predicted char and the hidden state
        return model.decoder[char], hidden

In [102]:
def generate_text(model, size, seed='The', k=1):
        
      
    
    # CHECK FOR GPU
    if(model.use_gpu):
        model.cuda()
    else:
        model.cpu()
    
    # Evaluation mode
    model.eval()
    
    # begin output from initial seed
    output_chars = [c for c in seed]
    
    # intiate hidden state
    hidden = model.hidden_state(1)
    
    # predict the next character for every character in seed
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    
    # add initial characters to output
    output_chars.append(char)
    
    # Now generate for size requested
    for i in range(size):
        
        # predict based off very last letter in output_chars
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        
        # add predicted character
        output_chars.append(char)
    
    # return string of predicted text
    return ''.join(output_chars)

In [103]:
print(generate_text(model, 1000, seed='Hi my name is', k=3))

Hi my name isl]OJ)7JPJX)"s'l>e}JJJJG9sJKl""JPJXN)> JN)JX)JX)JX)Jw)>lJ)7JOl>JXllwe}JJJJ?OPNJNO)pJP>NJXll9JNOPNJXO)p"sJXN>P9zlJN>P'wl9NX}JJJJ?OPNJNO)pJXP JXOlJXl9sJNOlJXp9J)7JNOllB}JJJJ?OlJw)NOl>J)7JNOlJXNP>XJ)7JNO'XJK)>sXJK'NO}JJJJGJXl"7J)7JOPNlJ)7Jw JOP9sB}JJvR[&SG?(GBJHXJO'wJX)J")g'9zJNOPNJHJO)pXle}JJJJ?)JNO'XJ'9JNOlJ])p"NJ)7JOlP>NJN)JNOlJ])9X'sl>N}JJJJGXJHJK)p"sJXNP JNOlJK'9sJPNJNOlJ])9Nl9NB}}JJJJJJJJJJJJJJJJJJJJJ[9Nl>JGQ?&QC}}JJvR[&SG?(GBJaOPNJK'NOJNO JXl"7JKOl>lJ'XJw }JJJJ?)JNOlJXp>XJ)7JN'wlXJP9sJX)9JNOlJK)>"sJIl}JJJJ?)JNOlJN)JOPNlJN)JNOlJ7P>NOJ)7JNOlJXN>'9zJ'9N)}JJJJG9sJI>lPNOl>J)7JwlB}JJGM(HSSGBJal""eJwPsPweJX'>eJHJKPXJNO'9VJ)7JNOlJK)>"se}JJJJ?OPNJNO)pJP>NJXl>g.sJN)JwPVlJwlJNOlJXp]OJPJX)"s}JJJJ?OP9JXOlJK'NOJNO'XJK)>NOJN>P'X)9XJ)9JNOlJ]P>lB}JJJJHJO)pXNJNO)pJOPsJOl>JX)9JN'""JOlJXN'""JIlJX)9e}JJJJ?OPNJNO)pJP>NJNO)pJP>NJPJw)>lB}JJv_G(yHGQBJHJOPglJ9)NJXl>g'9ze}JJJJG9sJN)JNOlJKPXNlJ)7JOl>JK)>sJNOP9JXOlJK'NOJNOllB}JJJJH7JNO)pJXO)p"sXNJXNP9sJOl>JXOPwleJNOlJK'7lJ)pN}JJJJa'NOJP""JNOlJXNPNl