In [1]:
import math
import random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
import tensorflow as tf
import csv

class AudioProcession():
    # ----------------------------
    # Load the full length audio files into python
    # ----------------------------

    @staticmethod
    def open(fullPath):
        signal, samplingRate = torchaudio.load(fullPath)
        return (signal, samplingRate)
    # ----------------------------
    # Convert the full audio to stereo
    # ----------------------------
    @staticmethod
    def rechannel(audio):
        signal, samplingRate = audio

        if (signal.shape[0] == 2):
            # Nothing to do
            return audio
        else:
            # Convert from mono to stereo by duplicating the first channel
            newSignal = torch.cat([signal, signal])

        return ((newSignal, samplingRate))
        # ----------------------------
    # Resampling one channel at a time as we can only do so
    # ----------------------------
    @staticmethod
    def resample(audio, newSamplingRate):
        signal, samplingRate = audio

        if (samplingRate == newSamplingRate):
            # Nothing to do
            return audio

        # Resample first channel
        channel1 = torchaudio.transforms.Resample(samplingRate, newSamplingRate)(signal[:1,:])
        channel2 = torchaudio.transforms.Resample(samplingRate, newSamplingRate)(signal[1:,:])
        newAudio = torch.cat([channel1, channel2])

        return ((newAudio, newSamplingRate))
    # ----------------------------
    # Updates CSV file
    # ----------------------------
    @staticmethod
    def updateCSV(classSerial, className, sampleName):
        with open('metadata.csv', mode='a') as metadata:
            csvWriter = csv.writer(metadata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
            csvWriter.writerow([classSerial, className, sampleName])
    # ----------------------------
    # Saves file
    # ----------------------------
    @staticmethod
    def saveAudio(audio, samplingRate, i, sampleName, className, classSerial):
        currentPath = Path.cwd()
        sampleName = sampleName.replace(".wav", "")
        sampleName = sampleName + "_" + str(i) + ".wav"
        outputPath = str(currentPath) + "/processedSamples/" + className + "/" + sampleName
        torchaudio.save(outputPath, audio, samplingRate, format="wav")
        AudioProcession.updateCSV(classSerial, className, sampleName)
    # ----------------------------
    # Resizing the full length audio files into the designated length of 4s
    # Resize happens by moving window of length 4s by 2s each time until the end of audio file
    # Remainder is padded with 0s at end of file
    # ----------------------------
    @staticmethod
    def audioBreakdown(audio, sampleName, className, classSerial):
        signal, samplingRate = audio
        numRows, signalLength = signal.shape

        #window length = 4s = 4000ms
        #winLength of tensor = 4s * Samples/second
        winLength = 4 * samplingRate
        offset = 0
        increment =  2 * samplingRate

        #number of windows = length/2 - 1 (remainder half window) if multiple
        isLengthMultiple = !int(signalLength % increment)
        if (isLengthMultiple):
            numWindows = int(signalLength / increment) - 1 
        else:
        #number of windows = length//2 - 1 (remainder half window) + 1 (remainder half window to end + padding)
            numWindows = int(signalLength // increment)

        if (isLengthMultiple):
            for i in range(numWindows):
                offset = i * increment
                AudioProcession.saveAudio(signal[: , offset:offset+winLength], samplingRate, i, sampleName, className, classSerial)
        else: 
            #for complete windows
            for i in range(numWindows-1):
                offset = i * increment
                AudioProcession.saveAudio(signal[: , offset:offset+winLength], samplingRate, i, sampleName, className, classSerial)
            #for last incomplete window
            paddingLength = winLength - (signalLength - offset - increment)
            #randomly distribute padding across start and end
            paddingBefore = random.randint(0, paddingLength)
            paddingAfter = paddingLength - paddingBefore
            #create padding of zeros
            paddingBefore = torch.zeros((numRows, paddingBefore))
            paddingAfter = torch.zeros((numRows, paddingAfter))
            #create and save last window
            lastWindow = torch.cat((paddingBefore, signal[: , offset+winLength:], paddingAfter), 1)
            AudioProcession.saveAudio(lastWindow, samplingRate, i+1, sampleName, className, classSerial)

In [2]:
# ----------------------------
# Process the audio files in a given foder (type)
# ----------------------------
def splitAudioInType(inputFolder, className, classSerial, samplingRate):
    typeFolder = str(inputFolder) + "/" + className
    fullLengthAudios = os.listdir(typeFolder)
    for sampleName in fullLengthAudios:
        fullPath = str(typeFolder) + "/" + sampleName
        audio= AudioProcession.open(fullPath)
        stereoed = AudioProcession.rechannel(audio)
        resampled = AudioProcession.resample(stereoed, samplingRate)
        AudioProcession.audioBreakdown(resampled, sampleName, className, classSerial)

In [3]:
# ----------------------------
# Moves the rawSamples into archivedSamples so that the next round of Processing will not generate duplicates
# ----------------------------
import shutil
def archive():
    inputFolder = str(Path.cwd()/'rawSamples')
    outputFolder = str(Path.cwd()/'archivedSamples')
    classes = os.listdir(inputFolder)
    for currentClass in classes:
        srcFolder = inputFolder + "/" + currentClass
        destFolder = outputFolder + "/" + currentClass
        files = os.listdir(srcFolder)
        for file in files:
            srcPath = srcFolder + "/" + file
            destPath = destFolder + "/" + file
            shutil.move(srcPath, destPath)

In [4]:
import pandas as pd
from pathlib import Path
import os
import numpy as np

samplingRate = 44100
inputFolder = Path.cwd()/'rawSamples'
classes = os.listdir(inputFolder)

for currentClass in classes:
    if str(currentClass) == "test1":
        splitAudioInType(inputFolder, currentClass, 1, samplingRate)
    elif str(currentClass) == "test2":      
        splitAudioInType(inputFolder, currentClass, 2, samplingRate)
    else:
        splitAudioInType(inputFolder, currentClass, 3, samplingRate)

archive()