In [1]:
from nltk.tokenize import word_tokenize
import pandas as pd
def read_file(path):
    with open(path, encoding="utf-8", errors="ignore") as f:
        chaine = f.read()
    return chaine

def get_list_lang(lang="en"):
    liste_langues_en_txt = read_file(f"liste_langues_{lang}.txt")
    liste_langues_en_txt = liste_langues_en_txt.replace("\n",",")
    liste_langues_en_txt = liste_langues_en_txt.replace("  ","")
    liste_langues_en_txt = liste_langues_en_txt.replace('"',"")
    liste_langues_en_txt = liste_langues_en_txt.lower()
    liste_langues_en_txt = liste_langues_en_txt.split(",")
    return liste_langues_en_txt

def br_appliquee(liste_chemins,liste_langues,langue="french"):
    """Prend en entrée la liste de chemins des articles -créée avec la fonction chemin_corpus(chemin)- 
    + la liste de langues -créée avec la fonction liste_langues(fichierjson)
    Renvoie un dictionnaire de dictionnaires de forme {chemin: {langue_citée: nb_occurrence}}"""
    
    dico_br_app = {}
    for chemin in liste_chemins:
        chemin_propre = chemin.split("\\")[-1]
        chaine = read_file(chemin).lower()
        chaine_l_propre = re.sub(r'[^\w]',' ', chaine)
        liste_mots_tok = word_tokenize(chaine_l_propre, language=langue)
        
        for mot in liste_mots_tok:
            for lg in liste_langues:
                if mot == lg:
                    dico_br_app.setdefault(chemin_propre, {})
                    dico_br_app[chemin_propre].setdefault(lg, 0)
                    dico_br_app[chemin_propre][lg]+=1

    return dico_br_app
def chemin_corpus(chemin_dossier,langue="fr",taln_ou_acl="taln"):
    """Prend en entrée le chemin du corpus ("archives_Boudin_txt/*/*/actes/*")
    Renvoie une liste contenant les chemins des articles en français"""
    
    liste_chemins = []

    for chemin in glob.glob(chemin_dossier):
        chaine = read_file(chemin)
        if detect(chaine)==langue:
            ###POUR EXCLURE LES DOCUMENTS QUI NE SONT PAS VRAIMENT ARTICLES (souvent très courts)
            if taln_ou_acl=="taln":
                tirets = chemin.split("-")
                #if len(tirets)>1:
                type_art = tirets[-2]
                if type_art != "demo" and type_art !="invite":
                    liste_chemins.append(chemin)
            if taln_ou_acl=="acl":
                if "P" in chemin:
                    if "the" in chaine:
                        liste_chemins.append(chemin)
                        
            if taln_ou_acl=="lrec":
                if "the" in chaine:
                    liste_chemins.append(chemin)
            
    return liste_chemins

def chemins_annotes(fichier_csv, cas_bender,nbheader=2,taln_ou_acl="taln"):
    """Prend en entrée le chemin vers le fichier csv qui contient le tableau d'annotation manuelle ("annotation_csv.csv")
    + le cas de figure d'application de la règle de Bender ('Appliquée', 'Non-appliquée', ...)
    + le nb_header à préciser ntmt pour langue traitée car une ligne en-dessous 
    Retourne la liste des chemins d'articles concernés"""
    
    dataset = pd.read_csv(fichier_csv,header=nbheader)
    annotes_cas = dataset.loc[dataset[cas_bender].notnull()]
    liste_annotes_cas = list(annotes_cas.iloc[:,1].values)
    if taln_ou_acl == "taln":
        url = "http://talnarchives.atala.org/TALN/"
    if taln_ou_acl == "acl":
        url = "https://aclanthology.org/"
    if taln_ou_acl == "lrec":
        url = "http://www.lrec-conf.org/proceedings/"
    
    chemins_annotes_cas = []
    for chemin in liste_annotes_cas:
        if type(chemin)==str:
            if url not in chemin and taln_ou_acl=="lrec":
                chemin = "http://www.lrec-conf.org/proceedings/"+chemin
            if url in chemin:
                dossiers = chemin.split("/")
                chemin_propre = dossiers[-1]
                chemin_propre = chemin_propre.replace(".pdf",".txt")
                chemins_annotes_cas.append(chemin_propre)
                
    return chemins_annotes_cas

In [2]:
from langdetect import detect
import glob
import re

In [3]:
def dico_br_appliquee_phrases(liste_chemins_annotes,liste_langues,chemins_annotes_appliquee):#,langue="french"):
    #faire deux dicos : 
#un où on envoie les phrases qui contiennent un nom de langue qui est bien la langue étudiée (annotée manuellement comme appliquée)
#un où on envoie les phrases qui contiennent un nom de langue mais qui n'est pas la langue étudiée (annotée manuellement comme autre chose qu'appliquée)
    dico_phrases_nomlg = {"appliquée":[], "non-appliquée": []}
    stats = {}
    for chemin in liste_chemins_annotes:
            #nettoyer articles et chemins
        chemin_propre = chemin.split("/")[-1]
        chaine = read_file(chemin).lower()
        liste_phrases = chaine.split(".")
        for phrase in liste_phrases:
            for mot in phrase.split():
                for l in liste_langues:
                    if mot == l:
                        stats.setdefault(mot, 0)
                        stats[mot]+=1
                        if chemin_propre in chemins_annotes_appliquee:
                            if phrase not in dico_phrases_nomlg["appliquée"]:
                                dico_phrases_nomlg["appliquée"].append(phrase)
                        else: #if chemin_propre not in chemins_annotes_appliquee:
                            if phrase not in dico_phrases_nomlg["non-appliquée"]:
                                dico_phrases_nomlg["non-appliquée"].append(phrase)
    L = [[eff, mot] for mot, eff in stats.items()]
    print("10 most frequent langues : ")
    print(sorted(L)[-10:])
    return dico_phrases_nomlg

liste_chemins_annotes_lrec = chemin_corpus("lrec_annotes2/*/*","en","lrec")
liste_lang = get_list_lang("en")


In [4]:
dico_br_app_lrec_a = br_appliquee(liste_chemins_annotes_lrec,liste_lang,"english")
data_annote_all = dico_br_appliquee_phrases(liste_chemins_annotes_lrec,liste_lang, dico_br_app_lrec_a)

[[427, 'spanish'], [474, 'greek'], [478, 'japanese'], [598, 'czech'], [640, 'arabic'], [1047, 'dutch'], [1610, 'chinese'], [3180, 'french'], [3780, 'german'], [7329, 'english']]


In [5]:
import json
with open("data_classif1_en420f.json") as f:
    data_annote = json.load(f)
textes_en = [x[0] for x in data_annote]
classes_en = [x[1] for x in data_annote]

In [6]:
from sklearn.linear_model import LogisticRegression
import time
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support

classifiers = []
for pen in ["l1", "l2"]:
    classifiers.append(["LogReg_saga_pen=%s"%pen, 
                        LogisticRegression(multi_class="auto", solver="saga", 
                                           max_iter=500, class_weight="balanced",
                                          penalty = pen)])

    
vectorizers=[
    ["Tfidf",TfidfVectorizer()],
    ["Count", CountVectorizer()]
]
for nom_classif, algo in classifiers:
  print(nom_classif)
  for name, V in vectorizers:
    X1_en = V.fit_transform(textes_en)
    Train_V = V.fit(textes_en)
    Y1_en = classes_en
    X_train_en, X_test_en, y_train_en, y_test_en = train_test_split(X1_en, Y1_en, test_size=0.3, random_state=0)
    start = time.perf_counter()
    clf_en = algo.fit(X_train_en, y_train_en)
    end = time.perf_counter()
    print(name, "... %.2f seconds"%(round(end-start,3)))
    pred_en = clf_en.predict(X_test_en)

    conf_matrix_en = confusion_matrix(y_test_en, pred_en)
    report_en = classification_report(y_test_en,pred_en)
    print(report_en)


LogReg_saga_pen=l1




Tfidf ... 1.51 seconds
               precision    recall  f1-score   support

    appliquée       0.94      0.91      0.93       704
non-appliquée       0.40      0.50      0.45        84

     accuracy                           0.87       788
    macro avg       0.67      0.71      0.69       788
 weighted avg       0.88      0.87      0.87       788





Count ... 2.67 seconds
               precision    recall  f1-score   support

    appliquée       0.94      0.92      0.93       704
non-appliquée       0.42      0.49      0.45        84

     accuracy                           0.87       788
    macro avg       0.68      0.70      0.69       788
 weighted avg       0.88      0.87      0.88       788

LogReg_saga_pen=l2




Tfidf ... 0.44 seconds
               precision    recall  f1-score   support

    appliquée       0.90      0.99      0.94       704
non-appliquée       0.50      0.12      0.19        84

     accuracy                           0.89       788
    macro avg       0.70      0.55      0.57       788
 weighted avg       0.86      0.89      0.86       788

Count ... 0.47 seconds
               precision    recall  f1-score   support

    appliquée       0.93      0.95      0.94       704
non-appliquée       0.52      0.44      0.48        84

     accuracy                           0.90       788
    macro avg       0.73      0.70      0.71       788
 weighted avg       0.89      0.90      0.89       788





In [8]:
with open("corpus_lremap.json")as f:
    names_LRE = json.load(f)
    
stats_LRE = {}
data_annote = [["ACL", "420-acl_annotation_bender.json"], ["LREC", "550-lrec_annotation_bender.json"]]
resultats = {}
for conf_name, json_path in data_annote:
    print(conf_name)
    resultats[conf_name] = {"Sentence":[], "Baseline":[], "Baseline_LRE":[], "Sentence_LRE":[]}
    with open(json_path) as f:
        dic_conf = json.load(f)
    for path_article, infos_article in dic_conf.items():
        classe = infos_article["class"]
        if classe =="Non-déductible" or classe =="Non-Aplicable":
            classe="N/A"
        chaine = read_file(path_article)
        has_LRE = False
        for name in names_LRE:
            if name in chaine:
                has_LRE = True
                stats_LRE.setdefault(name,0)
                stats_LRE[name]+=1
        chaine = chaine.lower()
        phrases = chaine.split(".")
        a_tester = []
        for phrase in phrases:
            for mot in phrase.split():
                for l in liste_lang:
                    if mot == l:
                        a_tester.append(phrase)
        if len(a_tester)>0:
            pred_baseline = "Appliquée"
            pred_LRE = pred_baseline
            if classe =="Déductible":#pour avoir aussi les déductibles
                pred_baseline = classe
            X_test2 = Train_V.transform(a_tester)
            Y_pred2 = clf_en.predict(X_test2)
        else:
            pred_baseline = "N/A"
            Y_pred2= [["N/A"]]
            if has_LRE==True:
                pred_LRE = "Déductible"
            else:
                pred_LRE = pred_baseline

        NB_pos = len([x for x in Y_pred2 if x== "appliquée"])
        if "appliquée" in Y_pred2:
            pred_sentence = "Appliquée"
            if classe =="Déductible":#pour avoir aussi les déductibles
                pred_sentence = classe
        else:
            pred_sentence = "N/A"
        pred_sentence_LRE = pred_sentence
        if pred_sentence!="Appliquée":
            if pred_LRE=="Déductible":
                pred_sentence_LRE = pred_LRE
            "Sentence_LRE"
        resultats[conf_name]["Sentence"].append([classe, pred_sentence])
        #, {"Phr_lg":len(a_tester), "Phr_lg_pos":NB_pos}])
        resultats[conf_name]["Baseline"].append([classe, pred_baseline])
        resultats[conf_name]["Baseline_LRE"].append([classe, pred_LRE])
        resultats[conf_name]["Sentence_LRE"].append([classe, pred_sentence_LRE])


ACL
LREC


In [None]:
from scipy.stats import spearmanr

for class_name, dic_res_lrec in resultats["LREC"].items():
    print("-"*20)
    print(class_name+"\n")
    y_test, y_pred = [x[0] for x in dic_res_lrec], [x[1] for x in dic_res_lrec]
    report_lrec = classification_report(y_test, y_pred, output_dict=True)
    print("LREC", str(spearmanr(y_test,y_pred)))

    dic_res_acl = resultats["ACL"][class_name]
    y_test, y_pred = [x[0] for x in dic_res_acl], [x[1] for x in dic_res_acl]
    report_acl = classification_report(y_test, y_pred, output_dict=True)
    print("ACL", str(spearmanr(y_test,y_pred)))

    dic_classes = {"Appliquée":"Applied\t", "Déductible":"Deducible", "N/A":"N/A\t", "macro avg":"macro avg", "weighted avg":"micro avg"}
    for classe in ["Appliquée", "Déductible", "N/A", "macro avg", "weighted avg"]:
        liste_ligne = [dic_classes[classe]+"\t"]
        for this_res in [report_lrec, report_acl]:
            for mesure in ["precision", "recall", "f1-score"]:
                liste_ligne.append(round(this_res[classe][mesure], 3))
            liste_ligne.append(this_res[classe]["support"])
        print(" & ".join([str(x) for x in liste_ligne])+"\\\\")
        


In [None]:
print(stats_LRE)

In [None]:
#### Tester vectoriseurs
algo = LogisticRegression(multi_class="auto", solver="lbfgs", max_iter=500, class_weight="balanced")
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
#Balanced est indispensable
classifiers = [
    #["base+bal", LogisticRegression(multi_class="auto", solver="lbfgs", class_weight="balanced")
    #],
    #["base+bal+iter", LogisticRegression(multi_class="auto", solver="lbfgs", max_iter=1000, class_weight="balanced")
    #],
        ["base+bal+saga", LogisticRegression(multi_class="auto", solver="saga", class_weight="balanced")
    ],
    ["base+bal+iter+saga", LogisticRegression(multi_class="auto", solver="saga", max_iter=1000, class_weight="balanced")
    ],
    #["LogisticRegr_Bal_C=0.01", LogisticRegression(multi_class="auto", solver="saga", C=0.01, n_jobs=2, max_iter=1000, class_weight="balanced")],
    ["LogisticRegr_Bal_C=0.05", LogisticRegression(multi_class="auto", solver="saga", C=0.05, n_jobs=2, max_iter=1000, class_weight="balanced")],
    ["Decision Tree", DecisionTreeClassifier()],
    #["MNB", MultinomialNB()],
    #["RF_depth=10_estimators=100", RandomForestClassifier(max_depth=10, n_estimators = 100, random_state=0)],
]
    

#V = CountVectorizer(ngram_range=(3,5), analyzer="char")
list_vectorizer = [
    ["Count 1-1\t", CountVectorizer()],
    ["Count 3-7--char", CountVectorizer(ngram_range=(3,7), analyzer="char")],
    #["tfidf", TfidfVectorizer()],
    #["tfidf 3-7--char", TfidfVectorizer(ngram_range=(3,7), analyzer="char")],
]
list_res = []
for name, clf in classifiers:
  for nom, this_V in list_vectorizer:
    X1_en = this_V.fit_transform(textes_en)
    Train_V = this_V.fit(textes_en)
    Y1_en = classes_en
    
    X_train_en, X_test_en, y_train_en, y_test_en = train_test_split(X1_en, Y1_en, test_size=0.3, random_state=0)
    start = time.perf_counter()
    clf_new = clf.fit(X_train_en, y_train_en)
    end = time.perf_counter()
    #print("... %.2f seconds"%(round(end-start,2)))
    pred_en = clf_new.predict(X_test_en)

    conf_matrix_en = confusion_matrix(y_test_en, pred_en)
    #print("Classifieur 1\n",conf_matrix_en) #na/app 229/3 2/9
    report_en = classification_report(y_test_en,pred_en,output_dict=True)
    print(name, nom, report_en["macro avg"]["f1-score"])

    list_res.append([report_en["macro avg"]["f1-score"], name, nom] )
for a in sorted(list_res):
    print(a)

In [11]:
### ROC curve
data_annote = [["ACL", "420-acl_annotation_bender.json"], ["LREC", "550-lrec_annotation_bender.json"]]

res_ROC = {}
for tranche in range(10, 110, 10):
  dic_resultats = {}
  for conf_name, json_path in data_annote:
    print(f"{conf_name} : first {tranche}%")
    dic_resultats[conf_name] = {"Sentence":[], "Baseline":[], "Baseline_LRE":[]}
    with open(json_path) as f:
        dic_conf = json.load(f)
    for path_article, infos_article in dic_conf.items():
        classe = infos_article["class"]
        if classe =="Non-déductible" or classe =="Non-Aplicable":
            classe="N/A"
        chaine = read_file(path_article)
        has_LRE = False
        chaine = chaine.lower()
        phrases = chaine.split(".")
        a_tester = []
        for cpt, phrase in enumerate(phrases):
            if cpt>(len(phrases)*tranche/100):
                break
            for mot in phrase.split():
                for l in liste_lang:
                    if mot == l:
                        a_tester.append(phrase)
        if len(a_tester)>0:
            pred_baseline = "Appliquée"
            pred_LRE = pred_baseline
            if classe =="Déductible":#pour avoir aussi les déductibles
                pred_baseline = classe
            X_test2 = Train_V.transform(a_tester)
            Y_pred2 = clf_en.predict(X_test2)
        else:
            pred_baseline = "N/A"
            Y_pred2= [["N/A"]]
            pred_LRE = pred_baseline
            if has_LRE==True:
                pred_LRE = "Déductible"
        if "appliquée" in Y_pred2:
            pred_sentence = "Appliquée"
            if classe =="Déductible":#pour avoir aussi les déductibles
                pred_sentence = classe
        else:
            pred_sentence = "N/A"        
        dic_resultats[conf_name]["Sentence"].append([classe, pred_sentence])
        dic_resultats[conf_name]["Baseline"].append([classe, pred_baseline])
        dic_resultats[conf_name]["Baseline_LRE"].append([classe, pred_LRE])
  res_ROC[tranche] = dic_resultats

ACL : first 10%
LREC : first 10%
ACL : first 20%
LREC : first 20%
ACL : first 30%
LREC : first 30%
ACL : first 40%
LREC : first 40%
ACL : first 50%
LREC : first 50%
ACL : first 60%
LREC : first 60%
ACL : first 70%
LREC : first 70%
ACL : first 80%
LREC : first 80%
ACL : first 90%
LREC : first 90%
ACL : first 100%
LREC : first 100%


In [None]:
print(res_ROC.keys())

In [None]:
#with open("resultats_roc.json", "w") as w:
#    w.write(json.dumps(res_ROC, indent = 2))

In [None]:
print(clf_en)

In [None]:
liste_data_conf = [["LREC", "lrec/*/*"]]#, ["ACL"]]
for name_conf, path_conf in liste_data_conf:
    liste_path = glob.glob(path_conf):