In [1]:
import tgt
import pandas as pd
import numpy as np
import os
import librosa
from tqdm.notebook import tqdm

# Create Spectrograms

In [2]:
def save_spectrogram(filepath, start, stop, y, sr):
    """
    # Arguments
        filepath: wav audio filepath.
        start: start time in seconds.
        stop: stop time in seconds.
        y: audio time series.
        sr: sample rate.
        
    # Outputs
        saves a numpy file of the mel spectrogram array with
        dimensions (n_mels, t)
    """
    S = librosa.feature.melspectrogram(y=y[sr * start:(sr * stop)],
                                       sr=sr, n_mels=64, fmax=sr / 2) 
    path_save = os.path.dirname(filepath) + '/' + os.path.basename(filepath).split('.')[0]
    np.save(path_save + '_' + str(start) + 'to' + str(stop) + '_spectro', S)

In [3]:
audio_path = 'audio/'

In [4]:
wavs = [os.path.join(root, name)
            for root, dirs, files in os.walk(audio_path)
            for name in files
            if name.endswith((".wav"))]

In [None]:
window_size = 6
slide = 3
for filepath in wavs:
    y, sr = librosa.load(filepath)
    length = int(len(y) / sr)
    remainder = length % window_size
    for i in tqdm(range(0, length-remainder, window_size), desc='save_spectro', leave=False):
            save_spectrogram(filepath, i, i + window_size, y, sr)
            j = i + slide
            if j + window_size < length-remainder:
                save_spectrogram(filepath, j, j + window_size, y, sr)

# Convert TextGrid file to csvs

In [5]:
def convert_tg_file_to_csv(file, annotation_path):
    """
    # Arguments
        file: TextGrid filepath.
        annotation_path: directory of annotations.
        
    # Outputs
        saves a filtered TextGrid file that contains only tiers that contain
        'laugh' as csv file.
    """
    tg = tgt.io.read_textgrid(file, include_empty_intervals=True)
    tier_list = tg.get_tier_names()
    tier_no_laugh_list = [tier for tier in tier_list if 'laugh' not in tier]
    for tier in tier_no_laugh_list:
        tg.delete_tier(tier)
    csv = tgt.io.export_to_table(tg, separator=',')
    save_name = os.path.basename(file).split('.')[0] + '_Laugh.txt'
    save_dir = os.path.dirname(file)
    save_file = save_dir + '/' + save_name
    with open(save_file, 'w') as output:
        output.write(csv)

In [6]:
def convert_tg_file_to_part_csv(file, annotation_path):  
    """
    # Arguments
        file: TextGrid filepath.
        annotation_path: directory of annotations.
        
    # Outputs
        saves a filtered TextGrid file that contains only tiers that contain
        'Part' as csv file. To be used to determine when roleplay
        starts.
    """
    tg = tgt.io.read_textgrid(file, include_empty_intervals=True)
    tier_list = tg.get_tier_names()
    tier_no_part_list = [tier for tier in tier_list if 'Part' not in tier]
    for tier in tier_no_part_list:
        tg.delete_tier(tier)
    csv = tgt.io.export_to_table(tg, separator=',')
    save_name = os.path.basename(file).split('.')[0] + '_Parts.txt'
    save_dir = os.path.dirname(file)
    save_file = save_dir + '/' + save_name
    with open(save_file, 'w') as output:
        output.write(csv)

In [7]:
annotation_path = 'transcriptions_annotations/'

In [8]:
TextGrid_files = [os.path.join(root, name)
             for root, dirs, files in os.walk(annotation_path)
             for name in files
             if name.endswith((".TextGrid"))]

In [None]:
for file in tqdm(TextGrid_files, desc='tg to csv'):
    convert_tg_file_to_csv(file, annotation_path)
    convert_tg_file_to_part_csv(file, annotation_path)

# Create Dataset (combine: id, spectrogram, label)

In [9]:
def find_label_start_end(spectro_file, annotation_path):
    """
    # Arguments
        spectro_file: spectrogram filepath.
        annotation_path: directory of annotations.
        
    # Returns
        label_path: label filepath relating to the spectrogram.
        start_time: start time relating to the spectrogram.
        end_time: end time relating to the spectrogram.
        roleplay_path: roleplay parts information filepath relating to the spectrogram.
    """
    base_file = os.path.basename(spectro_file)
    start_time = int(base_file.split('_')[1].split('to')[0])
    end_time = int(base_file.split('_')[1].split('to')[1])
    
    label_dir = annotation_path + os.path.dirname(spectro_file).split('/')[-1]
    label_files = [f for f in os.listdir(label_dir) if f.endswith(("Laugh.txt"))]
    label_path = label_dir + '/' + label_files[0]
    
    roleplay_files = [f for f in os.listdir(label_dir) if f.endswith(("Parts.txt"))] 
    roleplay_path = label_dir + '/' + roleplay_files[0]
    return label_path, start_time, end_time, roleplay_path

In [10]:
def filter_csv(start_time, end_time, label_path):
    """
    # Arguments
        start_time: start time relating to spectrogram.
        end_time: end time relating to spectrogram.
        label_path: filepath of label.
        
    # Returns
        dataframe filtered to contain 'laugh' in the text
        and filtered for specified start_time and end_time.
        When start_time in the csv is before specified start_time,
        this record will be included but start_time in the csv will be set
        to specified start_time. Same for end_time.
        
        For example:
    
        start_time    end_time     text
        905.765658    909.731864   <laughter> jaha läuft </laughter>
    
        if start_time was 907 and end_time was 909, filter_csv would set this row to:
    
        start_time    end_time     text
        907.0         909.0        <laughter> jaha läuft </laughter>
    """
    df = pd.read_csv(label_path)
    df = df[df['text'].str.contains('laugh') == True]
    df = df[df['start_time'] <= end_time]
    df = df[df['end_time'] >= start_time]
    df.loc[df.end_time > end_time, 'end_time'] = end_time
    df.loc[df.start_time < start_time, 'start_time'] = start_time
    return df

In [6]:
def create_id(spectro_file):
    """
    # Arguments
        spectro_file: filepath for spectrogram.
        
    # Returns
        id for file.
        For example input of spectro_file of 'audio/r7/r7_270to276_spectro.npy'
        would return 'r7_270to276'.
    """
    base_name = os.path.basename(spectro_file)
    r = base_name.split('_')[0]
    times = base_name.split('_')[1]
    file_id = r + '_' + times
    return file_id

In [12]:
def start_end_in_timesteps(df, start_time, timesteps_per_second):
    """
    # Arguments
        df: dataframe in format from output of function filter_csv.
        start_time: start time relating to spectrogram.
        timesteps_per_second: timesteps_per_second = timesteps / window_size.
        
    # Returns
        dataframe after:
        Removing tier_name, tier_type and text columns.
        Reformating times to start from 0 and end at 6.
        Converting seconds to timesteps.
    """
    df = df.drop(['tier_name', 'tier_type', 'text'], 1)
    df['start_time'] = df['start_time'] - start_time
    df['start_time'] = (df['start_time'] * timesteps_per_second).apply(np.floor)
    df['end_time'] = df['end_time'] - start_time
    df['end_time'] = (df['end_time'] * timesteps_per_second).apply(np.ceil)
    return df

In [13]:
def create_label_matrix(df):
    """
    # Arguments
        df: dataframe in format from output of start_end_in_timesteps.
        
    # Returns
        vector of length (timesteps) which has values of 0 or 1.
        1 representing laughter, 0 representing no laughter.
    
        For example:
        [1, 0, 0, 1, 0, 0 ....] represents laughter in timesteps 0 and 3
    """
    label = np.zeros(timesteps)
    update_list = []
    for index, row in df.iterrows():
        update_list.append([row['start_time'], row['end_time']])
    for l in update_list:
        start = int(l[0])
        end = int(l[1])
        label[start:end] = 1
    return label

In [14]:
def create_id_spectro_label(file_id, spectro_path, label):
    """
    # Arguments
        file_id: file id created from function create_id.
        spectro_path: filepath for spectrogram.
        label: label created from function create_label_matrix.
        
    # Returns
        numpy array containing 3 elements:
        id
        related spectrogram
        related label
    """
    np_spectro_file = np.load(spectro_path)
    combined = [file_id, np_spectro_file, label]
    np_combined = np.asarray(combined)
    return np_combined

In [15]:
def roleplay_flag(roleplay_path):
    """
    # Arguments
        roleplay_path: filepath for Parts file which has times for annotated roleplays.
        
    # Returns
        True if the start and end times of the spectrogram
        are during the annotated roleplay times. Else returns False.
    """
    df = pd.read_csv(roleplay_path)
    df = df.drop(['tier_name', 'tier_type', 'text'], 1)
    roleplay_times = []
    for index, row in df.iterrows():
        roleplay_times.append([row['start_time'], row['end_time']])
    proceed_flag = False
    for rp in roleplay_times:
        if start_time <= rp[1] and end_time >= rp[0]:
            proceed_flag = True
    return proceed_flag

In [4]:
spectros = [os.path.join(root, name)
            for root, dirs, files in os.walk(audio_path)
            for name in files
            if name.endswith(("spectro.npy"))]

In [None]:
dataset = []
window_size = 6
timesteps = 259 
timesteps_per_second = timesteps / window_size

for spectro_path in tqdm(spectros, desc='create dataset'):
    label_path, start_time, end_time, roleplay_path = find_label_start_end(spectro_path, annotation_path)
    if roleplay_flag(roleplay_path):
        df = filter_csv(start_time, end_time, label_path)
        df = start_end_in_timesteps(df, start_time, timesteps_per_second)
        df_label = create_label_matrix(df)
        file_id = create_id(spectro_path)
        np_combined = create_id_spectro_label(file_id, spectro_path, df_label)
        dataset.append(np_combined)
dataset = np.asarray(dataset)

# Create Train, Validation, Test Datasets

In [None]:
np.random.shuffle(dataset)

In [None]:
dataset_size = dataset.shape[0]
train_size = round(dataset_size * 0.75)
val_size =  round(dataset_size * 0.15)
test_size = round(dataset_size * 0.10)

In [None]:
x = dataset[:, 1]
y = dataset[:, 2]

In [None]:
# reformat x to (n, timesteps, mel bands, 1)
x = np.expand_dims(np.moveaxis(np.stack(x), 1, -1), axis=3)
# reformat y to (n, timesteps, 1)
y = np.expand_dims(np.moveaxis(np.stack(y), 1, -1), axis=2)

In [None]:
x_train = x[0:train_size]
y_train = y[0:train_size]
x_val = x[train_size:train_size + val_size]
y_val = y[train_size:train_size + val_size]
x_test = x[train_size + val_size:dataset_size]
y_test = y[train_size + val_size:dataset_size]

In [None]:
np.save('de_laughter_ds', dataset)
np.save('x_train', x_train)
np.save('x_val', x_val)
np.save('x_test', x_test)
np.save('y_train', y_train)
np.save('y_val', y_val)
np.save('y_test', y_test)