# Processing subtitles

This notebook is used to get one sentence per subtitles in the .vtt files from the **Matignon_LSF** dataset.

In [1]:
import os
import re
import module_traitement as m

## Loading the folders

Here, we load the folder containing the subtitles (.vtt files).
1. file_with_path : list of `.vtt` files with their relative path
2. folder : list of  `.vtt` file with only their file's names

We create **two output** folders : 
1. One for the cleaned subtitles (`.vtt`) files 
1. One for the segmented files based on punctuation


In [42]:
# TEST -  Matignon-LSF with one file  
file_with_path = m.lister_fichiers_with_path("test_matignon/")
folder = m.lister_fichiers("test_matignon/")
output_seg = "test_new_seg_matignon1"
output_cleaning = "test_cleaning_matignon1"

### Check if the folder exist

In [43]:
for fold in [output_cleaning,output_seg]:
    message = m.verifier_ou_creer_dossier(fold)
    print(message)

Le dossier 'test_cleaning_matignon1' existe déjà.
Le dossier 'test_new_seg_matignon1' existe déjà.


# Files normalisation

First, we use regexp to clean the subtitles files. It makes it easier to have the "sentence per subtitles" files at the end. We manage all the punctuations which could cause trouble for the segmentation. The files are stored in the **output_cleaning** folder.

In [64]:
def convertir_grand_nombre(nombre_texte):
    nombre_sans_points = re.sub(r'(\d)\.(\d)', r'\1\2', nombre_texte)
    return nombre_sans_points

def remplacer_points_adresses_email(texte):
    # regexp for email
    pattern = r'\b[\w.%+-]+@[\w.-]+\.[a-zA-Z]{2,}\b'

    # replace "." with "POINT"
    def remplacer(match):
        return match.group().replace('.', 'POINT')

    texte_modifie = re.sub(pattern, remplacer, texte)
    return texte_modifie

def remplacer_points_adresses(texte):
    # regexp for website
    pattern = r'\b(?:https?://)?(?:www\.)?[\w.-]+\.[a-zA-Z]{2,}\b'

    # Fonction de remplacement pour remplacer les points par "POINT"
    def remplacer(match):
        return match.group().replace('.', 'POINT')

    texte_modifie = re.sub(pattern, remplacer, texte)
    return texte_modifie

def normaliser_points_de_suspension(texte):
    #add a space after "..."
    texte_modifie = re.sub(r'\.\.\.(\w)', r'... \1', texte)

    # Remove a space before ellipses preceded by a letter.
    texte_modifie = re.sub(r'(\w)\s*\.\.\.', r'\1...', texte_modifie)

    return texte_modifie

def remplacer_ponctuation_html(texte):
    substitutions = [
        ("(", "&#40;"),
        (")", "&#41;"),
        ("?", "&#63;"),
        ("!", "&#33;"),
        (".", "&#46;"),
        ("...", "&#8230;")
    ]

    def remplacer_match(match):
        contenu_parentheses = match.group(1)
        for recherche, remplacement in substitutions:
            contenu_parentheses = contenu_parentheses.replace(recherche, remplacement)
        return f'({contenu_parentheses})'

    texte_modifie = re.sub(r'\(([^)]*)\)', remplacer_match, texte)
    return texte_modifie

def get_dict_vtt_clean(input):
    with open(input,encoding="utf-8") as f:
        lines = f.readlines()

    dict_sub = {}
    i = 0
    j = 0  

    while j < len(lines): 
        element = lines[j]
        if element.startswith("00:") or element.startswith("01:") or element.startswith("02:"):
            # Extract start and end time
            timing_line = element.strip().split(' --> ')
            start_time, end_time = timing_line

            text = ""
            while j + 1 < len(lines) and not lines[j + 1].startswith("00:") and not lines[j+1].startswith("01:") and not lines[j+1].startswith("02:"):
                j += 1
                content = lines[j]
                text = text + " " + content.strip()
                text = normaliser_points_de_suspension(text)
                text = text.replace("(...)","[SUSPENSIONP]") #régler ce problème.
                text = text.replace("....","[POINTS4]")
                text = remplacer_ponctuation_html(text)
                ### changement
                text = text.replace("…","...")
                text = text.replace("...","[SUSPENSION].") ### gérer la double suspension !
                text = re.sub(r'["“”«»]', '', text)
                text = text.replace("(???)","[INTERROGATION3]")
                text = text.replace("?!","[INTEREXCL].")
                text = text.replace("(?)","[INTERROGATION1]")
                text = text.replace("(??)","[INTERROGATION2]")
                text = text.replace("( ?)","[INTERROGATION1]")
                text = text.replace("?,","[INTERROGATION],")
                text = text.replace("!,","[EXCLAMATION],")
                text = text.replace("!.","[EXCLAMPOINT],")
                text = text.replace("etc.,","[ETC],")
                text = text.replace("etc.)","[ETC])")
                text = text.replace("etc. .","[ETC].")
                text = text.replace("Etc","etc")
                text = text.replace("PAM !","[PAM]")
                text = text.replace("Média’Pi","Média'Pi")
                text = text.replace("Média'Pi!","[NOM_MEDIA]")
                text = text.replace("Média'Pi !","[NOM_MEDIA]")
                text = text.replace("Media'Pi !","[NOM_MEDIA]")
                text = text.replace("Média'Pi&nbsp;!","[NOM_MEDIA]")
                text = text.replace("Média' Pi !","[NOM_MEDIA]")
                
                text = text.replace(".e.s","")
                text = text.replace(".ne.s","")
                text = text.replace(".e.","")
                text = text.replace(".e","")
                text = text.replace("!!","!")
                text = text.replace("??","?")
                text = remplacer_points_adresses_email(text)
                text = remplacer_points_adresses(text)
                text = text.replace(".,","[POINT],")
                text = text.replace("y.a","y a")
                ### 
                text=text.replace("... -G. Attal : ","")
                text=text.replace("-G. Attal : ","")
                text=text.replace("G. Attal : ","")
                text = text.replace("-Bonjour", "Bonjour")
                text = text.replace("Bonjour.", "Bonjour,")
                text = text.replace(" M."," Monsieur")
                text = text.replace(" Mme"," Madame")
                text = text.replace("-"," ")
                text = convertir_grand_nombre(text)
                

            dict_sub[i] = {'start': start_time, 'end': end_time, 'text': text.strip()}
            i += 1

        j += 1

    return dict_sub



def remplacer_entites_html(chaine):
    substitutions = [
    ("(", "&#40;"),
    (")", "&#41;"),
    ("?", "&#63;"),
    ("!", "&#33;"),
    (".", "&#46;"),
    ("...", "&#8230;")
]
    for new, old in substitutions:
        chaine = chaine.replace(old, new)
    return chaine

def reverse_cleaning(input):
    with open(input,encoding="utf-8") as f:
        lines = f.readlines()

    dict_sub = {}
    i = 0
    j = 0  

    while j < len(lines): 
        element = lines[j]
        if element.startswith("00:") or element.startswith("01:") or element.startswith("02:"):
            #Extract end and start time 
            timing_line = element.strip().split(' --> ')
            start_time, end_time = timing_line

            text = ""
            while j + 1 < len(lines) and not lines[j + 1].startswith("00:") and not lines[j+1].startswith("01:") and not lines[j+1].startswith("02:"):
                j += 1
                content = lines[j]
                text = text + " " + content.strip()
                text = text.replace("[POINT],",".,")
                text = text.replace("[SUSPENSIONP]","(...)") 
                text = text.replace("[POINTS4]","....")
                text = text.replace("…","...")
                text = text.replace("[INTERROGATION3]","(???)")
                text = text.replace("[INTEREXCL].","?!")
                text = text.replace("[INTERROGATION1]","(?)")
                text = text.replace("[INTERROGATION2]","(??)")
                text = text.replace("[INTERROGATION],","?,")
                text = text.replace("[EXCLAMATION],","!,")
                text = text.replace("[EXCLAMPOINT],","!.")
                text = text.replace("[ETC],","etc.,")
                text = text.replace("[ETC])","etc.)")
                text = text.replace("[ETC].","etc.")
                text = text.replace("[PAM]","PAM !")
                text = text.replace("[NOM_MEDIA]","Média'Pi !")
                text = text.replace("[SUSPENSION].","...") 
                text = text.replace("POINT",".")
                text = remplacer_entites_html(text)
                

            dict_sub[i] = {'start': start_time, 'end': end_time, 'text': text.strip()}
            i += 1

        j += 1

    return dict_sub



In [45]:
# create new clean subtitles files in output_cleaning folder
for file,name in zip(file_with_path,folder):
    dict_sub = get_dict_vtt_clean(file)
    m.create_vtt_file(dict_sub,f"{output_cleaning}/{name}")

# Get a sentence file

From the **clean subtitles** files (**output_cleaning** folder), we generate `.txt` files. These new files contains one sentence per line. They are put in a new folder : **output_sent**

In [46]:
file_with_path = m.lister_fichiers_with_path(output_cleaning)
folder = m.lister_fichiers(output_cleaning)
output_sent = "test_sent_file_matignon1"

### check existing output_sent folder

In [47]:
message = m.verifier_ou_creer_dossier(output_sent)
print(message)

Le dossier 'test_sent_file_matignon1' existe déjà.


### Create sentences files (.txt)

In [48]:
pattern = re.compile(r' [A-Z]\.')
for file,name in zip(file_with_path,folder):
    dict_sub = get_dict_vtt_clean(file)
    new_text = ""
    for k,v in dict_sub.items():
        for kk,vv in v.items():
            if kk == "text":
                new_text = new_text + vv.strip() + " "
    phrases = m.segmenter_texte_en_phrases(new_text)
    with open(f"{output_sent}/{name}", 'w', encoding='utf-8') as f:
        for index, element in enumerate(phrases):
            f.write(element.strip())
            if index < len(phrases) - 1:
                f.write("\n")


# Get an oversegmented subtitle's file (.vtt)

From the clean data, we segment our clean subtitle's file (.vtt) following the strong punctuation. We allow the over segmentation, because we can concatenate the subunite after this processing operation. The new subtitle's files are in the **output_seg** folder. 

In [49]:
file_with_path = m.lister_fichiers_with_path(output_cleaning)
folder = m.lister_fichiers(output_cleaning)
output_seg = "test_new_seg_matignon1"

### Check existing output_seg folder

In [50]:
message = m.verifier_ou_creer_dossier(output_seg)
print(message)

Le dossier 'test_new_seg_matignon1' existe déjà.


### File's (over)segmentation

In [51]:
for file,name in zip(file_with_path,folder):
    print(f"TRAITEMENT {file} ---- {name}")
    dict_sub = get_dict_vtt_clean(file)
    new_dict = {}
    mm = 0
    pattern = r'([.!?]+)[^)]' # good solution
    sous_unite = []
    for k, v in dict_sub.items():
        for kk, vv in v.items():
            if kk == "text":
                # Replace the point between two capital letters with '#'
                modified_text = re.sub(r'(?<=[A-Z])\.(?=[A-Z])', '#', vv) #attention à les supprimer pour remettre les points à la place à la fin
                # Use re.split() to split the text based on the pattern
                sentences = re.split(pattern, modified_text)
                # Combine pairs of adjacent list elements (sentence + punctuation)
                result = [sentences[i] + sentences[i + 1] if i < len(sentences) - 1 else sentences[i] for i in range(0, len(sentences), 2)]
                # Remove empty strings from the result
                result = [sentence.strip() for sentence in result if sentence.strip()]
                if len(result) == 1:
                    if mm not in new_dict:
                        new_dict[mm]=v
                        mm = mm +1
                else:
                    if result:
                        print(f"resultat : {result}")
                        start_time_str = v["start"]
                        end_time_str = v["end"]
                        # start_time = m.conv_str_to_time(start_time_str)
                        # end_time = m.conv_str_to_time(end_time_str)
                        nb_of_carach = len(v["text"])
                        duration = m.time_to_seconds(end_time_str) - m.time_to_seconds(start_time_str)
                        duration_sec = duration
                        print(duration)
                        sec_par_letter = duration_sec / nb_of_carach
                        for match in result:
                            len_match = len(match)
                            duration_match = len_match*sec_par_letter
                            if mm not in new_dict:
                                #print(end_time)
                                end_time = m.ajouter_secondes(start_time_str,duration_match)
                                print(end_time)
                                print(f"start time ({type(start_time_str)}) : {start_time_str}, end time ({type(end_time)}) : {end_time}, text : {match}")
                                new_dict[mm]={'start':start_time_str,"end":end_time,'text':match}
                                start_time_str = end_time
                                mm = mm +1
                    else:
                        continue
    m.create_vtt_file(new_dict,f"{output_seg}/{name}")

                    
    

TRAITEMENT test_cleaning_matignon1/1AjRdJ5d_Ww.vtt ---- 1AjRdJ5d_Ww.vtt
resultat : ['Bien.', 'Bonjour,']
1.7690000000000001
00:00:08.390
start time (<class 'str'>) : 00:00:07.759, end time (<class 'str'>) : 00:00:08.390, text : Bien.
00:00:09.400
start time (<class 'str'>) : 00:00:08.390, end time (<class 'str'>) : 00:00:09.400, text : Bonjour,
resultat : ['ne veut pas dire satisfaisante.', "L'épidémie ne recule plus."]
3.835000000000008
00:01:55.247
start time (<class 'str'>) : 00:01:53.198, end time (<class 'str'>) : 00:01:55.247, text : ne veut pas dire satisfaisante.
00:01:56.966
start time (<class 'str'>) : 00:01:55.247, end time (<class 'str'>) : 00:01:56.966, text : L'épidémie ne recule plus.
resultat : ['Le travail[SUSPENSION].', 'Je veux insister sur le fait']
2.835000000000008
00:02:20.847
start time (<class 'str'>) : 00:02:19.594, end time (<class 'str'>) : 00:02:20.847, text : Le travail[SUSPENSION].
00:02:22.373
start time (<class 'str'>) : 00:02:20.847, end time (<class '

# Concatenate the subunite from the subtitles in sentences

We now have two main folders : 
1. One containing `.txt` files, with a sentence per line
2. On containing `.vtt` files, with the subtitles segmented over strong punctuation

We can now use the sentence files to concatenate the subunit. The final subtitles are saved in **output_folder** folder

In [52]:
# test Matignon-LSF one file
pre_seg_file = output_seg
sentence_file = output_sent
resultat = m.lister_fichiers_with_path(pre_seg_file)
path_sentence = m.lister_fichiers_with_path(sentence_file)
resultat_output = m.lister_fichiers(pre_seg_file)
sentences_only = m.lister_fichiers(sentence_file)
output_folder = "test_sent_seg_matignon1/"

### Check existing output_folder

In [53]:
message = m.verifier_ou_creer_dossier(output_folder)
print(message)

Le dossier 'test_sent_seg_matignon1/' existe déjà.


### Create new subtitles files - sent based

In [54]:
# Creation of the new subtitle's files.

for sub,sub_name,phr,phr_name in zip(resultat,resultat_output,path_sentence,sentences_only):
    print(f"TRAITEMENT : {sub_name} && {phr_name}")
    print(f"TRAITEMENT : {sub} && {phr}")
    dict_sub = m.get_dict_vtt(sub)
    sentences = m.get_sentences(phr)
    keys = list(dict_sub.keys())
    i = 0
    j = 0
    mm = 0
    new_dict = {}
    content = ""
    while i < len(keys) and j < len(sentences):
        #print(mm)
        key = keys[i]
        value = dict_sub[key]
        unite = value['text'].strip()
        sent = sentences[j].strip()
        #print(f"'{unite} --> '{sent}'")
        # Je regarde si l'unité est dans la phrase. Si elle est dans la phrase, j'ajoute l'information start, et tant que les unités sont dans la même phrase je concatène pour ajouter l'information du texte
        # I check if the unit is in the sentence. If it is, I add the start information, and as long as the units are in the same sentence, I concatenate to add the text information.
        if unite in sent:
            #print(f"'{unite} --> '{sent}'")
            if mm not in new_dict:
                new_dict[mm] = {"start": value['start']}
            if 'text' not in new_dict[mm]:
                new_dict[mm]["text"] = unite.strip()
            else:
                new_dict[mm]['text'] += f" {unite.strip()}" # adding space here, problem to check
            i = i +1

            if i == len(keys):
                new_dict[mm]['end'] = value['end']
        else:
            old_key = keys[i - 1]
            if mm in new_dict and 'end' not in new_dict[mm]:
                new_dict[mm]['end'] = dict_sub[old_key]['end']
                #print(new_dict)
                mm += 1
            j = j +1
    m.create_vtt_file(new_dict,f"{output_folder}/{sub_name}")

TRAITEMENT : 1AjRdJ5d_Ww.vtt && 1AjRdJ5d_Ww.vtt
TRAITEMENT : test_new_seg_matignon1/1AjRdJ5d_Ww.vtt && test_sent_file_matignon1/1AjRdJ5d_Ww.vtt


# An other segmentation if necessary
<div style="border: 1px solid red; padding: 10px; background-color: #333;">
<strong>⚠️ Warning :</strong> if there is a lot of error due to the segmentation, it is possible to do this step one again. Please check your result before doing it again. If you need to, change the following "rawcells" in python code.
</div>

# Manual Verification

We can check if the **output_folder** matches the **output_sent** folder. In other words, we check if the subtitle units match the corresponding lines in the sentence file.


In [55]:
output_folder = m.lister_fichiers_with_path("test_sent_seg_matignon1") # change folder
output_folder = sorted(output_folder)
print(output_folder)

['test_sent_seg_matignon1/1AjRdJ5d_Ww.vtt']


In [57]:
path_sentence = m.lister_fichiers_with_path(output_sent) 
path_sentence = sorted(path_sentence)
print(path_sentence)

['test_sent_file_matignon1/1AjRdJ5d_Ww.vtt']


In [58]:
def supprimer_derniere_chaine_vide(liste):
    if liste and isinstance(liste[-1], str) and not liste[-1]:
        del liste[-1]
    return liste


### Check the files

In [59]:
nb_file = 0
name_file = []
good_file = []
for sub_file, sent_file in zip(output_folder,path_sentence):
    sub_dict = m.get_dict_vtt(sub_file)
    sentence = m.get_sentences(sent_file)
    new_sent = []
    for k,v in sub_dict.items():
        for kk,vv in v.items():
            if kk == "text":
                if "(... )" in vv:
                    vv = vv.replace("(... )","(...)")
                new_sent.append(vv)
    new_sent = supprimer_derniere_chaine_vide(new_sent)
    differences = m.comparer_listes(new_sent, sentence)
    if differences:
        print(f"TRAITEMENT : {sub_file} && {sent_file}")
        for position, element_a, element_b in differences:
            nb_file = nb_file +1
            print(f"Différence à la position {position}: {element_a} vs {element_b}")
            name_file.append(sub_file)
            break
    else:
        good_file.append(sub_file)

print(f"Nombres de fichiers contenant une erreur : {nb_file}")

Nombres de fichiers contenant une erreur : 0


# Reverse cleaning

We used some special token to replace some challenging one from the subtitles. We do a reverse cleaning in order to have the original subtitle's token.

In [60]:
reverse_cleaning_file = "reverse_cleaning_test1"
message = m.verifier_ou_creer_dossier(reverse_cleaning_file)
print(message)

Le dossier 'reverse_cleaning_test1' existe déjà.


In [61]:
# substitutions = [
#     ("(", "&#40;"),
#     (")", "&#41;"),
#     ("?", "&#63;"),
#     ("!", "&#33;"),
#     (".", "&#46;"),
#     ("...", "&#8230;")
# ]

# def remplacer_entites_html(chaine, substitutions):
#     for new, old in substitutions:
#         chaine = chaine.replace(old, new)
#     return chaine

In [65]:
files_with_path = m.lister_fichiers_with_path("test_sent_seg_matignon1")
name = m.lister_fichiers("test_sent_seg_matignon1")


for file,name_file in zip(files_with_path,name):
    dict_sub = reverse_cleaning(file)
    m.create_vtt_file(dict_sub,f"{reverse_cleaning_file}/{name_file}")