# Traitement du corpus TCOF - CNRTL

**Imports**

In [None]:
import os
import pandas as pd
import re
import requests
import xml.etree.ElementTree as ET
import numpy as np
from bs4 import BeautifulSoup
import chardet
from bs4.element import NavigableString, Tag

# **Téléchargement des corpus depuis le site CNRTL**

1. Depuis le site du CNRTL, télécharge chaque corpus dans un fichier compressé .zip (avec les fichiers .trs, .xml, .wav), dans le dossier indiqué par la variable filename
2. nomme chaque fichier compressé le nom du corpus, récupéré depuis la page html du site

In [None]:
def corpus_name_cnrtl(number):
    url = "https://tcof.atilf.fr/index.php?r=corpus%2Fview&id={}".format(number)
    response = requests.get(url)
    html_content = response.text
    soup = BeautifulSoup(html_content, "html.parser")
    # Find the h1 tag and extract its text
    h1_tag = soup.find('h1')
    text = h1_tag.get_text()
    # Extract the corpus name from the text
    corpus_name = text.replace('Corpus ', '')
    return corpus_name

In [None]:
for i in range(1,800):
    try:
        url = 'https://tcof.atilf.fr/index.php?r=corpus/download-corpus&id={}'.format(i)
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            filename = 'E:/Corpus/TCOF_CNRTL/{}.zip'.format(corpus_name_cnrtl(i))
            with open(filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
    except:
        pass

# **Fichiers dans le dossier**
> **Avant de lancer**: dézipper manuellement tous les fichier zip 

Le script : 
1. Recense les fichiers existants pour chaque corpus
2. Output un dataframe avec pour chaque corpus, les fichiers existants

In [None]:
def get_files_cnrtl(path):
    list_corpus_path = [add_path2corpus(path, corpus) for corpus in get_all_folder(path)]
    mydict = {}
    for corpus_path in list_corpus_path:
        dict_corpus = get_corpus_files(corpus_path)
        mydict[corpus_path]=dict_corpus
    df = pd.DataFrame.from_dict(mydict, orient='index').rename_axis('corpus').reset_index()
    return df

In [None]:
def get_corpus_files(path):
    files = {}
    for file in os.listdir(path):
        if file.endswith('.xml'):
            files['xml']=file
        if file.endswith('.trs'):
            pattern = r'(anonymise_[\S]*.trs)'
            match = re.search(pattern, file)
            if match:
                files['anonymise_trs']=file
            else:
                files['trs']=file
        if file.endswith('.wav'):
            pattern = r'(anonymise_[\S]*.wav)'
            match = re.search(pattern, file)
            if match:
                files['anonymise_wav']=file
            else:
                files['wav']=file
        else:
            continue
    return files

In [None]:
def add_path2corpus(path,corpus):
    new = '{}\{}'.format(path, corpus)
    return new

In [None]:
def get_all_folder(path):
    directory = os.listdir(path)
    folder_list = []
    for folder in directory:
        if os.path.isdir(os.path.join(path, folder)):
            folder_list.append(folder)
    return folder_list

In [None]:
def add_corpus_names_col(df):
    wav = df['xml'].tolist()
    corpus_names = []
    for file in wav:
        try: 
            pattern = r'^(.*)\.xml$'
            match = re.search(pattern, file)
            if match:
                corpus = match.group(1)
        except:
            corpus = 'NAN'
        corpus_names.append(corpus)
    df['corpus_name'] = corpus_names
    return df

In [None]:
#mettre le chemin du dossier 
path_cnrtl = r'E:\Corpus\TCOF_CNRTL'
#obtenir le dataframe avec le noms des fichiers du dossier dedans
df_cnrtl = get_files_cnrtl(path_cnrtl)

In [None]:
df_cnrtl = add_corpus_names_col(df_cnrtl)

In [None]:
#liste de tous les fichiers pour chaque corpus
df_cnrtl 

# **Extraction des métadonnées (fichier .xml)**

1. Extrait toutes les métadonnées depuis le fichier .xml
2. Détecte si le fichier est encodé en utf8 ou iso, et le décode selon le format.
2. Output un dataframe avec pour chaque corpus, les métadonnées associées extraites depuis le fichier .xml

In [None]:
def detect_encoding(file):
    with open(file, 'rb') as f:
        result = chardet.detect(f.read())
        encoding = result['encoding'] 
    # Open the file with the detected encoding
    with open(file, 'r', encoding=encoding) as f:
        soup = BeautifulSoup(f, "lxml-xml")
    return soup

In [None]:
def xml2df(xml_files):
    list_df =  []
    for xml_file in xml_files:
        soup = detect_encoding(xml_file)
        dict_file, locuteur = parsexml(soup)
        df_corpus = get_df(dict_file)
        locuteur_list = list(locuteur.items())
        df_corpus['locuteur'] = [locuteur_list] * len(df_corpus)
        list_df.append(df_corpus)
    df = pd.concat(list_df)
    return df

In [None]:
def get_df(my_dict):
    dict2df_corpus = my_dict.copy()
    df_corpus = pd.DataFrame([dict2df_corpus])
    return df_corpus

In [None]:
def get_general(soup, my_dict, general):
    for tag_name in general:
        tag = soup.find(tag_name)
        if tag is not None:
            tag_value = tag.text.strip()
            tag_name = tag.name
            my_dict[tag_name] = tag_value
    return my_dict

def get_enregistrement(soup, my_dict, enregistrement):
    for tag_name in enregistrement:
        tag = soup.find(tag_name)
        if tag is not None:
            tag_value = tag.text.strip()
            tag_name = tag.name
            my_dict[tag_name] = tag_value
    return my_dict

def get_referencement(soup, my_dict, referencement):
    for tag_name in referencement:
        tag = soup.find(tag_name)
        if tag is not None:
            tag_value = tag.text.strip()
            tag_name = tag.name
            my_dict[tag_name] = tag_value
    return my_dict

def get_transcription(soup, my_dict, transcription):
    for tag_name in transcription:
        tag = soup.find(tag_name)
        if tag is not None:
            tag_value = tag.text.strip()
            tag_name = tag.name
            my_dict[tag_name] = tag_value
    return my_dict

def get_locuteurs(soup, my_dict, locuteur_tag):
    # get all locuteurs
    locuteurs = soup.find_all('locuteur')

    # create a dictionary of locuteurs
    for locuteur in locuteurs[1:]:
        loc_principal = locuteur.get('locuteurPrincipal')
        loc_nb_tour = locuteur.get('nombre_tours')
        loc_identifiant = locuteur.get('identifiant')

        # create a new dictionary for this locuteur
        dict_locuteur = {}
        dict_locuteur['nb_tour'] = loc_nb_tour
        dict_locuteur['locuteur_principal'] = loc_principal

        for tag_name in locuteur_tag:
            tag = locuteur.find(tag_name)
            if tag is not None:
                tag_value = tag.text.strip()
                tag_name = tag.name
                dict_locuteur[tag_name] = tag_value
        # add this locuteur to the dictionary
        my_dict[loc_identifiant] = dict_locuteur
    return my_dict

In [None]:
def parsexml(soup):
    general = ["nomDossier", "responsable_corpus","droit_acces","lien_autre_corpus","mail",'logiciel_alignement','anonymisation',"nombre_locuteurs","relation","type_corpus","modalite_recueil","canal","cadre","degre",'situation_enonciation',"genre","support_dialogue","document_annexe","resume","commentaire","createur_fiche",'date_creation_fiche']
    enregistrement = ["nom_fichier", "responsable", "autorisation", "qualite","taille","duree","date","duree_transcription","debut_timecode_transcription", "dernier_timecode_transcription", "pays","region","ville","arrondissement", "description_lieu","format"]
    transcription = ["nom_fichier", "transcripteur","reviseur","format", "nombre_mots", "convention_transcription"]
    referencement = ["nom_corpus", "responsable", "titre", "laboratoire"]
    locuteur_tag = ['age','sexe','etude','formation','profession_actuelle','profession_anterieure','role','degre','statut_francais','autre_langue','relation_locuteur','naissance','residence','appartenance','particularite','nombre_mots','temps_parole']
    my_dict = {}
    my_dict.update(get_general(soup, my_dict, general))
    my_dict.update(get_enregistrement(soup, my_dict, enregistrement))
    my_dict.update(get_transcription(soup, my_dict,transcription))
    my_dict.update(get_referencement(soup, my_dict, referencement))
    locuteur = {}
    dict_locuteur = get_locuteurs(soup, locuteur, locuteur_tag)
    #my_dict.update(get_locuteurs(soup, my_dict, locuteur_tag))
    return my_dict, dict_locuteur

In [None]:
xml_files_cnrtl = [r'{}\{}.xml'.format(x, y).replace('\\','/') for x,y in list(zip(df_cnrtl['corpus'].tolist(), df_cnrtl['corpus_name'].tolist()))]

In [None]:
df_cnrtl_metadonnees = xml2df(xml_files_cnrtl)

In [None]:
#liste des métadonnées de chaque corpus
df_cnrtl_metadonnees

# **Analyse des tag.names pour les fichier .trs**

1. Récupère tous les noms des balises utilisées dans le fichier trs
2. Compare les noms utilisés dans tous les corpus
3. Output un dataframe avec pour chaque corpus, le fichier .trs correspondant, tous les tags utilisés, la différence avec les autres corpus

In [None]:
def get_all_tagsname(xml_file):
    # Parse the XML file into an ElementTree object
    try : 
        tree = ET.parse(xml_file)
    # Get the root element of the tree
        root = tree.getroot()
    # Initialize an empty set to store the tag names
        tag_names = set()
    # Iterate over all elements in the tree and add their tag names to the set
        for elem in root.iter():
            tag_names.add(elem.tag)
    # Return the set of tag names
    except: 
        return "Pas de fichier trouvé à cet emplacement"
    return tag_names

In [None]:
def get_corpus_name(path):
    filename = re.search(r'[^\\\/]*?(?=\.\w+$)', path).group()
    return filename

In [None]:
def get_df_tags(xml_files):
    corpus_name = [get_corpus_name(xml_file) for xml_file in xml_files]
    tags_ = [get_all_tagsname(xml_file) for xml_file in xml_files]
    df = pd.DataFrame(list(zip(corpus_name, tags_)), columns = ['corpus_name','tags'])
    return df

In [None]:
def get_difference(df):
    # prendre la liste des tags + l'intersection de toutes les listes
    lists_tags = df['tags'].tolist()
    results_union = set().union(*lists_tags)
    # trouver les différences entre l'intersection de toutes les listes et la liste des tags d'un corpus donné
    df['is_same'] = df['tags'].apply(lambda x:is_same(x, results_union))
    return results_union, df

In [None]:
def is_same(list1, list2):
    s = set(list1)
    difference = [x for x in list2 if x not in s]
    if difference : 
        return difference
    else: 
        return True

In [None]:
trs_files_cnrtl = [r'{}\{}.trs'.format(x, y).replace('\\','/') for x,y in list(zip(df_cnrtl['corpus'].tolist(), df_cnrtl['corpus_name'].tolist()))]

In [None]:
df_cnrtl_tagstrs = get_df_tags(trs_files_cnrtl)

Comparaison des tags

In [None]:
results_union_trs, df_cnrtl_tagstrs = get_difference(df_cnrtl_tagstrs)
print('Nb de tags communs :', len(results_union_trs), '\n Liste des tags communs :\n', results_union_trs)

In [None]:
df_cnrtl_tagstrs

Tous les fichiers trs ont les mêmes tags

# Parsage fichier .trs 

1. extrait toutes les données du fichier .trs (possible de récupérer également des données qui n'apparaissent pas dans le dataframe final, comme le topic)
2. Output un dataframe pour chaque fichier .trs avec à chaque ligne un tour de parole, son début, sa fin, le locuteur et ses caractéristiques, le fichier audio, l'encoding et *le texte synchronisé*

*NB : le texte synchronisé n'est pas encore optimal, j'ai du mal à récupérer le texte entre deux balises Sync, car d'autres balises Event sont aussi utilisées + l'encodage est tjrs en iso*

In [None]:
def detect_encoding2(file):
    with open(file, 'rb') as f:
        result = chardet.detect(f.read())
        encoding = result['encoding'] 
    # Open the file with the detected encoding
    with open(file, 'r', encoding=encoding) as f:
        soup = BeautifulSoup(f, "lxml-xml")
    return soup, encoding

In [None]:
def parse_trs2(file):
    try : 
        transcription = ["nom_fichier", "transcripteur","reviseur","format", "nombre_mots", "convention_transcription"]
        referencement = ["nom_corpus", "responsable", "titre", "laboratoire"]
        locuteur_tag = ['age','sexe','etude','formation','profession_actuelle','profession_anterieure','role','degre','statut_francais','autre_langue','relation_locuteur','naissance','residence','appartenance','particularite','nombre_mots','temps_parole']
        soup, encoding = detect_encoding2(file)
        my_dict = {}
        dict_trans, audio_filename = get_trans2(soup, my_dict)
        my_dict.update(dict_trans)  
        dict_loc = get_speaker2(soup, my_dict)
        my_dict.update(dict_loc)
        my_dict.update(get_topics2(soup, my_dict))
        my_dict.update(get_section2(soup, my_dict))
        dict_turns = get_turn2(soup)
        df = pd.DataFrame.from_dict(dict_turns,orient='index')
        df['audio_filename']= audio_filename
        df['encoding']=encoding
        return df
    except:
        pass
    

In [None]:
def get_trans2(soup, my_dict):
    trans = soup.find('Trans')
    # create a dictionary of locuteurs
    if trans is not None:
        audio_filename = trans.get('audio_filename')
        my_dict['audio_filename'] = audio_filename
    return my_dict, audio_filename

def get_speaker2(soup, my_dict):
    speakers = soup.find_all('Speaker')
    # create a dictionary of speakers
    for speaker in speakers:
        scope = speaker.get('scope')
        accent = speaker.get('accent')
        dialect = speaker.get('dialect')
        check = speaker.get('check')
        name = speaker.get('name')
        id_ = speaker.get('id')
        # create a new dictionary for this speaker
        dict_locuteur = {}
        dict_locuteur['scope'] = scope
        dict_locuteur['accent'] = accent
        dict_locuteur['dialect'] = dialect
        dict_locuteur['check'] = check
        dict_locuteur['name'] = name
        dict_locuteur['id'] = id_
        my_dict[id_] = dict_locuteur
    return my_dict

def get_topics2(soup, my_dict):
    topics = soup.find_all('Topic')
    # create a dictionary of locuteurs
    for topic in topics:
        id_ = topic.get('id')
        desc = topic.get('desc')
        # create a new dictionary for this speaker
        dict_topic = {}
        dict_topic['desc'] = desc
        dict_topic['id'] = id_
        my_dict[id_] = dict_topic
    return my_dict

def get_section2(soup, my_dict):
    sections = soup.find_all('Section')
    # create a dictionary of locuteurs
    for section in sections:
        topic = section.get('topic')
        endTime = section.get('endTime')
        startTime = section.get('startTime')
        type_ = section.get('type')
        # create a new dictionary for this speaker
        dict_section = {}
        dict_section['topic'] = topic
        dict_section['endTime'] = endTime
        dict_section['startTime'] = startTime
        dict_section['type'] = type_
        my_dict['section_{}'.format(topic)] = dict_section
    return my_dict

def get_turn2(soup):
    turns = soup.find_all('Turn')
    dict_turns = {}
    dict_loc = {}
    dict_loc.update(get_speaker2(soup, dict_loc))
    results = []
    for index, turn in enumerate(turns):
        endTime = turn.get('endTime')
        startTime = turn.get('startTime')
        speaker = turn.get('speaker')
        sync_tags = turn.find_all('Sync')
        result = get_sync_times2(turn)
        text = turn.text.encode('iso-8859-1').decode('utf-8')
        dict_turn = {}
        dict_turn['startTime']=startTime
        dict_turn['endTime']=endTime
        dict_turn['speaker'] = speaker
        dict_turn['speaker_characteristic'] = dict_loc.get('{}'.format(speaker))
        dict_turn['text']= text.replace('\n','')
        dict_turn['text_synchronisé'] = result
        dict_turns['{}'.format(index)] = dict_turn
    return dict_turns

In [None]:
def get_sync_times(turn):
    sync_tags = turn.find_all('Sync')
    sync_times = [float(sync_tag['time']) for sync_tag in sync_tags]
    text = turn.get_text().replace('\n\n','\n').strip().splitlines()
    result = [(sync_time, text) for sync_time, text in list(zip(sync_times, text))]
    return result

In [None]:
def get_sync_times2(turn):
    sync_tags = turn.find_all('Sync')
    list_sync = []
    for i, sync_tag in enumerate(sync_tags):  
        contenu = []
        # Récupérer le contenu jusqu'à la balise Sync suivante
        current_tag = sync_tag.next_sibling
        while current_tag is not None and current_tag.name != 'Sync':
            if isinstance(current_tag, NavigableString):
                current_tag_str = current_tag.get_text().replace('\n','').encode('iso-8859-1').decode('utf-8')
                contenu.append(current_tag_str)
            else:
                pass
            current_tag = current_tag.next_sibling
        list_sync.append((sync_tag['time'], ' '.join(contenu)))
    return list_sync

In [None]:
df = parse_trs2(trs_files_cnrtl[8])

In [None]:
trs_files_cnrtl[0]

In [None]:
list_df_turn= [parse_trs2(file) for file in trs_files_cnrtl]

In [None]:
df_essai = list_df_turn[0]

In [None]:
def add_meta(df_essai, dict_meta, dict_path):
    try:
        nom = df_essai['audio_filename'].tolist()[0].replace('.wav', '')
        df_essai['speaker_name'] = df_essai.apply(lambda row: row['speaker_characteristic']['name'], axis=1)
        df_essai['speaker_native'] = df_essai.apply(lambda row: row['speaker_characteristic']['dialect'], axis=1)
        nom = df_essai['audio_filename'][0].replace('.wav', '')
        dict_corpus = dict_meta['{}'.format(nom)]
        dict_loc = dict_corpus['locuteur']
        df_essai['sexe_speaker'] = df_essai.apply(lambda row: get_sexe(dict_meta, nom), axis=1)
        df_essai['age_speaker'] = df_essai.apply(lambda row: get_age(dict_meta, nom), axis=1)
        df_essai['profession_speaker'] = df_essai.apply(lambda row: get_prof(dict_meta, nom), axis=1)


        description = dict_corpus['resume']
        duree = dict_corpus['duree']
        genre = dict_corpus['genre']
        date_enregistrement = dict_corpus['date']
        responsable = str(dict_corpus['responsable_corpus']) +' '+ str(dict_corpus['responsable'])
        langue_enregistrement = 'français'
        lieu_enregistrement = str(dict_corpus['pays']) +' '+ str(dict_corpus['region']) + str(dict_corpus['ville'])

        df_essai['description'] = genre + ' '+ description
        df_essai['duree'] = duree 
        df_essai['responsable'] = responsable
        df_essai['date_enregistrement'] = date_enregistrement 
        df_essai['langue_enregistrement'] = langue_enregistrement 
        df_essai['lieu_enregistrement'] = lieu_enregistrement 
        
        path_trs = str(dict_path['{}'.format(nom)]['corpus'])+'\\'+str(dict_path['{}'.format(nom)]['trs'])
        path_wav = str(dict_path['{}'.format(nom)]['corpus'])+'\\'+str(dict_path['{}'.format(nom)]['wav'])
        
        df_essai['path_trs'] = path_trs
        df_essai['path_wav'] = path_wav
        df_essai['sous_corpus']=nom

        return df_essai
    except:
        pass

In [None]:
dict_meta = df_cnrtl_metadonnees.set_index('nomDossier').T.to_dict('dict')

In [None]:
dict_path = df_cnrtl.set_index('corpus_name').T.to_dict('dict')

In [None]:
def get_age(dict_meta, nom):
    for x, y  in dict_meta['{}'.format(nom)]['locuteur']:
        if x == L : #['{}'.format(L)]#['sexe']
            age = y['age']
            return age

In [None]:
def get_sexe(dict_meta, nom):
    for x, y  in dict_meta['{}'.format(nom)]['locuteur']:
        if x == L : #['{}'.format(L)]#['sexe']
            sexe = y['sexe']
            return sexe

In [None]:
def get_prof(dict_meta, nom):
    for x, y  in dict_meta['{}'.format(nom)]['locuteur']:
        if x == L : #['{}'.format(L)]#['sexe']
            prof = y['profession_actuelle']
            return prof

In [None]:
final = [add_meta(df, dict_meta, dict_path) for df in list_df_turn]

In [None]:
df_tours = pd.concat(final)
df_tours['corpus'] = ['TCOF']*len(df_tours)
df_tours['speaker']=df_tours['speaker_name']
df_tours = df_tours.drop('speaker_characteristic', axis=1)
df_tours = df_tours.drop('audio_filename', axis=1)


In [None]:
df_tours.to_csv('E:\Corpus\TCOF_CNRTL\TCOF_TOTAL.csv', sep='\t', encoding='utf-8', index=False)