# Spacy Text Categorizer

The objective here will be to build a french langage categorizer suited to distinguish different kind of wage agreement components.

Some inspiring sources:
- https://medium.com/@johnidouglasmarangon/building-a-text-classification-model-with-spacy-3-x-57e59fa50547
- https://www.machinelearningplus.com/nlp/custom-text-classification-spacy/
- https://www.width.ai/post/spacy-text-classification

In [None]:
import json
import numpy as np
import os

In [None]:
ROOT_PATH = r"C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files"

We first need to adapt the data from label studio to the required format for spacy TextCategorizer

In [None]:
def import_label_studio_data(filename, target_labels):
    """
    This function imports the data from Label Studio JSON file and returns the data in the format required for training.
    It also allows to select specific labels to train the model on with the "target_labels" argument.
    """

    if not isinstance(target_labels, list):
        raise ValueError("The 'target_labels' argument must be a list of strings.")

    TRAIN_DATA = []  # Initialize TRAIN_DATA
    
    with open(filename, 'rb') as fp:
        training_data = json.load(fp)
    for text in training_data:
        entities = []
        info = text.get('text')
        entities = []
        if text.get('label') is not None:
            list_ = []
            for label in text.get('label'):
                list_.append([label.get('start'), label.get('end')])
            a = np.array(list_)
            overlap_ind = []
            for i in range(0, len(a[:, 0])):
                a_comp = a[i]
                x = np.delete(a, (i), axis=0)
                overlap_flag = any([a_comp[0] in range(j[0], j[1] + 1) for j in x])
                if overlap_flag:
                    overlap_ind.append(i)

            for ind, label in enumerate(text.get('label')):
                if ind in overlap_ind:
                    iop = 0
                else:
                    if any(target in label.get('labels') for target in target_labels):
                        entities.append((label.get('start'), label.get('end'), label.get('labels')[0]))
        
        if entities:  # Proceed only if there are non-empty entities
            TRAIN_DATA.append((info, {"entities": entities}))

    return TRAIN_DATA

all = ['OUV', 'INT', 'CAD', 'NOUV', 'NCAD', 'AG', 'AI', 'TOUS', 'AG OUV', 'AG INT', 'AG CAD', 'AI OUV', 'AI INT', 'AI CAD', 'NOUV AG', 'NCAD AG', 'NOUV AI', 'NCAD AI', 'ATOT',\
        'ATOT OUV', 'ATOT INT', 'ATOT CAD', 'PPV', 'PPVm', 'DATE']

# Call the function with the filename and a list of target labels
target_labels = ['OUV', 'INT', 'CAD', 'NOUV', 'NCAD', 'AG', 'AI', 'TOUS', 'AG OUV', 'AG INT', 'AG CAD', 'AI OUV', 'AI INT', 'AI CAD', 'NOUV AG', 'NCAD AG', 'NOUV AI', 'NCAD AI', 'ATOT',\
        'ATOT OUV', 'ATOT INT', 'ATOT CAD', 'PPV', 'PPVm', 'DATE'] # Add your target labels here

data = import_label_studio_data(os.path.join(ROOT_PATH, r"data\training_json\data449.json"), target_labels)
data

In [None]:
import pandas as pd

def spacy_to_dataframe(data):
    """
    This function takes the data in the format returned by the import_label_studio_data function and returns a pandas dataframe of two columns: text and label.

    Args:
        data: The data in the format returned by the import_label_studio_data function.

    Returns:
        A pandas dataframe of two columns: text and label.
    """
    text_data = [text for text, _ in data]
    labels = [label for _, label in data]

    df = pd.DataFrame({'text': text_data, 'label': labels})
    return df

df = spacy_to_dataframe(data)
df.head()

In [None]:
def dummy_label(df):
    """
    This function creates a dummy variable for the target label.

    Args:
        df (DataFrame): The DataFrame containing the text and label columns.
    """
    # Create a new column called "label_dummy" and initialize with zeros
    df["label_dummy"] = 0

    # Iterate through each row in the DataFrame
    for index, row in df.iterrows():
        labels = row["label"]["entities"]  # Access the entities list in the tuple
        for label in labels:
            target = label[2]
            if target == "OUV":
                df.at[index, "label_dummy"] = 1  # Set the value to 1 for the current row

    # Print the DataFrame to verify the changes
    print(df["label_dummy"].value_counts())
    return df

df = dummy_label(df)

In [None]:
#cleaning function : 
def clean_dataset(data):
    """
    This function cleans the dataset by removing rows with missing values and dropping the "label" column.
    It also renames the "label_dummy" column to "label".

    Args:
        data (DataFrame): The DataFrame containing the text, label and label_dummy columns.
    """
    data.dropna(axis=0, how='any', inplace=True)
    # Now we can drop the "label" column and rename the "label_dummy" column to "label"
    if 'label_dummy' in data.columns:
        data.drop("label", axis=1, inplace=True)
        data.rename(columns={"label_dummy": "label"}, inplace=True)
    else:
        pass
    print(data.head())
    return data

df = clean_dataset(df)
df.to_csv(os.path.join(ROOT_PATH, r"\data\training_csv\data449_cleaned.csv"), index=False)

In [None]:
#We inverse the label to have 0 for PPV and 1 for the rest
def inverse_label(data):
    """
    This function inverses the label column.

    Args:
        data (DataFrame): The DataFrame containing the text and label columns.
    """
    data["label"] = data["label"].apply(lambda x: 0 if x == 1 else 1)
    print(data["label"].value_counts())
    return data

# df = inverse_label(df)
# df.head()

In [None]:
# Create the dataset
dataset = list(df[["text", "label"]].sample(frac=1).itertuples(index=False, name=None))
dataset

Now that our dataset is cleaned, we can split it into 3: the train, the validation and the test data

In [None]:
import numpy as np

def split_data(data, train_ratio=0.75, val_ratio=0.15, test_ratio=0.10, random_seed=None):
    """
    Split a dataset into training, validation, and test sets.

    Parameters:
    - data: The dataset to be split.
    - train_ratio: The ratio of data to be allocated to the training set (default: 0.75).
    - val_ratio: The ratio of data to be allocated to the validation set (default: 0.15).
    - test_ratio: The ratio of data to be allocated to the test set (default: 0.10).
    - random_seed: Seed for the random shuffling (default: None, which results in non-reproducible shuffling).

    Returns:
    - A tuple containing three sets: (train_set, val_set, test_set)
    """
    # Calculate the total size of the dataset
    total_size = len(data)
    
    # Calculate the sizes of each split
    train_size = int(total_size * train_ratio)
    val_size = int(total_size * val_ratio)
    test_size = total_size - train_size - val_size
    
    # Set the random seed if provided
    if random_seed is not None:
        np.random.seed(random_seed)
    
    # Shuffle the data
    shuffled_data = np.random.permutation(data)
    shuffled_data =list(map(lambda x:(str(x[0]),int(x[1])),shuffled_data))

    # Split the data into three sets
    train_set = shuffled_data[:train_size]
    val_set = shuffled_data[train_size:train_size + val_size]
    test_set = shuffled_data[train_size + val_size:]
    
    # Print the size of each set
    print("Training set size:", len(train_set))
    print("Validation set size:", len(val_set))
    print("Test set size:", len(test_set))

    return train_set, val_set, test_set

# Example usage:
train_data, val_data, test_data = split_data(dataset)

Spacy for training need binary files (doc) so we need to convert the data to this format

We build the model to obtain a classifier

In [None]:
import spacy

# We create an empty model
nlp = spacy.blank("fr")

# We add the text classifier to the pipeline
# textcat = nlp.add_pipe("tok2vec")
# textcat = nlp.add_pipe("textcat")
# textcat.add_label("PPV")
# textcat.add_label("NPPV")

print(nlp.pipe_names)

In [None]:
from spacy.tokens import DocBin

def convert(data, outfile):
    db = spacy.tokens.DocBin()

    for doc, label in nlp.pipe(data, as_tuples=True):

        doc.cats["PPV"] = label == 0
        doc.cats["NPPV"] = label == 1
     
        db.add(doc)
    
    db.to_disk(outfile)
    print("Data saved to:", outfile)

convert(train_data, os.path.join(ROOT_PATH, r"model\classifyer\PPV\train.spacy"))
convert(val_data, os.path.join(ROOT_PATH, r"model\classifyer\PPV\val.spacy"))
convert(test_data, os.path.join(ROOT_PATH, r"model\classifyer\PPV\test.spacy"))

In [None]:
!python -m spacy init config --lang fr --pipeline textcat --optimize efficiency --force C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\PPV\config.cfg 

In [None]:
!python -m spacy train C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\OUV\config.cfg --paths.train C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\OUV\train.spacy  --paths.dev C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\OUV\val.spacy --output C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\OUV --verbose

In [None]:
!python -m spacy evaluate C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\OUV\model-best C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\model\classifyer\OUV\test.spacy

Test

In [None]:
texts = ["Le contrat ne contient pas de prime de partage de la valeur ajoutée"]

In [None]:
nlp = spacy.load(os.path.join(ROOT_PATH, r"model\classifyer\PPV\model-best"))

for text in texts:
    doc = nlp(text)
    print(doc.cats,  "-",  text)

## Demo

Demonstration: on teste la capacité de tri sur un ensemble de documents

In [None]:
import pandas as pd
import spacy

# Load the model
nlp = spacy.load(os.path.join(ROOT_PATH, r"model\classifyer\PPV\model-best"))

# Load the data
data = pd.read_csv(os.path.join(ROOT_PATH, r"data\training_csv\data449_cleaned.csv"))
data.head()

In [None]:
texts = data["text"].tolist()

for text in texts:
    doc = nlp(text)
    print(doc.cats,  "-",  text)

In [None]:
ppv_list = []
nppv_list = []

for text in texts:
    doc = nlp(text)
    if doc.cats.get("PPV", 0.0) > doc.cats.get("NPPV", 0.0):
        ppv_list.append(text)
    else:
        nppv_list.append(text)

In [None]:
df_ppv = pd.DataFrame(ppv_list, columns=["PPV"])
df_nppv = pd.DataFrame(nppv_list, columns=["NPPV"])

df_data = pd.concat([df_ppv, df_nppv], axis=1)
df_data.info()

In [None]:
df_data.head()

In [None]:
import sys
sys.path.append(r"C:\Users\garsonj\Desktop\spacy_finetuning\spacy_files\script")
from confusion_matrix_spacy_def import print_statistics, import_label_studio_data, spacy_to_dataframe, dummy_label_true, clean_dataset, dummy_label_pred

true_data = import_label_studio_data(os.path.join(ROOT_PATH, r"data\training_json\data449.json"), target_labels)
df_true = spacy_to_dataframe(true_data)
df_true = dummy_label_true(df_true)
df_true = clean_dataset(df_true)

# for the predicted json
with open(r"", "r", encoding="utf-8") as f:
    data_pred = json.load(f)

df_pred = spacy_to_dataframe(data_pred)
df_pred = dummy_label_pred(df_pred)
df_pred = clean_dataset(df_pred)