In [51]:
import sys # Python system library needed to load custom functions
import math # module with access to mathematical functions
import os # for changing the directory

import numpy as np  # for performing calculations on numerical arrays
import pandas as pd  # home of the DataFrame construct, _the_ most important object for Data Science

from IPython.display import Audio # for listening to our insects
import IPython
from scipy.fft import fft # function to calculate Fast Fourier Transform

import matplotlib.pyplot as plt  # allows creation of insightful plots
import seaborn as sns # another library to make even more beautiful plots

import torch
import torchaudio

sys.path.append('../../src') # add the source directory to the PYTHONPATH. This allows to import local functions and modules.
# enable rendering plots under the code cell that created it
%matplotlib inline

from eda_utils import show_sampling, signal_generator, plot_random_spec, plot_spec, plot_waveform # functions to create plots for and from audio data
from gdsc_utils import download_directory, PROJECT_DIR # function to download GDSC data from S3 bucket and our root directory
from config import DEFAULT_BUCKET  # S3 bucket with the GDSC data

os.chdir(PROJECT_DIR) # changing our directory to root

In [58]:
def calculate_pause_candidates_via_amplitude(path: str,
                                             max_amplitude, 
                                             verbose = 0, 
                                             window_length = 500, 
                                             scan_param = 500, 
                                             plot_param = False):
    '''
    calculates potential candidates based on amplitude to split audio files.
    returns dictionary with list of candidates 
    '''

    waveform, samplerate =  torchaudio.load(path)
    #calculate boolean cut-off vector
    boolean_array = np.abs(waveform[0].numpy()) > max_amplitude
    return_dict = {'path': path,
                   'samplerate': samplerate,
               'candidates': [],
               'seconds': []}
    
    #scans every 1000th entry (default) and calculates sum based on window_length
    scan_length = math.floor(len(boolean_array[:-window_length])/scan_param)
    #print(scan_length)

    for x in range(scan_length):
        if np.sum(boolean_array[x*scan_param:x*scan_param+window_length]) <= 0: #max_amplitude * (window_length-1):
            if verbose:
                print('---------')
                print('sum equals', np.sum(boolean_array[x*scan_param:x*scan_param+window_length]))
                print('waveform_position:',x*scan_param)
                print('corresponding second:',x*scan_param/samplerate)
            return_dict['candidates'].append(x*scan_param)
            return_dict['seconds'].append(x*scan_param/samplerate)

    if plot_param:
        plot_waveform(path, samplerate)
    return return_dict
    

In [59]:
def calculate_pause_splits(input_dict : dict, min_length_s = 15,verbose = 0):
    '''
    expects dictionary with pause candidates.
    Returns trimmed version
    default minimum length is 15 sec
    '''

    return_dict = {'path': input_dict['path'],
                'samplerate': input_dict['samplerate'],
                'candidates': [],
                'seconds': []}
    #initialize first split
    min_length_wav = sample_dict['samplerate'] * min_length_s
    min_split = min_length_wav
    for x in sample_dict['candidates']:
        if x > min_split:
            return_dict['candidates'].append(x)
            return_dict['seconds'].append(x/sample_dict['samplerate'])
            min_split = min_length_wav + x

    return return_dict
    

In [60]:
testing_path = 'data/train/Atrapsaltacorticina_GBIF1946322682_IN18591645_27630.wav'

In [77]:
calculate_pause_candidates_via_amplitude(testing_path, 0.02, plot_param=True)

In [70]:
def plot_pitch(waveform, sample_rate, pitch):
  figure, axis = plt.subplots(1, 1)
  axis.set_title("Pitch Feature")
  axis.grid(True)

  end_time = waveform.shape[1] / sample_rate
  time_axis = torch.linspace(0, end_time,  waveform.shape[1])
  axis.plot(time_axis, waveform[0], linewidth=1, color='gray', alpha=0.3)

  axis2 = axis.twinx()
  time_axis = torch.linspace(0, end_time, pitch.shape[1])
  ln2 = axis2.plot(
      time_axis, pitch[0], linewidth=2, label='Pitch', color='green')

In [71]:
def calculate_pause_candidates_via_pitch(path: str, window_length = 50, verbose = 0, plot_param = False):
    '''
    calculates potential candidates based on pitch to split audio files.
    returns dictionary with list of candidates 
    '''
    waveform, samplerate =  torchaudio.load(path)
    pitch = torchaudio.functional.detect_pitch_frequency(waveform, samplerate)
    end_time = waveform.shape[1]/samplerate #end_time in seconds
    np_array = pitch[0].numpy()
    max_pitch = max(np_array)
    return_dict = {'path': path,
                'samplerate': samplerate,
                'candidates': [],
                'seconds': []}

    for x in range(len(np_array[:-window_length])):
        if np.sum(np_array[x:x+window_length]) >= max_pitch * (window_length-1):
            if verbose:
                print('---------')
                print('waveform_position:',x/np_array.shape[0]*waveform.shape[1])
                print('corresponding second:',x/np_array.shape[0]*end_time)
            return_dict['candidates'].append(np.floor(x/np_array.shape[0]*waveform.shape[1]))
            return_dict['seconds'].append(x/np_array.shape[0]*waveform.shape[1]/samplerate)

    if plot_param:
        plot_pitch(waveform, samplerate, pitch)
    return return_dict
    

In [72]:
calculate_pause_candidates_via_pitch(testing_path, plot_param=True)

In [73]:
calculate_pause_candidates_via_pitch('data/train/Barbitistesyersini_XC752461-dat047-009_edit4.wav', plot_param=True)

In [75]:
calculate_pause_candidates_via_amplitude('data/train/Barbitistesyersini_XC752461-dat047-009_edit4.wav', max_amplitude = 0.01, plot_param=True)