In [None]:
# -*- coding: utf-8 -*-
# Afficher les appareils disponibles
print("GPU disponible :", tf.config.list_physical_devices('GPU'))


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Bidirectional, Dense, Embedding, TimeDistributed, Attention
from sklearn.model_selection import train_test_split
import re


In [None]:
!wget http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip
!unzip -o cornell_movie_dialogs_corpus.zip
!ls "cornell movie-dialogs corpus"

In [None]:
def load_and_preprocess_data():
    # Charger le dataset Cornell Movie Dialogs
    # (Vous devrez adapter ceci selon la structure exacte de vos fichiers)
    lines = open(lines_path, encoding='utf-8', errors='ignore').read().split('\n')
    conv_lines = open(convs_path, encoding='utf-8', errors='ignore').read().split('\n')

    # Créer un dictionnaire pour mapper les ID de ligne au texte
    id2line = {}
    for line in lines:
        parts = line.split(' +++$+++ ')
        if len(parts) == 5:
            id2line[parts[0]] = parts[4]

    # Créer des paires question-réponse
    pairs = []
    for conv in conv_lines[:-1]:
        parts = conv.split(' +++$+++ ')
        if len(parts) == 4:
            line_ids = eval(parts[3])  # Convertir la string en liste
            for i in range(len(line_ids)-1):
                pairs.append((id2line[line_ids[i]], id2line[line_ids[i+1]]))

    return pairs

def clean_text(text):
    """Nettoyer le texte"""
    text = text.lower()
    text = re.sub(r"i'm", "i am", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"that's", "that is", text)
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"where's", "where is", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"won't", "will not", text)
    text = re.sub(r"can't", "cannot", text)
    text = re.sub(r"[-()\"#/@;:<>{}`+=~|.!?,]", "", text)
    return text

# Charger et nettoyer les données
pairs = load_and_preprocess_data()
cleaned_pairs = [(clean_text(q), clean_text(a)) for q, a in pairs]

In [None]:
# Paramètres
# Paramètres du modèle
MAX_LENGTH = 15  # Réduire de 20 à 15 (35% moins de calculs)
VOCAB_SIZE = 5000 # Limiter à 5000 mots (au lieu de 8000)
EMBEDDING_DIM = 64  # Réduire de 96 à 64
LSTM_UNITS = 64  # Réduire de 128 à 64 (4x moins de paramètres)
BATCH_SIZE = 256  # Augmenter le batch size
EPOCHS = 10  # Réduire le nombre d'epochs

# Création du tokenizer
tokenizer = Tokenizer(num_words=VOCAB_SIZE, oov_token='<OOV>')
all_text = [q for q, a in cleaned_pairs] + [a for q, a in cleaned_pairs]
tokenizer.fit_on_texts(all_text)

# Ajout des tokens spéciaux
word_index = tokenizer.word_index
word_index['<start>'] = len(word_index) + 1
word_index['<end>'] = len(word_index) + 1
tokenizer.word_index = word_index
VOCAB_SIZE = len(word_index) + 1

In [None]:
def tokenize_and_pad(pairs):
    input_seqs = []
    target_seqs = []

    for q, a in pairs:
        # Tokeniser la question
        q_seq = tokenizer.texts_to_sequences([q])[0]
        q_seq = q_seq[:MAX_LENGTH-1]  # Tronquer si nécessaire
        input_seqs.append(q_seq)

        # Tokeniser la réponse avec tokens start/end
        a_seq = tokenizer.texts_to_sequences([a])[0]
        a_seq = a_seq[:MAX_LENGTH-1]
        a_seq = [word_index['<start>']] + a_seq + [word_index['<end>']]
        target_seqs.append(a_seq)

    # Appliquer le padding
    input_seqs = pad_sequences(input_seqs, maxlen=MAX_LENGTH, padding='post')
    target_seqs = pad_sequences(target_seqs, maxlen=MAX_LENGTH+1, padding='post')

    return input_seqs, target_seqs

input_seqs, target_seqs = tokenize_and_pad(cleaned_pairs)


In [None]:
# Encodeur
encoder_inputs = Input(shape=(MAX_LENGTH,))
encoder_embedding = Embedding(VOCAB_SIZE, EMBEDDING_DIM)(encoder_inputs)
encoder_lstm = Bidirectional(LSTM(LSTM_UNITS, return_sequences=True, return_state=True))
encoder_outputs, forward_h, forward_c, backward_h, backward_c = encoder_lstm(encoder_embedding)

state_h = tf.keras.layers.Concatenate()([forward_h, backward_h])
state_c = tf.keras.layers.Concatenate()([forward_c, backward_c])
encoder_states = [state_h, state_c]

# Décodeur
decoder_inputs = Input(shape=(None,))
decoder_embedding = Embedding(VOCAB_SIZE, EMBEDDING_DIM)(decoder_inputs)
decoder_lstm = LSTM(LSTM_UNITS*2, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)

# Couche d'attention
attention = Attention()([decoder_outputs, encoder_outputs])
decoder_concat = tf.keras.layers.Concatenate()([decoder_outputs, attention])

# Couche de sortie
decoder_dense = TimeDistributed(Dense(VOCAB_SIZE, activation='softmax'))
decoder_outputs = decoder_dense(decoder_concat)

# Modèle complet
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])


In [None]:
# Séparation train/test
X_train, X_test, y_train, y_test = train_test_split(input_seqs, target_seqs, test_size=0.2)

# Les entrées du décodeur sont les réponses sans le dernier token
decoder_input_data = y_train[:, :-1]
# Les sorties du décodeur sont les réponses décalées d'un token
decoder_output_data = y_train[:, 1:]

# Idem pour les données de test
decoder_input_test = y_test[:, :-1]
decoder_output_test = y_test[:, 1:]


In [None]:
import os

if os.path.exists('chatbot_model.h5'):
    print("Model already trained. Loading the model...")
    model.load_weights('chatbot_model.h5')
else:
    print("Training the model...")
    history = model.fit(
        [X_train, decoder_input_data],
        np.expand_dims(decoder_output_data, -1),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=([X_test, decoder_input_test], np.expand_dims(decoder_output_test, -1)),
        callbacks=[
            tf.keras.callbacks.EarlyStopping(patience=3),
            tf.keras.callbacks.ModelCheckpoint('chatbot_model.h5', save_best_only=True)
        ]
    )


In [None]:
# Encodeur pour l'inférence
encoder_model = Model(encoder_inputs, [encoder_outputs, encoder_states])

# Décodeur pour l'inférence
decoder_state_input_h = Input(shape=(LSTM_UNITS*2,))
decoder_state_input_c = Input(shape=(LSTM_UNITS*2,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_outputs, state_h, state_c = decoder_lstm(decoder_embedding, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]

# Attention pour l'inférence
attention = Attention()([decoder_outputs, encoder_outputs])
decoder_concat = tf.keras.layers.Concatenate()([decoder_outputs, attention])

decoder_outputs = decoder_dense(decoder_concat)
decoder_model = Model(
    [decoder_inputs] + [encoder_outputs] + decoder_states_inputs,
    [decoder_outputs] + decoder_states
)


In [None]:
def generate_response(input_seq):
    # Encoder l'input
    enc_out, enc_states = encoder_model.predict(input_seq)

    # Démarrer avec le token <start>
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = word_index['<start>']

    stop_condition = False
    response = []

    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq] + [enc_out] + enc_states)

        # Sélectionner le token avec la plus haute probabilité
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_word = None

        for word, index in word_index.items():
            if index == sampled_token_index:
                sampled_word = word
                response.append(word)

        # Condition d'arrêt: <end> ou longueur max atteinte
        if sampled_word == '<end>' or len(response) >= MAX_LENGTH:
            stop_condition = True

        # Mettre à jour la séquence cible
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index

        # Mettre à jour les états
        enc_states = [h, c]

    return ' '.join(response[:-1])  # Exclure le token <end>

In [None]:
# prompt: appeal generate_response with sentence "how are you?"

input_sentence = "it's over anakin! i have the high ground!"
input_seq = tokenizer.texts_to_sequences([clean_text(input_sentence)])
input_seq = pad_sequences(input_seq, maxlen=MAX_LENGTH, padding='post')
response = generate_response(input_seq)
response