### Folders for storing files
* Using relative path
* optimized is already in raw folder

In [16]:
RAW = '.'
LANDED = '../landed'

### Imports

In [17]:
import os
import librosa
import numpy as np
from pydub import AudioSegment
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, StringType

#### Build and store Spark session

In [18]:
from sagemaker_pyspark import classpath_jars
classpath = ":".join(classpath_jars())

from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("MUSIC SPARK")
builder.config(
    "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
# Unecessary setting to the default value
#builder.config("spark.speculation", "false")
builder.config("spark.sql.parquet.compression.codec", "gzip")
# TODO: What means debut.maxToStringFields?
builder.config("spark.debug.maxToStringFields", "100")
builder.config("spark.driver.extraClassPath", classpath)
# Unecessary setting the default value 
#builder.config("spark.driver.memory", "1g")
#builder.config("spark.driver.cores", "1")
builder.config("spark.executor-memory", "20g")
builder.config("spark.executor.cores", "4")


builder.master("local[*]")

spark = builder.getOrCreate()
spark

## Transform
* MP3 to WAV format
* Save in current folder (RAW file)

In [19]:
def audio_to_wav(file):
    dst = file.replace(LANDED,RAW)
    dst = dst.replace('.mp3',".wav")
    sound = AudioSegment.from_mp3(file)
    sound.export(dst, format="wav")
    return dst

## Extract (Features)

In [20]:
def extract_important_feature_music(file):
    
    songname = file.split('/')[0::-1][0]f
    y, sr = librosa.load(file, mono=True, duration=30)
    chroma_stft = np.mean(librosa.feature.chroma_stft(y=y, sr=sr))
    spec_cent = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))
    spec_bw = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr))
    rolloff = np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr))
    zcr = np.mean(librosa.feature.zero_crossing_rate(y))
    
    chroma_stft = np.array2string(chroma_stft, precision=4, separator=',',suppress_small=True)
    spec_cent = np.array2string(spec_cent, precision=4, separator=',',suppress_small=True)
    spec_bw = np.array2string(spec_bw, precision=4, separator=',',suppress_small=True)
    rolloff = np.array2string(rolloff, precision=4, separator=',',suppress_small=True)
    zcr = np.array2string(zcr, precision=4, separator=',',suppress_small=True)
    
    
    to_append = f'{songname};{chroma_stft};{spec_cent};{spec_bw};{rolloff};{zcr}'    
    return to_append

### Get all data from landed

In [21]:
all_music = [
    f'{LANDED}/{file}' for file in os.listdir(LANDED) if '.mp3' in file
]

In [22]:
all_music

['../landed/NLE Choppa - Shotta Flow.mp3',
 '../landed/nymano - jazz and rain.mp3',
 '../landed/NDBeatz - Often-Waves (Sickick Chill).mp3',
 '../landed/invention_ - ｍｏｒｆｏｓｉｓ.mp3',
 '../landed/ZZ - ICY (feat. Thorii).mp3',
 '../landed/Lil Tjay - Lil Tjay - Brothers (Prod by JDONTHATRACK and Protegebeatz).mp3',
 '../landed/HIGH ON MUSIC - Danrell x Småland - Hostage.mp3',
 '../landed/Flipp Dinero - Leave Me Alone (Prod. by Young Forever x Cast Beats).mp3',
 '../landed/Young Nero - Beyond (Prod. Scott Storch).mp3',
 '../landed/southernwade - U Feel Like (Prod. by Wade and Drty).mp3',
 '../landed/Cardi B - Money.mp3',
 '../landed/SimpkinsTwins - LAmbO DrEams.mp3',
 '../landed/90sFlav - Ｃａｌｌ ｍｅ.mp3',
 '../landed/Stryv - ONEDUO - Illusion (Stryv Remix).mp3',
 '../landed/SyrebralVibes - The Eden Project - Circles.mp3',
 '../landed/Megan Thee Stallion - Cash Shit feat. DaBaby.mp3',
 '../landed/Kodak Black - ZEZE (feat. Travis Scott and Offset).mp3',
 '../landed/Stryv - Ed Sheeran - Thinking Ou

### RDD CSV parse of important feature

In [27]:
pipe_rdd_csv = spark\
        .sparkContext\
        .parallelize(all_music)\
        .map(audio_to_wav)\
        .map(extract_important_feature_music)

In [24]:
pipe_rdd_csv

PythonRDD[3] at RDD at PythonRDD.scala:52

### Create Schema for music features

In [25]:
schema = StructType([StructField('file_name', StringType(), True),
                     StructField('chroma', StringType(), True),
                     StructField('spec_cent', StringType(), True),
                     StructField('spec_bw', StringType(), True),
                     StructField('rolloff', StringType(), True),
                     StructField('zcr', StringType(), True)])

In [26]:
pipe_rdd_csv = pipe_rdd_csv.map(lambda x : x.split(";"))
rdd = spark.createDataFrame(pipe_rdd_csv,schema)
rdd.show(2)

+---------+------+---------+---------+---------+------+
|file_name|chroma|spec_cent|  spec_bw|  rolloff|   zcr|
+---------+------+---------+---------+---------+------+
|        .|0.3913|2232.2017|2086.5985|4074.3025|0.1139|
|        .|0.3538|1545.9148|1932.9944|3007.3651|0.0632|
+---------+------+---------+---------+---------+------+
only showing top 2 rows



In [14]:
rdd.write.csv(f'{RAW}/data',sep=';',mode='overwrite')