In [1]:
import numpy as np
from scipy.io import wavfile
import textgrid
import contextlib
import wave
import collections
import webrtcvad
import os
from pydub import AudioSegment
import sys



In [2]:
# Charger le fichier TextGrid
# tg = textgrid.TextGrid.fromFile('../../TEXTGRID_WAV_gold_non_gold_TALN/KAD_09/KAD_09_Kabir-Gymnasium_MG.TextGrid')

# Charger le fichier wav
# wav_file = wave.open('../../TEXTGRID_WAV_gold_non_gold_TALN/KAD_09/KAD_09_Kabir-Gymnasium_MG.wav', 'r')

# Extraire les annotations existantes
# for tier in tg.tiers:
#     if tier.name == 'trans':
#         for interval in tier.intervals:
#             print(f"Début: {interval.minTime}, Fin: {interval.maxTime}, Texte: {interval.mark}")


In [3]:
# Fonction pour convertir le fichier wav à un taux d'échantillonnage de 16000 Hz
def convert_wav(input_path: str, output_path: str, target_sample_rate=16000):
    audio = AudioSegment.from_wav(input_path)
    audio = audio.set_frame_rate(target_sample_rate)
    audio = audio.set_channels(1)
    audio.export(output_path, format="wav")

# Chemins des fichiers
input_wav_path = '../../../../TEXTGRID_WAV_gold_non_gold_TALN/KAD_09/KAD_09_Kabir-Gymnasium_MG.wav'
converted_wav_path = '../../../../TEXTGRID_WAV_gold_non_gold_TALN/KAD_09/KAD_09_Kabir-Gymnasium_MG_16000Hz.wav'

# Conversion du fichier wav
convert_wav(input_wav_path, converted_wav_path)

In [4]:
# Initialiser VAD avec la sensibilité la plus élevée
vad = webrtcvad.Vad(3)

# Lire le fichier wav converti
def read_wave(path: str):
    with contextlib.closing(wave.open(path, 'rb')) as wf:
        num_channels = wf.getnchannels()
        assert num_channels == 1, "Le fichier doit être mono"
        sample_width = wf.getsampwidth()
        assert sample_width == 2, "La largeur d'échantillon doit être de 2 octets"
        sample_rate = wf.getframerate()
        assert sample_rate in (8000, 16000, 32000, 48000), f"Taux d'échantillonnage non supporté: {sample_rate}"
        pcm_data = wf.readframes(wf.getnframes())
        return pcm_data, sample_rate

In [5]:
class Frame(object):
    def __init__(self, bytes, timestamp, duration):
        self.bytes = bytes
        self.timestamp = timestamp
        self.duration = duration

def frame_generator(frame_duration_ms, audio, sample_rate):
    n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
    offset = 0
    timestamp = 0.0
    duration = (float(n) / sample_rate) / 2.0
    while offset + n < len(audio):
        yield Frame(audio[offset:offset + n], timestamp, duration)
        timestamp += duration
        offset += n

In [6]:
def vad_collector(sample_rate, frame_duration_ms, padding_duration_ms, vad, frames):
    num_padding_frames = int(padding_duration_ms / frame_duration_ms)
    ring_buffer = collections.deque(maxlen=num_padding_frames)
    triggered = False

    voiced_frames = []
    segments = []
    for frame in frames:
        is_speech = vad.is_speech(frame.bytes, sample_rate)

        sys.stdout.write('1' if is_speech else '0')
        if not triggered:
            ring_buffer.append((frame, is_speech))
            num_voiced = len([f for f, speech in ring_buffer if speech])
            if num_voiced > 0.6 * ring_buffer.maxlen:
                triggered = True
                sys.stdout.write('+(%s)' % (ring_buffer[0][0].timestamp,))
                for f, s in ring_buffer:
                    voiced_frames.append(f)
                ring_buffer.clear()
        else:
            voiced_frames.append(frame)
            ring_buffer.append((frame, is_speech))
            num_unvoiced = len([f for f, speech in ring_buffer if not speech])
            if num_unvoiced > 0.6 * ring_buffer.maxlen:
                sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
                triggered = False
                segments.append(voiced_frames)
                ring_buffer.clear()
                voiced_frames = []
    if triggered:
        sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
    sys.stdout.write('\n')
    if voiced_frames:
        segments.append(voiced_frames)
    return segments

In [7]:
# Utiliser VAD pour détecter les segments de parole
audio, sample_rate = read_wave(converted_wav_path)
frames = frame_generator(10, audio, sample_rate)
segments = vad_collector(sample_rate, 10, 40, vad, list(frames))

# Convertir les segments détectés en une liste de tuples (début, fin)
detected_segments = []
for segment in segments:
    start_time = segment[0].timestamp
    end_time = segment[-1].timestamp + segment[-1].duration
    detected_segments.append((start_time, end_time))

# Fusionner les segments chevauchants ou adjacents
merged_segments = []
for start, end in detected_segments:
    if not merged_segments:
        merged_segments.append([start, end])
    else:
        prev_start, prev_end = merged_segments[-1]
        if start <= prev_end:  # Fusionner les segments chevauchants ou adjacents
            merged_segments[-1][1] = max(prev_end, end)
        else:
            merged_segments.append([start, end])

print("Segments de parole détectés:", len(merged_segments))

00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000111+(1.360000000000001)111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111000-(3.1999999999999758)00000111+(3.239999999999975)1111000-(3.3499999999999726)00000000000000000000000000000000000000000000000000111+(3.839999999999962)11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111000-(4.949999999999939)111+(4.949999999999939)1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111000-(6.459999999999907)0000000000000000000000000000000000000000000000000000000000000000000000000000111+(7.209999999999891)111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111

In [8]:
# Affiner les segments basés sur l'amplitude
def refine_segments(audio, segments, sample_rate, threshold_ratio=0.4):
    refined_segments = []
    audio_data = np.frombuffer(audio, dtype=np.int16)
    global_threshold = np.mean(np.abs(audio_data)) * threshold_ratio  # Calcul d'un seuil global basé sur l'amplitude moyenne

    for start, end in segments:
        segment_audio = audio_data[int(start * sample_rate):int(end * sample_rate)]
        local_threshold = np.mean(np.abs(segment_audio)) * threshold_ratio  # Seuil local basé sur le segment
        non_silent = np.where(np.abs(segment_audio) > min(global_threshold, local_threshold))[0]

        if len(non_silent) > 0:
            refined_start = start + non_silent[0] / sample_rate
            refined_end = start + non_silent[-1] / sample_rate
            refined_segments.append((refined_start, refined_end))

    return refined_segments

refined_segments = refine_segments(audio, merged_segments, sample_rate)

print("Segments de parole affinés:", len(refined_segments))

Segments de parole affinés: 297


In [9]:
# Ajouter les segments de silence basés sur les niveaux sonores
silence_threshold = -40  # Ajuster le seuil en dB pour détecter les silences
min_silence_duration = 0.1  # Durée minimale des silences en secondes

silence_segments = []
for i in range(len(refined_segments) - 1):
    end_current = refined_segments[i][1]
    start_next = refined_segments[i + 1][0]
    if start_next - end_current >= min_silence_duration:
        segment = AudioSegment.from_wav(converted_wav_path)[int(end_current * 1000):int(start_next * 1000)]
        if segment.dBFS < silence_threshold:
            silence_segments.append((end_current, start_next))

# Ajouter un segment de silence au début si le premier segment ne commence pas à 0
if refined_segments and refined_segments[0][0] > 0:
    silence_segments.insert(0, (0, refined_segments[0][0]))

# Ajouter un segment de silence à la fin si le dernier segment ne se termine pas à la fin du fichier
if refined_segments and refined_segments[-1][1] < wave.open(converted_wav_path).getnframes() / sample_rate:
    silence_segments.append((refined_segments[-1][1], wave.open(converted_wav_path).getnframes() / sample_rate))

print("Segments de silence:", len(silence_segments))

Segments de silence: 172


In [10]:
# Créer un nouveau TextGrid avec les segments détectés
new_tg = textgrid.TextGrid()
interval_tier = textgrid.IntervalTier(name="IPUs", minTime=0, maxTime=wave.open(converted_wav_path).getnframes() / sample_rate)
new_tg.append(interval_tier)

# Ajouter les segments de parole détectés au TextGrid
i = 0
for start, end in refined_segments:
    if start < end:  # Vérifier que la durée est positive
        interval_tier.add(start, end, f"IPU_{i}")
        i += 1

# Ajouter les segments de silence détectés au TextGrid
for start, end in silence_segments:
    if start < end:  # Vérifier que la durée est positive
        interval_tier.add(start, end, "#")

# Sauvegarder le nouveau TextGrid
new_textgrid_path = "../../../../TEXTGRID_WAV_gold_non_gold_TALN/KAD_09/KAD_09_Kabir-Gymnasium_MG_detected.TextGrid"
new_tg.write(new_textgrid_path)
print(f"Nouveau fichier TextGrid sauvegardé: {new_textgrid_path}")


Nouveau fichier TextGrid sauvegardé: ../../TEXTGRID_WAV_gold_non_gold_TALN/KAD_09/KAD_09_Kabir-Gymnasium_MG_detected.TextGrid


In [173]:
# Fonction pour convertir le fichier wav à un taux d'échantillonnage de 16000 Hz
def convert_wav(input_path: str, output_path: str, target_sample_rate=16000):
    audio = AudioSegment.from_wav(input_path)
    audio = audio.set_frame_rate(target_sample_rate)
    audio = audio.set_channels(1)
    audio.export(output_path, format="wav")

# Lire le fichier wav converti
def read_wave(path: str):
    with contextlib.closing(wave.open(path, 'rb')) as wf:
        num_channels = wf.getnchannels()
        assert num_channels == 1, "Le fichier doit être mono"
        sample_width = wf.getsampwidth()
        assert sample_width == 2, "La largeur d'échantillon doit être de 2 octets"
        sample_rate = wf.getframerate()
        assert sample_rate in (8000, 16000, 32000, 48000), f"Taux d'échantillonnage non supporté: {sample_rate}"
        pcm_data = wf.readframes(wf.getnframes())
        return pcm_data, sample_rate

class Frame(object):
    def __init__(self, bytes, timestamp, duration):
        self.bytes = bytes
        self.timestamp = timestamp
        self.duration = duration

def frame_generator(frame_duration_ms, audio, sample_rate):
    n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
    offset = 0
    timestamp = 0.0
    duration = (float(n) / sample_rate) / 2.0
    while offset + n < len(audio):
        yield Frame(audio[offset:offset + n], timestamp, duration)
        timestamp += duration
        offset += n

def vad_collector(sample_rate, frame_duration_ms, padding_duration_ms, vad, frames):
    num_padding_frames = int(padding_duration_ms / frame_duration_ms)
    ring_buffer = collections.deque(maxlen=num_padding_frames)
    triggered = False

    voiced_frames = []
    segments = []
    for frame in frames:
        is_speech = vad.is_speech(frame.bytes, sample_rate)

        sys.stdout.write('1' if is_speech else '0')
        if not triggered:
            ring_buffer.append((frame, is_speech))
            num_voiced = len([f for f, speech in ring_buffer if speech])
            if num_voiced > 0.6 * ring_buffer.maxlen:
                triggered = True
                sys.stdout.write('+(%s)' % (ring_buffer[0][0].timestamp,))
                for f, s in ring_buffer:
                    voiced_frames.append(f)
                ring_buffer.clear()
        else:
            voiced_frames.append(frame)
            ring_buffer.append((frame, is_speech))
            num_unvoiced = len([f for f, speech in ring_buffer if not speech])
            if num_unvoiced > 0.6 * ring_buffer.maxlen:
                sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
                triggered = False
                segments.append(voiced_frames)
                ring_buffer.clear()
                voiced_frames = []
    if triggered:
        sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
    sys.stdout.write('\n')
    if voiced_frames:
        segments.append(voiced_frames)
    return segments

# Affiner les segments basés sur l'amplitude
def refine_segments(audio, segments, sample_rate, threshold_ratio=0.4):
    refined_segments = []
    audio_data = np.frombuffer(audio, dtype=np.int16)
    global_threshold = np.mean(np.abs(audio_data)) * threshold_ratio  # Calcul d'un seuil global basé sur l'amplitude moyenne

    for start, end in segments:
        segment_audio = audio_data[int(start * sample_rate):int(end * sample_rate)]
        local_threshold = np.mean(np.abs(segment_audio)) * threshold_ratio  # Seuil local basé sur le segment
        non_silent = np.where(np.abs(segment_audio) > min(global_threshold, local_threshold))[0]

        if len(non_silent) > 0:
            refined_start = start + non_silent[0] / sample_rate
            refined_end = start + non_silent[-1] / sample_rate
            refined_segments.append((refined_start, refined_end))

    return refined_segments

In [174]:
def process_file(textgrid_path, wav_path):
    # Charger le fichier TextGrid
    tg = textgrid.TextGrid.fromFile(textgrid_path)


    # Convertir le fichier wav
    converted_wav_path = wav_path.replace('_MG.wav', '_MG_16000Hz.wav')
    convert_wav(wav_path, converted_wav_path)

    # Utiliser VAD pour détecter les segments de parole
    audio, sample_rate = read_wave(converted_wav_path)
    frames = frame_generator(10, audio, sample_rate)
    segments = vad_collector(sample_rate, 10, 40, vad, list(frames))

    # Convertir les segments détectés en une liste de tuples (début, fin)
    detected_segments = []
    for segment in segments:
        start_time = segment[0].timestamp
        end_time = segment[-1].timestamp + segment[-1].duration
        detected_segments.append((start_time, end_time))

    # Fusionner les segments chevauchants ou adjacents
    merged_segments = []
    for start, end in detected_segments:
        if not merged_segments:
            merged_segments.append([start, end])
        else:
            prev_start, prev_end = merged_segments[-1]
            if start <= prev_end:  # Fusionner les segments chevauchants ou adjacents
                merged_segments[-1][1] = max(prev_end, end)
            else:
                merged_segments.append([start, end])

    print("Segments de parole détectés:", len(merged_segments))

    refined_segments = refine_segments(audio, merged_segments, sample_rate)

    print("Segments de parole affinés:", len(refined_segments)) 

    # Ajouter les segments de silence basés sur les niveaux sonores
    silence_threshold = -40  # Ajuster le seuil en dB pour détecter les silences
    min_silence_duration = 0.1  # Durée minimale des silences en secondes

    silence_segments = []
    for i in range(len(refined_segments) - 1):
        end_current = refined_segments[i][1]
        start_next = refined_segments[i + 1][0]
        if start_next - end_current >= min_silence_duration:
            segment = AudioSegment.from_wav(converted_wav_path)[int(end_current * 1000):int(start_next * 1000)]
            if segment.dBFS < silence_threshold:
                silence_segments.append((end_current, start_next))

    # Ajouter un segment de silence au début si le premier segment ne commence pas à 0
    if refined_segments and refined_segments[0][0] > 0:
        silence_segments.insert(0, (0, refined_segments[0][0]))

    # Ajouter un segment de silence à la fin si le dernier segment ne se termine pas à la fin du fichier
    if refined_segments and refined_segments[-1][1] < wave.open(converted_wav_path).getnframes() / sample_rate:
        silence_segments.append((refined_segments[-1][1], wave.open(converted_wav_path).getnframes() / sample_rate))

    print("Segments de silence:", len(silence_segments))

    # Créer un nouveau TextGrid avec les segments détectés
    new_tg = textgrid.TextGrid()
    interval_tier = textgrid.IntervalTier(name="IPUs", minTime=0, maxTime=wave.open(converted_wav_path).getnframes() / sample_rate)
    new_tg.append(interval_tier)

    # Ajouter les segments de parole détectés au TextGrid
    i = 0
    for start, end in refined_segments:
        if start < end:  # Vérifier que la durée est positive
            interval_tier.add(start, end, f"IPU_{i}")
            i += 1

    # Ajouter les segments de silence détectés au TextGrid
    for start, end in silence_segments:
        if start < end:  # Vérifier que la durée est positive
            interval_tier.add(start, end, "#")

    # Sauvegarder le nouveau TextGrid
    new_textgrid_path = textgrid_path.replace('.TextGrid', '_detected.TextGrid')
    new_tg.write(new_textgrid_path)
    print(f"Nouveau fichier TextGrid sauvegardé: {new_textgrid_path}")

In [175]:
# Initialiser VAD avec la sensibilité la plus élevée
vad = webrtcvad.Vad(3)

# Parcourir les sous-dossiers dans TEXTGRID_WAV et traiter les fichiers
base_path = '../../../../TEXTGRID_WAV_gold_non_gold_TALN'
for root, dirs, files in os.walk(base_path):
    for file in files:
        if file.endswith('_MG.TextGrid'):
            textgrid_path = os.path.join(root, file)
            wav_path = textgrid_path.replace('_MG.TextGrid', '_MG.wav')
            if os.path.exists(wav_path):
                print(f"Traitement du fichier: {textgrid_path} et {wav_path}")
                process_file(textgrid_path, wav_path)

Traitement du fichier: ../../TEXTGRID_WAV_gold_non_gold_TALN/IBA_32/IBA_32_Tori-By-Samuel_MG.TextGrid et ../../TEXTGRID_WAV_gold_non_gold_TALN/IBA_32/IBA_32_Tori-By-Samuel_MG.wav
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000111+(2.289999999999995)1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111000-(6.359999999999909)0000000000111+(6.449999999999907)1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111