In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.1'
#spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [40.1 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:10 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:11 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:13 https://developer.download.nv

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-11-05 17:57:16--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-11-05 17:57:18 (1.05 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Import depencies for MFCCs and loading sparkfile using librosa
import librosa
import numpy as np
import math
from pyspark import SparkFiles


In [None]:
# AWS postgresSQL setting
# Configure settings for RDS

#mode = "append"
jdbc_url="jdbc:postgresql://urbansounddb.cwkue3lwi5mx.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"root", 
          "password": "urbansoundDB", 
          "driver":"org.postgresql.Driver"}

Preprocessing audio data and build MFCCs vectors

In [None]:
# Define constants
SAMPLE_RATE = 22050
DURATION = 4 # measured in seconds
SAMPLES_PER_TRACK = SAMPLE_RATE * DURATION

# Define parameters for librosa mfcc function
#n_mfcc = 13
n_fft = 2048 # default number
hop_length = 512 # default number

# build_MFCC_vectors(fold_name):
#   This function, for given folder name, builds MFCC vectors from the audio files in the folder
#   taking average of the MFCCs values for each coefficient
#   fold_name: string
#   n_mfcc: number of MFCCs to return
#   return -- list of array=[ fileID, classID, MFCCs of average ] (list size = 2+n_mfcc)
def build_MFCC_features(fold_name, n_mfcc=13):

  #print("Start constructiong mfcc for " + fold_name)
  #num_samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)
  #expected_num_mfcc_vectors_per_segment =  math.ceil(num_samples_per_segment / hop_length)

  # Read metadata for the folder
  metadata_df = spark.read.jdbc(url=jdbc_url, table=f'{fold_name}_metadata', properties=config)

  # url S3 bucket: load audiofiles from S3 bucket
  fold_url_path=f"https://ejbigdatasets.s3.amazonaws.com/UrbanSound8K/audio/{fold_name}"

  # Iterate the metatable, get the filenames
  mrows = metadata_df.collect()

  mfcc_rows = []
  i = 0
  for frow in mrows:
    fn = frow.slice_file_name
    file_url = fold_url_path + "/" + fn

    # Read an audio file fn
    spark.sparkContext.addFile(file_url)

    # Load the signal samples from the audio file
    signal, sr = librosa.load(SparkFiles.get(fn), sr=SAMPLE_RATE)

    # Build mfcc vectors for each segment       
    mfcc = librosa.feature.mfcc(signal,
                                sr=sr,
                                n_mfcc=n_mfcc,
                                n_fft=n_fft,
                                hop_length=hop_length)

    #mfcc = mfcc.T # transpose
    mfcc_processed = np.mean(mfcc.T, axis=0)
    new_row = [int(frow.id), int(frow.classid)] + mfcc_processed.tolist()
    mfcc_rows.append(new_row)

    #print("mfcc shape", mfcc.shape)
    #print("size(mean of mfcc)", len(np.mean(mfcc,axis=0)))
    #print("size(mean of mfcc.T)", len(np.mean(mfcc.T,axis=0)))

    #i+=1
    #if i>5:
    #  break
  return mfcc_rows


In [None]:
NUMFOLDS = 10
n_mfcc = 40

# Define columns for MFCCs dataframe
columns = ['fileID', 'classID']
for j in range(n_mfcc):
  columns.append('mfcc_avg_'+str(j))
print(columns)

for i in range(2,NUMFOLDS+1):
  fold_name = f'fold{i}'

  print("Building MFCCs for ", fold_name)
  mfcc_vcs = build_MFCC_features(fold_name, n_mfcc=n_mfcc)

  # Create dataframe from mfcc vectors
  rdd = spark.sparkContext.parallelize(mfcc_vcs)
  mfcc_df = spark.createDataFrame(rdd, columns)

  # Write the dataframe to AWS RDS
  table_name = f'fold{i}_mfcc_avg_n_{n_mfcc}'
  print("Writing MFCCs for ", table_name)
  mfcc_df.write.jdbc(url=jdbc_url, table=table_name, mode='append', properties=config)

['fileID', 'classID', 'mfcc_avg_0', 'mfcc_avg_1', 'mfcc_avg_2', 'mfcc_avg_3', 'mfcc_avg_4', 'mfcc_avg_5', 'mfcc_avg_6', 'mfcc_avg_7', 'mfcc_avg_8', 'mfcc_avg_9', 'mfcc_avg_10', 'mfcc_avg_11', 'mfcc_avg_12', 'mfcc_avg_13', 'mfcc_avg_14', 'mfcc_avg_15', 'mfcc_avg_16', 'mfcc_avg_17', 'mfcc_avg_18', 'mfcc_avg_19', 'mfcc_avg_20', 'mfcc_avg_21', 'mfcc_avg_22', 'mfcc_avg_23', 'mfcc_avg_24', 'mfcc_avg_25', 'mfcc_avg_26', 'mfcc_avg_27', 'mfcc_avg_28', 'mfcc_avg_29', 'mfcc_avg_30', 'mfcc_avg_31', 'mfcc_avg_32', 'mfcc_avg_33', 'mfcc_avg_34', 'mfcc_avg_35', 'mfcc_avg_36', 'mfcc_avg_37', 'mfcc_avg_38', 'mfcc_avg_39']
Building MFCCs for  fold2
Writing MFCCs for  fold2_mfcc_avg_n_40
Building MFCCs for  fold3
Writing MFCCs for  fold3_mfcc_avg_n_40
Building MFCCs for  fold4
Writing MFCCs for  fold4_mfcc_avg_n_40
Building MFCCs for  fold5
Writing MFCCs for  fold5_mfcc_avg_n_40
Building MFCCs for  fold6
Writing MFCCs for  fold6_mfcc_avg_n_40
Building MFCCs for  fold7
Writing MFCCs for  fold7_mfcc_avg_n_4