# Cleaning and wrangling the flattened csv/parquet

The following code can be put into a loop for each chunk or done manualy if specific chunks are required. This portion of the report is largely the same as my assignment 2.

Here I only do this process for just a single chunk, to process many chunks I have put the entire code into a for loop but not here.

In [23]:
import pyspark.sql.functions as F
import os, atexit, sys, findspark, sparkhpc, pyspark, re
import math
import ast
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import concat, col, lit, array
pd.set_option('display.max_columns', 550)

In [95]:
# kill previous sparkcluster jobs just in case
try: sc.stop()
except: pass
try: sj.stop()
except: pass
! scancel -u `whoami` -n sparkcluster

INFO:sparkhpc.sparkjob:


In [3]:
findspark.init()

# specify your partition (unless you're OK with default)
#os.environ['SBATCH_PARTITION']='cpu2019' #run this on the arc cluster

sj = sparkhpc.sparkjob.sparkjob(
    ncores = 24,                       # total number or cores
    cores_per_executor = 8,            # parallelism of a single executor
    memory_per_core = 10240,           # memory per core in MB 
    walltime = "4:0"                   # hh:mm format
)

sj.wait_to_start()
sc = sj.start_spark()
scq = SQLContext(sc)

def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass
atexit.register(exitHandler,sj,sc);
#display(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 8036

INFO:sparkhpc.sparkjob:Submitted cluster 0


## Part 1 - load the CSV file into a spark DataFrame
Here I am not manually adjusting the schema as there are over 500 columns so I'll stick to inferring schema and that seems to work just fine.

In [4]:
#specify which chunk of data you want here
chunk = 3

In [5]:
%%time
df = scq.read.load('reportfiles/clean_{}.csv'.format(chunk), format='csv', header=True, multiLine = True, inferSchema=True)
df = df.drop("_c0")

CPU times: user 6.5 ms, sys: 1.1 ms, total: 7.6 ms
Wall time: 17.4 s


## Part 2 - save the DataFrame to a local Parquet file

In [6]:
%%time
WRITEPATH = "reportfiles/clean_{}.parquet".format(chunk)
overwrite = False
if os.path.exists(WRITEPATH):
    print("File exists, will overwrite if overwrite is set to True")
    if overwrite:
        print("overwriting")
        df.write.parquet(WRITEPATH, mode="overwrite")
    else:
        print("overwrite set to False, moving on")
else:
    print("writing file")
    df.write.parquet(WRITEPATH, mode="overwrite")

File exists, will overwrite if overwrite is set to True
overwrite set to False, moving on
CPU times: user 1.33 ms, sys: 72 µs, total: 1.4 ms
Wall time: 6.45 ms


## Part 3 - load the DataFrame from Parquet file

In [76]:
%%time
df = scq.read.load(WRITEPATH, format='parquet', header=True, inferSchema=True)
df = df.cache()

CPU times: user 3.65 ms, sys: 297 µs, total: 3.95 ms
Wall time: 294 ms


## Part 4 - perform cleaning/wrangling steps
In the follwing steps I want to convert the following columns into array types so I can use them later. I have defined functions that will allow me to do this given the column names. The functions will convert columns that have string representations of a list into an actual array type so I can work on this later. I will then combine some of the columns into a 2D matrix as I will be using that for Deep learning in my project.

**Note that I cannot use VectorAssembler on lists**

First in order to not have any issues I will first drop any rows with any NAs. I will not be imputing or doing anything else other than dropping because I have so much data to begin with and imputing for these features properly may require some knowledge of spectral data.

### Dropping columns that I don't need
Some of these data chunks have columns that possess extra columns due to the fact that the spectral data was extracted through an older version of Essentia (the spectral data extractor library). So I need to remove all extra columns if there are any.

Below are a list of columns that are from clean_0.csv which will serve as the base or default columns. If a chunk has more columns than what is specified then the extra columns will be removed.

In [77]:
default_columns = []
with open('reportfiles/default_columns.txt', 'r') as filehandle:
    for line in filehandle:
        de_col = line[:-1]
        default_columns.append(de_col)

In [78]:
list(set(df.columns) - set(default_columns))

['_c0']

In [79]:
df = df.drop("_c0")
if default_columns != df.columns:
    print("there is an issue")
    extracols = list(set(df.columns) - set(default_columns))
    litcols = ['`'+ ext + '`' for ext in extracols]
    #get the data that doesn't have data in that extra cols
    #my reasoning is that the older spectral data may also have some flaws even if they have
    #information in the columns that I want
    df = df.filter(df[litcols[0]].isNull())
    df = df.drop(*extracols)
print("are columns the same? ", default_columns == df.columns)

are columns the same?  True


#### Dropping rows with NAs
I have lots of data so I will just dump the rows with even a single NA

In [80]:
NA_drop_columns = ["`"+i+"`" for i in df.columns]
df = df.na.drop(how='any', subset=NA_drop_columns)

#### converting string-list to list

In [81]:
#I feed this function a string representation of a list and it will convert it to a list
@F.udf(returnType=ArrayType(FloatType()))
def to_alist(val):
    if type(val) != str:
        return val
    temp = ast.literal_eval(val)
    return [float(i) for i in temp]

#These columns gfcc.cov, gfcc.icov, mfcc.cov, and mfcc.icov are matrices so
#i have to convert them into a list of lists
@F.udf(returnType=ArrayType(ArrayType(FloatType())))
def lsofls(val):
    if type(val) != str:
        return val
    temp = ast.literal_eval(val)
    return [list(map(float, i)) for i in temp]

Now lets get all the relevant columns that contian lists. I will be using regex to find all the columns that I want. First, let's deal with converting string of a list into a simple list and then in the next step I will look at the matrices

In [82]:
#These are the columns that are in a string list format
cols = ['thpcp', 'chords_histogram', 'gfcc.mean', 'mfcc.mean']

#These are columns containing the following name that are lists but I have to find all of them
#for example there are columns named barkbands.dmean, erbbands.var, etc and I can't list them all by hand
colgroup = ['barkbands', 'erbbands', 'melbands', 'spectral_contrast_coeffs', 
             'spectral_contrast_valleys', 'beats_loudness_band_ratio', 'hpcp']

Now lets find all the columns

In [83]:
col_list = []
for i in colgroup:
    pattern = re.compile("{}\.".format(i))
    mercol = [col for col in df.columns if re.search(pattern, col)]
    col_list = np.concatenate([col_list, mercol])

#merge other columns
col_list = np.concatenate([col_list, cols])
#let's see if I got all the columns that I need to convert
col_list

array(['barkbands.dmean', 'barkbands.dmean2', 'barkbands.dvar',
       'barkbands.dvar2', 'barkbands.max', 'barkbands.mean',
       'barkbands.median', 'barkbands.min', 'barkbands.var',
       'erbbands.dmean', 'erbbands.dmean2', 'erbbands.dvar',
       'erbbands.dvar2', 'erbbands.max', 'erbbands.mean',
       'erbbands.median', 'erbbands.min', 'erbbands.var',
       'melbands.dmean', 'melbands.dmean2', 'melbands.dvar',
       'melbands.dvar2', 'melbands.max', 'melbands.mean',
       'melbands.median', 'melbands.min', 'melbands.var',
       'spectral_contrast_coeffs.dmean',
       'spectral_contrast_coeffs.dmean2', 'spectral_contrast_coeffs.dvar',
       'spectral_contrast_coeffs.dvar2', 'spectral_contrast_coeffs.max',
       'spectral_contrast_coeffs.mean', 'spectral_contrast_coeffs.median',
       'spectral_contrast_coeffs.min', 'spectral_contrast_coeffs.var',
       'spectral_contrast_valleys.dmean',
       'spectral_contrast_valleys.dmean2',
       'spectral_contrast_valleys.dvar',

Now let's convert the columns that have the string representation of a list into an actual list by iterating through the relevant columns

In [84]:
for c in col_list:
        df = df.withColumn(c, to_alist("`"+c+"`"))

In [85]:
#now I have converted the relevant columns into list representations

#uncomment the line below to see
#df.printSchema()

#An Example:
print(df.select('`chords_histogram`').collect()[10][0])

[33.90714645385742, 6.624934673309326, 9.73743724822998, 17.249174118041992, 0.2956007719039917, 1.5127804279327393, 0.0, 10.763345718383789, 1.147626519203186, 7.9986090660095215, 0.3129890561103821, 0.834637463092804, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.772909164428711, 3.8428099155426025]


#### converting string list of list to 2D matrix

In [86]:
#These are 2D matrices that need to be converted
cols2 = ['gfcc.cov', 'gfcc.icov', 'mfcc.cov', 'mfcc.icov']

In [87]:
for c in cols2:
        df = df.withColumn(c, lsofls("`"+c+"`"))

In [88]:
#now I have converted the relevant columns into 2D matrix representations

#uncomment the line below to see
#df.printSchema()

#An Example:
print(df.select('`mfcc.cov`').collect()[2][0])

[[9792.9248046875, 47.185794830322266, 683.4351196289062, 126.49763488769531, 282.6947937011719, 352.2110290527344, 297.0572814941406, 261.89593505859375, 155.08909606933594, 76.77680206298828, -16.304019927978516, 7.2960710525512695, -86.40018463134766], [47.185794830322266, 1505.6640625, -164.31114196777344, 74.41741943359375, 68.31846618652344, 43.0838737487793, 38.4344482421875, -95.55068969726562, -67.0043716430664, -80.81790161132812, -57.170387268066406, -22.215173721313477, 14.488946914672852], [683.4351196289062, -164.31114196777344, 505.932861328125, 133.6712646484375, 110.04557037353516, 136.54856872558594, 107.27102661132812, 117.1864242553711, 72.07858276367188, 29.474576950073242, 22.955862045288086, 16.355819702148438, 0.5319907069206238], [126.49763488769531, 74.41741943359375, 133.6712646484375, 279.4884338378906, 100.76358795166016, 62.555877685546875, 22.998754501342773, 31.22076416015625, 12.424187660217285, 1.4747402667999268, -17.57172966003418, -3.837735414505005

### Part 5 - Joining with the Target Data

we should merge this feature set with the targets that wwe made in Final Report part 1

In [89]:
genre = scq.read.load('reportfiles/target_genre.parquet', format='parquet', header=True, inferSchema=True)

In [90]:
df = df.join(genre, df.index==genre.gid, 'inner')
df = df.drop(df.__index_level_0__).drop(df.index)
df.printSchema()

root
 |-- average_loudness: double (nullable = true)
 |-- dynamic_complexity: double (nullable = true)
 |-- barkbands.dmean: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.dmean2: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.dvar: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.dvar2: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.max: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.mean: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.median: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.min: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands.var: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- barkbands_crest.dmean: double (nullable = true)
 |-- barkband

## Part 6 - Merging of all similar columns to a 2D matrix
Things like "barkbands", "erbbands", "melbands" should all be clustered into a 2D matrix for easier processing when doing Deep Learning. Merging all these columns makes it easy to work with in a DL application.

In [91]:
#I already specified colgroup eariler
dropcol = []
for k in colgroup:
    pattern = re.compile("{}\.".format(k))
    mercol = [col for col in default_columns if re.search(pattern, col)]
    litcol = ["`"+i+"`" for i in mercol]
    df = df.withColumn(k, array(litcol))
    dropcol = np.concatenate([dropcol, mercol])
df = df.drop(*dropcol)

In [93]:
#now I have converted the relevant columns into 2D matrix representations and dropped off the 1D columns

#uncomment the line below to see
#df.printSchema()

#An Example:
print(df.select('`barkbands`').collect()[2][0])

[[0.020926741883158684, 0.025013452395796776, 0.004412636626511812, 0.0034730236511677504, 0.004636225290596485, 0.0009782052366062999, 0.0007385953795164824, 0.000714368827175349, 0.0005219386075623333, 0.0002858470834325999, 0.00020057670189999044, 0.00021363867563195527, 0.00022517704928759485, 0.0001795208518160507, 0.000140201227623038, 0.00010960613144561648, 0.00014570560597348958, 0.00013620434037875384, 0.00012751136091537774, 0.00010248825128655881, 0.00010937304614344612, 5.655180211761035e-05, 2.9631730285473168e-05, 2.1515659682336263e-05, 0.00039906371966935694, 3.9579386793775484e-05, 9.190034325001761e-05], [0.0343354195356369, 0.04112480580806732, 0.007527674548327923, 0.005851578898727894, 0.007603638805449009, 0.0015732392203062773, 0.001135632861405611, 0.001154429162852466, 0.0007844818173907697, 0.0004330945375841111, 0.0002948124019894749, 0.0003362208080943674, 0.00032927613938227296, 0.0002682297199498862, 0.0002117143594659865, 0.00016051370766945183, 0.000217

### Export
Below is the final dataset which I will feed into my deep learning model

In [94]:
%%time
df.write.parquet("reportfiles/chunk_{}.parquet".format(chunk), mode="overwrite")

CPU times: user 27.9 ms, sys: 7.57 ms, total: 35.5 ms
Wall time: 1min 26s
