In [14]:
# -- Chargement et préparation des données pour Keras (Seq2Seq avec Attention) --
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

import json
import re
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from pathlib import Path
from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping

plt.rcParams["font.family"] = "Noto Sans CJK JP"

In [15]:
# --- Nettoyage et normalisation ---
def clean_romanized(text):
    text = text.lower()
    text = re.sub(r"[^\w\s]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def clean_sinogram(text):
    punctuations = r"[，。！？、：；「」『』《》〈〉（）(){}【】\[\]\"\'“”‘’.,!?;:…\-—~·•◦→←«»]"
    text = re.sub(punctuations, "", text)
    text = re.sub(r"\s+", "", text)
    return text.strip()

In [16]:
# --- Chargement du corpus ---
def load_data(path):
    data = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            obj = json.loads(line.strip())
            if "r" in obj and "j" in obj:
                romanized = clean_romanized(obj["r"])
                sinograms = clean_sinogram(obj["j"])
                if romanized and sinograms:
                    data.append((romanized, sinograms))
    return data

In [17]:
# --- Tokenisation ---
def tokenize_romanized(s):
    return s.strip().split(" ")

def tokenize_sinogrammes(s):
    return list(s.strip())

# --- Construction des vocabulaires ---
def build_vocab(sequences, special_tokens=["<PAD>", "<BOS>", "<EOS>", "<UNK>"]):
    vocab = set(token for seq in sequences for token in seq)
    vocab = special_tokens + sorted(vocab)
    token_to_id = {tok: idx for idx, tok in enumerate(vocab)}
    id_to_token = {idx: tok for tok, idx in token_to_id.items()}
    return token_to_id, id_to_token

In [18]:
# --- Conversion en indices ---
def convert_to_ids(sequences, token_to_id, bos=False, eos=False):
    out = []
    for seq in sequences:
        ids = []
        if bos:
            ids.append(token_to_id["<BOS>"])
        for token in seq:
            ids.append(token_to_id.get(token, token_to_id["<UNK>"]))
        if eos:
            ids.append(token_to_id["<EOS>"])
        out.append(ids)
    return out

In [19]:
# --- Pipeline complet ---
def prepare_data(jsonl_path, maxlen_r=30, maxlen_j=30, test_size=0.2):
    data = load_data(jsonl_path)

    romanized_seqs = [tokenize_romanized(r) for r, _ in data]
    sinogram_seqs = [tokenize_sinogrammes(j) for _, j in data]

    tok2id_r, id2tok_r = build_vocab(romanized_seqs)
    tok2id_j, id2tok_j = build_vocab(sinogram_seqs)

    X = convert_to_ids(romanized_seqs, tok2id_r)
    y = convert_to_ids(sinogram_seqs, tok2id_j, bos=True, eos=True)

    X_pad = pad_sequences(X, maxlen=maxlen_r, padding="post", truncating="post", value=tok2id_r["<PAD>"])
    y_pad = pad_sequences(y, maxlen=maxlen_j, padding="post", truncating="post", value=tok2id_j["<PAD>"])

    X_train, X_test, y_train, y_test = train_test_split(X_pad, y_pad, test_size=test_size, random_state=42)

    return {
        "X_train": X_train, "X_test": X_test,
        "y_train": y_train, "y_test": y_test,
        "tok2id_r": tok2id_r, "id2tok_r": id2tok_r,
        "tok2id_j": tok2id_j, "id2tok_j": id2tok_j
    }

In [20]:
# Charger et préparer les données
data = prepare_data("taigi_ime_data.jsonl")

# --- Séparation des séquences cible ---
def split_decoder_inputs_outputs(y, pad_token=0):
    decoder_input = y[:, :-1]
    decoder_output = y[:, 1:]
    return decoder_input, decoder_output

decoder_input_train, decoder_target_train = split_decoder_inputs_outputs(data["y_train"])
decoder_input_test, decoder_target_test = split_decoder_inputs_outputs(data["y_test"])

# --- Modèle Seq2Seq avec Attention ---
def build_seq2seq_attention_model(input_vocab_size, target_vocab_size, embedding_dim=128, encoder_units=256, decoder_units=256, maxlen_input=30, maxlen_target=30):
    encoder_inputs = keras.Input(shape=(maxlen_input,), name="encoder_inputs")
    enc_emb = layers.Embedding(input_vocab_size, embedding_dim)(encoder_inputs)
    encoder_lstm = layers.LSTM(encoder_units, return_sequences=True, return_state=True)
    encoder_outputs, state_h, state_c = encoder_lstm(enc_emb)

    decoder_inputs = keras.Input(shape=(maxlen_target,), name="decoder_inputs")
    dec_emb = layers.Embedding(target_vocab_size, embedding_dim)(decoder_inputs)
    decoder_lstm = layers.LSTM(decoder_units, return_sequences=True, return_state=True)
    decoder_outputs, _, _ = decoder_lstm(dec_emb, initial_state=[state_h, state_c])

    attention_layer = layers.AdditiveAttention(name="attention")
    attention_output = attention_layer([decoder_outputs, encoder_outputs])
    concat_attention = layers.Concatenate(axis=-1)([decoder_outputs, attention_output])
    output = layers.TimeDistributed(layers.Dense(target_vocab_size, activation="softmax"))(concat_attention)

    model = keras.Model([encoder_inputs, decoder_inputs], output)
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

In [21]:
# Paramètres et modèle
input_vocab_size = len(data["tok2id_r"])
target_vocab_size = len(data["tok2id_j"])
maxlen_input = data["X_train"].shape[1]
maxlen_target = data["y_train"].shape[1] - 1

model = build_seq2seq_attention_model(
    input_vocab_size, target_vocab_size,
    maxlen_input=maxlen_input,
    maxlen_target=maxlen_target
)


In [22]:
# Entraînement
early_stop = EarlyStopping(patience=3, restore_best_weights=True)
history = model.fit(
    [data["X_train"], decoder_input_train],
    decoder_target_train,
    validation_split=0.2,
    batch_size=32,
    epochs=20,
    callbacks=[early_stop]
)

Epoch 1/20
[1m1619/1619[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m963s[0m 592ms/step - accuracy: 0.6026 - loss: 2.7889 - val_accuracy: 0.6631 - val_loss: 2.0567
Epoch 2/20
[1m1619/1619[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m680s[0m 406ms/step - accuracy: 0.6855 - loss: 1.8663 - val_accuracy: 0.7549 - val_loss: 1.3573
Epoch 3/20
[1m1619/1619[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m634s[0m 391ms/step - accuracy: 0.7923 - loss: 1.1023 - val_accuracy: 0.8620 - val_loss: 0.7269
Epoch 4/20
[1m1619/1619[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m611s[0m 378ms/step - accuracy: 0.8971 - loss: 0.5076 - val_accuracy: 0.9119 - val_loss: 0.4530
Epoch 5/20
[1m1619/1619[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m611s[0m 378ms/step - accuracy: 0.9446 - loss: 0.2612 - val_accuracy: 0.9323 - val_loss: 0.3437
Epoch 6/20
[1m1619/1619[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m597s[0m 369ms/step - accuracy: 0.9642 - loss: 0.1617 - val_accuracy: 0.9468 - val_loss:

In [None]:
def plot_seq2seq_learning_curves(history):
    plt.figure(figsize=(12, 5))

    # Sous-plot 1 : loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history["loss"], label="train")
    plt.plot(history.history["val_loss"], label="val")
    plt.title("Seq2Seq - Loss au fil des époques")
    plt.xlabel("Époques")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)

    # Sous-plot 2 : accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history["accuracy"], label="train")
    plt.plot(history.history["val_accuracy"], label="val")
    plt.title("Seq2Seq - Accuracy (caractère) au fil des époques")
    plt.xlabel("Époques")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

In [23]:
# --- Création des modèles encoder / decoder pour l'inférence ---
encoder_inputs = model.input[0]
encoder_embedding_layer = model.layers[1]
encoder_lstm = model.layers[2]
enc_emb = encoder_embedding_layer(encoder_inputs)
encoder_outputs, state_h_enc, state_c_enc = encoder_lstm(enc_emb)
encoder_model = keras.Model(encoder_inputs, [encoder_outputs, state_h_enc, state_c_enc])

decoder_inputs = model.input[1]
decoder_embedding_layer = model.layers[4]
decoder_lstm = model.layers[5]
attention_layer = model.get_layer("attention")
concat_layer = model.layers[7]
dense_layer = model.layers[8]

decoder_input_single = keras.Input(shape=(1,), name="decoder_input_t")
enc_outputs_input = keras.Input(shape=(None, encoder_outputs.shape[-1]), name="encoder_outputs")
state_h_input = keras.Input(shape=(decoder_lstm.units,), name="h_input")
state_c_input = keras.Input(shape=(decoder_lstm.units,), name="c_input")

dec_emb = decoder_embedding_layer(decoder_input_single)
dec_out, state_h_new, state_c_new = decoder_lstm(dec_emb, initial_state=[state_h_input, state_c_input])
attn_out = attention_layer([dec_out, enc_outputs_input])
concat_out = concat_layer([dec_out, attn_out])
final_output = dense_layer(concat_out)

decoder_model = keras.Model(
    inputs=[decoder_input_single, enc_outputs_input, state_h_input, state_c_input],
    outputs=[final_output, state_h_new, state_c_new]
)

TypeError: too many positional arguments

In [None]:
# --- Décodage greedy avec attention ---
def decode_sequence_infer(input_seq, max_len=30):
    enc_outs, h, c = encoder_model.predict(input_seq)
    tok2id_j = data["tok2id_j"]
    id2tok_j = data["id2tok_j"]
    BOS, EOS = tok2id_j["<BOS>"], tok2id_j["<EOS>"]
    target_seq = np.array([[BOS]])
    decoded_tokens = []

    for _ in range(max_len):
        output, h, c = decoder_model.predict([target_seq, enc_outs, h, c])
        token_id = np.argmax(output[0, -1, :])
        if token_id == EOS:
            break
        decoded_tokens.append(id2tok_j.get(token_id, "<UNK>"))
        target_seq = np.array([[token_id]])

    return "".join(decoded_tokens)

In [None]:
# --- Affichage sur un exemple ---
def infer_with_encoder_decoder(example_idx):
    x_input = data["X_test"][example_idx:example_idx+1]
    gold_ids = data["y_test"][example_idx]
    pad_id = data["tok2id_j"]["<PAD>"]
    r_tok = [data["id2tok_r"][i] for i in x_input[0] if i != data["tok2id_r"]["<PAD>"]]
    j_gold = [data["id2tok_j"][i] for i in gold_ids if i not in (pad_id, data["tok2id_j"]["<BOS>"], data["tok2id_j"]["<EOS>"])]
    j_pred = decode_sequence_infer(x_input)
    print("🔤 Romanisé :", " ".join(r_tok))
    print("✅ Réel     :", "".join(j_gold))
    print("🤖 Prédit   :", j_pred)

# Exemple
infer_with_encoder_decoder(0)
infer_with_encoder_decoder(1)

In [None]:
# --- Évaluation globale : CER (Character Error Rate) et BLEU ---
from jiwer import cer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction


def evaluate_model_on_test(data, max_examples=200):
    pad_id = data["tok2id_j"]["<PAD>"]
    bos_id = data["tok2id_j"]["<BOS>"]
    eos_id = data["tok2id_j"]["<EOS>"]
    id2tok_j = data["id2tok_j"]

    cer_scores = []
    bleu_scores = []
    smoothie = SmoothingFunction().method4

    for i in range(min(max_examples, len(data["X_test"]))):
        x_input = data["X_test"][i:i+1]
        gold_ids = data["y_test"][i]
        gold = [id2tok_j[tok] for tok in gold_ids if tok not in (pad_id, bos_id, eos_id)]
        pred = list(decode_sequence_infer(x_input))

        ref_str = "".join(gold)
        hyp_str = "".join(pred)

        cer_scores.append(cer(ref_str, hyp_str))
        bleu_scores.append(sentence_bleu([gold], pred, smoothing_function=smoothie))

    avg_cer = np.mean(cer_scores)
    avg_bleu = np.mean(bleu_scores)

    print(f"📏 CER moyen  : {avg_cer:.4f} ({avg_cer * 100:.2f}%)")
    print(f"🟦 BLEU moyen : {avg_bleu:.4f}")
    return avg_cer, avg_bleu

# Évaluation
evaluate_model_on_test(data, max_examples=200)

In [None]:
# --- Phase 7 : Baseline simple par lookup mot-à-mot ---
def build_lookup_table(data):
    table = {}
    for x_seq, y_seq in zip(data["X_train"], data["y_train"]):
        for x_id, y_id in zip(x_seq, y_seq[1:]):  # y_seq[1:] pour ignorer BOS
            if x_id == data["tok2id_r"]["<PAD>"] or y_id in (
                data["tok2id_j"]["<PAD>"], data["tok2id_j"]["<EOS>"]):
                continue
            if x_id not in table:
                table[x_id] = Counter()
            table[x_id][y_id] += 1
    return {x: counter.most_common(1)[0][0] for x, counter in table.items()}


def lookup_baseline_predict(x_seq, lookup_table, unk_id):
    return [lookup_table.get(tok, unk_id) for tok in x_seq if tok != 0]


def evaluate_lookup_baseline(data, max_examples=200):
    id2tok_j = data["id2tok_j"]
    pad_id = data["tok2id_j"]["<PAD>"]
    bos_id = data["tok2id_j"]["<BOS>"]
    eos_id = data["tok2id_j"]["<EOS>"]
    unk_id = data["tok2id_j"]["<UNK>"]

    table = build_lookup_table(data)

    cer_scores = []
    bleu_scores = []
    smoothie = SmoothingFunction().method4

    for i in range(min(max_examples, len(data["X_test"]))):
        x_seq = data["X_test"][i]
        y_gold_ids = data["y_test"][i]

        gold = [id2tok_j[i] for i in y_gold_ids if i not in (pad_id, bos_id, eos_id)]
        pred_ids = lookup_baseline_predict(x_seq, table, unk_id)
        pred = [id2tok_j.get(i, "<UNK>") for i in pred_ids]

        cer_scores.append(cer("".join(gold), "".join(pred)))
        bleu_scores.append(sentence_bleu([gold], pred, smoothing_function=smoothie))

    avg_cer = np.mean(cer_scores)
    avg_bleu = np.mean(bleu_scores)

    print(f"🔁 Baseline CER  : {avg_cer:.4f} ({avg_cer * 100:.2f}%)")
    print(f"🔁 Baseline BLEU : {avg_bleu:.4f}")
    return avg_cer, avg_bleu

# Évaluer la baseline
evaluate_lookup_baseline(data, max_examples=200)

In [None]:
# --- Phase 8 : Modèle autoregressif simplifié (LSTM unidirectionnel) ---
def build_autoregressive_model(vocab_size, embedding_dim=128, rnn_units=256, maxlen_input=30):
    model = keras.Sequential([
        layers.Input(shape=(maxlen_input,), name="input_seq"),
        layers.Embedding(vocab_size, embedding_dim),
        layers.LSTM(rnn_units, return_sequences=True),
        layers.TimeDistributed(layers.Dense(vocab_size, activation="softmax"))
    ])
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

# Préparation des données autoregressives (X = sinogrammes[:-1], y = sinogrammes[1:])
def prepare_autoregressive_data(data):
    Y = data["y_train"]
    X_auto = Y[:, :-1]
    y_auto = Y[:, 1:]
    return X_auto, y_auto

X_auto, y_auto = prepare_autoregressive_data(data)

# Modèle
vocab_size_j = len(data["tok2id_j"])
auto_model = build_autoregressive_model(vocab_size=vocab_size_j, maxlen_input=X_auto.shape[1])

# Entraînement
history_auto = auto_model.fit(
    X_auto, y_auto,
    validation_split=0.2,
    batch_size=32,
    epochs=10,
    callbacks=[EarlyStopping(patience=2, restore_best_weights=True)]
)

In [None]:
# --- Visualisation des courbes d'apprentissage pour le modèle autoregressif ---
def plot_autoregressive_learning_curves(history):
    plt.figure(figsize=(12, 5))

    # Sous-plot 1 : loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history["loss"], label="train")
    plt.plot(history.history["val_loss"], label="val")
    plt.title("Autoregressif - Loss au fil des époques")
    plt.xlabel("Époques")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)

    # Sous-plot 2 : accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history["accuracy"], label="train")
    plt.plot(history.history["val_accuracy"], label="val")
    plt.title("Autoregressif - Accuracy au fil des époques")
    plt.xlabel("Époques")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

# Tracer les courbes d'entraînement
plot_autoregressive_learning_curves(history_auto)


In [None]:

# Exemple d'inférence (auto-génération à partir du BOS)
def generate_from_bos(model, tok2id, id2tok, maxlen=30):
    BOS = tok2id["<BOS>"]
    EOS = tok2id["<EOS>"]
    PAD = tok2id["<PAD>"]

    input_seq = [BOS]
    for _ in range(maxlen):
        padded = pad_sequences([input_seq], maxlen=maxlen, padding="post", value=PAD)
        preds = model.predict(padded, verbose=0)
        next_id = np.argmax(preds[0, len(input_seq)-1])
        if next_id == EOS:
            break
        input_seq.append(next_id)

    return "".join(id2tok[i] for i in input_seq[1:] if i != PAD)

# Exemple
print("🌀 Exemple génération autoregressive:")
print(generate_from_bos(auto_model, data["tok2id_j"], data["id2tok_j"]))

In [None]:
def plot_comparative_learning_curves(history_seq2seq, history_auto):
    plt.figure(figsize=(14, 5))

    # === Perte (loss)
    plt.subplot(1, 2, 1)
    plt.plot(history_seq2seq.history["loss"], label="Seq2Seq - train", color="blue")
    plt.plot(history_seq2seq.history["val_loss"], label="Seq2Seq - val", color="blue", linestyle="--")
    plt.plot(history_auto.history["loss"], label="AutoReg - train", color="green")
    plt.plot(history_auto.history["val_loss"], label="AutoReg - val", color="green", linestyle="--")
    plt.title("Courbes de perte (Loss)")
    plt.xlabel("Époques")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)

    # === Précision (accuracy)
    plt.subplot(1, 2, 2)
    plt.plot(history_seq2seq.history["accuracy"], label="Seq2Seq - train", color="blue")
    plt.plot(history_seq2seq.history["val_accuracy"], label="Seq2Seq - val", color="blue", linestyle="--")
    plt.plot(history_auto.history["accuracy"], label="AutoReg - train", color="green")
    plt.plot(history_auto.history["val_accuracy"], label="AutoReg - val", color="green", linestyle="--")
    plt.title("Accuracy caractère par caractère")
    plt.xlabel("Époques")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()