In [None]:
import re
from typing import List, Tuple

from utils.names import get_file_names

files = get_file_names()

## Preprocessing of ground-truth files

### 1) Check the validity of the srt
Rules:
-	The identifiers of each block must be consecutive
-	The ending time of a block must be after the starting time of that block
-	The starting time of the block must be after the ending time of the preceding block

In [7]:
def check_srt_correctness(srt_text: str) -> bool:
    """
    Controlla la correttezza del file .srt. Stampa messaggi di errore per gli errori segnalati.
    
    Args:
        srt_text: il testo del file .srt

    Returns:
        bool: True se il file .srt è corretto, False altrimenti. 
    """
    import re
    blocks = re.split(r'\n\s*\n', srt_text.strip())
    prev_end = None
    for idx, block in enumerate(blocks):
        lines = block.strip().splitlines()
        if len(lines) < 2:
            print(f"Errore: blocco {idx+1} incompleto.")
            return False
        # Check identifier
        try:
            block_id = int(lines[0].strip())
        except ValueError:
            print(f"Errore: identificatore non numerico nel blocco {idx+1}.")
            return False
        if block_id != idx + 1:
            print(f"Errore: identificatore non consecutivo nel blocco {idx+1} (trovato {block_id}, atteso {idx+1}).")
            return False
        # Check time format
        time_line = lines[1].strip()
        match = re.match(r'(\d{2}:\d{2}:\d{2},\d{3})\s+-->\s+(\d{2}:\d{2}:\d{2},\d{3})', time_line)
        if not match:
            print(f"Errore: formato tempo non valido nel blocco {idx+1}.")
            return False
        start, end = match.groups()
        def to_ms(t):
            h,m,s_ms = t.split(':')
            s,ms = s_ms.split(',')
            return int(h)*3600000 + int(m)*60000 + int(s)*1000 + int(ms)
        start_ms = to_ms(start)
        end_ms = to_ms(end)
        if end_ms <= start_ms:
            print(f"Errore: tempo di fine non successivo a quello di inizio nel blocco {idx+1}.")
            return False
        if prev_end is not None and start_ms <= prev_end:
            print(f"Errore: inizio del blocco {idx+1} non successivo alla fine del blocco precedente.")
            return False
        prev_end = end_ms
    return True

In [8]:
def parse_srt_blocks(srt_content: str) -> List[Tuple[str, str, str]]:
    pattern = re.compile(
        r"\s*(\d+)\s*\n"
        r"(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})\s*\n"
        r"(.*?)(?=\n{2,}|\Z)", re.DOTALL
    )
    matches = pattern.findall(srt_content.strip())
    parsed_blocks = []
    for i, (_, start, end, text) in enumerate(matches):
        parsed_blocks.append((start, end, text.strip()))
    return parsed_blocks

def rebuild_srt(blocks: List[Tuple[str,str,str]]) -> str:    
    lines = []
    for i, (start, end, text) in enumerate(blocks, start=1):
        # Garantisce che il testo non finisca con una newline multipla
        text = text.rstrip()
        lines.append(f"{i}\n{start} --> {end}\n{text}")
    return "\n\n".join(lines).strip() + "\n"

In [None]:
non_correct_files = []

for file in files:
    with open(f'../data/srt/ground-truth-original/{file}.srt', 'r', encoding='utf-8') as f:
        srt_text = f.read()

        if check_srt_correctness(srt_text):            
            print(f"Il file .srt {file} è corretto.")
        else:
            non_correct_files.append(file)
            print(f"Il file .srt {file} presenta degli errori.")

In [11]:
for filename in non_correct_files:    
    file_path = f"../data/srt/ground-truth-original/{filename}.srt"    
    with open(file_path, 'r', encoding='utf-8') as f:
        srt_content = f.read()

    parsed_blocks = parse_srt_blocks(srt_content=srt_content)
    rebuilt_srt = rebuild_srt(parsed_blocks)
    
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(rebuilt_srt)

### 2) Normalize the text
Rules:
- Delete tags
- Delete what is between square brackets []
- Delete last phrase if it is 'Sottotitoli RAI Pubblica Utilità'

In [12]:
def preprocess_ground_truth(srt_text: str) -> str:
    """
    Preprocessing function della ground truth: rimuove tag di stile, contenuto tra parentesi quadre e l'ultima frase se è 'Sottotitoli RAI Pubblica Utilità'.
    Restituisce una stringa pronta per essere salvata come file .srt.
    """
    import re
    blocks = re.split(r'\n\s*\n', srt_text.strip())
    cleaned_blocks = []
    id = 1
    for block in blocks:
        lines = block.strip().splitlines()
        if len(lines) < 3:
            continue
        # Mantieni identificatore e timecode
        block_id = id
        timecode = lines[1]
        text_lines = lines[2:]
        cleaned_text_lines = []
        for line in text_lines:
            # Rimuovi tag <font color="#xxxxxx">...</font>
            line = re.sub(r'<font color="#?[A-Fa-f0-9]+">(.*?)</font>', r'\1', line)
            # Rimuovi altri tag html
            line = re.sub(r'<[^>]+>', '', line)
            # Rimuovi contenuto tra parentesi quadre
            line = re.sub(r'\[[^\]]*\]', '', line)
            if line.strip().startswith('[') or line.strip().endswith(']'):
                line = ''
            # Rimuovi spazi multipli
            line = re.sub(r'\s+', ' ', line).strip()
            cleaned_text_lines.append(line)
        # Unisci le linee di testo pulite, rimuovi eventuali vuote
        cleaned_text_lines = [l for l in cleaned_text_lines if l]
        if not cleaned_text_lines:            
            continue
        # Salta blocchi con testo "Sottotitoli RAI Pubblica Utilità" o simili
        joined_text = ' '.join(cleaned_text_lines).lower()
        if joined_text.startswith('sottotitoli rai pubblica utilità') or joined_text.startswith('sottotitoli a cura'):
            continue
        # Ricostruisci il blocco srt
        cleaned_block = '\n'.join([str(block_id), timecode] + cleaned_text_lines)
        cleaned_blocks.append(cleaned_block)
        id += 1
    processed_srt_text = '\n\n'.join(cleaned_blocks)
    return processed_srt_text

In [13]:
def extract_text_from_srt(srt_text):
    lines = []
    # Assicurati che srt_text sia una stringa, non una lista di caratteri
    for line in srt_text.splitlines():
        line = line.strip()
        # Salta numeri e timestamp
        if line and not line.isdigit() and '-->' not in line:
            lines.append(line)
    return ' '.join(lines)

In [6]:
def extract_text_from_srt_with_shift(srt_text):
    lines = []
    # Assicurati che srt_text sia una stringa, non una lista di caratteri
    for line in srt_text.splitlines():
        line = line.strip()
        # Salta numeri e timestamp
        if line and not line.isdigit() and '-->' not in line:
            lines.append(line)
    return '\n'.join(lines)

In [16]:
for file in files:
    with open(f'../data/srt/ground-truth-original/{file}.srt', 'r', encoding='utf-8') as f:
        srt_text = f.read()
    new_srt = preprocess_ground_truth(srt_text=srt_text)
    with open(f'../data/srt/ground-truth-cleaned/{file}.srt', 'w', encoding='utf-8') as f:
        f.write(new_srt)

### 3) Save also just the text

In [7]:
for file in files:    
    with open(f'../data/srt/ground-truth-cleaned/{file}.srt', 'r', encoding='utf-8') as f:
        srt_text = f.read()    
    # text = extract_text_from_srt(srt_text=srt_text)
    text_shift = extract_text_from_srt_with_shift(srt_text=srt_text)
    # with open(f'../data/text/{file}.txt', 'w', encoding='utf-8') as f:
    #     f.write(text)
    with open(f'../data/text_shift/{file}.txt', 'w', encoding='utf-8') as f:
        f.write(text_shift)

## Exploration of ground truth files

Program duration

In [1]:
import json

with open("../raw_results/program_duration.json", "r", encoding="utf-8") as f:
    duration_dict = json.load(f)

tot_durata = sum(duration_dict.values())


In [2]:
print(f"Durata totale audio in input: {tot_durata} secondi = {tot_durata/60} minuti = {tot_durata/60/60} ore")

Durata totale audio in input: 180291.34669527932 secondi = 3004.8557782546554 minuti = 50.08092963757759 ore


### Segments

In [None]:
from standardization.standardization_utils import load_all_subtitles

# Directory dove si trovano i file .srt
srt_directory = ["../data/srt/ground-truth-cleaned"]


all_subtitles = load_all_subtitles(srt_directory)


# Verifica esempio:
print(f"Caricati {len(all_subtitles)} file SRT")
print(f"Esempio: primo file = {all_subtitles[0][1]}, numero segmenti = {len(all_subtitles[0][2])}")

Caricati 30 file SRT
Esempio: primo file = MEZZORAINPIU_10_10_21.srt, numero segmenti = 540


In [3]:
tot_length = 0
tot_segments = 0

for _, filename , subtitle_list in all_subtitles:
    duration = subtitle_list[-1].end_time - subtitle_list[0].start_time    
    duration_in_hours = duration/1000/60/60
    tot_length += duration_in_hours
    tot_segments += len(subtitle_list)

print(f"Numero totale di segmenti: {tot_segments}")

Numero totale di segmenti: 42177
