In [7]:
import os
import re
import noisereduce as nr
from pydub.utils import mediainfo
from scipy.io import wavfile
import csv
import threading
import queue
current_directory = os.getcwd()
print("Current working directory:", current_directory)

Current working directory: c:\Users\zijun0502\Codes\Python\ML_Final


In [8]:
def get_length_time_stamp(time_stamp):
    return sum(end_time - start_time for start_time, end_time in time_stamp)
        
def get_timestamp(file_path) -> list[tuple[int, int]]: 
    pattern = r'\d+_\d'
    par_intervals = []
    with open(file_path, 'r') as file:
        par_speaking = False
        interval = [] # [start, end]
        for line in file:
            if line.startswith("*PAR:"):
                par_speaking = True
            elif line.startswith("%"):
                par_speaking = False
            elif line.startswith("*INV:") or line.startswith("*OTH:"):
                if interval and interval[1] - interval[0] > 3000:
                    par_intervals.append(interval)
                interval = []
                get_timestr = '' 
            if par_speaking == True:
                get_timestr = line.split()[-1][1:-1]
                if(re.match(pattern, get_timestr) is not None):
                    if not interval:
                        interval = list(map(int, get_timestr.split('_')))
                    else:
                        temp_interval = list(map(int, get_timestr.split('_')))
                        interval[1] = temp_interval[1]
        if interval:
            par_intervals.append(interval)


    return par_intervals
def get_patient_info(file_path) -> str:
    with open(file_path, 'r') as file:
        timestr = ''
        for line in file:
            if line.startswith("@ID:"):
                temp = line.split()[1].split('|')
                for st in temp:
                    if st == "PAR":
                        return [s for s in temp if s != ""]
            
    return ''

In [9]:
from pydub import AudioSegment, effects

In [10]:
class NamedAudioSegment:
    def __init__(self, name, info, timestamp, audio_segment):
        self.name = name
        self.info = info
        self.timestamp = timestamp
        self.audio_segment = audio_segment

def load_file(file_path):
    """
    Create AudioSegment object from a path.
    """
    aud = AudioSegment.from_file(file_path, format="mp3")

    parent_path = os.path.dirname(file_path)
    name = file_path.split('\\')[-1].split('.')[0]

    cha_file_path = os.path.join(parent_path, name + '.cha')
    timestamp = get_timestamp(cha_file_path)
    info = get_patient_info(cha_file_path)
    
    return NamedAudioSegment(name, info, timestamp, aud)
def normalize_audio(audio, target_dBFS = -20):
    """
    Normalize all audio to -20 dBFS
    """
    change_in_dBFS = target_dBFS - audio.dBFS
    normalized_audio = audio + change_in_dBFS

    return normalized_audio
def average_channels(audio):
    """
    If an audio file has multiple channels,
    combine them into one channel by averaging all channels
    """
    # Split the multi-channel audio into individual channels
    channels = audio.split_to_mono()
    # Combine channels by averaging
    combined_audio = channels[0]

    for audio in channels[1::]:
        combined_audio = combined_audio.overlay(audio)

    return combined_audio

def get_labels(infos):
    """
    Get labels, with alzeheimer's as 1 and MCI as 0.
    """
    labels = []
    MCI = ["mci"]
    AD = ["alzheimer", "ad", "alzheimer's", "possiblead", "probablead"]
    MCI_LABEL = 0
    AD_LABEL = 1
    NONE_LABEL = 0
    for info in infos:
        info = info.lower().split()
        found = False
        for t in info:
            if t.lower() in MCI:
                found = True
                labels.append(MCI_LABEL)
            elif t.lower() in AD:
                found = True
                labels.append(AD_LABEL)
        if not found:
            labels.append(NONE_LABEL)
    return labels


In [11]:
current_directory = os.getcwd()
data_folder = os.path.join(current_directory, 'data')
infos = []
names = []
data_folder = os.path.join(os.getcwd(), 'data')

output_directory = os.path.join(current_directory,"Audio_Output")
os.makedirs(output_directory, exist_ok=True)

def open_raw_audio(filepath: str) -> NamedAudioSegment:
    return load_file(filepath)

def segmentize(named_audio_segment: NamedAudioSegment) -> list[AudioSegment, str]:
    # This function should take a NamedAudioSegment object and return an AudioSegment object
    print(f"Processing {named_audio_segment.name}"
    , flush=True)
        
    if get_length_time_stamp(named_audio_segment.timestamp) < 10000:
        return 
    # audio output path
    # combined_seg_path = os.path.join(combined_directory, f"{named_audio_segment.name}.wav")
    # create output folder
    # seg_output_directory = os.path.join(aud_segments_directory, named_audio_segment.name)
    # os.makedirs(seg_output_directory, exist_ok=True)
    
    infos.append(' '.join(named_audio_segment.info[1::]))
    names.append(named_audio_segment.name)
    
    
    combined_audio = AudioSegment.empty()
    for i, (start, end) in enumerate(named_audio_segment.timestamp):
        seg = named_audio_segment.audio_segment[max(0, start-20) : min(len(named_audio_segment.audio_segment), end+20)]
        if len(seg) > 2000:
            seg = average_channels(seg)
            samples = seg.get_array_of_samples()
            rate = seg.frame_rate
            reduced_aud = nr.reduce_noise(y=samples, sr=rate)
            seg = effects.normalize(seg._spawn(reduced_aud))
            seg = normalize_audio(seg)
            combined_audio += seg
    return combined_audio, named_audio_segment.name

def write_audio(audio_segment: list[AudioSegment, str]) -> None:
    file_path = os.path.join(output_directory, audio_segment[1])
    audio_segment[0].export(output_directory + f"\\{audio_segment[1]}.wav", format="wav")
    return

def worker_function(worker_id, input_queue, output_queue, stage_function):
    while True:
        item = input_queue.get()
        if item is None:
            if output_queue:
                output_queue.put(None)
            break
        result = stage_function(item)
        if output_queue and result:
            output_queue.put(result)

def main():
    num_threads = 8  # Adjust this based on your system and workload
    open_pool = queue.Queue()
    segmentize_pool = queue.Queue()
    write_pool = queue.Queue()

    # Create and start threads for each stage
    open_threads = [threading.Thread(target=worker_function, args=(i, open_pool, segmentize_pool, open_raw_audio)) for i in range(num_threads)]
    segmentize_threads = [threading.Thread(target=worker_function, args=(i, segmentize_pool, write_pool, segmentize)) for i in range(num_threads)]
    write_threads = [threading.Thread(target=worker_function, args=(i, write_pool, None, write_audio)) for i in range(num_threads)]

    # Enqueue the initial task for the first stage
    mp3_file_paths = [os.path.join(data_folder, file) for file in os.listdir(data_folder) if file.lower().endswith(".mp3") or file.lower().endswith(".wav")]
    for path in mp3_file_paths:
        open_pool.put(path)
        
    # Signal the write threads to exit
    for _ in range(num_threads):
        open_pool.put(None)
    for thread in open_threads + segmentize_threads + write_threads:
        thread.start()
    # Wait for all threads to finish
    for thread in open_threads + segmentize_threads + write_threads:
        thread.join()

    # print("Waiting for writing threads to join")
    # Wait for write threads to finish
    # for thread in write_threads:
    #     thread.join()

    print("Writing csv")
    labels = get_labels(infos)
    csv_file_name = "patient_info.csv"
    with open(os.path.join(output_directory, csv_file_name), 'w', newline='') as file:
        csv_writer = csv.writer(file)
        csv_writer.writerow(["name", "label", "infos"])

        # Write the data from the lists
        csv_writer.writerows(list(zip(names, labels, infos)))
        

In [12]:
main()

Processing Baycrest2103
Processing Baycrest12814
Processing Baycrest7352
Processing Baycrest11633
Processing Baycrest11976
Processing Baycrest12813
Processing Baycrest12257
Processing Baycrest8538
Processing Baycrest11634
Processing Baycrest8961
Writing csv
