In [None]:
%pip install scikit-learn nltk numpy pandas spacy
!python -m spacy download en_core_web_sm
!python -m spacy download en_core_web_lg

In [None]:
import numpy as np
import pandas as pd
import spacy
import nltk

from joblib import parallel_backend
from nltk.stem import LancasterStemmer, PorterStemmer, SnowballStemmer, WordNetLemmatizer
from numpy import ndarray
from pandas import DataFrame
from sklearn.compose import make_column_transformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_recall_fscore_support
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer

In [None]:
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')
nltk.download('wordnet')

#nlp = spacy.load('en_core_web_sm')
nlp = spacy.load('en_core_web_lg')

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')
data_path = 'BUILD/'

In [None]:
train = pd.read_csv(f'{data_path}train.csv')
dev = pd.read_csv(f'{data_path}dev.csv')
test = pd.read_csv(f'{data_path}test.csv')

In [None]:
def sentence_position(df: DataFrame):
    doc2nbsent = df.groupby('doc_id')['sentence_index'].max().to_dict()
    df['nb_sent'] = df['doc_id'].map(doc2nbsent.get)
    position = df['sentence_index'] / df['nb_sent']
    return position.values.reshape(-1, 1)

In [None]:
def is_verb(tag: str):
    return tag.startswith('VB') or tag == 'MD'

def verb_tense(sentence: str):
    verbs = [tag for _, tag in nltk.pos_tag(nltk.word_tokenize(sentence)) if is_verb(tag)]
    return '' if len(verbs) == 0 else verbs[0]

In [None]:
train['verb_tense'] = train['text'].map(verb_tense)
dev['verb_tense'] = dev['text'].map(verb_tense)
test['verb_tense'] = test['text'].map(verb_tense)

In [None]:
verb_tense_encoding = ['VB', 'VBC', 'VBD', 'VBF', 'VBG', 'VBN', 'VBP', 'VBZ', 'MD']

def verb_tense_encode(df: DataFrame):
    return np.asarray([[1 if verb_tense_encoding[i] == vt else 0 for i in range(len(verb_tense_encoding))] for vt in df['verb_tense']])

In [None]:
train['doc'] = np.asarray(nlp.pipe(train['text']))
dev['doc'] = np.asarray(nlp.pipe(dev['text']))
test['doc'] = np.asarray(nlp.pipe(test['text']))

In [None]:
def entity_count(entity_type: str):
    return lambda df: np.asarray([[sum(1 for ent in doc.ents if ent.label_ == entity_type)] for doc in df['doc']])

In [None]:
def sentence_length(df: DataFrame):
    """Donne la longueur des phrases.

    Args:
        df (pd.DataFrame): Corpus
    """
    return df['text'].map(lambda x: len(x.split())).values.reshape(-1, 1)

In [None]:
def contains_ldots(sentence: str):
    return 1 if '...' in sentence else 0

In [None]:
train['contains_ldots'] = train['text'].map(contains_ldots)
dev['contains_ldots'] = dev['text'].map(contains_ldots)
test['contains_ldots'] = test['text'].map(contains_ldots)

In [None]:
#stemmer = LancasterStemmer()
#stemmer = PorterStemmer()
#stemmer = SnowballStemmer(language='english')
stemmer = WordNetLemmatizer()

class StemmedTfidfVectorizer(TfidfVectorizer):
    def build_analyzer(self):
        analyzer = super().build_analyzer()
        return lambda doc: [stemmer.lemmatize(w) for w in analyzer(doc)]

In [None]:
class LemmatizedTfidfVectorizer(TfidfVectorizer):
    def build_analyzer(self):
        #return lambda doc: [w.text for w in doc]
        return lambda doc: [w.lemma_ for w in doc]

In [None]:
class PosTfidfVectorizer(TfidfVectorizer):
    def build_analyzer(self):
        analyzer = super().build_analyzer()
        return lambda doc: [w + '/' + tag for w, tag in nltk.pos_tag(analyzer(doc))]

In [None]:
classifier = make_pipeline(
    make_column_transformer(
        (
            TfidfVectorizer(stop_words='english', ngram_range=(1,3), min_df=10),
            'text'
        ),
        #(
        #    StemmedTfidfVectorizer(
        #        stop_words='english',
        #        ngram_range=(1,3),
        #        min_df=10
        #    'text'
        #),
        (
            LemmatizedTfidfVectorizer(
                stop_words='english',
                ngram_range=(1,3),
                min_df=10
            ),
            'doc'
        ),
        #(
        #    PosTfidfVectorizer(
        #        stop_words='english',
        #        ngram_range=(1,3),
        #        min_df=10
        #    ),
        #    'text'
        #),
        #(
        #    'passthrough',
        #    ['sentence_index']
        #),
        (
            'passthrough',
            ['contains_ldots']
        ),
        (
            FunctionTransformer(sentence_position),
            ['doc_id', 'sentence_index']
        ),
        #(
        #    FunctionTransformer(entity_count('LOC')),
        #    ['doc']
        #),
        (
            FunctionTransformer(entity_count('LAW')),
            ['doc']
        ),
        (
            FunctionTransformer(entity_count('DATE')),
            ['doc']
        ),
        #(
        #    FunctionTransformer(entity_count('PERSON')),
        #    ['doc']
        #),
        (
            FunctionTransformer(verb_tense_encode),
            ['verb_tense']
        ),
        (
            FunctionTransformer(sentence_length),
            ['text']
        ),
    ),
    LogisticRegression(
        multi_class='multinomial',
        max_iter=10000
    )
)

In [None]:
with parallel_backend('threading', n_jobs=4):
    classifier.fit(train, train['labels'])

In [None]:
train['pred'] = classifier.predict(train)
dev['pred'] = classifier.predict(dev)
test['pred'] = classifier.predict(test)

In [None]:
def eval(df: DataFrame):
    ground_truth_labels = df['labels']
    submission_labels = df['pred']
    precision, recall, f1, _ = precision_recall_fscore_support(
        ground_truth_labels, submission_labels, average='weighted'
    )
    # https://pyformat.info/#number
    print(f'{precision:.3f} & {recall:.3f} & {f1:.3f}')

In [None]:
eval(train)

In [None]:
eval(dev)

In [None]:
index2label: ndarray = classifier.named_steps['logisticregression'].classes_

confusion = confusion_matrix(dev['labels'], dev['pred'], labels=index2label)
confusion_plot = ConfusionMatrixDisplay(confusion, display_labels=index2label)
confusion_plot.plot(xticks_rotation=60)
_

In [None]:
dev[['annotation_id', 'pred']].rename(columns={'pred': 'labels'}).to_csv('run1_dev.csv', index=False)
test[['annotation_id', 'pred']].rename(columns={'pred': 'labels'}).to_csv('run1_test.csv', index=False)