# Generate audio training set w/without noise
Mono signals to simulate popping, echo, and static noise issues in real radio broadcasting
-Langchen Fan 7/6/22

In [1]:
import numpy as np
import pandas as pd
import scipy.io.wavfile as wavfile
import os
import numpy.random as rng
from scipy import signal
import matplotlib.pyplot as plt
import pickle

In [2]:
fpath = 'unprocessed_audio/Mono/' # file path, adjust accordingly
flist = os.listdir(fpath)
wavlist = []
station_names = []
# read wav files
for i in range(len(flist)):
    if flist[i][-3:]=='wav':
        fs, wav = wavfile.read(fpath+flist[i]) #set mono=False for stereo signals
        wavlist.append(wav)
        station_names.append(flist[i][:4])
# read noise files
# fs,static = wavfile.read('simulated_static.wav')
n=fs*30 # audio clip length

  fs, wav = wavfile.read(fpath+flist[i]) #set mono=False for stereo signals


In [3]:
# calculate audio rms
def rms(sig):
    return np.sqrt(np.mean(sig**2))

In [4]:
# convert power to dB
def db(PSD):
    return 10*np.log10(PSD)

In [5]:
# generate echo pieces
target_rms=0.2
atten = [0.5,0.3] # attenuation factor for echo sound
# use different piece was used across SNRs and noise types to avoid model learns the clean signal
for i in range(0,9): # station
    for j in range(0,50):
        wav = np.array(wavlist[i][j*n:(j+1)*n])
        wav = wav/rms(wav)*target_rms
        if j%3!=2:
            echo_len = fs*round(rng.default_rng().uniform(low=15,high=20)) # echo piece ranged from 15 to 21 sec
            echo_start = rng.default_rng().integers(15*fs-2)
            delay = echo_start+fs*round(rng.default_rng().uniform(low=1,high=3))
            echo = wav[echo_start:echo_start+echo_len]
            wav[delay:np.min([delay+echo_len,len(wav)])] += atten[j%3]*echo[:np.min([echo_len,len(wav)-delay])]
            fname = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/new_test_files/'+station_names[i]+'_'+str(j+1)+'_echo_atten'+str(atten[j%3])
        else:
            fname = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/new_test_files/'+station_names[i]+'_'+str(j+1)+'_inf'
        wavfile.write(fname+'.wav',rate=fs,data=wav) #write wav files
        f,t,Sxx = signal.spectrogram(wav,fs,window='hann',nperseg=1024,nfft=2048,noverlap=512) #calculate spectrogram
        # fig=plt.figure()
        # ax=plt.pcolormesh(t,f,db(Sxx)) # plot to check
        # plt.ylabel('Frequency (Hz)')
        # plt.xlabel('Time (sec)')
        # plt.colorbar()
        # plt.show()
        pickle.dump([f,t,db(Sxx)],open(fname+'.pkl','wb'))

  return 10*np.log10(PSD)


IndexError: list index out of range

In [6]:
# generate popping pieces
target_rms=0.2
# use different piece was used across SNRs and noise types to avoid model learns the clean signal
for i in range(0,9): # station
    for j in range(50,100):
        wav = np.array(wavlist[i][j*n:(j+1)*n])
        wav = wav/rms(wav)*target_rms
        if j%2==0:
            p_len = fs*round(rng.default_rng().uniform(low=15,high=20)) # echo piece ranged from 15 to 21 sec
            pstart = rng.default_rng().integers(10*fs-2)
            end = p_len+pstart
            vec = rng.default_rng().poisson(20,size=(70,)) #lambda =200ms
            vec2 = rng.default_rng().poisson(15,size=(70,)) #lambda =150ms
            for k in range(0,70):
                pp = round(vec[k]*fs/100)
                if (pstart>end) or (pstart+pp>=len(wav)):
                    break
                wav[pstart:pstart+pp]=0
                pstart+=pp+round(vec2[k]*fs/100)
            fname = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/new_test_files/'+station_names[i]+'_'+str(j+1)+'_popping'
        else:
            fname = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/new_test_files/'+station_names[i]+'_'+str(j+1)+'_inf'
        wavfile.write(fname+'.wav',rate=fs,data=wav) #write wav files
        f,t,Sxx = signal.spectrogram(wav,fs,window='hann',nperseg=1024,nfft=2048,noverlap=512) #calculate spectrogram
        # fig=plt.figure()
        # ax=plt.pcolormesh(t,f,db(Sxx)) # plot to check
        # plt.ylabel('Frequency (Hz)')
        # plt.xlabel('Time (sec)')
        # plt.colorbar()
        # plt.show()
        pickle.dump([f,t,db(Sxx)],open(fname+'.pkl','wb'))

  return 10*np.log10(PSD)


In [8]:
# generate noisy pieces
target_rms=0.2
atten = [0.3,0.1] # attenuation factor for echo sound
# use different piece was used across SNRs and noise types to avoid model learns the clean signal
for i in range(0,9): # station
    for j in range(100,150):
        wav = np.array(wavlist[i][j*n:(j+1)*n])
        wav = wav/rms(wav)*target_rms
        if j%3!=2:
            noise_len = fs*round(rng.default_rng().uniform(low=15,high=20)) # echo piece ranged from 15 to 21 sec
            noise_start = rng.default_rng().integers(10*fs-2)
            noise = rng.default_rng().uniform(low=-1,high=1,size=noise_len)
            weight = np.abs(signal.hilbert(wav[noise_start:noise_start+noise_len]))
            wav[noise_start:noise_start+noise_len] += np.multiply(noise*atten[j%3],weight)
            fname = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/new_test_files/'+station_names[i]+'_'+str(j+1)+'_noise_atten'+str(atten[j%3])
        else:
            fname = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/new_test_files/'+station_names[i]+'_'+str(j+1)+'_inf'
        wavfile.write(fname+'.wav',rate=fs,data=wav) #write wav files
        f,t,Sxx = signal.spectrogram(wav,fs,window='hann',nperseg=1024,nfft=2048,noverlap=512) #calculate spectrogram
        # fig=plt.figure()
        # ax=plt.pcolormesh(t,f,db(Sxx)) # plot to check
        # plt.ylabel('Frequency (Hz)')
        # plt.xlabel('Time (sec)')
        # plt.colorbar()
        # plt.show()
        pickle.dump([f,t,db(Sxx)],open(fname+'.pkl','wb'))

  return 10*np.log10(PSD)


In [2]:
import scipy.signal as signal
import scipy.io.wavfile as wavfile
import os
# import time
import pandas as pd
fpath2 = '/Users/langchenfan/Library/CloudStorage/OneDrive-iHeartMediaInc/real_radio_event/'
wavlist = os.listdir(fpath2)
output=[]
tf = []
fnames=[]
hop = 0.2 # do analysis every 50 ms
for i in range(len(wavlist)):
    if wavlist[i][-4:]!='.wav':
        continue
    print('Predicting '+wavlist[i]+' ......')
    # start = time.time()
    fs, wav = wavfile.read(fpath2+wavlist[i]) #set mono=False for stereo signals
    wav = wav[:,0]
    if fs!=44100:
        wav = signal.resample_poly(wav,up=441,down=fs/100)
        # wavfile.write(fpath2+'test'+str(i)+'.wav',44100,y)
    j = 0
    n=44100*30 # note: cannot use fs*30, because 
    while j+n<len(wav):
        testwav = wav[j:j+n]
        f,t,Sxx = signal.spectrogram(testwav,fs,window='hann',nperseg=1024,nfft=2048,noverlap=512) #calculate spectrogram
        # predict with model here
        output.append(1) # change 1 to model output
        tf.append(j/fs)
        fnames.append(wavlist[i])
        j+=round(hop*fs)
    # end = time.time()
    # print('It takes '+str(end-start))
pred = pd.DataFrame({'File':fnames,'TimeFrame':tf,'Prediction':output})

Predicting noise 540pm.wav ......


  fs, wav = wavfile.read(fpath2+wavlist[i]) #set mono=False for stereo signals


Predicting intermittent-dualtrack WBZY.wav ......


KeyboardInterrupt: 

In [5]:
fs, wav = wavfile.read(fpath2+wavlist[i])

  fs, wav = wavfile.read(fpath2+wavlist[i])
