<h1 style="text-align: center; font-size: 36px; color: #3498db; font-weight: bold;">Prosody Tools</h1>

## <h2 style="text-align: center; font-size: 28px; color: #2ecc71; font-weight: bold;">Prosody Diarization Tool</h2>
### <h3 style="text-align: center; font-size: 24px; color: #e74c3c; font-family: 'Arial', sans-serif; font-weight: bold;">Stage 1</h3>

In [1]:
# importing the libraries

from pyannote.audio import Pipeline
import torch
import torchvision
import torchaudio
from moviepy.video.io.VideoFileClip import VideoFileClip
from pyannote.audio.pipelines.utils.hook import ProgressHook
from pydub import AudioSegment
import pandas as pd

In [2]:
# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"Running on GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device('cpu')
    print("Running on CPU")

Running on GPU: NVIDIA GeForce RTX 4060 Laptop GPU


# Pyannote pipeline

In [3]:
# instantiate the pipeline
pipeline = Pipeline.from_pretrained(
  "pyannote/speaker-diarization-3.1",
  use_auth_token="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")

#https://huggingface.co/pyannote/speaker-diarization-3.1

# Creating the RTTM files
## Stage 0, number of speakers are unlimitted

In [None]:
# Save RTTMs

import os
from pyannote.audio.pipelines import SpeakerDiarization

# diarization pipeline
diarization_pipeline = pipeline

# Path to WAV files
input_dir = r".\Database\0.Interviews\wav\stage_1"

# Create output if it doesn't exist
output_dir = r".\Database\1.rttms\stage_1"
os.makedirs(output_dir, exist_ok=True)

# Iterateall WAV files in the input
for file_name in os.listdir(input_dir):
    if file_name.endswith(".wav"):
        # Construct the full path to the WAV file
        wav_file_path = os.path.join(input_dir, file_name)

        # Perform diarization
        #waveform, sample_rate = torchaudio.load(wav_file_path)
        #diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate})

        #diarization = pipeline(wav_file_path, num_speakers=2)
        diarization = pipeline(wav_file_path)
 
        
        # Construct output RTTM file
        rttm_file_path = os.path.join(output_dir, f"{os.path.splitext(file_name)[0]}.rttm")

        # diarization output RTTM format
        with open(rttm_file_path, "w") as rttm_file:
            diarization.write_rttm(rttm_file)
            print (file_name, ' Done...')


EXPRA_1_001.wav  Done...
EXPRA_1_002.wav  Done...
EXPRA_1_003.wav  Done...
EXPRA_3_012.wav  Done...
PBN_114.wav  Done...
PBN_166.wav  Done...


## Spliting the interview

In [5]:
import os
import pandas as pd

# directory RTTM files
rttm_dir = r".\Database\1.rttms\Stage_1"

columns = ["Type", "Interview_ID", "Channel", "Start", "Duration", "NA1", "NA2", "Speaker_ID", "NA3", "NA4"]

# combined data from all RTTM files
combined_df = pd.DataFrame(columns=columns)

# RTTM files in the directory
for file_name in os.listdir(rttm_dir):
    if file_name.endswith(".rttm"):
        # path to the RTTM file
        rttm_file_path = os.path.join(rttm_dir, file_name)

        # Read the RTTM file into a DataFrame
        rttm_df = pd.read_csv(rttm_file_path, sep=" ", header=None, names=columns)

        # Append RTTM file to the combined DataFrame
        combined_df = pd.concat([combined_df, rttm_df], ignore_index=True)

# Drop the columns 'NA1', 'NA2', 'NA3', and 'NA4' from the DataFrame
combined_df = combined_df.drop(columns=['NA1', 'NA2', 'NA3', 'NA4'])

combined_df


Unnamed: 0,Type,Interview_ID,Channel,Start,Duration,Speaker_ID
0,SPEAKER,EXPRA_1_001,1,2.073,16.065,SPEAKER_00
1,SPEAKER,EXPRA_1_001,1,2.107,0.236,SPEAKER_01
2,SPEAKER,EXPRA_1_001,1,18.982,0.017,SPEAKER_01
3,SPEAKER,EXPRA_1_001,1,18.998,0.624,SPEAKER_00
4,SPEAKER,EXPRA_1_001,1,20.568,1.029,SPEAKER_00
...,...,...,...,...,...,...
1673,SPEAKER,PBN_166,1,1149.387,0.017,SPEAKER_00
1674,SPEAKER,PBN_166,1,1152.070,2.565,SPEAKER_00
1675,SPEAKER,PBN_166,1,1157.268,0.489,SPEAKER_00
1676,SPEAKER,PBN_166,1,1159.023,0.287,SPEAKER_00


In [6]:
combined_df['Speaker_ID'].unique()

array(['SPEAKER_00', 'SPEAKER_01', 'SPEAKER_02'], dtype=object)

In [None]:
combined_df['Interview_ID'].unique()

In [7]:
# spliting

import os
import pandas as pd
from pydub import AudioSegment

# split based on start and duration
def split_audio(input_path, output_path, start, duration, filename):
    sound = AudioSegment.from_wav(input_path)
    split_sound = sound[start*1000:(start+duration)*1000]  # Pydub in milliseconds
    split_sound.export(os.path.join(output_path, filename), format="wav")

df = combined_df

input_dir = r".\Database\0.Interviews\wav\Stage_1"
output_dir = r".\Database\2.Speaker_Parts\Stage_1"

os.makedirs(output_dir, exist_ok=True)

i = 0
# each row in df
for index, row in df.iterrows():
    interview_id = row['Interview_ID']
    speaker_id = row['Speaker_ID']
    start = row['Start']
    duration = row['Duration']
    filename = f"{interview_id}_{speaker_id}_{i}.wav"
    input_path = os.path.join(input_dir, f"{interview_id}.wav")
    output_path = os.path.join(output_dir, speaker_id)
    os.makedirs(output_path, exist_ok=True)
    split_audio(input_path, output_path, start, duration, filename)
    i=i+1


# Join the Speakers

In [None]:
import os
import wave
from pydub import AudioSegment

# Function to concatenate wav files with a gap
def concatenate_wav_files_with_gap(directory, output, gap_duration=0.1):
    # Store wave file objects
    wave_files = {}

    # List files in directory
    for filename in os.listdir(directory):
        if filename.endswith(".wav"):
            # Prefix based on the first two parts of the filename
            if (filename.split('_')[0] == 'PBN'):
                prefix = '_'.join(filename.split('_')[:2])
            elif (filename.split('_')[0] == 'EXPRA'):
                prefix = '_'.join(filename.split('_')[:3])
            else:
                print('Name ERROR..............')
                continue

            # Append filename to the corresponding prefix list
            if prefix in wave_files:
                wave_files[prefix].append(filename)
            else:
                wave_files[prefix] = [filename]

    # Concatenate WAV files with the specified gap
    for prefix, files in wave_files.items():
        # Sort the files based on the last numeric part in the filename
        files.sort(key=lambda x: int(x.split('_')[-1].split('.')[0]))

        # Create an empty AudioSegment to combine files
        combined = AudioSegment.silent(duration=0)  # Start with zero duration

        # Iterate through each file and concatenate with silence gaps
        for file in files:
            # Load each WAV file
            current_segment = AudioSegment.from_wav(os.path.join(directory, file))
            
            # Add a 0.2-second silent gap between segments
            combined += current_segment + AudioSegment.silent(duration=gap_duration * 1000)

        # Export the combined segment to the output directory
        output_filename = os.path.join(output, f"{prefix}_{os.path.basename(directory)}.wav")
        combined.export(output_filename, format="wav")
        print(f"Combined file saved: {output_filename}")

# Define directories and speakers
if __name__ == "__main__":
########### you have to check the number of speakers in  'combined_df['Speaker_ID'].unique()' ####################
    speakers = [f"SPEAKER_{str(i).zfill(2)}" for i in range(5)]  # Adjust range as needed (e.g., range(10) for 10 speakers)
    stage = "Stage_1"
    
    for speaker in speakers:
        directory = rf".\Database\2.Speaker_Parts\{stage}\{speaker}"
        output = rf".\Database\3.Speaker_Combined\{stage}\{speaker}"
        # Create the output directory if it does not exist
        os.makedirs(output, exist_ok=True)

        print(f"Processing {speaker}...")
        # Run the concatenation function with the desired gap duration
        concatenate_wav_files_with_gap(directory, output, gap_duration=0.1)
