In [6]:
import os
import pandas as pd
import numpy as np
import torchaudio
import torchaudio.transforms as T
import soundfile as sf
from scipy.signal import resample_poly
import random

In [7]:
def generate_scenarios():
    scenarios = [
        ("healthy", "male", "healthy", "male"),
        ("healthy", "male", "healthy", "female"),
        ("healthy", "female", "healthy", "female"),
        ("sick", "male", "sick", "male"),
        ("sick", "male", "sick", "female"),
        ("sick", "female", "sick", "female"),
        ("healthy", "male", "sick", "male"),
        ("healthy", "male", "sick", "female"),
        ("healthy", "female", "sick", "male"),
        ("healthy", "female", "sick", "female"),
    ]
    return scenarios

In [8]:
'''
Returns a dictionnary audio_paths with all the audio paths in the directory base_dir for each scenario (health, gender)
Uses the dataframe df to find the gender of the speaker of an audio
'''

def get_audio_paths(base_dir, df):
    audio_paths = {}

    # Iterate through all the audio files in the directory
    for root, _ , files in os.walk(base_dir):
        for file in files:
            if file.endswith(".wav"):
                audio_path = os.path.join(root, file)

                # Find the row in the dataframe that matches the ID of the audio file
                id = file.split("_")[0]
                matching_row = df[df['RECODING ORIGINAL NAME'] == id]   
                
                if len(matching_row) == 1:
                    # Find if the ID corresponds to a male of a female
                    gender_letter = matching_row['SEX'].item()  
                    gender = "male" if gender_letter == "M" else "female" if gender_letter == "F" else None
                    if gender is None:
                            raise ValueError(f"Invalid gender: {gender_letter}")
                    
                    # Find if the ID corresponds to a healthy or a sick person
                    health_condition = "healthy" if "hc" in root or "HC" in root else "sick" if "pd" in root or "PD" in root else None 
                    if health_condition is None:
                        raise ValueError(f"Invalid health condition for {audio_path}")
                    
                    # Add the audio path to the dictionary
                    key = (health_condition, gender)
                    if key not in audio_paths:
                        audio_paths[key] = []
                    audio_paths[key].append(audio_path)
                
                # If there is no matching row or more than one matching row, raise an error       
                else:
                    if  matching_row.empty:
                        raise ValueError(f"No matching rows found for ID: {id}, found in {audio_path} ")
                    else:
                        raise ValueError(f"Expected 1 matching row for ID: {id}, but found more.")
    
    # Print the number of audio paths for each key
    for key, audio_list in audio_paths.items():
        print(f"Key: {key}, Number of Audio Paths: {len(audio_list)}")
    
    return audio_paths

In [9]:
'''
Creates the directory text_file_dir and the text files for all possible combinations of audio files
Creates the directory processed_audio_dir and the text files for the processed audio files
'''
def create_text_files(text_file_dir, processed_audio_dir, audio_paths, scenarios):
    # Create the text_file directory if it does not exist
    if not os.path.exists(text_file_dir):
        os.makedirs(text_file_dir)
    
    # Create the processed_audio directory if it does not exist
    if not os.path.exists(processed_audio_dir):
        os.makedirs(processed_audio_dir)

    # Create an output file for each scenario
    for scenario in scenarios:
        condition1, gender1, condition2, gender2 = scenario
        output_filename = f"{condition1}_{gender1}_vs_{condition2}_{gender2}.txt"
        output_path = os.path.join(text_file_dir, output_filename)

        # Create the sub_processed_audio directory if it does not exist
        output_sub_dir = os.path.splitext(output_filename)[0]   #removes the .txt extension
        sub_processed_audio_dir = os.path.join(processed_audio_dir, output_sub_dir)
        if not os.path.exists(sub_processed_audio_dir):
            os.makedirs(sub_processed_audio_dir)

        # Create the empty text files in processed_audio_dir for each scenario  
        output_path_processed = os.path.join(sub_processed_audio_dir, output_filename)
        with open(output_path_processed, "w") as output_file:
            pass
        
        # Create the text files in text_file_dir for each scenario and fills them
        with open(output_path, "w") as output_file:
            audio_group_1 = audio_paths[(condition1, gender1)]
            audio_group_2 = audio_paths[(condition2, gender2)]
            alpha, beta, SNR, duration = np.nan, np.nan, np.nan, np.nan
            written_pairs = set()

            # Iterate through all combinations of audio files
            for audio1 in audio_group_1:
                for audio2 in audio_group_2:
                    # Get the ID and sentence of each audio file
                    id1 = os.path.basename(audio1).split("_")[0]
                    id2 = os.path.basename(audio2).split("_")[0]
                    sentence1 = os.path.basename(audio1).split("_")[1]
                    sentence2 = os.path.basename(audio2).split("_")[1]

                    # If the audio files are not the same and the sentences are not the same
                    if id1 != id2 and sentence1 != sentence2:
                        # In this case the pair (audio1, audio2) == (audio2, audio1)
                        if (condition1, gender1) == (condition2, gender2):
                            # If the audio files are not already written to the file
                            if (audio1, audio2) not in written_pairs and (audio2, audio1) not in written_pairs:
                                output_file.write(f"{audio1} {alpha} {audio2} {SNR} {beta} {duration}\n")
                                written_pairs.add((audio1, audio2))
                        else:
                            output_file.write(f"{audio1} {alpha} {audio2} {beta} {SNR} {duration}\n")
                            
                        
        # Count the number of lines in the file
        with open(output_path, "r") as output_file:
            number_of_lines = sum(1 for line in output_file)
        print(f"Number of lines in {output_filename}: {number_of_lines}")
    

In [10]:
'''
Downsamples all the audios signals with path contained in audio_paths at the target_sample_rate
'''
def downsample_audio(audio_paths, target_sample_rate):
    # Iterate through all the audio files in the directory
    for key, paths in audio_paths.items():
        for audio_path in paths:
            audio, sr = sf.read(audio_path)
            audio = resample_poly(audio, target_sample_rate, sr)    # Downsample the audio file
            sf.write(audio_path, audio, target_sample_rate)         # Overwrite original file

In [11]:
'''
Computes the weight alpha and beta of audio1 and audio2 respectively used to generate the mix audio signal
'''
def calculate_audio_values(audio1, audio2):
    
    # Load the audio files
    audio1, sr1 = sf.read(audio1)
    audio2, sr2 = sf.read(audio2)

   # Ensure both audio signals have the same length
    duration = min(len(audio1), len(audio2))
    audio1 = audio1[:duration]
    audio2 = audio2[:duration]
    
    # Normalize the audio signals
    lev1 = np.mean(np.square(audio1))
    lev2 = np.mean(np.square(audio2))
    audio1 /= np.sqrt(lev1)
    audio2 /= np.sqrt(lev2)

    # Generate a random SNR between 0 and 5 dB and calculate weights
    SNR = random.uniform(0, 5)
    alpha_temp = 10 ** (SNR/20)
    beta_temp = 10 ** (-SNR/20)
   
    # Apply weights
    audio1 *= alpha_temp
    audio2 *= beta_temp

    # Calculate the maximum amplitude and scale the signals
    mix_audio = audio1 + audio2
    max_amp = max(np.max(np.abs(mix_audio)), np.max(np.abs(audio1)), np.max(np.abs(audio2)))
    scaling_factor = 0.9 / max_amp
    
    audio1 *= scaling_factor
    audio2 *= scaling_factor
    #mix_audio *= scaling_factor

    # Calculate the final weights values
    alpha = alpha_temp * (scaling_factor/ np.sqrt(lev1))
    beta = beta_temp * (scaling_factor/ np.sqrt(lev2))

    return alpha, beta, SNR, duration

In [12]:
'''
Computes new values of audio1, alpha, audio2, beta, SNR, duration  for each scenario 
Updates the corresponding text file found in text_file_dir with these new values
'''
def update_text_files(text_file_dir):    
    # Iterate through all the text files in the directory
    for filename in os.listdir(text_file_dir):
        if filename.endswith(".txt"):
            text_file_path = os.path.join(text_file_dir, filename)
            print(f"Updating {filename}")
            
            # Update the values of each line in the text file
            with open(text_file_path, "r") as text_file:
                lines = text_file.readlines()
                updated_lines = []
                for line in lines:
                    audio1, alpha, audio2, beta, SNR, duration = line.split()
                    new_alpha, new_beta, new_SNR, new_duration = calculate_audio_values(audio1, audio2)
                    line = f"{audio1} {new_alpha} {audio2} {new_beta} {new_SNR} {new_duration}\n"
                    updated_lines.append(line)
            # Update the text file
            with open(text_file_path, 'w') as text_file:
                text_file.writelines(updated_lines)

In [13]:
base_dir = r"PC-GITA_per_task_16000Hz" 
text_file_dir = os.path.join(base_dir, "text_files")
processed_audio_dir = os.path.join(base_dir, "processed_audio")
excel_file_path = excel_file_path = os.path.join(base_dir, "PCGITA_allmetadata.xlsx")

df = pd.read_excel(excel_file_path, sheet_name='PD+HC', usecols=['RECODING ORIGINAL NAME', 'SEX'])

target_sample_rate = 8000
scenarios = generate_scenarios()
audio_paths = get_audio_paths(base_dir, df)

#create_text_files(text_file_dir, processed_audio_dir, audio_paths, scenarios)
#downsample_audio(audio_paths, target_sample_rate)
#update_text_files(text_file_dir)


Key: ('healthy', 'female'), Number of Audio Paths: 250
Key: ('healthy', 'male'), Number of Audio Paths: 250
Key: ('sick', 'male'), Number of Audio Paths: 250
Key: ('sick', 'female'), Number of Audio Paths: 250
