In [2]:
import os
import IPython
import numpy as np
import pandas as pd
from pydub import AudioSegment
from pydub.utils import make_chunks
from scipy.io import wavfile
from data_utils import Datainfo, Seed

In [3]:
posDir = Datainfo["positives"]
negDir = Datainfo["negatives"]
templates = Datainfo["backgrounds"]

In [27]:
# Unit test

sampleDir = os.path.join(posDir, os.listdir(posDir)[0])
audioSample = AudioSegment.from_wav(sampleDir)

negSampleDir = os.path.join(negDir, os.listdir(negDir)[0])
negSample = AudioSegment.from_wav(negSampleDir)

templateDir = os.path.join(templates, os.listdir(templates)[0])
template = AudioSegment.from_wav(templateDir)

test = template.overlay(audioSample, position=2000)
test = test.overlay(negSample, position = 3500)

test.export(os.path.join(Datainfo["dir"], "tests", "overlayTest.wav"), format="wav")

<_io.BufferedRandom name='e:\\Prototype\\WWmodule\\Data\\tests\\overlayTest.wav'>

In [28]:
# normalize test

def normalize(filepath, dBFS=-14):
    audio = AudioSegment.from_wav(filepath)
    diff = dBFS-audio.dBFS
    return audio.apply_gain(diff)
negSample.export(os.path.join(Datainfo["dir"], "tests", "before_norm.wav"), format="wav")
test = normalize(negSampleDir)
test.export(os.path.join(Datainfo["dir"], "tests", "after_norm.wav"), format="wav")

<_io.BufferedRandom name='e:\\Prototype\\WWmodule\\Data\\tests\\after_norm.wav'>

In [29]:
count = 1
targetDir = os.path.join(Datainfo["dir"], "posNorms")
if not os.path.isdir(targetDir):
    os.mkdir(targetDir)
for file in os.listdir(Datainfo["positives"]):
    filepath = os.path.join(Datainfo["positives"], file)
    targetpath = os.path.join(targetDir, f"audio{count}.wav")
    audio = normalize(filepath)
    audio.export(targetpath, format="wav")
    count = count+1


In [30]:
count = 1
targetDir = os.path.join(Datainfo["dir"], "negNorms")
if not os.path.isdir(targetDir):
    os.mkdir(targetDir)
for file in os.listdir(Datainfo["negatives"]):
    filepath = os.path.join(Datainfo["negatives"], file)
    targetpath = os.path.join(targetDir, f"audio{count}.wav")
    audio = normalize(filepath)
    audio.export(targetpath, format="wav")
    count = count+1

In [4]:
def audioInfo(filepath):
    rate, data = wavfile.read(filepath)
    duration = len(data)/rate
    return rate, data, duration

In [5]:
def audioDF(dir):
    files = os.listdir(dir)
    filepaths = [os.path.join(dir, x) for x in files]
    if("positives" in dir):
        y_labels = [1 for x in files]
    elif("negatives" in dir):
        y_labels = [0 for x in files]
    df = pd.DataFrame({
        'file': files,
        'path': filepaths,
        'y_label': y_labels,
    })
    return df

In [6]:
posData = audioDF(posDir)
posData.head()

Unnamed: 0,file,path,y_label
0,audio1.wav,e:\Prototype\WWmodule\Data\positives\audio1.wav,1
1,audio10.wav,e:\Prototype\WWmodule\Data\positives\audio10.wav,1
2,audio11.wav,e:\Prototype\WWmodule\Data\positives\audio11.wav,1
3,audio12.wav,e:\Prototype\WWmodule\Data\positives\audio12.wav,1
4,audio13.wav,e:\Prototype\WWmodule\Data\positives\audio13.wav,1


In [7]:
negData = audioDF(negDir)
negData.head()

Unnamed: 0,file,path,y_label
0,audio1.wav,e:\Prototype\WWmodule\Data\negatives\audio1.wav,0
1,audio10.wav,e:\Prototype\WWmodule\Data\negatives\audio10.wav,0
2,audio11.wav,e:\Prototype\WWmodule\Data\negatives\audio11.wav,0
3,audio12.wav,e:\Prototype\WWmodule\Data\negatives\audio12.wav,0
4,audio13.wav,e:\Prototype\WWmodule\Data\negatives\audio13.wav,0


In [8]:
data = pd.concat([posData, negData])
# data = data.iloc[np.random.permutation(len(data))]
# data = data.reset_index(drop=True)
data.head()

Unnamed: 0,file,path,y_label
0,audio1.wav,e:\Prototype\WWmodule\Data\positives\audio1.wav,1
1,audio10.wav,e:\Prototype\WWmodule\Data\positives\audio10.wav,1
2,audio11.wav,e:\Prototype\WWmodule\Data\positives\audio11.wav,1
3,audio12.wav,e:\Prototype\WWmodule\Data\positives\audio12.wav,1
4,audio13.wav,e:\Prototype\WWmodule\Data\positives\audio13.wav,1


In [36]:
data.to_csv(os.path.join(os.getcwd(), "Temp", "data.csv"), index=False)

In [37]:
data = pd.read_csv(os.path.join(os.getcwd(), "Temp", "data.csv"))
data.head()

Unnamed: 0,file,path,y_label
0,audio1.wav,e:\Prototype\WWmodule\Data\positives\audio1.wav,1
1,audio10.wav,e:\Prototype\WWmodule\Data\positives\audio10.wav,1
2,audio11.wav,e:\Prototype\WWmodule\Data\positives\audio11.wav,1
3,audio12.wav,e:\Prototype\WWmodule\Data\positives\audio12.wav,1
4,audio13.wav,e:\Prototype\WWmodule\Data\positives\audio13.wav,1


In [24]:
def createDir(dirname):
    path = os.path.join(Datainfo["dir"], dirname)
    if not os.path.exists(path):
        try:
            os.mkdir(path)
        except:
            print(f"error creating {dirname} dir")
    return path


In [50]:
datalen = len(data)
templates = Datainfo["backgrounds"]
sampleDir = createDir("samples")
chunksDir = createDir("inputs")
threshold = 125
df = pd.DataFrame()
sampleCount = 1
chunkCount = 1
for audiopath in os.listdir(templates):
    tempPath = os.path.join(templates, audiopath)
    background = AudioSegment.from_wav(tempPath)
    segments = []
    nextsegment = 100
    tempSample = background
    while(True):
        randData = data.iloc[np.random.randint(datalen)]
        path = randData["path"]
        y_label = randData["y_label"]
        sample = AudioSegment.from_wav(path)
        duration = sample.duration_seconds
        if nextsegment + duration < 10000:
            try:
                tempSample = tempSample.overlay(sample, position=nextsegment)
                if y_label==1:
                    segments.append((nextsegment+threshold, nextsegment+(sample.duration_seconds*1000)-1-threshold))
                nextsegment = nextsegment + (sample.duration_seconds*1000) + 199
            except:
                pass
        else:
            break
    tempSample.export(os.path.join(sampleDir, f"sample{sampleCount}.wav"), format="wav")
    chunks = make_chunks(tempSample, chunk_length=500)
    chunks = [x for i, x in enumerate(chunks)]
    chunk_range = []
    start = 0
    for i in range(len(chunks)-1):
        sample = chunks[i] + chunks[i+1]
        chunk_range.append((start, start+999))
        curr_chunk_path = os.path.join(chunksDir, f"input{chunkCount}.wav")
        sample.export(curr_chunk_path, format="wav")
        y_label = 0
        for segment in segments:
            if(start<=segment[0] and start+999>=segment[1]):
                y_label = y_label or 1
            else:
                y_label = y_label or 0
        row = pd.Series({
            "path": curr_chunk_path,
            "y_label": y_label,
        })
        df = df.append(row, ignore_index=True)
        start = start+500
        chunkCount = chunkCount+1
    sampleCount = sampleCount+1
    df.to_csv("input.csv", index=False)


  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append(row, ignore_index=True)
  df = df.append

In [48]:
df = pd.read_csv("input.csv")
df.head()

Unnamed: 0,path,y_label
0,e:\Prototype\WWmodule\Data\chunks\sample1.wav,1
1,e:\Prototype\WWmodule\Data\chunks\sample2.wav,0
2,e:\Prototype\WWmodule\Data\chunks\sample3.wav,0
3,e:\Prototype\WWmodule\Data\chunks\sample4.wav,0
4,e:\Prototype\WWmodule\Data\chunks\sample5.wav,0
