In [1]:
import logging 
import itertools
import random
import torch 
import torch.random

import numpy as np
import pandas as pd 

from pathlib import Path 
from typing import Union, Generator
from collections.abc import Iterable

logging.basicConfig(level=logging.DEBUG, 
                    format='[%(levelname)s - %(asctime)s] %(message)s',
                    datefmt='%m/%d/%Y %I:%M:%S %p')

DATASET_PATH = "../data/text_emotion.csv"
RANDOM_STATE = 42

NGRAM_SIZE = 4
MAX_RESPONSE_LENGTH=100
SENTENCE_SEPARATOR = ". "

# Load the dataset 

In [2]:
# Show the head of the dataset or throw an error if the dataset file could not be found
dataset_path = Path(DATASET_PATH).resolve()
if not dataset_path.exists() or not dataset_path.is_file():
    logging.error(f'The dataset file could not be found at the specified path: \'{dataset_path}\'')
    raise FileNotFoundError('The dataset file could not be found at the specified path')
logging.info(f'Loading dataset from \'{dataset_path}\'')
dataset = pd.read_csv(dataset_path)
dataset.head()

[INFO - 04/10/2023 09:02:15 AM] Loading dataset from '/Users/kadeem/Spaces/Projects/ANa/ana-core/data/text_emotion.csv'


Unnamed: 0,tweet_id,sentiment,author,content
0,1956967341,empty,xoshayzers,@tiffanylue i know i was listenin to bad habi...
1,1956967666,sadness,wannamama,Layin n bed with a headache ughhhh...waitin o...
2,1956967696,sadness,coolfunky,Funeral ceremony...gloomy friday...
3,1956967789,enthusiasm,czareaquino,wants to hang out with friends SOON!
4,1956968416,neutral,xkilljoyx,@dannycastillo We want to trade with someone w...


# Create a text corpus from the dataset 

In [3]:
corpus = dataset['content'].str.cat(sep=SENTENCE_SEPARATOR)
print(f"Sample of corpus: '{corpus[:100]}'")

Sample of corpus: '@tiffanylue i know  i was listenin to bad habit earlier and i started freakin at his part =[. Layin '


# Create the tokenizer

In [4]:
class Tokenizer:
    @staticmethod
    def tokenize(text: str) -> list[str]:
        """Returns the input text as a sequence of tokens

        The input is tokenized at character level and returns each character
        in the order they appear in the input
        """
        return (char for char in text) if text is not None else None

In [5]:
print(f"Sample of training tokens: {list(itertools.islice(Tokenizer.tokenize(corpus), 10))}")

Sample of training tokens: ['@', 't', 'i', 'f', 'f', 'a', 'n', 'y', 'l', 'u']


# Create the token codec

In [6]:
class TokenCodec:
    def __init__(self, token_stream: Generator[str, None, None]):
        # get the unique tokens in the token stream
        unique_tokens = set()
        for token in token_stream:
            unique_tokens.add(token)
        self.alphabet_size = len(unique_tokens)
        # map each unique token to an id
        self._id_by_token = {token: _id for _id, token in enumerate(unique_tokens)}
        # create the reverse mapping from ids to tokens
        self._token_by_id = {_id: token for token, _id in self._id_by_token.items()}
    
    def encode(self, text: Union[str, Iterable[str]]) -> list[int]:
        return [self._id_by_token[token] for token in text]
    
    def decode(self, encoded_text: Iterable[int]) -> str:
        return "".join([self._token_by_id[_id] for _id in encoded_text])

# Create the ngram model

In [7]:
def to_ngrams(corpus, tokenizer, ngram_size=NGRAM_SIZE):
    token_sequences = [itertools.islice(Tokenizer.tokenize(corpus), pos, None) for pos in range(ngram_size)]
    assert len(token_sequences) == ngram_size
    while True:
        try:
            ngram = [next(seq) for seq in token_sequences]
            yield ngram
        except StopIteration:
            break    

In [8]:
corpus_sample = corpus[:10]
print(f"Text sample: {corpus_sample}")
print("n-grams:")
list(to_ngrams(corpus_sample, Tokenizer))

Text sample: @tiffanylu
n-grams:


[['@', 't', 'i', 'f'],
 ['t', 'i', 'f', 'f'],
 ['i', 'f', 'f', 'a'],
 ['f', 'f', 'a', 'n'],
 ['f', 'a', 'n', 'y'],
 ['a', 'n', 'y', 'l'],
 ['n', 'y', 'l', 'u']]

In [9]:
class NGramModel:
    def __init__(self, codec: TokenCodec, ngram_size:int=NGRAM_SIZE, max_response_length=MAX_RESPONSE_LENGTH):
        logging.info(f"Initializing a {ngram_size}-gram model")
        self.codec = codec
        self.ngram_size = ngram_size
        self.max_response_length = max_response_length
        
    def train(self, corpus):
        """Train the model on the tokens from the training corpus"""
        # count the occurrences of each ngram
        self._ngram_frequencies = torch.zeros(self.ngram_size * [self.codec.alphabet_size], dtype=torch.int32)
        for ngram in to_ngrams(corpus, Tokenizer, ngram_size=self.ngram_size):
            encoded_ngram = tuple(self.codec.encode(ngram))
            self._ngram_frequencies[encoded_ngram] = self._ngram_frequencies[encoded_ngram].item() + 1
        # build a probability distribution of each ngram based on their frequency
        self._ngram_proba = self._ngram_frequencies / self._ngram_frequencies.sum(axis=self.ngram_size - 1, keepdims=True)
        return self
    
    def __call__(self, prompt=None):
        # TODO: handle the case where the prompt is shorter than the ngram size
        # choose a random letter as the starting prompt is none is provided
        if prompt is None or prompt == "":
            # prompt = self._vocabulary[random.randint(0, len(self._vocabulary))]
            prompt = "dog"
        # initialize the response with the prompt as the lead 
        response = codec.encode(prompt)
        # build the remainder of the response one token at a time using the ngram model
        next_token = None
        count = 0
        while next_token != self.codec.encode('.')[0] and count < self.max_response_length: 
        # for i in range(self.max_response_length):
            leading_tokens = tuple(response[-self.ngram_size+1:])
            next_token = torch.multinomial(self._ngram_proba[leading_tokens], num_samples=1).item()
            response.append(next_token)
            count += 1
        return self.codec.decode(response)
        

In [10]:
codec = TokenCodec(Tokenizer.tokenize(corpus))
model = NGramModel(codec, max_response_length=50).train(corpus)

[INFO - 04/10/2023 09:02:15 AM] Initializing a 4-gram model


# Generate a few responses using the model 

In [17]:
prompt = 'Today'
# Generate a few responses from the same prompt
print(f"Responses from the prompt '{prompt}':")
for i in range(10):
    response = model(prompt)
    print(f'  {response} [{len(response)} chars]')

Responses from the prompt 'Today':
  Today new u remer cool press backs to sepin whold!. [51 chars]
  Today oftw. [11 chars]
  Today. [6 chars]
  Today like the flately traces in 2 getty Oh, I littere  [55 chars]
  Today ule anday. [16 chars]
  Todays there take stuffice is hat one he globby_rence n [55 chars]
  Today. [6 chars]
  Today havey and reat!?!? Reebodi Hi btw. [40 chars]
  Today get a why. [16 chars]
  Today moviewedding like conder 18 One o2 - is time!. [52 chars]
