##### Steps to detect duration and frequency of ball bouncing from wav file
1. Import necessary libraries
2. Open audio File
3. Detect periods of activity
4. For Each period, find the frequency of the ball
5. Output: 
    1. Plot a graph identifying periods of activity 
    2. Plot a sample period sinsoidal wave over activity
    3. Output excel file with columns: start period, end period, duration, frequency, number of bounces

1. Import necessary libraries

In [1]:
# for opening audio files and play back
from pydub import AudioSegment
from pydub.playback import play
from pydub.utils import mediainfo
# for plotting
import matplotlib.pyplot as plt
# for calculating standard deviation
import numpy as np
import pandas as pd
# for estimating frequency
import scipy.optimize
from collections import deque
from math import sqrt
# for file saving the output
import os
import time

In [2]:
all_script_start_time=time.time()

Controlling parameters

In [3]:
# There can be breathing noise in audio, if detected bounces is less than the following
# number, it is not registered in the output file
MINIMUM_BOUNCES_TO_ADD_TO_RESULTS = 5
# if the period is detected to be less than this duration, do not apply sine wave fitting and ignore
MINIMUM_PERIOD_DURATION_IN_SECONDS = 1
# If you want to show figures to understand the algorithm more, make the following True
SHOW_FIGURES = False
# figure size
FIG_SIZE = [18, 8]
# Enable this flag to see more debugging messages
SHOW_DEBUG_MESSAGES = False
# 80 th percentile and window size of 1000 are good estimates
INITIAL_WINDOW_SIZE = 1000
PERCENTILE_LEVEL = 80
# where to find the input audio file
SOUND_FILES_FOLDER = './sound_files'
ORIGINALS_SUB_FOLDER = 'originals'

In the pervious cell, we select the window size for our smoothing operation to be 1000 and the percentile to be 80th percentile
The percentile idea works as follows: 
- Get the samples the are above the 80th percentile. These are used to mark areas of activity. 
- This helps reduce the noise and false positives 

2. Open audio File

In [4]:
FILE_NAME_ONLY_DICT = {'original_work_file': 'sound_of_ball_original.wav',
                       'file_with_breathing': 'pep4_2nd_original.mp3', 'fifth_file': 'AUD-20231221-WA0007_5th.m4a'}
FILE_NAME_ONLY=FILE_NAME_ONLY_DICT['fifth_file']
FILE_NAME = os.path.join(SOUND_FILES_FOLDER, ORIGINALS_SUB_FOLDER, FILE_NAME_ONLY)
# what is the sampling rate in the recorded audio file
info = mediainfo(FILE_NAME)
original_sample_rate=int(info['sample_rate'])
sound = AudioSegment.from_file(FILE_NAME)
# some files are stereo, then select the channel with higher power
if info['channels'] == '2':  # stereo
    sound_split = sound.split_to_mono()
    if sound_split[0].dBFS > sound_split[1].dBFS:
        sound = sound_split[0]
    else:
        sound = sound_split[1]
# Let's get the samples to work with
sound_louder = sound+10

samples = sound.get_array_of_samples()
samples_list = sound_louder.get_array_of_samples().tolist()

The following cell plots the original signal

In [5]:
# make the sound lounder
if SHOW_FIGURES:
    plt.figure(figsize=FIG_SIZE)
    _=plt.plot(samples_list, alpha=.5, color='blue', label='Original Signal')
    _=plt.plot(sound_louder.get_array_of_samples().tolist(), alpha=0.1, color='red', label='Magnified signal')
    plt.legend(loc='best')

3. Detect periods of activity

In [6]:
# 80 th percentile and window size of 1000 are good estimates
# Making everything positive to mark periods of high loud activity
abs_samples_list = [abs(ele) for ele in samples_list]
# what is the signal level we are targing above?
req_percentile = np.percentile(abs_samples_list, PERCENTILE_LEVEL)
# set activity periods to max value for signal
max_val = max(abs_samples_list)
signal_on = [max_val if x > req_percentile else 0 for x in abs_samples_list]
block_signal_on = signal_on.copy()

# Mark the window before and after high signal as max value
WS = INITIAL_WINDOW_SIZE


# start_time=time.time()
# for i in range(WS, len(samples_list)-WS):
#     if max_val in signal_on[i-WS:i+WS]:
#         block_signal_on[i] = max_val
# print('method 1 time:\t', time.time()-start_time)

block_signal_on = signal_on.copy()
max_val_indexes=[]
for anIndex in range(WS, len(samples_list)-WS):
    if signal_on[anIndex]==max_val:
        max_val_indexes.append(anIndex)
if len(max_val_indexes)==0:
    raise Exception('No signal detected, lower the percentile.')
lastIndexSetToMax=0
for anIndex in max_val_indexes:
    for i in range(max(lastIndexSetToMax, anIndex-WS),anIndex+WS):
        block_signal_on[i]=max_val
    lastIndexSetToMax=anIndex+WS

if SHOW_FIGURES:
    plt.figure(figsize=FIG_SIZE, dpi=600)
    plt.plot(samples_list, label='Original signal')
    plt.plot(block_signal_on, 'g', label='Blocks of activity')
    plt.legend(loc='best')
    plt.show()

Identify periods of activity start and end

In [7]:
# a list of tuples marking start and end of activity periods
signal_on_periods=[]
period_start=-1
period_end=-1
for i in range(len(block_signal_on)):
    if block_signal_on[i]!=0:
        if period_start==-1:
            period_start=i
    else:
        if period_start!=-1 and period_end==-1:
            period_end = i
            signal_on_periods.append(tuple([period_start, period_end]))
            period_start=period_end=-1

Detecting and counting energy

The idea here is to fit the signal to a sine wave.
However, we are not going to match the signal directly to a sine wave, this has not been very accurate. Instead, areas of activity exhibit high standard deviation from the norm. It is the change in the standard deviation for a sliding window is what we will be matching to a sine wave.

In [8]:
def sinfunc(t, A, w, p, c):
    return A * np.sin(w*t + p) + c
def fit_sin(tt, yy):
    '''Fit sin to the input time sequence, and return fitting parameters "amp", "omega", "phase", "offset", "freq", "period" and "fitfunc"'''
    tt = np.array(tt)
    yy = np.array(yy)
    ff = np.fft.fftfreq(len(tt), (tt[1]-tt[0]))   # assume uniform spacing
    Fyy = abs(np.fft.fft(yy))
    # excluding the zero frequency "peak", which is related to offset
    guess_freq = abs(ff[np.argmax(Fyy[1:])+1])
    guess_amp = np.std(yy) * 2.**0.5
    guess_offset = np.mean(yy)
    guess = np.array([guess_amp, 2.*np.pi*guess_freq, 0., guess_offset])

    popt, pcov = scipy.optimize.curve_fit(sinfunc, tt, yy, p0=guess)
    A, w, p, c = popt
    f = w/(2.*np.pi)
    def fitfunc(t): return A * np.sin(w*t + p) + c
    return {"amp": A, "omega": w, "phase": p, "offset": c, "freq": f, "period": 1./f, "fitfunc": fitfunc, "maxcov": np.max(pcov), "rawres": (guess, popt,pcov)}

Helper class to achieve speeding up of Rolling Standard Deviation [Calculations](https://stackoverflow.com/questions/71594436/is-there-a-fast-running-rolling-standard-deviation-algorithm)

In [9]:
class RunningStd:
    def __init__(self, n, initial_window_data):
        self.n = n
        self.data = deque(list(initial_window_data), maxlen=n)
        self.mean = np.mean(self.data)
        self.sdev = np.std(self.data)
        self.variance = self.sdev * self.sdev

    def add(self, x):
        n = self.n
        oldmean = self.mean
        goingaway = self.data[0]
        self.mean = newmean = oldmean + (x - goingaway) / n
        self.data.append(x)
        self.variance += (x - goingaway) * (
            (x - newmean) + (goingaway - oldmean)) / (n - 1)
        self.sdev = sqrt(self.variance)

In [10]:
# A dictionary to store all results
all_result_dict = {}
period_index = 0
# For each period of detected activity
for period_start, period_end in signal_on_periods:
    try:
        # is the duration more than the minimum:
        if (period_end-period_start)/original_sample_rate < MINIMUM_PERIOD_DURATION_IN_SECONDS:

            if SHOW_DEBUG_MESSAGES:
                print('Skipping frames between:\t', period_start, '\t and\t:', period_end)

            continue
        # otherwise, match signal to sine wave and get frequency
        # prepare dictionary for results
        one_result_dict = {}
        one_result_dict['start'] = period_start
        one_result_dict['end'] = period_end
        # the period we are working on
        one_sample = samples[period_start:period_end]
        # Calculate the standard deviation of the rolling window for this period
        # initialize the standard deviation object:
        running_std_obj=RunningStd(INITIAL_WINDOW_SIZE, one_sample[:INITIAL_WINDOW_SIZE])

        if SHOW_FIGURES:
            plt.figure(figsize=FIG_SIZE)
            plt.plot(one_sample, alpha=0.5)

        # if you want to listen to the segment, uncomment the lines below:
        # 1000 here because it is in milliseconds
        # seg = sound[period_start/original_sample_rate*1000:period_end/original_sample_rate*1000]
        # play(seg)

        one_sample_series=pd.Series(one_sample)
        # calculate the standard deviation of the rolling window

        # periods with sound of ball hitting wall have high variance in values
        # this means, we can use the changing standard deviation as a proxy for the sound wave
        def append_last_value_to_std_dev(x):
            running_std_obj.add(x[-1])
            return running_std_obj.sdev

        rolling_result = one_sample_series.rolling(
            window=INITIAL_WINDOW_SIZE).apply(append_last_value_to_std_dev, raw=True)

        if SHOW_FIGURES:
            plt.plot(rolling_result, color='green', alpha=1, label='rolling_res')
            plt.legend(loc="upper left")
            plt.show()

        # fit sine wave
        if SHOW_FIGURES:
            fig, ax = plt.subplots(figsize=FIG_SIZE)

        data_to_fit=rolling_result
        # filling na data
        data_to_fit=data_to_fit.fillna(np.mean(data_to_fit))
        # standardizing the data to make it easier for fitting signal to identify it
        dataMean = np.mean(data_to_fit)
        dataStd = np.std(data_to_fit)
        data_to_fit_standardized=(data_to_fit-dataMean)/dataStd

        if SHOW_FIGURES:
            ax.plot(data_to_fit_standardized, color='red',
                    alpha=0.4, label='data_to_fit_standardized')

        # shift the data to be around the x axis
        # therefore, it sine wave fitting will be more accurate
        def func2(x):
            return x[0]-np.mean(x)
        data_to_fit_standardized_shifted = data_to_fit_standardized.rolling(
            window=INITIAL_WINDOW_SIZE).apply(func2, raw=True)

        if SHOW_FIGURES:
            ax.plot(data_to_fit_standardized_shifted, color='blue',
                    alpha=1, label='data_to_fit_standardized_shifted')
        x_vals = np.linspace(0, len(data_to_fit_standardized_shifted)-1,
                            num=len(data_to_fit_standardized_shifted))
        x_vals_test = np.linspace(0, len(data_to_fit_standardized_shifted)-1,
                                num=100 * len(data_to_fit_standardized))
        # replace na values
        ds_mean = data_to_fit_standardized_shifted.mean()
        data_to_fit_standardized_shifted.fillna(value=ds_mean, inplace=True)

        # fit to a sine wave
        res = fit_sin(x_vals, data_to_fit_standardized_shifted)

        if SHOW_DEBUG_MESSAGES:
            print( "Amplitude=%(amp)s, Angular freq.=%(omega)s, phase=%(phase)s, offset=%(offset)s, Max. Cov.=%(maxcov)s" % res )
        if SHOW_FIGURES:
            plt.plot(x_vals_test, res["fitfunc"](x_vals_test), "r-", label="y fit curve", linewidth=2)
            ax.legend(loc='best')
            ax.grid(True, which='both')
            plt.show()

        # should I add this to the result?
        # if the number of bounces is more than MINIMUM_BOUNCES_TO_ADD_TO_RESULTS
        this_period_freq = res['omega']/(2*np.pi)
        this_period_bounce_count = int(
            this_period_freq*(period_end-period_start))
        if this_period_bounce_count > MINIMUM_BOUNCES_TO_ADD_TO_RESULTS:
            one_result_dict['period_index'] = period_index
            one_result_dict['the_freq'] = this_period_freq
            one_result_dict['bounce_count'] = int(
                one_result_dict['the_freq']*(period_end-period_start))
            one_result_dict['period_duration'] = (
                period_end-period_start)/original_sample_rate
            if SHOW_DEBUG_MESSAGES:
                print('The bouncing frequency:', one_result_dict['the_freq'], end='\t')
                print('Period bounce count:', one_result_dict['bounce_count'], end='\t')
                print('Period duration:', one_result_dict['period_duration'])
            all_result_dict[period_index] = one_result_dict.copy()
            period_index += 1

    except Exception as error:
        print("An error has occured: ", error)

Export to Excel file

In [11]:
timestr = time.strftime("%Y%m%d-%H%M%S")
all_df=pd.DataFrame()
for period_index, period_details in all_result_dict.items():
    if all_df.size==0:
        all_df = pd.DataFrame(data=period_details, index=[0])
    else:
        one_df=pd.DataFrame(data=period_details, index=[0])
        all_df=pd.concat([all_df,one_df])

all_df.to_excel('./'+FILE_NAME+'_'+timestr+'_'+'Ball_Bounces.xlsx')


In [12]:
print('The script execution time is:\t', time.time()-all_script_start_time)

The script execution time is:	 15.283734798431396


The end