In [None]:
import os
import csv
from dotenv import load_dotenv
from huggingface_hub import login
from transformers import AutoTokenizer, PreTrainedTokenizerFast
from datasets import Dataset
from tqdm import tqdm
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.normalizers import Sequence
from tokenizers.pre_tokenizers import Split, Metaspace

#RETRAIN TOKENIZERS FOR CONVERSATIONAL PURPOSES

#Hugging Face login
load_dotenv("key.env")
login(os.getenv("HF_TOKEN"))

#parameters for experiment:

model = "mistralai/Mistral-7B-v0.1" #a different model can be processed in each execution of the code, these ones or others:
#"mistralai/Mistral-7B-v0.1"
#"meta-llama/Llama-3.1-8B"
#"google/gemma-2-9b"
#"deepseek-ai/DeepSeek-R1"
#"bigscience/bloom"
#"microsoft/phi"

mode_training = "output" #"input", "output", "conversation" are the three possible options
batch_size = 100 #to process the corpus

#extract the model name after the slash:
model_name = model.split("/")[-1]

In [None]:
#choose the train and test splits:
train_corpus = Dataset.from_parquet("train_randomsplit.parquet") #generated with create_corpus.ipynb
test_corpus = Dataset.from_parquet("test_randomsplit.parquet") #generated with create_corpus.ipynb

In [None]:
#to avoid loading all the texts into memory:
def batch_iterator(dataset, batch_size=1000, mode = "conversation"): #used for training
    column_map = {
    "input": "clean_input",
    "output": "clean_output", 
    "conversation": "clean_conversation"
    }
    column = column_map[mode] #depending on the training mode, one of these three columns is used
    
    for i in tqdm(range(0, len(dataset), batch_size), desc = "Traning Progress"):
        yield dataset[i : i + batch_size][column] #batches of texts to train the tokenizer


#to test the number of tokens generated for input and output separately:
def  get_test_corpus_input(test_dataset): #returns batches of the input of the conversations
    column = "clean_input"
    return (
         test_dataset[i : i + 100][column] 
        for i in range(0, len(test_dataset), 100)
    )

def  get_test_corpus_output(test_dataset): #returns batches of the output of the conversations
    column = "clean_output"
    return (
         test_dataset[i : i + 100][column] 
        for i in range(0, len(test_dataset), 100)
    )
    
#to count the number of tokens in a corpus from a generator (like get_test_corpus_input(test_dataset) or get_test_corpus_output(test_dataset)): 
def tokenize_and_count_from_generator(generator, tokenizer):
    total_tokens = 0
    for fragment_list in generator: 
        batch_tokenized = tokenizer(
            fragment_list,
            truncation=True,
            add_special_tokens=False
        )
        total_tokens += sum(len(ids) for ids in batch_tokenized["input_ids"]) #numerical identifiers of the tokens
    return total_tokens

#to organise the different tokenizers that are going to be generated:
def create_filename(base_name, mode):
    mode_map = {
        "input": "only_input",
        "output": "only_output",
        "conversation": "conversation"
    }

    base_name += f"_{mode_map[mode]}"
    base_name += ".csv"
    return base_name

In [None]:
print(f"Processing: {model}")

#load the tokenizer from the model:
original_tokenizer = AutoTokenizer.from_pretrained(model, use_fast=True)

In [None]:
#training of the new tokenizer using the iterator from the training corpus:
if model == "google/gemma-2-9b" or model == "mistralai/Mistral-7B-v0.1":
#for these models, the configuration of the tokenizers needs to be changed
    backend_tokenizer = original_tokenizer.backend_tokenizer #access the internal tokenizer
    #modify normalizer and pretokenizer
    backend_tokenizer.normalizer = Sequence(normalizers=[]) 
    backend_tokenizer.pre_tokenizer = pre_tokenizers.Sequence([
        pre_tokenizers.Whitespace(),  #divides by words
        Metaspace(replacement="▁")   #adds '▁' at the start of each word
    ])
    
    #create the directory
    os.makedirs(f"tokenizer_mod_{model_name}", exist_ok=True)

    #save the new configuration
    backend_tokenizer.save(f"tokenizer_mod_{model_name}/tokenizer.json")
    original_tokenizer_mod = PreTrainedTokenizerFast(tokenizer_file=f"tokenizer_mod_{model_name}/tokenizer.json")
    #retrain tokenizer
    tokenizer = original_tokenizer_mod.train_new_from_iterator(batch_iterator(train_corpus, batch_size, mode_training), original_tokenizer.vocab_size, length=len(train_corpus))

else:
    #retrain tokenizer
    tokenizer = original_tokenizer.train_new_from_iterator(batch_iterator(train_corpus, batch_size, mode_training), original_tokenizer.vocab_size, length=len(train_corpus))


In [None]:
#create the output directory path inside the folder 'retrained_conversational_tokenizers':
output_dir = os.path.join("retrained_conversational_tokenizers", f"tokenizer_{model_name}_{mode_training}")
os.makedirs(output_dir, exist_ok=True)

#save the tokenizer:
tokenizer.save_pretrained(output_dir)
print(f"Retrained tokenizer completed and saved in {output_dir}")

In [None]:
#load original tokenizer again (the change of configuration in gemma and mistral affects the current copy):
original_tokenizer = AutoTokenizer.from_pretrained(model, use_fast=True)

In [None]:
#compute the number of tokens for the input of the test corpus for the retrained tokenizer
tokens_new_input = tokenize_and_count_from_generator(get_test_corpus_input(test_corpus), tokenizer)

In [None]:
#compute the number of tokens for the input of the test corpus for the original tokenizer
tokens_old_input = tokenize_and_count_from_generator(get_test_corpus_input(test_corpus), original_tokenizer)

#compute the gain achieved by the retraining for the input (user) texts
gain_input = (1 - tokens_new_input / tokens_old_input) * 100

In [None]:
#compute the number of tokens for the output of the test corpus for the retrained tokenizer
tokens_new_output = tokenize_and_count_from_generator(get_test_corpus_output(test_corpus), tokenizer)

In [None]:
#compute the number of tokens for the input of the test corpus for the original tokenizer
tokens_old_output = tokenize_and_count_from_generator(get_test_corpus_output(test_corpus), original_tokenizer)
#compute the gain achieved by the retraining for the output (assistant) texts
gain_output = (1 - tokens_new_output / tokens_old_output) * 100

In [None]:
#print results
print(f"Model: {model}, Tokens old input: {tokens_old_input}, Tokens new input: {tokens_new_input}, Gain input: {gain_input:.2f}%, Tokens old output: {tokens_old_output}, Tokens new output: {tokens_new_output}, Gain output: {gain_output:.2f}% ")

#create the CSV file name
csv_file = create_filename("conversational_tokenizers", mode_training)

#check if the file already exists
file_exists = os.path.isfile(csv_file)

#open the file in "append" mode if it exists, or "write" mode if it doesn't
with open(csv_file, mode="a" if file_exists else "w", newline="", encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=[
        "model", 
        "tokens_old_input", "tokens_new_input", "gain_input",
        "tokens_old_output", "tokens_new_output", "gain_output"
    ])
    #if the file doesn't exist, write the header
    if not file_exists:
        writer.writeheader()
    
    #directly write the single result row
    writer.writerow({
        "model": model,
        "tokens_old_input": tokens_old_input,
        "tokens_new_input": tokens_new_input,
        "gain_input": round(gain_input, 2),  #round the gain to 2 decimal places
        "tokens_old_output": tokens_old_output,
        "tokens_new_output": tokens_new_output,
        "gain_output": round(gain_output, 2),  #round the gain to 2 decimal places        
    })

print(f"Results saved in {csv_file}")
