In [1]:
from collections import Counter
import numpy as np
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as Function
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

NN implementation and architecture closely follows the tutorial at: https://www.kaggle.com/code/purvasingh/text-generation-via-rnn-and-lstms-pytorch#Pre-processing-Stock-News

In [2]:
# Check if a GPU is available for training
train_on_gpu = torch.cuda.is_available()

# Data parameters
sequence_length = 10  # The number of characters in a sequence, "10-gram"?
batch_size = 128  # The batch size for training the model.

# Training parameters
number_epochs = 10  # The number of times to iterate over the entire dataset during training.
learning_rate = 0.001  # The rate at which the model adjusts its parameters during training.

# Model parameters
embedding_dimension = 200  # The number of dimensions in the embedding layer.
hidden_dimension = 250  # The number of hidden units in each LSTM layer.
number_layers = 2  # The number of LSTM layers in the model.
show_every_n_batches = 100  # How often to print training statistics, such as loss and accuracy, during training. Set to every 100 batches.


In [3]:
# Dictionaries for special characters in the text
# >>> Not really necessary any more with the GPT HSK corpus.
SPECIAL_WORDS: dict = {'PADDING': '<PAD>'}

punctuation_dict: dict = {"。": "<PERIOD>",
                        "．": "<DECIMAL>",
                        "，": "<COMMA>",
                        "！": "<EXCLAMATION>",
                        "）": "<RIGHT_PARENTHESIS>",
                        "（": "<LEFT_PARENTHESIS>",
                        '"': "<QUOTE>",
                        "”": "<RIGHT_QUOTE>",
                        "“": "<LEFT_QUOTE>",
                        "？": "<QUESTION_MARK>",
                        "：": "<COLON>",
                        "；": "<SEMICOLON>",
                        "》": "<RIGHT_BRACKETS>",
                        "《": "<LEFT_BRACKETS>",
                        "‘": "<RIGHT_APOSTROPHE>",
                        "’": "<LEFT_APOSTROPHE>",
                        "\t": "<TAB>",
                        "\n": "<NEW_LINE>",
                        "＊": "<ASTERISK>",
                        "%": "<PERCENT>",
                        "＄": "<DOLLAR_SIGN>",
                        "＆": "<AMPRISAND>"
                        }

In [4]:
class RNN(nn.Module):
    """
    A recurrent neural network class that inherits from the PyTorch nn.Module class.

    Args:
        vocabulary_size (int): number of unique characters in the vocabulary.
        output_size (int): size of the output, which is equal to the vocabulary size.
        embedding_dimension (int): size of the feature vector for mapping characters.
        hidden_dimension (int): number of nodes in each hidden layer.
        number_layers (int): number of layers in the LSTM model.
        dropout (float, optional): dropout rate. Default is 0.5.

    Attributes:
        embedding (nn.Embedding): embedding layer that maps characters to feature vectors.
        lstm (nn.LSTM): LSTM layer that processes the embedded characters.
        vocabulary_size (int): number of unique characters in the vocabulary.
        output_size (int): size of the output, which is equal to the vocabulary size.
        embedding_dimension (int):size of the feature vector for mapping characters.
        hidden_dimension (int): number of nodes in each hidden layer.
        number_layers (int): number of layers in the LSTM model.
        fully_connected_layer (nn.Linear): fully connected layer that produces the final output.

    Methods:
        forward(input, hidden_state): Forward pass of the network.
        init_hidden(batch_size): Initialize the hidden state of the LSTM.
    """
    def __init__(self, vocabulary_size, output_size, embedding_dimension, hidden_dimension, number_layers, dropout=0.5):
        super().__init__() # call __init__ method of parent class to inherit its attributes
        self.embedding = nn.Embedding(vocabulary_size, embedding_dimension) # create an embedding layer
        self.lstm = nn.LSTM(embedding_dimension, hidden_dimension, number_layers, dropout=dropout, batch_first=True) # create an LSTM layer
        self.vocabulary_size = vocabulary_size # set the number of unique characters in the vocabulary
        self.output_size = output_size # set the output size (which is equal to the vocabulary size)
        self.embedding_dimension = embedding_dimension # set the size of the feature vector for mapping characters
        self.hidden_dimension = hidden_dimension # set the number of nodes in each hidden layer
        self.number_layers = number_layers # set the number of layers in the LSTM model
        self.fully_connected_layer = nn.Linear(hidden_dimension, output_size) # create a fully connected layer
    
    def forward(self, input, hidden_state):
        batch_size = input.size(0) # get the batch size
        input = input.long() # convert the input to a LongTensor
        embedddings = self.embedding(input) # apply the embedding layer to the input
        lstm_out, hidden_state = self.lstm(embedddings, hidden_state) # apply the LSTM layer to the embeddings and the hidden state
        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dimension) # stack LSTM outputs
        output = self.fully_connected_layer(lstm_out) # apply the fully connected layer to the LSTM outputs
        output = output.view(batch_size, -1, self.output_size) # reshape the output to batch_size * sequence_length * output_size
        output = output[:, -1] # get the final batch
        return output, hidden_state # return the final batch word scores and the final hidden state
    
    def init_hidden(self, batch_size):
        # create two new zero tensors of size number_layers * batch_size * hidden_dimension
        # initialize hidden state with zero weights, and run on GPU if possible
        weights = next(self.parameters()).data # get the weights of the model
        if(train_on_gpu) == True: # if we are training on a GPU
            # create a tuple of two tensors, both of size number_layers * batch_size * hidden_dimension, and initialize them with zero weights, and run them on the GPU if available
            hidden_state = (weights.new(self.number_layers, batch_size, self.hidden_dimension).zero_().cuda(), 
                     weights.new(self.number_layers, batch_size, self.hidden_dimension).zero_().cuda()) 
        else:
            hidden_state = (weights.new(self.number_layers, batch_size, self.hidden_dimension).zero_(),
                     weights.new(self.number_layers, batch_size, self.hidden_dimension).zero_()) 
        return hidden_state # return the hidden state


In [5]:
def preprocess_data(filename):
    """
    Preprocesses data for use in a language model.

    Args: filename (str):name of the file to preprocess.

    Returns: None
    """
    # Read in the data from the file.
    with open(f"./data/{filename}.txt", "r", encoding="GBK") as file_in:
        data = file_in.readlines()
    # Strip whitespace from the data.
    data = [line.strip() for line in data]
    # Join all the lines into one big long string.
    long_data = ''.join(data)
    # Count the occurrences of each character in the data.
    character_counts = Counter(long_data)
    # Filter out any punctuation characters (for some reason?).
    trimmed_counts = {item: count for item, count in character_counts.items() if item not in punctuation_dict.keys()}
    # Sort the remaining characters in descending order of frequency.
    sorted_characters = sorted(trimmed_counts, key=trimmed_counts.get, reverse=True)
    # Create dictionaries to convert between characters and their indices in the sorted list.
    index_to_character = {index: character for index, character in enumerate(sorted_characters + list(SPECIAL_WORDS.values()))}
    character_to_index = {character: index for index, character in index_to_character.items()}
    # Convert the long string of data into a list of indices.
    indexed_data = list()
    for character in long_data:
        try:
            indexed_data.append(character_to_index[character])
        except KeyError:
            # If a character is not in the vocabulary, skip it.
            pass
    # Save the preprocessed data and dictionaries using pickle.
    pickle.dump((indexed_data, character_to_index, index_to_character, punctuation_dict), open(f"./data/save/{filename}_preprocess.p", 'wb'))

    return

In [6]:
def batch_data(characters, sequence_length, batch_size):
    """
    Batches input/target pairs for a given set of characters.
    returns a DataLoader object that can be used to iterate over the batches during training or testing.
    
    Args:
        characters (list): list of characters to be used as input for the RNN.
        sequence_length (int): length of each input sequence.
        batch_size (int): number of input/target pairs per batch.
    
    Returns: DataLoader: PyTorch DataLoader object that contains batches of input/target pairs.
    """
    # Calculate the number of batches to make
    number_batches = len(characters)//batch_size
    # Only consider characters up to the last full batch
    characters = characters[:number_batches * batch_size]
    # Initialize empty lists to hold inputs and targets
    x, y = [], []

    # Iterate through the characters, creating input/target pairs
    for i in range(0, len(characters)-sequence_length):
        # Get the starting and ending indices for the input sequence
        i_end = i + sequence_length        
        # Extract the input sequence and append it to the list of inputs
        batch_x = characters[i: i + sequence_length]
        x.append(batch_x)
        # Extract the target character and append it to the list of targets
        batch_y = characters[i_end]
        y.append(batch_y)
    
    # Create a TensorDataset from the input/target pairs
    data = TensorDataset(torch.from_numpy(np.asarray(x)), torch.from_numpy(np.asarray(y)))
    # Use DataLoader to create batches from the TensorDataset
    data_loader = DataLoader(data, shuffle=True, batch_size=batch_size)
        
    return data_loader # Return the DataLoader object

In [7]:
def forward_backward_propagation(rnn, optimizer, criterion, input_batch, target, hidden):
    """Runs forward and backward propagation on a given batch of input data and target labels.

    Args:
        rnn (torch.nn.Module): RNN model to use for the forward propagation.
        optimizer (torch.optim.Optimizer): optimizer to use for the backward propagation.
        criterion (torch.nn.modules.loss._Loss): loss function to use for computing the error.
        input_batch (torch.Tensor): input batch of data with shape (batch_size, seq_len).
        target (torch.Tensor): target batch of labels with shape (batch_size, seq_len).
        hidden (torch.Tensor): hidden state of the RNN with shape (num_layers * num_directions, batch_size, hidden_size).

    Returns: tuple containing the loss over the batch and the new hidden state of the RNN with shape (num_layers * num_directions, batch_size, hidden_size).
    """
    # Move data to GPU, if available
    if(train_on_gpu) == True:
        rnn.cuda()
    # Creating variables for hidden state to prevent back-propagation
    hidden_states = tuple([state.data for state in hidden]) # Historical states 
    rnn.zero_grad()
    # Move inputs and targets to GPU
    inputs, targets = input_batch.cuda(), target.type(torch.LongTensor).cuda() 
    # Get the output and new hidden state from the RNN model
    output, hidden_states = rnn(inputs, hidden_states)
    # Compute the loss between output and target
    loss = criterion(output, targets)
    # Perform backpropagation and optimization
    loss.backward()
    nn.utils.clip_grad_norm_(rnn.parameters(), 5) # Gradient clipping to prevent exploding gradients
    optimizer.step()

    # Return the loss over a batch and the hidden state produced by our model
    return loss.item(), hidden_states


In [8]:
def train_rnn(rnn, batch_size, optimizer, criterion, number_epochs, show_every_n_batches=100) -> RNN:
    """
    trains a given RNN model for a specified number of epochs using a specified optimizer and criterion. 
    It also prints the training loss every show_every_n_batches batches.

    Parameters:
        rnn (nn.Module): RNN model to be trained
        batch_size (int): batch size for training
        optimizer (torch.optim.Optimizer): optimizer to use for training the model
        criterion (torch.nn.modules.loss._Loss): loss criterion to use for training the model
        number_epochs (int): number of epochs to train the model
        show_every_n_batches (int, optional): number of batches after which to print the training loss. Default is 100.
    
    Returns:
        rnn (nn.Module): trained RNN model
    """

    batch_losses = [] # initialize an empty list to store the losses for each batch
    rnn.train() # set the model to training mode

    print("Training for %d epoch(s)..." % number_epochs) # print the number of epochs for training
    for epoch_i in range(1, number_epochs + 1): # loop over the epochs
        hidden = rnn.init_hidden(batch_size) # initialize the hidden state
        for batch_j, (inputs, labels) in enumerate(train_loader, 1): # loop over the batches in the training data
            number_batches = len(train_loader.dataset)//batch_size # calculate the total number of batches
            if batch_j > number_batches: # if the current batch number is greater than the total number of batches, break out of the loop
                break
            # forward + backward propagation
            loss, hidden = forward_backward_propagation(rnn, optimizer, criterion, inputs, labels, hidden) # perform forward and backward propagation on the current batch          
            batch_losses.append(loss) # record the loss for the current batch
            if batch_j % show_every_n_batches == 0: # if the current batch number is a multiple of show_every_n_batches, print the average loss for the previous batches
                print('Epoch: {:>4}/{:<4}  Loss: {}\n'.format(
                    epoch_i, number_epochs, np.average(batch_losses)))
                batch_losses = [] # reset the batch loss list for the next batches
   
    return rnn

In [9]:
def generate(rnn, prime_id, index_to_character, punctuation_dict, pad_value, predict_len=100) -> str:
    """
    Generates a sequence of characters using a trained RNN model.

    Args:
        rnn (nn.Module): trained RNN model.
        prime_id (int): index of the character to start the sequence with.
        index_to_character (dict): dictionary that maps character indices to characters.
        punctuation_dict (dict): dictionary that maps punctuation characters to their corresponding token.
        pad_value (int): value to use for padding the sequence.
        predict_len (int): number of characters to generate (default is 100).

    Returns: string containing the generated characters.
    """
    rnn.eval() # set the model to evaluation mode

    # create a sequence (batch_size=1) with the prime_id
    current_sequence = np.full((1, sequence_length), pad_value) # initialize the current sequence with the pad_value
    current_sequence[-1][-1] = prime_id # set the last element of the sequence to the prime_id
    predicted = [index_to_character[prime_id]] # initialize the predicted list with the character corresponding to the prime_id
    
    for _ in range(predict_len):
        if train_on_gpu == True:
            current_sequence = torch.LongTensor(current_sequence).cuda() # move the current sequence to the GPU if available
        else:
            current_sequence = torch.LongTensor(current_sequence) # convert the current sequence to a tensor
        # initialize the hidden state
        hidden = rnn.init_hidden(current_sequence.size(0))
        # get the output of the rnn
        output, hidden_state = rnn(current_sequence, hidden)
        # get the next word probabilities
        probability = Function.softmax(output, dim=1).data
        if train_on_gpu == True:
            probability = probability.cpu() # move the probability tensor to the CPU if available
        # use top_k sampling to get the index of the next word
        top_k = 5
        probability, top_i = probability.topk(top_k)
        top_i = top_i.numpy().squeeze()
        # select the likely next word index with some element of randomness
        probability = probability.numpy().squeeze()
        # choose the next character index based on the probability distribution
        character_i = np.random.choice(top_i, p=probability/probability.sum())
        # retrieve that character from the dictionary
        character = index_to_character[character_i]
        predicted.append(character)     
        # the generated character becomes the next "current sequence" and the cycle can continue
        current_sequence = np.roll(current_sequence.cpu(), -1, 1)
        current_sequence[-1][-1] = character_i
    
    gen_sentences = ''.join(predicted) # concatenate the predicted characters into a string
    
    return gen_sentences


In [10]:
def load(filename):
    """
    Loads preprocessed data from a file, or preprocesses the data if the file doesn't exist.

    Args: filename (str): The name of the file to load or create.

    Returns: tuple containing the following:
        - indexed_data (list): list of integers representing the preprocessed text data.
        - character_to_index (dict): dictionary that maps characters to their corresponding indices.
        - index_to_character (dict): dictionary that maps indices to their corresponding characters.
        - punctuation_dict (dict): dictionary that maps punctuation characters to their corresponding token.
    """
    
    while True:
        try:
            # Try to load preprocessed data from file
            indexed_data, character_to_index, index_to_character, punctuation_dict = pickle.load(open(f"./data/save/{filename}_preprocess.p", mode='rb'))
            return indexed_data, character_to_index, index_to_character, punctuation_dict
        except FileNotFoundError:
            # If the file doesn't exist, preprocess the data and try again
            print("Couldn't find file, rebuilding data...")
            preprocess_data(filename)  # This is assumed to be a function that preprocesses the data


In [11]:
def train_and_save(filename, finetune=False):
    """
    Train a RNN model on the preprocessed data and save the trained model to a file.

    Args: filename (str): filename to use for the saved model file.

    Returns: print statement confirming success.
    """
    # initailize vocab and output sizes for training 
    vocabulary_size = len(character_to_index)
    # Output size
    output_size = vocabulary_size
    # Create an RNN model with the specified architecture
    if finetune == False:
        rnn = RNN(vocabulary_size, output_size, embedding_dimension, hidden_dimension, number_layers, dropout=0.5)
    elif finetune == True:
        rnn = torch.load('./data/save/full_hsk_sentences_trained_rnn.pt')
    
    if train_on_gpu:
        rnn.cuda() # Move the model to the GPU if available

    # Define the loss and optimization functions for training
    optimizer = torch.optim.Adam(rnn.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    # Train the RNN model
    trained_rnn = train_rnn(rnn, batch_size, optimizer, criterion, number_epochs, show_every_n_batches)
    # Save the trained model to a file
    torch.save(trained_rnn, f'./data/save/{filename}_trained_rnn.pt')
    return print('Model Trained and Saved')


In [12]:
# Define the filename of the preprocessed data to be loaded and trained on
filename = f"full_hsk_sentences"
 # Load the preprocessed data if available or preprocess if not
indexed_data, character_to_index, index_to_character, punctuation_dict = load(filename)

# Train baseline rnn model
while True:
        # Try to load preprocessed data from file
        try:
            trained_rnn = torch.load(f'./data/save/{filename}_trained_rnn.pt')
            print(f"{filename} loaded.")
            break
        # If the file doesn't exist, train the model and try again
        except FileNotFoundError:
            # Prepare data by batching it into sequence_length chunks of batch_size
            train_loader = batch_data(indexed_data, sequence_length, batch_size)
            # Train RNN on the prepared data and save the trained model to file
            train_and_save(filename)

full_hsk_sentences loaded.


In [13]:
text_length = 10  # number of characters to generate

# Initialize empty list to store generated texts
text_collection = []
# Loop through each of the 6 HSK levels
for i in tqdm(range(1, 7)):
    # Load preprocessed data and the trained RNN model for the current HSK level
    filename = f"hsk{i}_sentences_filtered"
    # Load preprocessed data from file
    unused_indexed_data, vocab_to_int, int_to_vocab, punctuation_dict = load(filename)
    while True:
        # Try to load preprocessed data from file
        try:
            trained_rnn = torch.load(f'./data/save/{filename}_trained_rnn.pt')
            break
        # If the file doesn't exist, train the model and try again
        except FileNotFoundError:
            print(f"Training new rnn for HSK {i}")
            # Create batches of training data
            train_loader = batch_data(indexed_data, sequence_length, batch_size)
            # Train model and save it to a file
            train_and_save(filename, finetune=True)
    # Set the prime words to use for each text
    prime_words = ['我', '你', '他', '她', '这', '人', '的', '是', '那', '一']  # character(s) to use to start generation
    pad_word = SPECIAL_WORDS['PADDING']

    texts = [] # Initialize empty list to store text during generation
    for prime_word in prime_words: # Generate text based on each prime word
        # Generate text using the trained RNN and the current prime word
        text = generate(trained_rnn, vocab_to_int[prime_word], int_to_vocab, punctuation_dict, vocab_to_int[pad_word], text_length)
        texts.append(text)
    # append generated texts to the list of generated texts
    text_collection.append('\n'.join(texts))

# Print number of unique characters in each generated text as well as generated text
for i, output in enumerate(text_collection):
    print(f"Unique characters generated for HSK {i} model: {len(set(output))}")
    print(f"Generated text based on HSK {i} model: ")
    print(output)


100%|██████████| 6/6 [00:04<00:00,  1.45it/s]

Unique characters generated for HSK 0 model: 61
Generated text based on HSK 0 model: 
我们可以去公园散步他很
你想吃什么他喜欢吃水果
他是一个医生我会说一点
她你有没有兄弟姐妹你喜
这是你的铅笔吗请问你有
人我们可以去公园玩儿你
的你喜欢吃蔬菜吗我想要
是你住在哪里我们的学生
那我们可以坐地铁去吗我
一你喜欢吃甜食你喜欢旅
Unique characters generated for HSK 1 model: 61
Generated text based on HSK 1 model: 
我今天天气很好我们去公
你觉都很便宜我叫李明他
他很漂亮你要不要来看看
她我天美你要不要试试你
这我很漂天这个菜很热闹
人天我们可以在图书馆学
的我们要坐地铁去你觉得
是我的狗很可爱我们可以
那我们可以在那家书店买
一了你喜欢吃中国菜吗这
Unique characters generated for HSK 2 model: 62
Generated text based on HSK 2 model: 
我的学校有很多俱乐部比
你身有名这个地方很热闹
他有一个很好的福利制度
她妈样看电影我喜欢听音
这个地方很舒适他的中文
人身体有这个周末天气预
的身高他们在商场购物你
是身试试样我们可以在这
那有一个很好的福利制度
一名吗这个地方很舒适他
Unique characters generated for HSK 3 model: 61
Generated text based on HSK 3 model: 
我们可以在这里等一会儿
你要不非常努力我的爷爷
他道的对音乐会们个房间
她对让了我的家离学校不
这道这个项目我们每个月
人们多个问题我们需要更
的对道菜的味道很好我很
是春十我们需要更加注重
那道菜对让我们一起工作
一么个问题我们可以在这
Unique characters generated for HSK 4 model: 78
Generated text based on HSK 4 model: 
我们要始终保持谦虚谨慎
你保取市的文化遗产非常
他是个非常勤奋的学生总
她的成就令人钦佩是许多
这个公司的业务范围非常
人年们要保持积极的心态
的支持这个项目需要一个
是支与与外国


