# Import Libraries

In [None]:
from librosa.core import load as ld_wav
from librosa.feature import delta
import librosa.feature as ft_extraction
import scipy.io.wavfile as wav
import numpy as np
import os
from google.colab import drive
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix,make_scorer,f1_score
from sklearn.model_selection import train_test_split,StratifiedKFold,cross_validate
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.kernel_approximation import Nystroem
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn import preprocessing
from sklearn.metrics.pairwise import euclidean_distances
from sklearn import metrics
from sklearn.pipeline import Pipeline
from sklearn.utils.validation import check_is_fitted
!pip install scikit-optimize
from skopt import BayesSearchCV
from skopt.space import Real, Categorical, Integer
import seaborn as sns
import pandas as pd
import librosa
from operator import itemgetter
import pickle
%tensorflow_version 2.x
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from tensorflow.keras.metrics import AUC,Precision,Recall,BinaryCrossentropy,Accuracy
drive.mount('/content/drive')

## Install pyspark and systemml

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
 
# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')
!pip install elephas

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()
import time


# Data Extraction

In [None]:
!ls "/content/drive/My Drive/IRMAS-training"

In [None]:
# directory where we your .wav files are
directoryName = "/content/drive/My Drive/IRMAS-training" # put your own directory here
#instruments to evaluate
instruments = ["pia","vio"]
# directory to put our results in, you can change the name if you like
resultsDirectory = directoryName + "/MFCCresults"


In [None]:
def countTrainTracks(input_path, labels):
		""" Counts the number of tracks in the folders of the trainset
		"""
		total = 0
		for l, label in enumerate(labels):
			instrument_dir = os.path.join(input_path, label)
			total += len(os.listdir(instrument_dir))

		return total

In [None]:
if not os.path.exists(resultsDirectory):
    os.makedirs(resultsDirectory)

In [None]:
total_tracks = countTrainTracks(directoryName,instruments)
print("Total tracks: ",total_tracks)

In [None]:
data_dict = dict()
data_dict["rolloff"] = list()
data_dict["bandwidth"] = list()
data_dict["centroids"] = list()
data_dict["zero_crossing_rate"] = list()
data_dict["rms"] = list()
data_dict["slope"] = list()
data_dict["kurtosis"] = list()
data_dict["skewness"] = list()
for i in range(0,13):
    data_dict["mfcc"+str(i)] = list()
data_dict["instrument"] = list()

In [None]:
#Essa funcao recebe cada coluna mfcc[j], que é um vetor onde cada elemento vetor[i]
#contem todos os K coeficientes mfcc[j] de um audio i. O objetivo é descobrir como os MFCC[j] mudam ao longo do tempo.
#Obs: 
#j (quantidade de coeficientes MFCC) = {0,13}, 
#k (quantidade de uma mesma caracteristica extraída do audio completo - depende do tamanho de janela e do tempo de duração) = {0,130}, 
#i (numero de arquivos do dataset) = 1301 
def getDeltaFeat(column):
    #Quantos vetores serão lidos, ou seja, o tamanho do dataset 
    original_len = len(column)
    deltas = list()
    for i in range(original_len):
      deltas.append(delta(column[i], order=1))
    return np.array(deltas)

In [None]:
def includeDeltaFeat(df):
    for i in range(0,13):
        df["delta_mfcc"+str(i)]=getDeltaFeat(df["mfcc"+str(i)])
    return df

In [None]:
def FeatureSpectralCentroid(X, f_s):

    isSpectrum = X.ndim == 1

    # X = X**2 removed for consistency with book

    norm = X.sum(axis=0, keepdims=True)
    norm[norm == 0] = 1

    vsc = np.dot(np.arange(0, X.shape[0]), X) / norm

    # convert from index to Hz
    vsc = vsc / (X.shape[0] - 1) * f_s / 2

    # if input is a spectrum, output scaler else if spectrogram, output 1d array
    vsc = np.squeeze(vsc) if isSpectrum else np.squeeze(vsc, axis=0)

    return vsc

In [None]:
def FeatureSpectralSpread(X, f_s):

    isSpectrum = X.ndim == 1
    if isSpectrum:
        X = np.expand_dims(X, axis=1)

    # get spectral centroid as index
    vsc = FeatureSpectralCentroid(X, f_s) * 2 / f_s * (X.shape[0] - 1)

    # X = X**2 removed for consistency with book

    norm = X.sum(axis=0)
    norm[norm == 0] = 1

    # compute spread
    vss = np.zeros(X.shape[1])
    indices = np.arange(0, X.shape[0])
    for n in range(0, X.shape[1]):
        vss[n] = np.dot((indices - vsc[n])**2, X[:, n]) / norm[n]

    vss = np.sqrt(vss)

    # convert from index to Hz
    vss = vss / (X.shape[0] - 1) * f_s / 2

    return np.squeeze(vss) if isSpectrum else vss

In [None]:
def FeatureSpectralKurtosis(X, f_s, UseBookDefinition=False):

    isSpectrum = X.ndim == 1
    if isSpectrum:
        X = np.expand_dims(X, axis=1)

    if UseBookDefinition:  # not recommended
        # compute mean and standard deviation
        mu_x = np.mean(X, axis=0, keepdims=True)
        std_x = np.std(X, axis=0)

        # remove mean
        X = X - mu_x

        # compute kurtosis
        vsk = np.sum(X**4, axis=0) / (std_x**4 * X.shape[0])
    else:
        f = np.arange(0, X.shape[0]) / (X.shape[0] - 1) * f_s / 2
        # get spectral centroid and spread (mean and std of dist)
        vsc = FeatureSpectralCentroid(X, f_s)  # *2/f_s * (X.shape[0]-1)
        vss = FeatureSpectralSpread(X, f_s)    # *2/f_s * (X.shape[0]-1)

        norm = X.sum(axis=0)
        norm[norm == 0] = 1
        vss[vss == 0] = 1

        # compute kurtosis
        vsk = np.zeros(X.shape[1])
        for n in range(0, X.shape[1]):
            vsk[n] = np.dot((f - vsc[n])**4, X[:, n]) / (vss[n]**4 * norm[n] * X.shape[0])

    return np.squeeze(vsk - 3) if isSpectrum else (vsk - 3)

In [None]:
def FeatureSpectralSkewness(X, f_s, UseBookDefinition=False):

    isSpectrum = X.ndim == 1
    if isSpectrum:
        X = np.expand_dims(X, axis=1)

    if UseBookDefinition:  # not recommended
        # compute mean and standard deviation
        mu_x = np.mean(X, axis=0, keepdims=True)
        std_x = np.std(X, axis=0)

        # remove mean
        X = X - mu_x

        # compute kurtosis
        vssk = np.sum(X**3, axis=0) / (std_x**3 * X.shape[0])
    else:
        f = np.arange(0, X.shape[0]) / (X.shape[0] - 1) * f_s / 2
        # get spectral centroid and spread (mean and std of dist)
        vsc = FeatureSpectralCentroid(X, f_s) 
        vss = FeatureSpectralSpread(X, f_s)   

        norm = X.sum(axis=0)
        norm[norm == 0] = 1
        vss[vss == 0] = 1

        # compute spread
        vssk = np.zeros(X.shape[1])
        for n in range(0, X.shape[1]):
            vssk[n] = np.dot((f - vsc[n])**3, X[:, n]) / (vss[n]**3 * norm[n] * X.shape[0])

    return np.squeeze(vssk) if isSpectrum else vssk

In [None]:
def FeatureSpectralSlope(X, f_s):

    # compute mean
    mu_x = X.mean(axis=0, keepdims=True)

    # compute index vector
    kmu = np.arange(0, X.shape[0]) - X.shape[0] / 2

    # compute slope
    X = X - mu_x
    vssl = np.dot(kmu, X) / np.dot(kmu, kmu)

    return vssl

In [None]:
(sig,rate) = ld_wav(directoryName +"/"+'pia'+"/" +'001__[pia][nod][cla]1389__1.wav')
spectogram = np.abs(librosa.stft(sig,n_fft=512))
feat = FeatureSpectralSlope(spectogram,rate)
print(np.array(feat).reshape(1,-1).shape)

In [None]:
mfcc_feat = ft_extraction.mfcc(S=spectogram,sr=rate,n_mfcc=13)
mfcc_feat.shape

In [None]:
ft_extraction.zero_crossing_rate(y=sig,frame_length=512,hop_length=512//4).shape

In [None]:
len(sig)

In [None]:
def zeroPadding(sig,wav_len = 66150):
  left_zeros = np.zeros(wav_len - sig.shape[0])
  return np.hstack([left_zeros,sig])

In [None]:
def getData(directoryName,instruments,data_dict):
    instrument_index = 0
    files_read = 0
    for instrument in instruments:
        for filename in os.listdir(directoryName+"/"+instrument):
            if filename.endswith('.wav'): # only get MFCCs from .wavs
                # read in our file
                (sig,rate) = ld_wav(directoryName +"/"+instrument+"/" +filename)
                sig = zeroPadding(sig)
                # get mfcc
                spectogram = np.abs(librosa.stft(sig,n_fft=512))
                mfcc_feat = ft_extraction.mfcc(S=spectogram,sr=rate,n_mfcc=13)
                rolloff_feat = ft_extraction.spectral_rolloff(S=spectogram,sr=rate)
                bandwidth_feat = ft_extraction.spectral_bandwidth(S=spectogram,sr=rate,)
                centroid_feat = ft_extraction.spectral_centroid(S=spectogram,sr=rate)
                zero_crossing_rate_feat = ft_extraction.zero_crossing_rate(y=sig,frame_length=512,hop_length=512//4)
                rms_feat = ft_extraction.rms(S=spectogram)
                slope_feat = FeatureSpectralSlope(spectogram,rate)
                slope_feat = np.array(slope_feat).reshape(1,-1)
                kurtosis_feat = FeatureSpectralKurtosis(spectogram,rate)
                kurtosis_feat = np.array(kurtosis_feat).reshape(1,-1)
                skewness_feat = FeatureSpectralSkewness(spectogram,rate)
                skewness_feat = np.array(skewness_feat).reshape(1,-1)
                data_dict["instrument"].append([instrument_index])
                for i in range(0,13):
                  data_dict["mfcc"+str(i)].append(mfcc_feat[i])
                data_dict["rolloff"].append(rolloff_feat[0])
                data_dict["bandwidth"].append(bandwidth_feat[0])
                data_dict["centroids"].append(centroid_feat[0])
                data_dict["zero_crossing_rate"].append(zero_crossing_rate_feat[0])
                data_dict["rms"].append(rms_feat[0])
                data_dict["slope"].append(slope_feat[0])
                data_dict["kurtosis"].append(kurtosis_feat[0])
                data_dict["skewness"].append(skewness_feat[0])
                # create a file to save our results in
                files_read += 1
        instrument_index += 1
    data_dict = includeDeltaFeat(data_dict)
    return data_dict                                                                                           

In [None]:
"""data_dict = getData(directoryName,instruments,data_dict)
with open(directoryName+"/data_dict_procVoz.pickle","wb") as f:
  pickle.dump(data_dict,f)"""

In [None]:
with open(directoryName+"/data_dict_procVoz.pickle","rb") as f:
  data_dict = pickle.load(f)

In [None]:
len(data_dict["bandwidth"][0])

In [None]:
np.count_nonzero(~np.isnan(data_dict["mfcc0"]))

In [None]:
for key in data_dict.keys():
    print(key,len(data_dict[key]))
    #convert list to np
    data_dict[key] = np.array(data_dict[key])

In [None]:
data_dict["rolloff"].shape[0]

In [None]:
#Para o datafram não aceita matriz como input. Como cada arquivo de audio do data se obtem uma matriz de 
#tamanho (tam,n_features) multiplicaremos cada elemento do vetor de saída por esse tamanho. Assim, para um dado audio e para cada linha
#da matriz de dados teremos um valor de saída corresponde. A representação com matriz será util quando utilizarmos redes neurais
#O metodo flatten() transforma 2d arrays em 1d
data_for_df = dict()
data_for_df['instrument'] = list()
for key in data_dict.keys():
  if key != 'instrument':
    data_for_df[key] = data_dict[key].flatten()
    print("Feat {} has len {}".format(key,len(data_for_df[key])))
  else:
    for i in range(len(data_dict['instrument'])):
      data_for_df['instrument'].extend([data_dict['instrument'][i][0]] * len(data_dict['bandwidth'][i]))
df = pd.DataFrame.from_dict(data_for_df)

In [None]:
len(data_dict[key].flatten())

In [None]:
517*1301

(1301x130)x31

In [None]:
input_variables = list(df.columns)
input_variables.remove("instrument")
output_variable = "instrument"
print(input_variables)

# Preprocessing

In [None]:
corrmat = df.corr()
f, ax = plt.subplots(figsize=(12, 9))
sns.heatmap(corrmat,cmap=sns.color_palette("RdBu_r", 1000), vmin=-1,vmax=1, square=True)
plt.savefig('CorrelationMatrix.png')
plt.show()

In [None]:
np.unique(df['instrument'])

In [None]:
df.shape

In [None]:
le = preprocessing.LabelEncoder()
le.fit(instruments)
print(le.inverse_transform([0,1]))

In [None]:
from pyspark.ml.feature import PCA,MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df[input_variables], df[output_variable], test_size=0.33, random_state=42)
#Transform to Spark DataFrame
df_train = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), np.column_stack((y_train,X_train)))
df_train = spark.createDataFrame(df_train,schema=["label", "features"])
df_test = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), np.column_stack((y_test,X_test)))
df_test = spark.createDataFrame(df_test,schema=["label", "features"])

In [None]:
#Apply Min-Max Scaler
mmScaler = MinMaxScaler(inputCol="features", outputCol="scaled")
#Apply PCA
pca = PCA(k=10, inputCol=mmScaler.getOutputCol(), outputCol="pca_features")
#Set Classifier
rf = RandomForestClassifier(labelCol="label", featuresCol=pca.getOutputCol(), numTrees=10)
pipeline = Pipeline(stages=[mmScaler, pca, rf])

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Random Forest

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",metricName="areaUnderPR")
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [8, 16, 32]) \
    .addGrid(rf.maxDepth, [2, 4, 8]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5) 

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df_train)

In [None]:
prediction = cvModel.transform(df_test)
evaluator.evaluate(prediction)

# Neural Networks 

In [None]:
#Convert DICT to np.array by concatenating columns
X = np.array([data_dict[key] for key in input_variables])
Y = data_dict['instrument'].flatten()
# We have a sequence of feature vectors of size n_features 
# Agora, para uma determina sequencia de caracteristicas extraidas efetuaremos a classificação. Logo teremos agora uma matrix:
# (tam_seq,n_features). Consideraremos cada linha dessa matriz sendo o tempo
#X.shape == (31,1301,130)
#Mas queremos que X.shape = (1301,130,31)
X = np.array([A.flatten() for A in np.transpose(X,(1,2,0))])
X_train, X_test, y_train, y_test = train_test_split(X,Y,
                                                    test_size=0.33,
                                                    random_state=42)

In [None]:
df_train = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), np.column_stack((y_train,X_train)))
df_train = spark.createDataFrame(df_train,schema=["label", "features"])
df_test = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), np.column_stack((y_test,X_test)))
df_test = spark.createDataFrame(df_test,schema=["label", "features"])

In [None]:
from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd
from elephas.ml_model import ElephasEstimator
from tensorflow.keras import optimizers

In [None]:
#rdd = to_simple_rdd(sc, X_train, y_train)
opt = optimizers.Adam(lr=0.01)

In [None]:
def get_CNN_Model(input_shape):
    model = Sequential()
    model.add(layers.Reshape((517,33,1), input_shape=(input_shape,) ) )
    assert model.output_shape == (None, 517, 33, 1)
    model.add(layers.Conv2D(32,(3,3),activation='relu',padding="same",strides=(1,1)))
    model.add(layers.MaxPooling2D((2,2),padding='same'))
    model.add(layers.Conv2D(64,(3,3),activation='relu',padding="same",strides=(1,1)))
    model.add(layers.MaxPooling2D((2,2),padding='same'))
    model.add(layers.Conv2D(64,(3,3),activation='relu',padding="same",strides=(1,1)))
    model.add(layers.MaxPooling2D((2,2),padding='same'))
    model.add(layers.Flatten())
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(64,activation='relu'))
    model.add(layers.Dense(2,activation='softmax'))
    #model.summary()
    #model.compile(optimizer=opt,loss='binary_crossentropy')
    return model

In [None]:
X_train.shape[1]

In [None]:
inp = X_train.shape[1]
#inp.append(1)
model = get_CNN_Model(inp)
model.compile(optimizer=opt,loss='categorical_crossentropy')
opt_conf = optimizers.serialize(opt)
# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")             # These two come directly from pyspark,
estimator.setLabelCol("label")                 # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_yaml())       # Provide serialized Keras model
estimator.set_categorical_labels(True)
estimator.set_nb_classes(2)
estimator.set_num_workers(1)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(20) 
estimator.set_batch_size(32)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

In [None]:
fitted = estimator.fit(df_train)

In [None]:
prediction = fitted.transform(df_test) # Evaluate on train data.
pnl = prediction.select("label", "prediction")
pnl.show(10)

In [None]:
prediction.show()

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
evaluator.evaluate(prediction,{evaluator.metricName: "areaUnderPR"})