##### `This notebook does automatic section detection of each video and generates a raw jsonl file per video for manual annotation. To run this file, make sure the following folders exist:`
- sounds: this folder includes signals/buzzes that we use to delimit sections
- audio: add all audios that needed to be processed here.
- timeSeg: add all csv files (generated from diartization) of speaker turn info for videos to be processed here.
- json: the folder to store the output files.

-------------------------------------------------------
- Intermediate file: `temp.wav`
- Potential output files if `write_to_result()` is run: `segmentation_in_min.csv`, `segmentation_in_sec.csv`

Last updated on August 3rd, 2022

In [37]:
import os
from pydub import AudioSegment
from pydub.silence import split_on_silence
import numpy as np
import librosa
import scipy as sp
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
import moviepy.editor as mp
import copy
import pandas as pd
import json

In [None]:
#sound0.wav is the end-of-episode buzzle
#sound1-4.wav is the result-revealing beap
#sound5.wav is the speaker indicator sound in the quick truth-lie section

def find_offset(within_file,find_file):
    '''
    parameters:
    within_file: str. the file in which the target audio segement is to be found
    find_file: str. the target file

    return:
    c: np.array (within_file.shape[0]-find_file.shape[0],) an array of correlation coefficient

    reference:
    https://dev.to/hiisi13/find-an-audio-within-another-audio-in-10-lines-of-python-1866
    '''

    y_within, sr_within = librosa.load(within_file, sr=16000)
    y_find, _ = librosa.load(find_file, sr=sr_within)

    c = sp.signal.correlate(y_within, y_find, mode='valid', method='fft')
    # peak = np.argmax(c)
    # print(f"Offset: {offset}s" )
    # return c,round(peak / sr_within, 2)
    return c

In [None]:
def generateChunkTimes(within_file,beapNum,signalNum):
    '''
    parameters:
    within_file: str. a wav file in which the target audio segement is to be found
    beapNum: int, the number of total answer-revealing sounds to detect
    signalNum: int, the number of speaker-designation-sounds to detect

    return: 
    timeSegSecond: np.array, an array of time segments in seconds
    timeSegMinute: np.array, an array of time segments in the form of "dd:dd"

    NOTE: the matching templates are stored in the "sounds" folder
    '''
    
    #find the beap sound
    beapSeg = find_offset(within_file, "../sounds/sound1.wav")

    #find in total five beaps in one video
    indexList=[]
    for i in range(beapNum):
        maxIndex=np.argmax(beapSeg)
        beapSeg[maxIndex-16000*120:maxIndex+16000*120,]=-1
        indexList.append(maxIndex+5*16000)
    indexList.sort()
        
    # #find the final beap
    # #generate the temp.wav for the final buzzle
    ffmpeg_extract_subclip(within_file, 1650, 1725, targetname="temp.wav")
    endSeg = find_offset("temp.wav", "../sounds/sound0.wav")
    endTime=np.argmax(endSeg)+1650*16000

    #find the first silence
    ffmpeg_extract_subclip(within_file, 75, 150, targetname="temp.wav")
    sound_file = AudioSegment.from_wav("temp.wav")
    audio_chunks = split_on_silence(sound_file, min_silence_len=850, silence_thresh=-40, keep_silence=100)
    try:
        startTime=round((len(audio_chunks[0])/1000+75)*16000)
    except:
        startTime=0

    #find the start sound of the quick truth and lies
    ffmpeg_extract_subclip(within_file, indexList[2]/16000, endTime/16000+1650, targetname="temp.wav")
    quickRoundSeg = find_offset("temp.wav", "../sounds/sound5.wav")
    indexList2=[]
    for i in range(signalNum):
        maxIndex=np.argmax(quickRoundSeg)
        quickRoundSeg[maxIndex-16000*75:maxIndex+16000*75,]=-1
        indexList2.append(maxIndex+indexList[2])
    indexList2.sort()
    
    return clean_repeititions(indexList,indexList2,startTime,endTime)

In [None]:
def clean_repeititions(beapList,signalList,startTime,endTime):
    '''
    parameters:
    beapList: list of beaps (answer-revealing sounds)
    signalList: list of signals (speaker-designation-sounds) in the quick truth-lie section
    startTime: int of the start time
    endTime: int of the end-of-episode buzzle
    NOTE: all time in multiples of 16000 (sample_rate)

    return:
    timeSegSecond: np.array, an array of time segments in seconds
    timeSegMinute: np.array, an array of time segments in the form of "dd:dd"
    NOTE: for each video the length of the output array varies
    '''
    beapList_copy=copy.deepcopy(beapList)
    signalList_copy=copy.deepcopy(signalList)

    #delete some unnecesssary (too close to the endSeg) signal time
    for item in signalList:
        if endTime/16000+1650-item/16000<120:
            signalList_copy.remove(item)

    #delete some repetitive sounds (adjacent sounds for the result reveal and speaker indicator)
    for beap in beapList:
        for signal in signalList:
            if abs(beap-signal)<60*16000:
                beapList_copy.remove(beap)
                break
    
    #finalize the timelist
    timeSegSecond=[startTime]+beapList_copy+signalList_copy
    timeSegSecond=np.sort(np.array(timeSegSecond))/16000
    segminute=np.array(timeSegSecond)//60
    segsec=np.round(np.array(timeSegSecond)%60)
    timeSegMinute=np.array(["{:02d}:{:02d}".format(int(segminute[i]),int(segsec[i])) for i in range(segminute.shape[0])])
    return timeSegSecond,timeSegMinute

In [None]:
def write_result_to_csv(outputFileNameSec="segmentation_in_sec.csv",outputFileNameMin="segmentation_in_min.csv",\
    subFolderName="wav",beapNum=5,signalNum=3):
    '''
    parameter: 
    outputFileNameSec: str. the name of the csv that the info in seconds will be written into
    outputFileNameMin: str. the name of the csv that the info in the form of "dd:dd" will be written into
    subFolderName: str. the subdirectory where all audios (in wav) are stored and will be processed
    beapNum: int, the number of beaps (answer-revealing sounds) to detect
    signalNum: int, the number of signals (speaker-designation-sounds) to detect

    output:
    write the time segmentation info to the corrosponding csv files in the same folder

    NOTE: the beapNum and signalNum might not be the final number as time segmenets close to each other
    (i.e. indicating the same interval) will not both be recorded
    '''
    dataSec=[]
    dataMin=[]

    file_names=os.listdir(subFolderName)
    try:
        file_names.remove(".DS_Store")
    except:
        pass

    for file in file_names:
        second,minute=generateChunkTimes(subFolderName+"/"+file,beapNum,signalNum)
        dataSec.append(second)
        dataMin.append(minute)

    dfs=pd.DataFrame(dataSec)
    dfm=pd.DataFrame(dataMin)
    dfs.insert(0,"filename",file_names)
    dfm.insert(0,"filename",file_names)

    dfs.to_csv(outputFileNameSec)
    dfm.to_csv(outputFileNameMin)

In [None]:
def generate_json_file_per_video(subFolderName="../audio",beapNum=5,signalNum=3):
    '''
    parameter:
    subFolderName: str. the subdirectory where all audios (in wav) are stored and will be processed
    beapNum: int, the number of beaps (answer-revealing sounds) to detect
    signalNum: int, the number of signals (speaker-designation-sounds) to detect

    take in the time segment files from the timeSeg folder,
    takes in the timeSeg, and store all time into a json file in the folder "json"
    '''

    file_names=os.listdir(subFolderName)
    try:
        file_names.remove(".DS_Store")
    except:
        pass

    for file in file_names:
        print(file)
        audio_spans=[]
        second,minute=generateChunkTimes(subFolderName+"/"+file,beapNum,signalNum)
        df=pd.read_csv("timeSeg/"+file[:-4]+"time.csv",usecols=[1,2])

        #the mark to indicate where to start
        start=0
        positionMark=0

        #adds SPEAKER time segmentation
        for sec in second:
            for i in range(start,len(df)):
                if sec<df["endAt"][i]:
                    speakerStart=df["endAt"][i]
                    try:
                        speakerEnd=df["endAt"][i+2]
                    except:
                        try: 
                            speakerEnd=df["endAt"][i+1]
                        except:
                            speakerEnd=df["endAt"][i]
                            
                    audio_spans.append({
                        "start": speakerStart,
                        "end":speakerEnd,
                        "label": "SPEAKER",
                    })
                    start=positionMark
                    break
                positionMark+=1

        #add SEGMENT_TRUTH time segmentation
        for i in range(len(second)-1):
            audio_spans.append({
                "start": second[i],
                "end":second[i+1],
                "label": "SEGMENT_TRUTH",
            })

        #make the big dictionary and write into json file
        jsonDic={
            "video":"videoAnnotate\\"+file[:-3]+"mp4",
            "path":"videoAnnotate\\"+file[:-3]+"mp4",
            "audio_spans":audio_spans,
        }

        with open("json/"+file[:-3]+"jsonl", "w") as outfile:
            json.dump(jsonDic, outfile)
            outfile.write("\n")
    

In [38]:
# write_result_to_csv(outputFileNameSec="timeSegSec01.csv",outputFileNameMin="timeSegMin01.csv", beapNum=6)
# write_result_to_csv()
generate_json_file_per_video(beapNum=6)