In [1]:
from collections import defaultdict
import pandas as pd
from transformers import BartTokenizer
import pickle

class BPEModel:
    
    def __init__(self, vocab_size=20000):
        """
        Description: Initializes the BPEModel with an initial vocabulary size.
        Inputs: 
            - vocab_size: (Optional) The desired vocabulary size. Default is 20000.
        Outputs: None
        """
        self.vocab_size = vocab_size
        self.tokenizer = BartTokenizer.from_pretrained("facebook/bart-large")
        self.word_freqs = defaultdict(int)
        self.bpe_merges = {}
        self.vocab = []

    def load_data_from_csv(self, filename, column_name):
        """
        Description: Loads data from a CSV file and returns a list of sentences.
        Inputs: 
            - filename: The path to the CSV file.
            - column_name: The column name which contains the sentences.
        Outputs: 
            - List of sentences from the specified column.
        """
        df = pd.read_csv(filename)
        return df[column_name].tolist()

    def compute_pair_freqs(self, splits):
        """
        Description: Computes the frequency of character pairs in the current vocabulary.
        Inputs: 
            - splits: Tokenized words split into characters.
        Outputs: 
            - Dictionary of character pairs and their frequencies.
        """
        pair_freqs = defaultdict(int)
        for word, freq in self.word_freqs.items():
            split = splits[word]
            for i in range(len(split) - 1):
                pair = (split[i], split[i + 1])
                pair_freqs[pair] += freq
        return pair_freqs

    def merge_pair(self, a, b, splits):
        """
        Description: Merges a character pair in the given word splits.
        Inputs: 
            - a: First character.
            - b: Second character.
            - splits: Tokenized words split into characters.
        Outputs: 
            - Updated splits with merged character pairs.
        """
        for word in self.word_freqs:
            split = splits[word]
            i = 0
            while i < len(split) - 1:
                if split[i] == a and split[i + 1] == b:
                    split = split[:i] + [a + b] + split[i + 2 :]
                else:
                    i += 1
            splits[word] = split
        return splits

    def train_bpe(self, corpus):
        """
        Description: Trains the BPE model on a given corpus.
        Inputs: 
            - corpus: List of sentences.
        Outputs: 
            - Dictionary of merged character pairs.
            - Updated vocabulary.
        """
        for text in corpus:
            new_words = self.tokenizer.tokenize(text)
            for word in new_words:
                self.word_freqs[word] += 1

        splits = {word: [c for c in word] for word in self.word_freqs.keys()}
        self.vocab = list(set(''.join(corpus)))

        while len(self.vocab) < self.vocab_size:
            pair_freqs = self.compute_pair_freqs(splits)
            if not pair_freqs:
                break
            best_pair = max(pair_freqs, key=pair_freqs.get)
            splits = self.merge_pair(*best_pair, splits)
            self.bpe_merges[best_pair] = best_pair[0] + best_pair[1]
            self.vocab.append(best_pair[0] + best_pair[1])
        return self.bpe_merges, self.vocab

    def incremental_train(self, new_corpus):
        """
        Description: Incrementally trains the BPE model on a new corpus.
        Inputs: 
            - new_corpus: List of new sentences.
        Outputs: None (Updates the BPE merges and vocabulary in-place).
        """
        for text in new_corpus:
            new_words = self.tokenizer.tokenize(text)
            for word in new_words:
                self.word_freqs[word] += 1

        splits = {word: [c for c in word] for word in self.word_freqs.keys()}

        while len(self.vocab) < self.vocab_size:
            pair_freqs = self.compute_pair_freqs(splits)
            if not pair_freqs:
                break
            best_pair = max(pair_freqs, key=pair_freqs.get)
            splits = self.merge_pair(*best_pair, splits)
            self.bpe_merges[best_pair] = best_pair[0] + best_pair[1]
            self.vocab.append(best_pair[0] + best_pair[1])
        return

    def add_special_tokens(self, tokens):
        """
        Description: Adds special tokens to the vocabulary.
        Inputs: 
            - tokens: List of special tokens to be added.
        Outputs: 
            - Updated vocabulary.
        """
        self.vocab.extend(tokens)
        return self.vocab

    def save_bpe(self, model_path):
        """
        Description: Saves the current BPE merges and vocabulary to a file.
        Inputs: 
            - model_path: The path where the model should be saved.
        Outputs: None
        """
        with open(model_path, 'wb') as f:
            pickle.dump((self.bpe_merges, self.vocab), f)

    def load_bpe(self, model_path):
        """
        Description: Loads the BPE merges and vocabulary from a file.
        Inputs: 
            - model_path: The path from where the model should be loaded.
        Outputs: None
        """
        with open(model_path, 'rb') as f:
            self.bpe_merges, self.vocab = pickle.load(f)

    def bpe_tokenize(self, text):
        """
        Description: Tokenizes a given text using the current BPE merges and vocabulary.
        Inputs: 
            - text: The text to be tokenized.
        Outputs: 
            - List of tokens.
        """
        words = self.tokenizer.tokenize(text)
        splits = {word: [c for c in word] for word in words}
        tokens = []
        for word in words:
            split = splits[word]
            tokenized_word = []
            i = 0
            while i < len(split):
                merge_found = False
                for j in range(len(split) - 1, i, -1):
                    if tuple(split[i:j]) in self.bpe_merges:
                        tokenized_word.append(''.join(split[i:j]))
                        i = j
                        merge_found = True
                        break
                if not merge_found:
                    tokenized_word.append(split[i])
                    i += 1
            tokens.extend(tokenized_word)
        return tokens

## Example 1

In [2]:
# Define your Spanglish corpus
spanglish_corpus = [
    "This is a Spanglish sentence.",
    "Hola, cómo estás?",
    "I don't hablar español very well.",
    "Spanglish is a combination of English y español.",
]

# Initialize the BPEModel
bpeModel = BPEModel()

# Train the BPE model with the given spanglish_corpus
bpeModel.train_bpe(spanglish_corpus)

# Tokenize a new Spanglish sentence
sentence = "Hello, cómo estás today?"
tokenized_text = bpeModel.bpe_tokenize(sentence)
print(tokenized_text)

['H', 'e', 'l', 'l', 'o', ',', 'Ġ', 'c', 'Ã', '³', 'm', 'o', 'Ġe', 's', 't', 'Ã¡', 's', 'Ġ', 't', 'o', 'd', 'a', 'y', '?']


# Train Model

In [3]:
# Initialize the BPE model
bpeModel = BPEModel()

# Load data from the CSV
filename = "spanglish.csv"  # Path to your CSV file
column_name = "sentences"  # Name of the column containing the data
spanglish_corpus = bpeModel.load_data_from_csv(filename, column_name)

# Train the BPE model with the loaded spanglish_corpus
bpe_merges, vocab = bpeModel.train_bpe(spanglish_corpus)

# Save Model

In [None]:
# Save the trained BPE model to a file
model_path = "saved_bpe_model.pkl"  # Path where you want to save the model
bpeModel.save_bpe(model_path)

# Load Saved Model

In [4]:
# Load the saved BPE model from the file
bpeModel = BPEModel()
bpeModel.load_bpe(model_path)

# Now, you can use the loaded BPE model as before

### Test loaded model

In [5]:
# Tokenize a new Spanglish sentence
sentence = "Hello, cómo estás today?"
tokenized_text = bpeModel.bpe_tokenize(sentence)
print(tokenized_text)

['H', 'el', 'l', 'o', ',', 'Ġ', 'c', 'Ã', '³', 'm', 'o', 'Ġe', 's', 't', 'Ã¡', 's', 'Ġt', 'od', 'a', 'y', '?']
