# Kwere Character-Level Language Iterative N-Gram Model

For my from scratch implementation I originally tried to do a vanilla n-gram model, but ran into speed issues with anything beyond a history of 5-characters, which had very poor performance.

Instead, I decided to try to implement a sort of "Iterative" N-Gram Model that works backwards to find the longest possible relatively frequent sequence. I explain more in the documentation below but the important thing to note is the model works *backwards* from the target character when computing probabilities.

So for example if an input sequence is [1,0,1,2,1], it'll first count the number of times [1] occurs. If it's greater than `threshold`, it'll "overflow" and begin counting sequences of [X, 1]. 

It will recursively so this until a less common sequence is reached. So if [1, 2, 1] is a very common sequence, it'll make the next prediction based on [0, 1, 2, 1].

### Results
As you'll see below, I reached a cross entropy of 2.228 While this seems high, it's not too far off from my anything goes implementation, so either it's not bad or I messed that up as well, which is fairly likely. The accuracy was (what seems) a reasonable 60%, considering the lack of Kwere training data and the messiness of the Swahili data.

### Parameters
Dictionary containing all parameters for ease of tuning. These will be logged to the neptune logger below.

**To add test data, enter the test file name in the `test_data` parameter.**

In [85]:
PARAMS = {
    'experiment_name': "Kwere",
    'tags': ["kwere", "from scratch"],
    'n': 1000,
    'threshold': 750,
    'train_iterations': 10,
    'carry_hidden_state': False,
    'val_split': 0.3,
    'kwere_train': "./cwe-train.txt",
    'pretrain_iterations': 5,
    'pretrain_percentage': 0.01, 
    'swahili': "./sw-train.txt",
    'test_data': "./cwe-test.txt"
}

Only import. Used for the log function to compute cross entropy.

In [86]:
import math

### Dataset Class
The `Dataset` class generates a list of all unique characters found in the supplied data, number of total characters, number of unique characters, mappings from characters to their respective ID, mappings from chracter IDs to characters for making outputs readable, and a data tensor of every character converted to its ID.

The `Dataset` will also generate a `~` character to be used in place of any characters unknown to the model (i.e. anything not in the training set). See the `clean_data` function below.

Inputs:
 - `raw_data`: `string` of all characters from the provided data in order

In [87]:
class Dataset():
    def __init__(self, raw_data: str):
        self.chars = set(list(set(raw_data)))
        self.chars.add('~')
        self.data_size, self.vocab_size = len(raw_data), len(self.chars)
        print("{} characters, {} unique".format(self.data_size, self.vocab_size))
        
        self.char_to_idx = { char: idx for idx, char in enumerate(self.chars) }
        self.idx_to_char = { idx: char for idx, char in enumerate(self.chars) }
        
        self.data = [self.char_to_idx[char] for char in list(raw_data)]
    
    def __len__(self):
        return self.data_size
    
    def __getitem__(self, index):
        return self.data[index]

### Data Cleaning
The `clean_data` function removes any unknown chracters in the provided data and replaces them with the deisgnated unknown chracter of `~`. I'm essentially forfeiting these characters if they ever appear in the testing data, since I likely couldn't get them correct anyway considering the model did not see them during training (unless they appear in the Kwere data, but see my explanation below for that decision).

Inputs:
 - `raw_data`: `string` of raw data read directly from file
 - `known_chars`: `list` of `string` to be included in the data. Everything not in this list will be replaced.

In [88]:
def clean_data(raw_data: str, known_chars: str) -> str:
    cleaned = ""
    for char in raw_data:
        if char not in known_chars:
            cleaned += "~"
        else:
            cleaned += char
    return cleaned

### Data Loading
Load the Swahili training data and split based on the provided ratio. Then load the percentage of the Kwere data requested (see `PARAMS`). Finally, if a test file is provided in `PARAMS`, load the test data.

The validation, Kwere, and test data are all cleaned of unknown chracters. I chose to exclude any chracters found in the Swahili data but not found in the Swahili training data for the sake of staying as true to the Swahili language as possible (in the event Kwere uses a character that Kwere does not).

In [89]:
print("Loading Kwere training data:", end="\n\t")
raw_kwere = open(PARAMS['kwere_train'], 'r').read()
kwere_train_size, kwere_val_size = int(len(raw_kwere)*(1-PARAMS['val_split'])), int(len(raw_kwere)*PARAMS['val_split'])

train_data = Dataset(raw_kwere[:kwere_train_size])

print("Loading Kwere validation data:", end="\n\t")
cleaned_kwere_val_data = clean_data(raw_kwere[kwere_train_size:], train_data.chars)
val_data = Dataset(cleaned_kwere_val_data)


if PARAMS['pretrain_percentage'] > 0:
    print("Loading Swahili data:", end="\n\t")
    raw_swahili = open(PARAMS['swahili'], 'r').read()
    swahili_size = int(len(raw_swahili) * PARAMS['pretrain_percentage'])

    cleaned_swahili_data = clean_data(raw_swahili[:swahili_size], train_data.chars)
    pretrain_data = Dataset(cleaned_swahili_data)


if len(PARAMS['test_data']) > 0:
    print("Loading Testing data:", end="\n\t")
    raw_test = open(PARAMS['test_data'], 'r').read()

    cleaned_test_data = clean_data(raw_test, train_data.chars)
    test_data = Dataset(cleaned_test_data)

Loading Kwere training data:
	422402 characters, 32 unique
Loading Kwere validation data:
	181030 characters, 32 unique
Loading Swahili data:
	392610 characters, 32 unique
Loading Testing data:
	61717 characters, 32 unique


### Count Matrix
The count matrix acts as a "node" in a web of matrices. The `counts` variable represents the counts of each character following the path to the current count matrix. The `next` variable contains the count matrices further down the path if an overflow has occurred. Otherwise, it's merely a dictionary of `None`.

In [90]:
class CountMatrix:
    def __init__(self, vocab: list, init_matrix=None):
        self.counts = init_matrix if init_matrix is not None else {i:0 for i in vocab}
        self.next = {i:None for i in vocab}

### Increment Count
The `increment_count` function increases the counts of characters following sequences while also controlling for overflow.

Overflow:
If a sequence has occured `threshold` number of times, the algorithm begins tracking the sequence's children, i.e. the combination of all possible preceding characters followed by the current sequence.

In [91]:
def increment_count(char: str, sequence: list, count_matrix: CountMatrix) -> list:
    next_char = sequence[-1]
    
    count_matrix.counts[char] += 1
    if count_matrix.next[next_char] is not None:
        count_matrix.next[next_char] = increment_count(char, sequence[:-1], count_matrix.next[next_char])
    elif sum(count_matrix.counts.values()) > PARAMS['threshold']:
        vocab = count_matrix.next.keys()
        initial_matrix = {i:0 for i in vocab}
        initial_matrix[char] += 1
        count_matrix.next = {i:CountMatrix(vocab, initial_matrix) for i in vocab}
    
    return count_matrix

`iterate_counts` simply calls `increment_count` for each character in the provided dataset.

In [92]:
def iterate_counts(data: Dataset, n: int, count_matrix: CountMatrix):
    for idx, char in enumerate(data[n:]):
        idx = n + idx
        sequence = data[idx-n:idx]
        
        count_matrix = increment_count(data[idx], sequence, count_matrix)
    return count_matrix

Build matrix using the above functions. I iterate over each dataset multiple times to account for the ratio in how much each dataset is taken into account. Since the pretrain set is so large, I iterate over the train set more so the pretrain set's probabilities don't overshadow the training set.

In [94]:
print("Building Matrix...")
count_matrix = CountMatrix(vocab=train_data.idx_to_char.keys())

print("Fitting...")
if PARAMS['pretrain_percentage'] > 0:
    for i in range(PARAMS['pretrain_iterations']):
        count_matrix = iterate_counts(pretrain_data, PARAMS['n'], count_matrix)

for i in range(PARAMS['train_iterations']):
    count_matrix = iterate_counts(train_data, PARAMS['n'], count_matrix)

Building Matrix...
Fitting...


Helper function for computing probabilities based on a dictionary of counts. Uses +1 smoothing.

Also contains an assertion to ensure all probabilities sum to 1.

In [95]:
def probabilities_from_counts(counts: dict):
    # add one smoothing
    counts = {key:counts[key]+1 for key in counts.keys()}
    
    probabilities = {key: counts[key] / sum(counts.values()) for key in counts.keys()}
    prob_sum = sum(probabilities.values())
    assert(abs(prob_sum - 1) < 0.0001), "Probabilities should sum to 1.0 but got {}".format(prob_sum)
    
    return probabilities

### Retrieve Probabilities
1. Start with a given sequence of N characters
2. Go to the `CountMatrix` of the last letter in the sequence (immediately preceding the target character)
3. If this CountMatrix has no further path (`count_matrix.next` is all `None`), then the longest tracked path has been reached so determine the most likely character from the current matrix
4. If more paths are possible, make a recursive call to get more history

In [96]:
def get_probabilities_for_sequence(sequence: list, count_matrix: CountMatrix):
    # return counts if sequence has been exhausted
    if len(sequence) == 0:
        return count_matrix.counts
    
    next_char = sequence[-1]
    
    if count_matrix.next[next_char] is not None:
        return get_probabilities_for_sequence(sequence[:-1], count_matrix.next[next_char])
    else:
        return probabilities_from_counts(count_matrix.counts)

### Cross Entropy Loss

In [97]:
def calc_loss(target_prob):
    return -math.log(target_prob, 2)

### Evaluate

In [102]:
def eval(data: Dataset, n: int, count_matrix: CountMatrix):
    print("Evaluating...")
    
    counter = 0
    running_loss = 0
    running_acc = 0
    
    for idx, char in enumerate(data[n:]):
        idx = n + idx
        sequence = data[idx-n:idx]

        probabilities: dict = get_probabilities_for_sequence(sequence, count_matrix)
        pred: str = max(probabilities, key=probabilities.get)
        target: str = data[idx]
        target_prob: float = probabilities[target]
        
        running_loss += calc_loss(target_prob)
        running_acc += 1 if target == pred else 0
        counter += 1
        
    return running_loss / counter, running_acc / counter

In [105]:
train_loss, train_acc = eval(train_data, PARAMS['n'], count_matrix)
print("Train Loss: {:.3f}\t\t|\tTrain Accuracy: {:.2f}%".format(train_loss, train_acc*100))

val_loss, val_acc = eval(val_data, PARAMS['n'], count_matrix)
print("Validation Loss: {:.3f}\t\t|\tValidation Accuracy: {:.2f}%".format(val_loss, val_acc*100))

Evaluating...
Train Loss: 1.631		|	Train Accuracy: 67.65%
Evaluating...
Validation Loss: 2.228		|	Validation Accuracy: 59.98%


In [109]:
if len(PARAMS['test_data']) > 0:
    test_loss, test_acc = eval(test_data, PARAMS['n'], count_matrix)
    print("Test Loss: {:.3f}\t\t|\tTest Accuracy: {:.2f}%".format(test_loss, test_acc*100))

Evaluating...
Test Loss: 2.228		|	Test Accuracy: 59.55%
