The purpose of this notebook is to setup a pipeline that can use parallelization and run all 4 codebases on multiple audio files. Results will be archived in a specific directory structure as discussed.

This code runs full 5 min audio files without any chunking.

In [5]:
import multiprocessing
from multiprocessing import Pool
import pandas as pd
import os
import dask
import socket
from dask_jobqueue import SLURMCluster
from  distributed import Client
import subprocess
from dask import delayed, compute
import glob
import json
import numpy as np
import time
from sklearn.metrics import mean_squared_error  
import math  
import warnings
import pypandoc
import jiwer
warnings.simplefilter(action='ignore', category=FutureWarning)
!module load openmind/ffmpeg/20160310 

codebasenameoptions = ['yinruiqing', 'speechbox', 'ashraf', 'whisperx']  
codebase_mapping_dict = {1: 'yinruiqing', 2: 'speechbox', 3: 'ashraf',4: 'whisperx'}

In [2]:
 
#folder based-run of this pipeline 
#use glob itself

box_set_9 = [ '/om/user/arjunp/rawAudioFiles/34_KS_Recording4-LR_2165-2465_CT.wav',
'/om/user/arjunp/rawAudioFiles/34_KS_Recording4-LR_2465-2765_CT.wav',
'/om/user/arjunp/rawAudioFiles/34_KS_Recording4-LR_9365-9665_CT.wav',
'/om/user/arjunp/rawAudioFiles/40_JC_Recording1-LR_2662-2962_CT.wav',
'/om/user/arjunp/rawAudioFiles/40_JC_Recording1-LR_2962-3262_CT.wav',
'/om/user/arjunp/rawAudioFiles/40_JC_Recording2-LR_22826-23126_CT.wav',
'/om/user/arjunp/rawAudioFiles/85_NA_Recording1-LR_2244-2544_CT.wav',
'/om/user/arjunp/rawAudioFiles/85_NA_Recording1-LR_3144-3444_CT.wav',
'/om/user/arjunp/rawAudioFiles/85_NA_Recording1-LR_9444-9744_CT.wav'
]

childes_random_10 = ['/om/user/arjunp/rawAudioFiles/Ethan_021005_first5.wav', '/om/user/arjunp/rawAudioFiles/Lily_030725_first5.wav', '/om/user/arjunp/rawAudioFiles/William_030011_first5.wav', '/om/user/arjunp/rawAudioFiles/Naima_020720_first5.wav', '/om/user/arjunp/rawAudioFiles/Naima_010801_first5.wav', '/om/user/arjunp/rawAudioFiles/William_011115_first5.wav', '/om/user/arjunp/rawAudioFiles/Violet_030119_first5.wav',   '/om/user/arjunp/rawAudioFiles/Ethan_020208_first5.wav', '/om/user/arjunp/rawAudioFiles/Alex_010526_first5.wav', '/om/user/arjunp/rawAudioFiles/Lily_040002_first5.wav']
AUDIO_FILE_NAMES = childes_random_10 + box_set_9

In [5]:
glob.glob('/om/user/arjunp/rawAudioFiles/*.wav')

['/om/user/arjunp/rawAudioFiles/Ethan_021005_first5.wav',
 '/om/user/arjunp/rawAudioFiles/first_min_030327.wav',
 '/om/user/arjunp/rawAudioFiles/85_NA_Recording1-LR_3144-3444_CT.wav',
 '/om/user/arjunp/rawAudioFiles/Lily_030725_first5.wav',
 '/om/user/arjunp/rawAudioFiles/34_KS_Recording4-LR_2465-2765_CT.wav',
 '/om/user/arjunp/rawAudioFiles/William_030011_first5.wav',
 '/om/user/arjunp/rawAudioFiles/40_JC_Recording2-LR_22826-23126_CT.wav',
 '/om/user/arjunp/rawAudioFiles/Naima_020720_first5.wav',
 '/om/user/arjunp/rawAudioFiles/85_NA_Recording1-LR_9444-9744_CT.wav',
 '/om/user/arjunp/rawAudioFiles/Naima_010801_first5.wav',
 '/om/user/arjunp/rawAudioFiles/William_011115_first5.wav',
 '/om/user/arjunp/rawAudioFiles/Violet_030119_first5.wav',
 '/om/user/arjunp/rawAudioFiles/34_KS_Recording4-LR_9365-9665_CT.wav',
 '/om/user/arjunp/rawAudioFiles/afjiv.wav',
 '/om/user/arjunp/rawAudioFiles/Alex_030119_first5.wav',
 '/om/user/arjunp/rawAudioFiles/030327.wav',
 '/om/user/arjunp/rawAudioFiles/

In [3]:
def codebaseCommand(codebase_id, audiopath, specific_csv_path,notes, params):
    #print(type(params))
    # TO DO: parse kwargs and pass them to the command and then update the cb scripts; and then change to argparse inside
    extra_args = []
    for key, value in params.items():
        extra_args.append('-'+key)
        extra_args.append(value)


    if codebase_id == 1:
        command = ['python', 'yinruiqing_trial.py', audiopath,specific_csv_path] + extra_args
        #print(command)
        
    elif codebase_id == 2:
        command = ['python', 'speechbox_trial.py', audiopath,specific_csv_path]  
         
    elif codebase_id == 3:
        command = ['python', '/om/user/arjunp/ashraf_repo/whisper-diarization/diarize.py','-a',audiopath, '--whisper-model', 'small.en'] 
        srtpath = audiopath[:-4] +".srt" 
         
        command2 = ['python', '/om/user/arjunp/process_ashraf_output.py', srtpath,specific_csv_path]  
    else:
        command = ['python', 'whisperx_demo.py',audiopath,specific_csv_path]

    result = subprocess.run(command, capture_output=True, text=True)

    if codebase_id == 3:
        result2 = subprocess.run(command2, capture_output=True, text=True)
        return result, result2
    else: 
        return result    

In [8]:
def runCodebase(codebase_id, audiofilepaths, unixtime ,  notes, params):

    result_list = []

    codebasenameoptions = ['yinruiqing', 'speechbox', 'ashraf', 'whisperx']
    codebasename = codebasenameoptions[codebase_id-1]

    if codebase_id==1:
        cluster = SLURMCluster(cores=8,
                       processes=1,
                       memory="24GB",
                       account="cpl",
                       walltime="01:00:00",
                       queue="normal",
                       job_script_prologue  =[
                        'source /etc/profile.d/modules.sh' ,
                        'module load openmind8/anaconda/3-2023.09-0',
                        'module load openmind/gcc/11.1.0',
                        'module load openmind/ffmpeg/20160310' , 
                        'source ~/.bashrc',
                        'export MKL_THREADING_LAYER=GNU',
                        'conda activate torch_gpu'
                                              ],
                       job_extra_directives=['--gres=gpu:QUADRORTX6000:1'] 
                       )
        cluster.scale(1)
        #cluster.adapt()
        client = Client(cluster)

    elif codebase_id ==2:
       
        cluster = SLURMCluster(cores=8,
                       processes=2,
                       memory="32GB",
                       account="cpl",
                       walltime="02:00:00",
                       queue="cpl",
                       job_script_prologue  =[
                        'source /etc/profile.d/modules.sh' ,
                        'module load openmind8/anaconda/3-2023.09-0',
                        'module load openmind/gcc/11.1.0',
                        'module load openmind/ffmpeg/20160310' , 
                        'source ~/.bashrc',
                        'export MKL_THREADING_LAYER=GNU',
                        'conda activate torch_gpu'
                                              ],
                       job_extra_directives=['--gres=gpu:QUADRORTX6000:1'] 
                       )
        cluster.scale(1)
        #cluster.adapt()
        client = Client(cluster)

    elif codebase_id==3:
        cluster = SLURMCluster(cores=8, 
                       processes=2,
                       memory="16GB",  
                       account="cpl",
                       walltime="01:00:00",
                       queue="cpl",
                       job_script_prologue  =[
                        'source /etc/profile.d/modules.sh' ,
                        'module load openmind8/anaconda/3-2023.09-0',
                        'module load openmind/gcc/11.1.0',
                        'module load openmind8/ffmpeg/2023-05' , 'source ~/.bashrc',
                        'conda activate codebase3' 
                        #'cd /om/user/arjunp/ashraf_repo/whisper-diarization'
                                              ],
                       job_extra_directives=['--gres=gpu:QUADRORTX6000:1'] 
                       )
        cluster.scale(1)
        #cluster.adapt()
        client = Client(cluster)

    else: 
        cluster = SLURMCluster(cores=8,
                       processes=2,
                       memory="16GB",
                       account="cpl",
                       walltime="01:00:00",
                       queue="normal",
                       job_script_prologue  =[
                        'source /etc/profile.d/modules.sh' ,
                        'module load openmind8/anaconda/3-2023.09-0',
                        'module load openmind8/cuda/12.1',                 
                        'module load openmind8/cudnn/8.8.1-cuda12'  ,
                        'module load openmind/gcc/11.1.0',
                        'module load openmind/ffmpeg/20160310' , 
                        'source ~/.bashrc',
                        'conda activate whisperx'
                                              ],
                       job_extra_directives=['--gres=gpu:QUADRORTX6000:1'] 
                       )
        cluster.scale(1)
        #cluster.adapt()
        client = Client(cluster)

    #print("set up cluster for cb", codebase_id )

    for audiopath in audiofilepaths:

        # -------- CREATING FILE STRUCTURE ---------------
        filename = os.path.basename(audiopath)[:-4]
        #unixtime = str(round(time.time()))
        unix_level = os.path.join('/om/user/arjunp/pipelineOutput',unixtime)
         
        if not os.path.exists(unix_level):
            os.makedirs(unix_level)
        
        cb_level = os.path.join(unix_level,codebasename)
        
        if not os.path.exists(cb_level):
            os.makedirs(cb_level)
            #print("created folder for", codebasename)

        csv_to_save = filename+".csv"
        specific_path = os.path.join(cb_level, csv_to_save)


        future  = client.submit(codebaseCommand,codebase_id,audiopath,specific_path,notes,params)  
        result  = future.result()
        result_list.append(result)
        print("Ran codebase", codebase_id, "on", filename)

         # -------- CREATING METADATA JSON ---------------
        dictionary = {
                "csvname": csv_to_save,
                "codebase":  codebasename,
                "codebase_id": codebase_id,
                "audiofile": filename,
                "full_csv_path" : specific_path,
                "unix_time":unixtime,
                "parameters" : params,
                "notes": notes}
 

        jsonpath = os.path.join(cb_level,  filename+".json")

        if os.path.exists(jsonpath):
            #read existing file and append new data
            with open(jsonpath,"r") as f:
                loaded = json.load(f)
            loaded.append(dictionary)
        else:
            #create new json
            loaded = [dictionary]

        #overwrite/create file
        with open(jsonpath,"w") as f:
            json.dump(loaded,f,indent=4)
    print("Closing cluster normally.")
    #client.close() 
    #cluster.close()
    
    return result_list
        

The block below actually runs all the codebases on all the audiofiles through dask.

In [9]:
%%time

#beam_sizes = [2,4,8,16,32]  
#patience_factors = [0.5,1,2,4]
beam_sizes = [1,5] 
#temperatures = [0,0.25,0.5,0.75]

audiofilepaths = [os.path.join('/om/user/arjunp/rawAudioFiles', os.path.basename(name_of_audio)) for name_of_audio in AUDIO_FILE_NAMES]
#timestamps_from_this_run = []
unixtimestamp = str(round(time.time()))
#timestamps_from_this_run = [unixtimestamp+"temperature"+str(beam)  for beam in temperatures]
notes="experimenting with  prompt (not initial_prompt) on  small.en"
#list_of_args = [(1, audiofilepaths,unixtimestamp+"beam"+str(beam),notes,{"beam_size": str(beam), "temperature": "0"}) for beam in beam_sizes]

#list_of_args = [(1, audiofilepaths,unixtimestamp+"temp0.75beam2",notes,{ "temperature": " 0.75", 'beam_size':'2'}) ]
list_of_args = [(1, audiofilepaths,unixtimestamp,notes,{ "temperature": " 0",  "prompt":"context is child-parent interaction"})  ]
with Pool(processes=8) as pool: 
    poolresult = pool.starmap(runCodebase, list_of_args)


Ran codebase 1 on Ethan_021005_first5
Ran codebase 1 on Lily_030725_first5
Ran codebase 1 on William_030011_first5
Ran codebase 1 on Naima_020720_first5
Ran codebase 1 on Naima_010801_first5
Ran codebase 1 on William_011115_first5
Ran codebase 1 on Violet_030119_first5
Ran codebase 1 on Ethan_020208_first5
Ran codebase 1 on Alex_010526_first5


In [6]:
%%time

# beam_sizes = [2,4,8,16,32]  

beam_sizes = [5]  

AUDIO_FILE_NAMES = box_set_9
audiofilepaths = [os.path.join('/om/user/arjunp/rawAudioFiles', os.path.basename(name_of_audio)) for name_of_audio in AUDIO_FILE_NAMES]
timestamps_from_this_run = []

for i in beam_sizes:
    unixtimestamp = str(round(time.time()))
    beamvalue = str(i)
    timestamps_from_this_run.append(unixtimestamp)
 
#list_of_args = [(i+1, audiofilepaths,unixtimestamp) for i in range(4)]
    notes=""
    params = {"beam_size": beamvalue, "temperature": "0"}
    list_of_args = [(i+1, audiofilepaths,unixtimestamp,notes,params) for i in range(4)]
 
    with Pool(processes=14) as pool: 
        poolresult = pool.starmap(runCodebase, list_of_args)
    print("finished beam size", i)

set up cluster for cb 1
created folder for yinruiqing
Ran codebase 1 on 34_KS_Recording4-LR_2165-2465_CT
Ran codebase 1 on 34_KS_Recording4-LR_2465-2765_CT
Ran codebase 1 on 34_KS_Recording4-LR_9365-9665_CT
Ran codebase 1 on 40_JC_Recording1-LR_2662-2962_CT
Ran codebase 1 on 40_JC_Recording1-LR_2962-3262_CT
Ran codebase 1 on 40_JC_Recording2-LR_22826-23126_CT
Ran codebase 1 on 85_NA_Recording1-LR_2244-2544_CT
Ran codebase 1 on 85_NA_Recording1-LR_3144-3444_CT
Ran codebase 1 on 85_NA_Recording1-LR_9444-9744_CT
finished beam size 2
set up cluster for cb 1
created folder for yinruiqing


KeyboardInterrupt: 

In [52]:
print(poolresult[0][0].stderr)

Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.3.3. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../../../home/arjunp/.cache/torch/pyannote/models--pyannote--segmentation/snapshots/c4c8ceafcbb3a7a280c2d357aee9fbc9b0be7f9b/pytorch_model.bin`
Traceback (most recent call last):
  File "/weka/scratch/weka/cpl/arjunp/yinruiqing_trial.py", line 22, in <module>
    parser.add_argument('audio_file_path', type=str,   required=True)
  File "/home/arjunp/.conda/envs/torch_gpu/lib/python3.9/argparse.py", line 1405, in add_argument
    kwargs = self._get_positional_kwargs(*args, **kwargs)
  File "/home/arjunp/.conda/envs/torch_gpu/lib/python3.9/argparse.py", line 1521, in _get_positional_kwargs
    raise TypeError(msg)
TypeError: 'required' is an invalid argument for positionals



Now it's time to round up all the json files created to make one big json file

In [4]:
def updateMyJson():
    jsons_to_combine=[]

    for filename in glob.glob('pipelineOutput/**/*.json', recursive=True):
        if  filename !="pipelineOutput/metadata.json":
            jsons_to_combine.append(filename)   


    combined_data = []

    # Iterate over each JSON file 
    for file_path in jsons_to_combine:
        with open(file_path, 'r') as f:
            data = json.load(f)
            combined_data.append(data)

    # Write the combined data to a new JSON file
    with open('pipelineOutput/metadata.json', 'w') as f:
        json.dump(combined_data, f, indent=4)

    print("All JSON files have been combined into 'metadata.json")

In [9]:
updateMyJson()

All JSON files have been combined into 'metadata.json
