In [None]:
import pyspark
import librosa
import math
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [None]:
FRAME_LENGTH = 1024
HOP_LENGTH = 512

In [None]:
spark = SparkSession.builder.appName('preprocess').getOrCreate()
spark

In [None]:
# CLEANING/PREPROCESSING STEP 1: Take audio from x minutes to 10 seconds
def sliceAudio(rdd, duration, sampleRate):
  secondsPerSample = 1/sampleRate
  samplesForDuration = duration/secondsPerSample
  rddZip = rdd.zipWithIndex() # (signal[i], i)
  filteredRdd = rddZip.filter(lambda x: x[1] < samplesForDuration)
  return filteredRdd

In [None]:
#CLEANING/PREPROCESSING STEP 2: Retrieve amplitude envelope
def amplitudeEnvelope(rdd, frameLength, hopLength):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  envelope = []
  for i in range(0, len(signal), hopLength):
      frameAE = max(signal[i:i+frameLength])
      envelope.append(frameAE)
  return spark.sparkContext.parallelize(envelope)

In [None]:
#CLEANING/PREPROCESSING STEP 3: Retreive Root Mean Square Energy
def rms(rdd, frameLength, hopLength):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  rms = librosa.feature.rms(y=signal, frame_length=frameLength, hop_length=hopLength)[0]
  return spark.sparkContext.parallelize(rms)

In [None]:
#CLEANING/PREPROCESSING STEP 4: Retreive Zero Crossing Rate
def zcr(rdd, frameLength, hopLength):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  zcr = librosa.feature.zero_crossing_rate(signal, frame_length=frameLength, hop_length=hopLength)[0]
  return spark.sparkContext.parallelize(zcr)

In [None]:
#CLEANING/PREPROCESSING STEP 5: Retreive Mel Frequency Cepstrum Coefficients
def mfcc20(rdd, sampleRate):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  mfcc = librosa.feature.mfcc(y=signal, n_mfcc=20, sr=sampleRate)
  mfccRdds = []
  for i in range(len(mfcc)):
    mfccRdds.append(spark.sparkContext.parallelize(mfcc[i]))
  return mfccRdds

In [None]:
#CLEANING/PREPROCESSING STEP 6: Retreive Spectral Centroid
def spectralCentroid(rdd, sampleRate, frameLength, hopLength):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  sc = librosa.feature.spectral_centroid(y=signal, sr=sampleRate, n_fft=frameLength, hop_length=hopLength)[0]
  return spark.sparkContext.parallelize(sc)

In [None]:
#CLEANING/PREPROCESSING STEP 7: Retreive Spectral Bandwidth
def spectralBandwidth(rdd, sampleRate, frameLength, hopLength):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  sb = librosa.feature.spectral_bandwidth(y=signal, sr=sampleRate, n_fft=frameLength, hop_length=hopLength)[0]
  return spark.sparkContext.parallelize(sb)

In [None]:
#CLEANING/PREPROCESSING STEP 8: Spectral Rolloff
def spectralRolloff(rdd, sampleRate, frameLength, hopLength):
  rddSort = rdd.sortBy(lambda x: x[1])
  signal = np.array([list(i) for i in zip(*rddSort.collect())][0])
  sr = librosa.feature.spectral_rolloff(y=signal, sr=sampleRate, n_fft=frameLength, hop_length=hopLength)[0]
  return spark.sparkContext.parallelize(sr)

In [None]:
#CLEANING/PREPROCESSING STEP 9: Normalize RDDs
def normalize(rdd):
  array = np.array(rdd.collect())
  # put return val in tuple so we can union and dataframe
  return spark.sparkContext.parallelize((array - np.min(array))/(np.max(array)-np.min(array)))

In [None]:
def combineRdd(amplitudeRdd, rmsRdd, zcrRdd, scRdd, sbRdd, srRdd, mfccRdds):
  amplitude = amplitudeRdd.collect()
  rms = rmsRdd.collect()
  zcr = zcrRdd.collect()
  sc = scRdd.collect()
  sb = sbRdd.collect()
  sr = srRdd.collect()
  mfccs = [i.collect() for i in mfccRdds]
  rows = []
  for i in range(len(rms)):
    row = [amplitude[i], rms[i], zcr[i], sc[i], sb[i], sr[i]]
    for mfcc in mfccs:
      row.append(mfcc[i])
    row = [float(i) for i in row]
    rows.append(tuple(row))
  return rows    

In [None]:
def wavToDataFrame(input, intelligence):
    audioArray, sampleRate = librosa.load(input)
    rdd = spark.sparkContext.parallelize(audioArray)
    filteredRdd = sliceAudio(rdd, 10, sampleRate)
    amplitudeRdd = amplitudeEnvelope(filteredRdd, FRAME_LENGTH, HOP_LENGTH)
    rmsRdd = rms(filteredRdd, FRAME_LENGTH, HOP_LENGTH)
    zcrRdd = zcr(filteredRdd, FRAME_LENGTH, HOP_LENGTH)
    mfccRdds = mfcc20(filteredRdd, sampleRate)
    scRdd = spectralCentroid(filteredRdd, sampleRate, FRAME_LENGTH, HOP_LENGTH)
    sbRdd = spectralBandwidth(filteredRdd, sampleRate, FRAME_LENGTH, HOP_LENGTH)
    srRdd = spectralRolloff(filteredRdd, sampleRate, FRAME_LENGTH, HOP_LENGTH)
    amplitudeRdd = normalize(amplitudeRdd)
    rmsRdd = normalize(rmsRdd)
    zcrRdd = normalize(zcrRdd)
    for i in range(len(mfccRdds)):
      mfccRdds[i] = normalize(mfccRdds[i])
    scRdd = normalize(scRdd)
    sbRdd = normalize(sbRdd)
    srRdd = normalize(srRdd)

    #CLEANING/PREPROCESSING STEP 10: Combine and make to dataframe
    rddRows = combineRdd(amplitudeRdd, rmsRdd, zcrRdd, scRdd, sbRdd, srRdd, mfccRdds)
    schema = StructType([
        StructField("amplitudeEnvelope", FloatType()),
        StructField("RMSE", FloatType()),
        StructField("ZCR", FloatType()),
        StructField("spectralCentroid", FloatType()),
        StructField("spectralBandwidth", FloatType()),
        StructField("spectralRolloff", FloatType()),
        StructField("MFCC1", FloatType()),
        StructField("MFCC2", FloatType()),
        StructField("MFCC3", FloatType()),
        StructField("MFCC4", FloatType()),
        StructField("MFCC5", FloatType()),
        StructField("MFCC6", FloatType()),
        StructField("MFCC7", FloatType()),
        StructField("MFCC8", FloatType()),
        StructField("MFCC9", FloatType()),
        StructField("MFCC10", FloatType()),
        StructField("MFCC11", FloatType()),
        StructField("MFCC12", FloatType()),
        StructField("MFCC13", FloatType()),
        StructField("MFCC14", FloatType()),
        StructField("MFCC15", FloatType()),
        StructField("MFCC16", FloatType()),
        StructField("MFCC17", FloatType()),
        StructField("MFCC18", FloatType()),
        StructField("MFCC19", FloatType()),
        StructField("MFCC20", FloatType())
    ])

    df = spark.createDataFrame(rddRows, schema=schema)
    df = df.withColumn("intelligence", lit(intelligence))
    return df

In [None]:
bidenDF = wavToDataFrame("AudioData/biden-human.wav", 0)
bidenToObamaAiDF = wavToDataFrame("AudioData/biden-to-obama-ai.wav", 1)
linusDF = wavToDataFrame("AudioData/linus-human.wav", 0)
linusToRyanAiDF = wavToDataFrame("AudioData/linus-to-ryan-ai.wav", 1)
linusToBidenAiDF = wavToDataFrame("AudioData/linus-to-biden-ai.wav", 1)
obamaDF = wavToDataFrame("AudioData/obama-human.wav", 0)
obamaToLinusAiDF= wavToDataFrame("AudioData/obama-to-linus-ai.wav", 1)
trumpDF = wavToDataFrame("AudioData/trump-human.wav", 0)
trumpToTaylorAiDF = wavToDataFrame("AudioData/trump-to-taylor-ai.wav", 1)
margotDF = wavToDataFrame("AudioData/margot-human.wav", 0)
margotToTrumpAiDF = wavToDataFrame("AudioData/margot-to-trump-ai.wav", 1)
taylorDF = wavToDataFrame("AudioData/taylor-human.wav", 0)
taylorToMargotAiDF = wavToDataFrame("AudioData/taylor-to-margot-ai.wav", 1)

combinedDf = bidenDf.union(bidenToObamaAiDF).union(linusDF).union(
    linusToRyanAiDF).union(linusToBidenAiDF).union(obamaDF).union(
    obamaToLinusAiDF).union(trumpDF).union(trumpToTaylorAiDF).union(
    margotDF).union(margotToTrumpAiDF).union(taylorDF).union(taylorToMargotAiDF)