In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import spacy
import datasets
import torchtext
import tqdm
from datasets import Dataset, DatasetDict, load_dataset
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, processors
from tokenizers.normalizers import NFKC
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.trainers import WordPieceTrainer
from torchtext.vocab import build_vocab_from_iterator
from collections import Counter
import torch
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
import os
import streamlit as st
import nltk
from nltk.translate.bleu_score import corpus_bleu

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
seed = 1234

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

In [4]:
# Télécharger la base sur Hugging Face 
ds = load_dataset("emuchogu/swahili-english-translation")

Using the latest cached version of the dataset since emuchogu/swahili-english-translation couldn't be found on the Hugging Face Hub
Found the latest cached dataset configuration 'default' at C:\Users\PC\.cache\huggingface\datasets\emuchogu___swahili-english-translation\default\0.0.0\08b59ca54fb915d8547392c79c609029fc4c4d8e (last modified on Thu Mar  6 13:20:48 2025).


In [9]:
ds

DatasetDict({
    train: Dataset({
        features: ['prompt', 'input', 'output'],
        num_rows: 1115700
    })
})

In [13]:
# Convertir en DataFrame 
df = pd.DataFrame(ds["train"])
# Sauvegarder en fichier CSV
df.to_csv("swahili_english_translation.csv", index=False, encoding="utf-8")

In [11]:
donnees = pd.read_csv("swahili_english_translation.csv")

In [12]:
df = donnees.head(50000)

In [13]:
df_p = pd.DataFrame(columns=["en", "ye"])
df_p["en"] = df.apply(lambda row: row["input"] if row.name % 2 == 0 else row["output"], axis=1)
df_p["ye"] = df.apply(lambda row: row["output"] if row.name % 2 == 0 else row["input"], axis=1)


In [14]:
df_p.describe()

Unnamed: 0,en,ye
count,50000,50000
unique,1349,1343
top,The people are outside.,Watu wako nje.
freq,6272,8004


In [15]:
df_p

Unnamed: 0,en,ye
0,A person on a horse jumps over a broken down a...,Mtu aliyepanda farasi anaruka juu ya ndege ili...
1,A person on a horse jumps over a broken down a...,Mtu aliyepanda farasi anaruka juu ya ndege ili...
2,Children smiling and waving at camera,Watoto wakitabasamu na kutikisa kamera
3,Children smiling and waving at camera,Watoto wakitabasamu na kutikisa kamera
4,A boy is jumping on skateboard in the middle o...,Mvulana anakimbia kwenye ubao wa kuteleza kati...
...,...,...
49995,A man is sleeping.,Mwanamume fulani amelala.
49996,A man is sleeping.,Mwanamume fulani amelala.
49997,A man is sleeping.,Mwanamume fulani amelala.
49998,A man is sleeping.,Mwanamume fulani amelala.


In [16]:
# Convertir le DataFrame en une liste de dictionnaires
data_list = df_p.to_dict(orient='records')

# Séparer les données en ensembles d'entraînement, validation et test
train, test = train_test_split(data_list, test_size=0.2, random_state=42)  # 80% train, 20% test
train, validation = train_test_split(train, test_size=0.2, random_state=42)  # 80% train, 20% validation

print(f"Taille de l'échantillon d'entraînement: {len(train)}")
print(f"Taille de l'échantillon de validation: {len(validation)}")
print(f"Taille de l'échantillon de test: {len(test)}")

Taille de l'échantillon d'entraînement: 32000
Taille de l'échantillon de validation: 8000
Taille de l'échantillon de test: 10000


In [17]:
train_dataset = Dataset.from_list(train)
validation_dataset = Dataset.from_list(validation)
test_dataset = Dataset.from_list(test)

In [18]:
# Construire le DatasetDict
dataset = DatasetDict({
    "train": train_dataset,
    "validation": validation_dataset,
    "test": test_dataset
})
print(dataset)

DatasetDict({
    train: Dataset({
        features: ['en', 'ye'],
        num_rows: 32000
    })
    validation: Dataset({
        features: ['en', 'ye'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['en', 'ye'],
        num_rows: 10000
    })
})


In [195]:
corpus = df_p["ye"]
pd.DataFrame(corpus).to_csv('corpus.txt', index=False)

In [196]:
corpus2 = df_p["en"]
pd.DataFrame(corpus2).to_csv('corpus2.txt', index=False)

In [None]:
# Initialiser un tokenizer basé sur WordPiece
tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]"))

# Ajouter des normalisations et des pré-tokenizers
tokenizer.normalizer = NFKC()
tokenizer.pre_tokenizer = Whitespace()

# Configurer un entraîneur pour générer un vocabulaire
trainer = WordPieceTrainer(special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"], vocab_size=30000)

# Charger un corpus et entraîner le tokenizer
files = ["corpus.txt"]  
tokenizer.train(files, trainer)

# Définir un post-traitement 
tokenizer.post_processor = processors.TemplateProcessing(
    single="[CLS] $A [SEP]",
    pair="[CLS] $A [SEP] $B:1 [SEP]:1",
    special_tokens=[("[CLS]", 1), ("[SEP]", 2)]
)
# Sauvegarder le tokenizer
tokenizer.save("TokenizerSW.json")

In [None]:
# Pour l'anglais
tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]"))

tokenizer.normalizer = NFKC()
tokenizer.pre_tokenizer = Whitespace()
trainer = WordPieceTrainer(special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"], vocab_size=30000)

files = ["corpus2.txt"] 
tokenizer.train(files, trainer)

tokenizer.post_processor = processors.TemplateProcessing(
    single="[CLS] $A [SEP]",
    pair="[CLS] $A [SEP] $B:1 [SEP]:1",
    special_tokens=[("[CLS]", 1), ("[SEP]", 2)]
)
tokenizer.save("TokenizerEN.json")

In [None]:
# Tester le tokenizer
encoded = tokenizer.encode("Mtu aliyepanda farasi anaruka juu ya ndege iliyovunjika.")
print(encoded.tokens)  


['[CLS]', 'M', '##tu', 'al', '##i', '##y', '##ep', '##and', '##a', 'far', '##a', '##si', 'an', '##ar', '##u', '##ka', 'ju', '##u', 'y', '##a', 'n', '##de', '##ge', 'il', '##i', '##y', '##o', '##v', '##un', '##j', '##ik', '##a', '.', '[SEP]']


In [19]:
ye_nlp = Tokenizer.from_file("TokenizerSW.json")
en_nlp = Tokenizer.from_file("TokenizerEN.json")

In [20]:
train_data, valid_data, test_data = (
    dataset["train"],
    dataset["validation"],
    dataset["test"],
)

In [21]:
def yield_tokens(dataset, lang):
    """Générateur de tokens pour le vocabulaire."""
    for example in dataset:
        yield example[lang].split()  # Tokenisation naïve par espaces

# Construire le vocabulaire pour chaque langue
unk_token = "<unk>"
pad_token = "<pad>"
bos_token = "<bos>"
eos_token = "<eos>"
en_vocab = build_vocab_from_iterator(yield_tokens(train_data, "en"), specials= [unk_token, pad_token, bos_token, eos_token])
ye_vocab = build_vocab_from_iterator(yield_tokens(train_data, "ye"), specials= [unk_token, pad_token, bos_token, eos_token])

# Configurer l'index spécial pour <unk>
en_vocab.set_default_index(en_vocab["<unk>"])
ye_vocab.set_default_index(ye_vocab["<unk>"])

# Vérification
print("Taille du vocabulaire Anglais:", len(en_vocab))
print("Taille du vocabulaire Yemba:", len(ye_vocab))


Taille du vocabulaire Anglais: 2396
Taille du vocabulaire Yemba: 2258


In [22]:
# Exemple d'indexation de mots
print("Index de 'the':", en_vocab["the"])  # Retourne l'index si le mot existe, sinon <unk>
print("Index de 'anaruka':", ye_vocab["anaruka"])  # Mot en swalli

Index de 'the': 14
Index de 'anaruka': 551


In [23]:
en_vocab.get_itos()[:10]

['<unk>',
 '<pad>',
 '<bos>',
 '<eos>',
 'is',
 'man',
 'The',
 'outside.',
 'A',
 'are']

In [24]:
ye_vocab.get_itos()[:10]

['<unk>',
 '<pad>',
 '<bos>',
 '<eos>',
 'Mwanamume',
 'nje.',
 'huyo',
 'Watu',
 'wako',
 'fulani']

In [25]:
assert ye_vocab[unk_token] == en_vocab[unk_token]
assert ye_vocab[pad_token] == en_vocab[pad_token]

unk_index = en_vocab[unk_token]
pad_index = en_vocab[pad_token]

In [26]:
en_vocab.set_default_index(unk_index)
ye_vocab.set_default_index(unk_index)

In [27]:
tokens = ["our", "riparian", "field", "is", "near", "the", "bridge"]
en_vocab.lookup_indices(tokens)

[0, 0, 128, 4, 170, 14, 594]

In [28]:
# Fonction pour convertir une phrase en indices
def numericalize(sentence, vocab):
    return [vocab[bos_token]] + [vocab[token] for token in sentence.split()] + [vocab[eos_token]]

# Fonction de traitement d'un lot
def collate_fn(batch):
    en_batch = [torch.tensor(numericalize(item["en"], en_vocab)) for item in batch]
    ye_batch = [torch.tensor(numericalize(item["ye"], ye_vocab)) for item in batch]
    
    en_batch = pad_sequence(en_batch, padding_value=en_vocab[pad_token])
    ye_batch = pad_sequence(ye_batch, padding_value=ye_vocab[pad_token])
    
    return {"en": en_batch, "ye": ye_batch}



In [29]:
# Création des chargeurs de données
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_data, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, collate_fn=collate_fn)


In [31]:
# Définition de l'encodeur
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell


In [32]:
# Définition du décodeur
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

In [33]:
# Définition du modèle Seq2Seq
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        input = trg[0, :]
        
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1
        
        return outputs

In [34]:
# Initialisation des hyperparamètres
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_dim = len(en_vocab)
output_dim = len(ye_vocab)
emb_dim = 256
hid_dim = 512
n_layers = 2
dropout = 0.5

# Initialisation du modèle
encoder = Encoder(input_dim, emb_dim, hid_dim, n_layers, dropout).to(device)
decoder = Decoder(output_dim, emb_dim, hid_dim, n_layers, dropout).to(device)
model = Seq2Seq(encoder, decoder, device).to(device)

# Définition de la fonction de perte et de l'optimiseur
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=en_vocab[pad_token])

print("Modèle initialisé!")

Modèle initialisé!


In [35]:
# Fonction d'entraînement
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
    for batch in iterator:
        src, trg = batch["en"].to(device), batch["ye"].to(device)
        optimizer.zero_grad()
        output = model(src, trg)
        output = output[1:].view(-1, output.shape[-1])
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)

In [36]:
# Fonction d'évaluation
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for batch in iterator:
            src, trg = batch["en"].to(device), batch["ye"].to(device)
            output = model(src, trg, 0)
            output = output[1:].view(-1, output.shape[-1])
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)

In [None]:
# Boucle principale d'entraînement et d'évaluation
epochs = 5
clip = 1

for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, criterion, clip)
    valid_loss = evaluate(model, valid_loader, criterion)
    print(f"Époque {epoch+1}: Perte entraînement = {train_loss:.3f}, Perte validation = {valid_loss:.3f}")


Époque 1: Perte entraînement = 1.825, Perte validation = 1.197
Époque 2: Perte entraînement = 0.751, Perte validation = 0.787
Époque 3: Perte entraînement = 0.458, Perte validation = 0.637
Époque 4: Perte entraînement = 0.385, Perte validation = 0.524
Époque 5: Perte entraînement = 0.338, Perte validation = 0.513


In [43]:
import matplotlib.pyplot as plt

# Données
los = [1.825, 0.751, 0.458, 0.385, 0.338]
val = [1.197, 0.787, 0.637, 0.524, 0.513]
epochs = range(1, 6)

# Tracé du graphique
plt.figure(figsize=(8, 5))
plt.plot(epochs, los, label='perte entraînement', marker='o')
plt.plot(epochs, val, label='perte validation', marker='s')

# Ajout des labels et titre
plt.xlabel("nombre d'épochs")
plt.ylabel("valeur de la perte")
plt.title("Évolution de la perte en fonction des épochs")
plt.legend()
plt.grid()
plt.ion()
plt.savefig("loss_plot.png")



In [None]:
# Dossier pour sauvegarder le modèle
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)

In [46]:
model_path = os.path.join(save_dir, "best_model.pth")
torch.save({
    'epoch': epoch + 1,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': valid_loss,
}, model_path)
print(f"Modèle sauvegardé à l'époque {epoch+1} avec une perte de validation de {valid_loss:.3f}")


Modèle sauvegardé à l'époque 5 avec une perte de validation de 0.513


In [45]:
# Chargement du modèle sauvegardé
def load_model(model, optimizer, model_path="saved_models/best_model.pth"):
    checkpoint = torch.load(model_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    print(f"Modèle chargé depuis l'époque {epoch} avec une perte de validation de {loss:.3f}")
    return model, optimizer


In [46]:
model_path="saved_models/best_model.pth"
model, optimizer = load_model(model, optimizer, model_path)


Modèle chargé depuis l'époque 5 avec une perte de validation de 0.513


In [49]:
def translate_sentence(sentence, model, en_vocab, ye_vocab, device, max_length=50):
    """Traduit une phrase de l'anglais vers le yemba."""
    model.eval()
    
    # Tokenisation et conversion en indices
    tokens = sentence.lower().split()
    numericalized = [en_vocab["<bos>"]]
    
    # Ajouter chaque token avec un fallback sur <unk> si le mot n'existe pas
    for token in tokens:
        try:
            numericalized.append(en_vocab[token])  # Tentative d'accès direct au vocabulaire
        except KeyError:
            numericalized.append(en_vocab["<unk>"]) # Si le mot n'est pas trouvé, ajouter <unk>
    numericalized.append(en_vocab["<eos>"])
    
    # Conversion en tenseur PyTorch
    src_tensor = torch.LongTensor(numericalized).unsqueeze(1).to(device)
    
    # Passage dans l'encodeur
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)
    
    # Décodage itératif
    trg_indexes = [ye_vocab["<bos>"]]
    
    for _ in range(max_length):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        
        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
        
        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        
        if pred_token == ye_vocab["<eos>"]:
            break
  
    trg_tokens = [ye_vocab.lookup_token(idx) for idx in trg_indexes]   # Conversion des indices en mots
    
    return " ".join(trg_tokens[1:-1])  # Exclure <bos> et <eos>


In [50]:
phrases = test_data['en'][:50]
traduction = test_data['ye'][:50]
df_phrases = pd.DataFrame({"Phrases en anglais": phrases, "Phrases en swahili": traduction})
df_phrase = df_phrases.drop_duplicates()

In [51]:
sentence = test_data[1]["en"]
expected_translation = test_data[1]["ye"]

sentence, expected_translation

('The people are outside.', 'Watu wako nje.')

In [52]:
translation = translate_sentence(sentence, model, en_vocab, ye_vocab, device)
print("Traduction:", translation)

Traduction: Watu wako nje.


In [53]:
# Exemple de traduction
translation = translate_sentence(sentence, model, en_vocab, ye_vocab, device)
print("Traduction:", translation)


Traduction: Watu wako nje.


In [62]:
predictions = []
references = []
for i in range(20):
    sentence = test_data[i]["en"]
    expected_translation = test_data[i]["ye"]
    translation = translate_sentence(sentence, model, en_vocab, ye_vocab, device)
    predictions.append(translation)
    references.append(expected_translation)
    print("Phrase:", sentence)
    print("Réponse:", expected_translation)
    print("Traduction:", translation)

Phrase: The man is sitting.
Réponse: Mwanamume huyo ameketi.
Traduction: Mwanamume huyo ameketi.
Phrase: The people are outside.
Réponse: Watu wako nje.
Traduction: Watu wako nje.
Phrase: The woman is wearing white.
Réponse: Mwanamke huyo amevaa mavazi meupe.
Traduction: Mwanamke huyo amevaa mavazi meupe.
Phrase: The people are outside.
Réponse: Watu wako nje.
Traduction: Watu wako nje.
Phrase: A man is standing.
Réponse: Mwanamume mmoja amesimama.
Traduction: Mwanamume mmoja amesimama.
Phrase: A man wearing a white shirt with black dot is holding a microphone as he stands in front of a background of black with white symbols on it.
Réponse: Mwanamume aliyevalia shati jeupe lenye nukta nyeusi anashikilia kipaza sauti huku akiwa amesimama mbele ya mandhari nyeusi yenye alama nyeupe.
Traduction: kijana aliyevalia suruali ya rangi ya kahawia na shati la kijani-kibichi anatazama juu ya ukuta wenye urefu wa kiuno unaotenganisha jikoni na chumba cha kuishi cha nyumba.
Phrase: The people are o

In [55]:
import evaluate
bleu = evaluate.load("bleu")

Using the latest cached version of the module from C:\Users\PC\.cache\huggingface\modules\evaluate_modules\metrics\evaluate-metric--bleu\9e0985c1200e367cce45605ce0ecb5ede079894e0f24f54613fca08eeb8aff76 (last modified on Tue Feb 25 07:41:18 2025) since it couldn't be found locally at evaluate-metric--bleu, or remotely on the Hugging Face Hub.


In [63]:
predictions[0], references[0]


('Mwanamume huyo ameketi.', 'Mwanamume huyo ameketi.')

In [67]:
def get_tokenizer_fn(nlp, lower):
    def tokenizer_fn(s):
        tokens = nlp.encode(s).tokens
        if lower:
            tokens = [token.lower() for token in tokens]
        return tokens

    return tokenizer_fn

In [68]:
lower = True
tokenizer_fn = get_tokenizer_fn(ye_nlp, lower)

In [69]:
results = bleu.compute(
    predictions=predictions, references=references, tokenizer=tokenizer_fn
)

In [70]:
results

{'bleu': 0.6867548255809744,
 'precisions': [0.8066666666666666,
  0.7153846153846154,
  0.6545454545454545,
  0.5888888888888889],
 'brevity_penalty': 1.0,
 'length_ratio': 1.056338028169014,
 'translation_length': 150,
 'reference_length': 142}

In [None]:
# Interface utilisateur avec Streamlit
st.title("Chatbot de Traduction Anglais → Swahili")
st.write("Entrez une phrase en anglais et obtenez la traduction en Swahili.")

# Saisie utilisateur
sentence = st.text_input("Entrez votre texte en anglais :", "")

if st.button("Traduire"):
    if sentence:
        translation = translate_sentence(sentence)
        st.success(f"**Traduction en Swahili :** {translation}")
    else:
        st.warning("Veuillez entrer une phrase en anglais.")