In [1]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from bs4 import BeautifulSoup
import os
import chardet

In [2]:
path = "data"
textes = []
labels = []

for file in os.listdir(path):
    path_file = os.path.join(path, file)

    if os.path.isfile(path_file):
        # Détecter l'encodage du fichier
        with open(path_file, 'rb') as f:
            encoding = chardet.detect(f.read())['encoding']

        # Ouvrir le fichier avec l'encodage détecté
        with open(path_file, encoding=encoding) as fp:
            soup = BeautifulSoup(fp, "html.parser")

        paragraphes = soup.select("p")
        for p in paragraphes:
            texte = p.get_text()
            textes.append(texte)
            labels.append(file.split("-")[0])


In [3]:
def rassembler_textes_et_labels(textes, labels, taille_minimale=1000):
    textes_rassembles = []
    labels_rassembles = []

    buffer_texte = ""
    buffer_label = ""

    for texte, label in zip(textes, labels):
        if buffer_label == "":
            buffer_label = label

        if buffer_label == label:
            buffer_texte += " " + texte
            if len(buffer_texte) >= taille_minimale:
                textes_rassembles.append(buffer_texte)
                labels_rassembles.append(buffer_label)
                buffer_texte = ""
                buffer_label = ""
        else:
            if len(buffer_texte) >= taille_minimale:
                textes_rassembles.append(buffer_texte)
                labels_rassembles.append(buffer_label)
            buffer_texte = texte
            buffer_label = label

    # Ajoute le dernier échantillon s'il n'a pas été ajouté précédemment et s'il est assez long
    if buffer_label and len(buffer_texte) >= taille_minimale:
        textes_rassembles.append(buffer_texte)
        labels_rassembles.append(buffer_label)

    return textes_rassembles, labels_rassembles

textes_rassembles, labels_rassembles = rassembler_textes_et_labels(textes, labels)

In [4]:
print("Nombre d'échantillons rassemblés")
print("Balzac :",labels_rassembles.count("balzac"))
print("Flaubert :",labels_rassembles.count("flaubert"))
print("Maupassant :",labels_rassembles.count("maupassant"))
print("Sand :",labels_rassembles.count("sand"))
print("Zola :",labels_rassembles.count("zola"))

Nombre d'échantillons rassemblés
Balzac : 1667
Flaubert : 1887
Maupassant : 966
Sand : 1922
Zola : 3826


In [5]:
import numpy as np
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Tokenisation
max_len = 500  # Longueur maximale des séquences
max_words = 10000  # Nombre maximum de mots à considérer

tokenizer = Tokenizer(num_words=max_words, oov_token="<OOV>")
tokenizer.fit_on_texts(textes_rassembles)
sequences = tokenizer.texts_to_sequences(textes_rassembles)

word_index = tokenizer.word_index
print(f"{len(word_index)} mots uniques trouvés.")

# Padding
data = pad_sequences(sequences, maxlen=max_len)

# Encodage des labels
le = LabelEncoder()
labels_encoded = le.fit_transform(labels_rassembles)
labels_categorical = to_categorical(labels_encoded)

# Train-test split
x_train, x_test, y_train, y_test = train_test_split(data, labels_categorical, test_size=0.2, random_state=42)


70037 mots uniques trouvés.


Construction et entraînement du modèle LSTM

In [6]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense

model = Sequential([
    Embedding(max_words, 128, input_length=max_len),
    LSTM(128, dropout=0.2, recurrent_dropout=0.2),
    Dense(len(le.classes_), activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

print(model.summary())

history = model.fit(x_train, y_train, epochs=20, batch_size=32, validation_split=0.2)


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, 500, 128)          1280000   
_________________________________________________________________
lstm (LSTM)                  (None, 128)               131584    
_________________________________________________________________
dense (Dense)                (None, 5)                 645       
Total params: 1,412,229
Trainable params: 1,412,229
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [7]:
loss, accuracy = model.evaluate(x_test, y_test)
print(f"Accuracy: {accuracy * 100:.2f}%")
loss, accuracy = model.evaluate(x_test, y_test)
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 89.44%
Accuracy: 89.44%


Utilisation de BERT

In [9]:
import numpy as np
import tensorflow as tf
from transformers import BertTokenizer, TFBertForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [15]:
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

def encode(text, label):
    return tokenizer(text, padding="max_length", truncation=True, max_length=512, return_tensors="tf"), label

encoded_data = [encode(text, label) for text, label in zip(textes_rassembles, labels_rassembles)]

input_ids = np.array([data[0]["input_ids"].numpy() for data in encoded_data])
attention_mask = np.array([data[0]["attention_mask"].numpy() for data in encoded_data])

le = LabelEncoder()
numeric_labels = le.fit_transform(labels_rassembles)

X_train, X_test, y_train, y_test = train_test_split(input_ids, numeric_labels, test_size=0.2, random_state=42)

train_masks, test_masks, _, _ = train_test_split(attention_mask, input_ids, test_size=0.2, random_state=42)

In [19]:
import numpy as np

X_train = np.squeeze(X_train)
train_masks = np.squeeze(train_masks)

print("X_train shape:", X_train.shape)
print("train_masks shape:", train_masks.shape)


X_train shape: (8214, 512)
train_masks shape: (8214, 512)


In [11]:
num_labels = len(np.unique(labels_rassembles))
model = TFBertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels)

Downloading: 100%|██████████| 536M/536M [00:14<00:00, 37.6MB/s] 
All model checkpoint layers were used when initializing TFBertForSequenceClassification.

Some layers of TFBertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [23]:
from tensorflow.keras import backend as K

# Custom training loop
def train_with_gradient_accumulation(model, dataset, epochs, steps_per_update, optimizer):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    metric = tf.keras.metrics.SparseCategoricalAccuracy()

    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        step = 0
        total_loss = 0.0
        total_steps = 0

        for inputs, targets in dataset:
            with tf.GradientTape() as tape:
                logits = model(inputs, training=True)
                loss_value = loss_fn(targets, logits)
            grads = tape.gradient(loss_value, model.trainable_weights)
            if step % steps_per_update == 0:
                optimizer.apply_gradients(zip(grads, model.trainable_weights))
                grads = [tf.zeros_like(w) for w in model.trainable_weights]
            else:
                grads = [g + w for g, w in zip(grads, model.trainable_weights)]
            step += 1
            total_loss += loss_value
            total_steps += 1

            metric.update_state(targets, logits)

        print(f"Loss: {total_loss / total_steps}, Accuracy: {metric.result().numpy()}")
        metric.reset_states()

# Convert your data to a TensorFlow dataset
train_data = tf.data.Dataset.from_tensor_slices(({"input_ids": X_train, "attention_mask": train_masks}, y_train))
train_data = train_data.batch(1)  # Use a smaller batch size, e.g. 2

# Train the model with gradient accumulation
train_with_gradient_accumulation(model, train_data, epochs=20, steps_per_update=4, optimizer=optimizer)

def train_with_gradient_accumulation(model, train_dataset, val_dataset, epochs, steps_per_update, optimizer):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    train_metric = tf.keras.metrics.SparseCategoricalAccuracy()
    val_metric = tf.keras.metrics.SparseCategoricalAccuracy()

    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        step = 0
        total_loss = 0.0
        total_steps = 0

        # Training loop
        for inputs, targets in train_dataset:
            with tf.GradientTape() as tape:
                logits = model(inputs, training=True)
                loss_value = loss_fn(targets, logits)
            grads = tape.gradient(loss_value, model.trainable_weights)
            if step % steps_per_update == 0:
                optimizer.apply_gradients(zip(grads, model.trainable_weights))
                grads = [tf.zeros_like(w) for w in model.trainable_weights]
            else:
                grads = [g + w for g, w in zip(grads, model.trainable_weights)]
            step += 1
            total_loss += loss_value
            total_steps += 1

            train_metric.update_state(targets, logits)

        # Validation loop
        for val_inputs, val_targets in val_dataset:
            val_logits = model(val_inputs, training=False)
            val_metric.update_state(val_targets, val_logits)

        print(f"Train Loss: {total_loss / total_steps}, Train Accuracy: {train_metric.result().numpy()}, Validation Accuracy: {val_metric.result().numpy()}")
        train_metric.reset_states()
        val_metric.reset_states()

# Convert your data to TensorFlow datasets
train_data = tf.data.Dataset.from_tensor_slices(({"input_ids": X_train, "attention_mask": train_masks}, y_train)).batch(2)
val_data = tf.data.Dataset.from_tensor_slices(({"input_ids": X_test, "attention_mask": test_masks}, y_test)).batch(2)

# Train the model with gradient accumulation and validation
train_with_gradient_accumulation(model, train_data, val_data, epochs=20, steps_per_update=4, optimizer=optimizer)


Epoch 1/20


ResourceExhaustedError: OOM when allocating tensor with shape[1,512,768] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc [Op:ResourceGather]

In [None]:
eval_results = model.evaluate([X_test, test_masks], y_test, batch_size=8)
print(f"Accuracy: {eval_results[1] * 100:.2f}%")