# /tl/ Clusters

This notebook includes functions that allow us to obtain voicing, rise time, and relative intensity from /tl/ clusters. These functions make use of the [Parselmouth](https://buildmedia.readthedocs.org/media/pdf/parselmouth/latest/parselmouth.pdf) and [audiolabel](https://github.com/rsprouse/audiolabel) packages.

In [1]:
import os  # For some basic file functionality (creating/removing/parsing file paths)
import re  # For regular expessions
import numpy as np  # (advanced numerical calculations, multi-dimensional arrays)

#import audiolabel
from audiolabel import read_label  # For reading TextGrid files
from phonlab.utils import dir2df  # For finding directories and pulling data
import parselmouth  # For incorporating Praat features in this notebook
# To get general Praat functionality where native features don't exist
from parselmouth.praat import call as pcall

import pandas as pd  # For managing dataframes

### Functions

In [2]:
def get_voicing(row, psnd):
    '''
    Calculates the voice percentage of a portion of an audio file. It's designed to be called from
    pd.DataFrame.apply.
    
    Parameters
    ----------
    row: DataFrame row
    Identifies a portion of audio within the time period specified. Each row needs to have a 't1_ph'
    and a 'next_ph_t2' attributes, to identify the start and end point of the cluster under evaluation.
    A 'sex' ('male' or 'female') attribute is require for each row to set the pitch floor.
    
    psnd: Parselmouth Sound
    This is the audio to be analyzed from 't1_ph' to 'next_ph_t2'.
    
    Returns
    -------
    voiced: float
    Percentage of voicing between 't1_ph' and 'next_ph_t2'.
    '''
    
    pad = 0.5
    s = psnd.extract_part(row.t1_ph-pad, row.next_ph_t2+pad)
    pitch_floor = 100
    if row.sex == 'male':
        pitch_floor = 70
    pitch = pcall(
        s,
        'To Pitch (cc)...',
        0.001,  # Description of this arg
        pitch_floor, # Description of this arg
        15,  # Description of this arg
        0,  # Description of this arg
        0.03,  # Description of this arg
        0.45,  # Description of this arg
        0.01,  # Description of this arg
        0.35,  # Description of this arg
        0.14,  # Description of this arg
        250.0,  # Description of this arg
    )
    
    pulses = pcall([s, pitch], 'To PointProcess (cc)')
    
    voicing = pcall(
        [s, pitch, pulses],
        'Voice report',
        pad, pad+(row.next_ph_t2-row.t1_ph), pitch_floor, 600.0, 1.3, 1.6, 0.03, 0.45
    )
    
    m = re.search('unvoiced frames: (?P<percent>\d+\.\d*|\d+)', voicing)
    if m:
        voiced = 100.0-float(m.group('percent'))
    else:
        voiced = np.nan
    
    return voiced

In [3]:
def get_voicing_l(row, psnd):
    '''
    Calculates the voice percentage of a portion of an audio file. It's designed to be called from
    pd.DataFrame.apply.
    
    Parameters
    ----------
    row: DataFrame row
    Identifies a portion of audio within the time period specified. Each row needs to have a 't1_ph'
    and a 'next_ph_t2' attributes, to identify the start and end point of the cluster under evaluation.
    A 'sex' ('male' or 'female') attribute is require for each row to set the pitch floor.
    
    psnd: Parselmouth Sound
    This is the audio to be analyzed from 't1_ph' to 'next_ph_t2'.
    
    Returns
    -------
    voiced: float
    Percentage of voicing between 't2_ph' and 'next_ph_t2' (i.e. /l/).
    '''
    
    pad = 0.5
    s = psnd.extract_part(row.t1_ph-pad, row.next_ph_t2+pad)
    pitch_floor = 100
    if row.sex == 'male':
        pitch_floor = 70
    pitch = pcall(
        s,
        'To Pitch (cc)...',
        0.001,  # Description of this arg
        pitch_floor, # Description of this arg
        15,  # Description of this arg
        0,  # Description of this arg
        0.03,  # Description of this arg
        0.45,  # Description of this arg
        0.01,  # Description of this arg
        0.35,  # Description of this arg
        0.14,  # Description of this arg
        250.0,  # Description of this arg
    )
    
    pulses = pcall([s, pitch], 'To PointProcess (cc)')
    
    voicing = pcall(
        [s, pitch, pulses],
        'Voice report',
        pad, pad+(row.next_ph_t2-row.t1_ph), pitch_floor, 600.0, 1.3, 1.6, 0.03, 0.45
    )
    
    m = re.search('unvoiced frames: (?P<percent>\d+\.\d*|\d+)', voicing)
    if m:
        voiced = 100.0-float(m.group('percent'))
    else:
        voiced = np.nan
    
    return voiced

In [4]:
def get_relative_intensity(row, psnd):
    '''
    Calculates the relative intensity (difference) of two segments.
    
    Parameters
    ----------
    row: DataFrame row
    Identifies a portion of audio within the time periods specified. Each row needs to have a 't1_ph',
    't2_ph', and a 'next_ph_t2'attributes, to identify the start and end point of the segments in the
    cluster.
    
    psnd: Parselmouth Sound
    This is the audio to be analyzed from 't1_ph' to 't2_ph' to 'next_ph_t2'.
    
    Returns
    -------
    rel_intensity: float
    The relative intensity (intensity difference).
    '''

    t_s = psnd.extract_part(row.t1_ph, row.t2_ph)
    l_s = psnd.extract_part(row.t2_ph, row.next_ph_t2)
    
    return t_s.get_intensity()-l_s.get_intensity()

In [5]:
def get_rise_time(row, psnd):
    '''put multi-line documentation here
    '''
    
    s = psnd.extract_part(row.t1_ph, row.next_ph_t2)
    rise_time = (pcall(s, 'Get time of maximum...', 0.0, 0.0, "Parabolic")*1000)
    
    return rise_time

In [6]:
def get_rise_time_l(row, psnd):
    '''put multi-line documentation here
    '''
    
    s = psnd.extract_part(row.t2_ph, row.next_ph_t2)
    rise_time = (pcall(s, 'Get time of maximum...', 0.0, 0.0, "Parabolic")*1000)
    
    return rise_time

In [7]:
def findMiddle(input_list):
    middle = float(len(input_list))/2
    if middle % 2 != 0:
        return input_list[int(middle - .5)]
    else:
        return np.mean([input_list[int(middle)], input_list[int(middle-1)]])

In [8]:
def get_formants(row, psnd):
    
    if (row.next_ph_t2 - row.t2_ph) < .15:
        row.t2_ph = row.t2_ph - 0.03
        row.next_ph_t2 = row.next_ph_t2 + 0.03
    elif (row.next_ph_t2 - row.t2_ph) < .25:
        row.t2_ph = row.t2_ph - 0.015
        row.next_ph_t2 = row.next_ph_t2 + 0.015
    elif (row.next_ph_t2 - row.t2_ph) < .35:
        row.t2_ph = row.t2_ph - 0.01
        row.next_ph_t2 = row.next_ph_t2 + 0.01
    elif (row.next_ph_t2 - row.t2_ph) < .45:
        row.t2_ph = row.t2_ph - 0.005
        row.next_ph_t2 = row.next_ph_t2 + 0.005
    
    s = psnd.extract_part(row.t2_ph, row.next_ph_t2) # read the sound
    pitch_floor = 100
    if row.sex == 'male':
        pitch_floor = 70
    pitch = pcall(
        s,
        'To Pitch (cc)...',
        0.001,  # Description of this arg
        pitch_floor, # Description of this arg
        15,  # Description of this arg
        0,  # Description of this arg
        0.03,  # Description of this arg
        0.45,  # Description of this arg
        0.01,  # Description of this arg
        0.35,  # Description of this arg
        0.14,  # Description of this arg
        250.0,  # Description of this arg
        )
    pulses = pcall([s, pitch], 'To PointProcess (cc)')
    
    max_fq=5500
    if row.sex == 'male':
        max_fq = 5000
    formants = pcall(s, "To Formant (burg)", 0.001, 5, max_fq, 0.025, 50)
    numPoints = pcall(pulses, "Get number of points")

    f1_list = []
    f2_list = []
    f3_list = []
    f4_list = []
    
    # Measure formants only at glottal pulses
    for point in range(0, numPoints):
        point += 1
        t = pcall(pulses, "Get time from index", point)
        f1 = pcall(formants, "Get value at time", 1, t, 'Hertz', 'Linear')
        f2 = pcall(formants, "Get value at time", 2, t, 'Hertz', 'Linear')
        f3 = pcall(formants, "Get value at time", 3, t, 'Hertz', 'Linear')
        f4 = pcall(formants, "Get value at time", 4, t, 'Hertz', 'Linear')
        f1_list.append(f1)
        f2_list.append(f2)
        f3_list.append(f3)
        f4_list.append(f4)
    
    f1_list = [f1 for f1 in f1_list if str(f1) != 'nan']
    f2_list = [f2 for f2 in f2_list if str(f2) != 'nan']
    f3_list = [f3 for f3 in f3_list if str(f3) != 'nan']
    f4_list = [f4 for f4 in f4_list if str(f4) != 'nan']
    
    # calculate mean formants across pulses
    if len(f1_list) == 0:
        row['f1_mean'] = 'empty'
    else:
        row['f1_mean'] = np.mean(f1_list)
    if len(f1_list) == 0:
        row['f2_mean'] = 'empty'
    else:
        row['f2_mean'] = np.mean(f2_list)
    if len(f3_list) == 0:
        row['f3_mean'] = 'empty'
    else:
        row['f3_mean'] = np.mean(f3_list)
    if len(f4_list) == 0:
        row['f4_mean'] = 'empty'
    else:
        row['f4_mean'] = np.mean(f4_list)
    
    # calculate median formants across pulses, this is what is used in all subsequent calcualtions
    # you can use mean if you want, just edit the code in the boxes below to replace median with mean
    if row.f1_mean == 'empty':
        row['f1_mid'] = row.f1_mean
    else:
        row['f1_mid'] = findMiddle(f1_list)
    if row.f2_mean == 'empty':
        row['f2_mid'] = row.f2_mean
    else:
        row['f2_mid'] = findMiddle(f2_list)
    if row.f3_mean == 'empty':
        row['f3_mid'] = row.f3_mean
    else:
        row['f3_mid'] = findMiddle(f3_list)
    if row.f4_mean == 'empty':
        row['f4_mid'] = row.f4_mean
    else:
        row['f4_mid'] = findMiddle(f4_list)
    
    return row

In [9]:
def roll_1d_with_constant(a, shift, val):
    '''
    Roll a list of values in a way similar to np.roll(), but instead of wrapping
    values, replace wrapped elements with a constant.
    
    Parameters
    ----------
    
    a: 1d array-like
        Input array.
        
    shift: int
        The number of places by which elements are shifted. (See np.roll() shift param.)
        
    val: any value
        The constant value to use as a replacement. Must be compatible with
        input array's dtype.
        
    Returns
    -------
    res: ndarray
        Output array of same length as a.
        
    Examples
    --------
    
    > roll_1d_with_constant(['a', 'b', 'c'], 1, '')
    array(['', 'a', 'b'], dtype='<U1')
    
    > roll_1d_with_constant(pd.Series(['a', 'b', 'c']), -2, '')
    array(['c', '', ''], dtype='<U1')
    
    > roll_1d_with_constant([3, 4, 5, 6], -2, 0)
    array([5, 6, 0, 0])
    
    > roll_1d_with_constant([3, 4, 5, 6], 2, 0)
    array([0, 0, 3, 4])
    '''
    if shift >= 0:
        index = np.arange(len(a))
    else:
        index = np.arange(len(a) * -1, 0, 1)
    return np.pad(a, np.abs(shift), 'constant', constant_values=val)[index]

### Start creating dataframe

In [10]:
datadir = './Data'
#dirpat = 'S(?P<subject_number>04)' # pipe = or # %% Why are subjects showing as O1/02? How to remove?
dirpat = '(?P<subject>S\d+)' #comment one out 
#dirpat = 'S(\d+)' #comment one out 


fdf = dir2df(datadir, dirpat=dirpat, fnpat='\.wav$', addcols=['barename','dirname'])
speaker_sex = './speaker_sex.csv'
speaker_sex = pd.read_csv(speaker_sex, encoding = 'utf-8')
fdf = fdf.merge(speaker_sex, on='subject', how='left')

wav_suffix = '_words.wav'
tg_suffix = '_words_Spanish_aligned.TextGrid'

In [11]:
def func_executor(participant):
    
    print(participant)
    wav_file = os.path.join(datadir,participant.relpath,participant.relpath+wav_suffix)
    tg_file = os.path.join(datadir,participant.relpath,participant.relpath+tg_suffix)

    psnd = parselmouth.Sound(wav_file) 

    [phdf,wddf] = read_label(tg_file, 'praat')

    word_info = './word_info.csv'  #Identify file's location
    widf = pd.read_csv(word_info, encoding = 'utf-8')  #Create df for said file using UTF-8
    widf.Token = widf.Token.str.upper()  #Make words uppercase to match wdpf

    merged_wddf = wddf.merge(widf, left_on='label', right_on='Token', how='left').sort_values(by='t1')

    phdf = phdf.assign(  #Creates new column
        next_ph=roll_1d_with_constant(phdf.label, shift=-1, val=''),  #Rolls phone
        next_ph_t2=roll_1d_with_constant(phdf.t2, shift=-1, val=np.nan))  #Rolls phone's t2

 #   phdf.next_ph.iat[-1] = np.nan  #nullifies the values for last row (coming from first row)
  #  phdf.next_ph_t2.iat[-1] = np.nan  #nullifies the values for last row (coming from first row)

    phdf_tl = phdf[(phdf.label=='t')&(phdf.next_ph=='l')]

#    return [phdf, wddf, widf, phdf_tl, merged_wddf] 

    phwddf = pd.merge_asof(
        phdf_tl.rename(
            columns={'t1':'t1_ph'}), 
        merged_wddf.rename(
            columns={'t1':'t1_wd'}), 
        left_on='t1_ph', 
        right_on='t1_wd', 
        suffixes=['_ph', '_wd']
    )

    phwddf = phwddf.assign(
        sex=participant.sex
    )

    phwddf = phwddf.assign(
        voicing=phwddf.apply(
            get_voicing,
            args=([psnd]), 
            axis=1
        ),
        t_intensity=phwddf.apply(
            lambda x: psnd.extract_part(x.t1_ph, x.t2_ph).get_intensity(), 
            axis=1
        ),
        l_intensity=phwddf.apply(
            lambda x: psnd.extract_part(x.t2_ph, x.next_ph_t2).get_intensity(), 
            axis=1
        ),
        tl_intensity=phwddf.apply(
            lambda x: psnd.extract_part(x.t1_ph, x.next_ph_t2).get_intensity(), 
            axis=1
        ),
        rise_time=phwddf.apply(
            get_rise_time, 
            args=([psnd]), 
            axis=1
        ),
        rise_time_l=phwddf.apply(
            get_rise_time_l,
            args=([psnd]),
            axis=1
        )
    )
    
    phwddf = phwddf.apply(get_formants, args=([psnd]), axis=1)
    
    return phwddf

In [12]:
## 'drop' allows you to remove a column (e.g. for duplicated subject)

In [13]:
new_df = pd.concat(fdf.apply(func_executor, axis=1).tolist())

dirname            ./Data
relpath               S01
fname       S01_words.wav
barename        S01_words
subject               S01
sex                female
Name: 0, dtype: object
dirname            ./Data
relpath               S02
fname       S02_words.wav
barename        S02_words
subject               S02
sex                  male
Name: 1, dtype: object
dirname            ./Data
relpath               S03
fname       S03_words.wav
barename        S03_words
subject               S03
sex                  male
Name: 2, dtype: object
dirname            ./Data
relpath               S04
fname       S04_words.wav
barename        S04_words
subject               S04
sex                  male
Name: 3, dtype: object
dirname            ./Data
relpath               S05
fname       S05_words.wav
barename        S05_words
subject               S05
sex                  male
Name: 4, dtype: object
dirname            ./Data
relpath               S06
fname       S06_words.wav
barename        S06_words
s

In [14]:
new_df[np.any(new_df.isna(), axis=1)] #Checks to see if there are any empty cells or cells with NaN

Unnamed: 0,t1_ph,t2_ph,label_ph,fname_ph,next_ph,next_ph_t2,t1_wd,t2_wd,label_wd,fname_wd,...,rise_time,rise_time_l,f1_mean,f2_mean,f3_mean,f4_mean,f1_mid,f2_mid,f3_mid,f4_mid


In [15]:
export_csv = new_df.to_csv('Results/tl.csv', index=None, header=True)