In [1]:
START_TOKEN = '<SOS>'.lower()
END_TOKEN = '<EOS>'.lower()

DATA_PATH = 'data/poetry.txt'

CUTOFF = 0.8
EPOCHES = 2000
NUM_LAYERS = 1
BATCH_SIZE = 128
LATENT_DIM = 25
EMBEDDING_DIM = 50
LEARNING_RATE = 0.01

In [2]:
import os
import torch
from tqdm import tqdm
from torchnlp.encoders.text import *
from torch.utils.data import TensorDataset, DataLoader

In [3]:
def load_data():
    input_lines = []
    target_lines = []
    for line in open(DATA_PATH):
        line = line.strip()
        
        input_line = '{} {}'.format(START_TOKEN, line)
        target_line = '{} {}'.format(line, END_TOKEN)
        
        input_lines.append(input_line)
        target_lines.append(target_line)
        
    return input_lines, target_lines

In [4]:
def tokenize_line(line, encoder):
    return encoder.encode(line)

def tokenize_lines(lines, encoder):
    return [tokenize_line(line, encoder) for line in lines]

def extract_max_length(all_seq_lengths):
    return torch.max(all_seq_lengths).item()
    
def padding_line(tokens, MAX_LENGTH):
    return pad_tensor(
                    tokens, 
                    length = MAX_LENGTH
                    )

def padding_lines(all_tokens):
    return stack_and_pad_tensors(
                    all_tokens
                    )

def process_line(line, encoder, MAX_LENGTH):
    tokens = tokenize_line(line, encoder)
    padded = padding_line(tokens, MAX_LENGTH)
    return padded

def process_lines(lines, encoder):
    all_tokens = tokenize_lines(lines, encoder)
    all_padded = padding_lines(all_tokens)
    return all_padded

In [5]:
'''
*** Make target Sequence OneHot ***

    The Reason is that Sparse Categorical Cross Entropy won't work when Target is a Sequence.
'''

def make_one_hot_target_sequences(MAX_LENGTH, vocabulary, target_sequences):
    onehot_target_sequences = torch.empty(
                                    len(target_sequences),
                                    MAX_LENGTH,
                                    len(vocabulary),
                                    dtype=torch.float32
                                        )
    
    for idx, target_sequence in enumerate(target_sequences):
        onehot_target_sequence = torch.nn.functional.one_hot(
                                                    target_sequence, 
                                                    num_classes = len(vocabulary)
                                                    )
        onehot_target_sequences[idx, :, :] = onehot_target_sequence
        
    return onehot_target_sequences

In [6]:
def process_data():
    input_lines, target_lines = load_data()
    all_lines = input_lines + target_lines
    encoder = StaticTokenizerEncoder(
                                all_lines, 
                                tokenize = lambda s: s.split()
                                )

    input_sequences, input_seq_lengths = process_lines(input_lines, encoder)
    target_sequences, target_seq_lengths = process_lines(target_lines, encoder)
    all_sequences, all_seq_lengths = process_lines(all_lines, encoder)

    MAX_LENGTH = extract_max_length(all_seq_lengths)
    vocabulary = encoder.vocab
    
    assert START_TOKEN in vocabulary, 'START_TOKEN NOT FOUND' 
    assert END_TOKEN in vocabulary, 'END_TOKEN NOT FOUND' 
    
    target_sequences = make_one_hot_target_sequences(
                                                    MAX_LENGTH,
                                                    vocabulary, 
                                                    target_sequences
                                                    )
    
    return input_sequences, input_seq_lengths, target_sequences, target_seq_lengths, encoder, MAX_LENGTH

In [7]:
class LanguageGenerator(torch.nn.Module):
    def __init__(
                self,
                VOCAB_SIZE,
                DEVICE
                ):
        super(LanguageGenerator,self).__init__()
        
        self.embedding_layer = torch.nn.Embedding(VOCAB_SIZE, EMBEDDING_DIM)
        self.lstm_layer = torch.nn.LSTM(
                                    input_size=EMBEDDING_DIM, 
                                    hidden_size=LATENT_DIM,
                                    num_layers=NUM_LAYERS, 
                                    batch_first=True
                                    ) 

        self.linear_layer = torch.nn.Linear(LATENT_DIM, VOCAB_SIZE)
        
        
        self.DEVICE = DEVICE
        self.VOCAB_SIZE = VOCAB_SIZE

    def forward(self, x, memory_init, CHUNK_SIZE):
        x = x.long()
        
        x = self.embedding_layer(x)
        x, _ = self.lstm_layer(x, memory_init) 
        
        '''
        
        output shape : (CHUNK_SIZE, MAX_LENGTH, LATENT_DIM)
             --> Always return_sequences = True
        
        '''
        x = self.linear_layer(x)
        return x

In [8]:
## TEST MODEL
# device = torch.device('cpu' if not torch.cuda.is_available() else 'cuda:0')
# model = LanguageGenerator(3921, device).to(device)
# sample_seq = torch.tensor([[1,2,3,4,5,6]]).to(device)
# sample_seq.shape

In [9]:
class LanguageModelingPyTorch(object):
    def __init__(self):
        input_sequences, input_seq_lengths, target_sequences, target_seq_lengths, encoder, MAX_LENGTH = process_data()
        
        dataset = TensorDataset(
                        input_sequences, 
                        target_sequences
                                )
        
        train_data, valid_data = torch.utils.data.random_split(dataset, 
                                                              [int(len(input_sequences) * CUTOFF), len(input_sequences) - int(len(input_sequences) * CUTOFF)])
        
        train_loader = DataLoader(
                            train_data, 
                            shuffle=False, 
                            drop_last=True,
                            batch_size=BATCH_SIZE
                                )
    
        valid_loader = DataLoader(
                            valid_data, 
                            shuffle=False, 
                            drop_last=True,
                            batch_size=BATCH_SIZE
                                )
        
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        self.VOCAB_SIZE = len(encoder.vocab)
        self.DEVICE = torch.device('cpu' if not torch.cuda.is_available() else 'cuda:0') 
        
        h_init = torch.zeros(
                        NUM_LAYERS, 
                        CHUNK_SIZE, 
                        LATENT_DIM
                        ).to(self.DEVICE)
        
        c_init = torch.zeros(
                        NUM_LAYERS, 
                        CHUNK_SIZE, 
                        LATENT_DIM
                        ).to(self.DEVICE)
        
        self.memory_init = (h_init, c_init)
            
    def language_generator(self):        
        model = LanguageGenerator(
                                    self.VOCAB_SIZE, 
                                    self.DEVICE
                                     )
        
        self.model = model.to(self.DEVICE)
        
        self.optimizer = torch.optim.Adam(
                                    self.model.parameters(),
                                    lr = LEARNING_RATE
                                        )
        self.criterion = torch.nn.CrossEntropyLoss()
        
    def train_epoch(self):
        
        train_loss_epoch = 0
        valid_loss_epoch = 0
        self.model.train()
        
        for Xbatch, Ybatch in self.train_loader:
            self.optimizer.zero_grad()
            
            Xbatch = Xbatch.to(self.DEVICE)
            Ybatch = Ybatch.to(self.DEVICE)
        
            CHUNK_SIZE = Xbatch.shape[0]
            
            Pbatch = self.model(Xbatch, self.memory_init, CHUNK_SIZE).to(self.DEVICE)

            train_loss = self.criterion(Pbatch, Ybatch)
            train_loss.backward()
            self.optimizer.step()
            
            train_loss_epoch += train_loss.item()
            
        train_loss_epoch = train_loss_epoch / len(self.train_loader)
        
        self.model.eval()
        
        for Xbatch, Ybatch in self.valid_loader:

            Xbatch = Xbatch.to(self.DEVICE)
            Ybatch = Ybatch.to(self.DEVICE)
        
            CHUNK_SIZE = Xbatch.shape[0]
            
            Pbatch = self.model(Xbatch,self.memory_init, CHUNK_SIZE).to(self.DEVICE)

            valid_loss = self.criterion(Pbatch, Ybatch)
            
            valid_loss_epoch += valid_loss.item()
            
        valid_loss_epoch = valid_loss_epoch / len(self.train_loader)
        
        return train_loss_epoch, valid_loss_epoch
            
    def train_loop(self):
        train_loss, valid_loss = [], []
        for epoch in range(EPOCHES):
            train_loss_epoch, valid_loss_epoch = self.train_epoch()
            
            train_loss.append(train_loss_epoch)
            valid_loss.append(valid_loss_epoch)
            
            if (epoch + 1) % 100 == 0:
                print('EPOCH : {}, TRAIN LOSS : {}, VALID LOSS : {}'.format(epoch+1, train_loss_epoch, valid_loss_epoch))
        
        self.train_loss = train_loss
        self.valid_loss = valid_loss
        
    def plot_cross_entropy(self):
        plt.plot(self.train_loss, label='loss')
        plt.plot(self.valid_loss, label='val_loss')
        plt.legend()
        plt.show()
        
    def train(self):
        self.language_generator()
        self.train_loop()
        self.plot_cross_entropy()

In [10]:
LMP = LanguageModelingPyTorch()
LMP.train()