<a href="https://colab.research.google.com/github/AriadnaOR97/TFG-Automatic-Assessment/blob/master/FE_DTW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# INTRODUCTION

---

This first part imports and installs all necessary libraries.

NOTE: The proces od DTW takes 4419 cells/s, so it comparing 81*69 -> it takes 81*69/4419 s ~= 1,85 s

In [0]:
# Basic tools
import numpy as np
import time
import matplotlib.pyplot as plt
import sys
from numpy import argmax, mean, diff, log
from matplotlib.mlab import find

# Feature extraction
import librosa 
!pip install soundfile
import soundfile as sf

# Data adquisition
import pandas as pd

# Learning
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, confusion_matrix  

print("\nsoundfile installed. Necessary libraries imported.")

In [0]:
#Load the Drive helper and mount
from google.colab import drive

#This will prompt the authentification
drive.mount('/content/drive')

# To acces to the directory
!ls '/content/drive/My Drive/TFG - Ariadna'
myDrive = '/content/drive/My Drive/TFG - Ariadna'

# DTW

---

Here is initialized de Dynamic Time Warping Distance by 1-D and N-D. For mor detailed information click [here](https://en.wikipedia.org/wiki/Dynamic_time_warping)


In [0]:
def dtwDistance1D (array_1, array_2, detail_value):

    # initialize all cost to infinity in DTW matrix NxMx1
    DTW = np.zeros([len(array_1), len(array_2)]) + 1000

    # set first element to 0.
    DTW[0][0] = 0

    for i in np.arange(1, len(array_1)):
        for j in np.arange(1, len(array_2)):
            cost = np.abs(array_1[i] - array_2[j])  # AQUÍ COMPTAR PESOS A 1
            DTW[i][j] = cost + np.amin([DTW[i - 1][j], DTW[i][j - 1], DTW[i - 1][j - 1]])
    if detail_value:  # if detail value is true, return all matrix
        return DTW
    else:
        return DTW[len(array_1) - 1][len(array_2) - 1]

      
def dtwDistanceND(matrix_1, matrix_2, num_features, info):
    start = time.process_time()
    if info: print("Executing DTW...")
    # initialize all cost to infinity in DTW matrix NxMxF
    DTW = np.zeros([num_features, len(matrix_1[0]), len(matrix_2[0])]) + 1000

    # initialize cost array, considered as Indpendent
    feature_cost = np.zeros(num_features)

    for f in np.arange(0, num_features):
        cost = dtwDistance1D(matrix_1[f], matrix_2[f], False)

        # save all the costs
        feature_cost[f] = float(cost)
        #feature_cost[f] = DTW[f][len(matrix_1[0]) - 1][len(matrix_2[0]) - 1]

    elapsed = time.process_time() - start
    if info: print("Execution time: " + str(elapsed) + " s\n")
    return feature_cost

In [0]:
"""print('---------------------- TEST 1 ----------------------')
print('Testing multi-dimensional DTW Independent')
m1 = [[1, 2, 3, 4, 5], [1, 2, 3, 5, 5]]
m2 = [[1, 9, 8, 3, 4, 4, 5], [1, 2, 2, 3, 4, 5, 5]]
num_features = 2

m1_m2 = dtwDistanceND(m1, m2, num_features, info=True)
print('Distance between m1 and m2: ' + str(m1_m2))"""

# Median Filter
---
Median filter is used to filter the noise and also reduce the data in feature extraction process.

In [0]:
def median(x):
    x = np.abs(x)
    x.sort()
    median = x[int(np.floor(len(x)/2)-1)]
    
    return median

  
def medianFilter(x, hop_size):
    if hop_size == 1:
        return x
      
    new_x = []
    i = 0
    while i < len(x):
        idx_end = i+hop_size
        if idx_end < len(x):
            new_x.append(median(x[i:idx_end -1]))
        elif idx_end > len(x):
            new_x.append(median(x[i:idx_end -1]))
        i = i+hop_size
        
    return new_x
  
print("Median filter is initialized")

In [0]:
"""print('---------------------- TEST 2 ----------------------')
x = [1,2,3,2,3,1,2,3,4,5,6,7,6,7,5,6,7]
plt.plot(x)
plt.plot(medianFilter(x,3))
print(medianFilter(x,3))"""

# Pitch Detecion
---
First feature extracted, based on zero crossing. This function is from [github](https://gist.github.com/endolith/255291)

In [0]:
def freq_from_crossings(sig, fs):
    # Find all indices right before a rising-edge zero crossing
    indices = find((sig[1:] >= 0) & (sig[:-1] < 0))

    # Naive (Measures 1000.185 Hz for 1000 Hz, for instance)
    # crossings = indices

    # More accurate, using linear interpolation to find intersample
    # zero-crossings (Measures 1000.000129 Hz for 1000 Hz, for instance)
    crossings = [i - sig[i] / (sig[i+1] - sig[i]) for i in indices]

    # Some other interpolation based on neighboring points might be better.
    # Spline, cubic, whatever

    return fs / mean(diff(crossings))
      
def pitch_detection(y, sr, n_fft, hop_length):
    voiceVector = []

    window_position = 0
    while window_position < len(y):
      
        idx_end = window_position+n_fft
        if idx_end < len(y):
            voiceVector.append(freq_from_crossings(y[window_position:idx_end -1],sr))
            
        elif idx_end > len(y):
            voiceVector.append(freq_from_crossings(y[window_position:idx_end-1],sr))
            
        window_position = window_position+n_fft-hop_length
    return voiceVector


In [0]:
"""print('---------------------- TEST 3 ----------------------')
y, sr = sf.read(myDrive + 'A3-A2.wav', dtype='float32')

pitch = pitch_detection(y, sr, n_fft=1*1024, hop_length=512)
print(len(pitch))
centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
flatness = librosa.feature.spectral_flatness(y=y)
rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr, n_fft=1*1024, hop_length=512)
zero_crossing = librosa.feature.spectral.zero_crossing_rate(y=y, frame_length = 1*1024, hop_length=512)
mel_spectrum = librosa.feature.mfcc(y=y, sr=sr, n_fft=1*1024, n_mfcc=15) #Get 20 melspectrum

      
"""

# Feature Extraction

---

This is the first block of the all method. It extracts 20 features including pitch, spectral centroid, spectral flatness, spectral rollof, zero-crossing rate and MFCC. 

At the end of the process, it saves the data extracted into csv files.

In [0]:
def featureExtraction(filenames, hop_size = 1):
    # Get the file path to the included audio example
    audio_features = []
    start = time.process_time()
    for filename in filenames:
        # Load the audio as a waveform `y`
        # Store the sampling rate as `sr`
        # librosa needs to use float type for data
        y, sr = sf.read(myDrive + '/SoundFiles/Exercise 14 - Segmented/' + filename, dtype='float32')

        # Declare FFT properties
        n_fft = 1024
        hop_length = 512
        
        # Get features
        pitch = pitch_detection(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length)
        centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
        flatness = librosa.feature.spectral_flatness(y=y)
        rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr, n_fft = n_fft, hop_length=hop_length)
        zero_crossing = librosa.feature.spectral.zero_crossing_rate(y=y, frame_length = n_fft, hop_length=hop_length)
        mel_spectrum = librosa.feature.mfcc(y=y, sr=sr, n_fft=n_fft, n_mfcc=15) #Get 15 melspectrum
        
        # Apply medianfilter
        pitch = medianFilter(pitch, hop_size)
        centroid = medianFilter(centroid[0], hop_size)
        flatness = medianFilter(flatness[0], hop_size)
        rolloff = medianFilter(rolloff[0], hop_size)
        zero_crossing = medianFilter(zero_crossing[0], hop_size)
        MFCC_1 = medianFilter(mel_spectrum[0], hop_size)
        MFCC_2 = medianFilter(mel_spectrum[1], hop_size)
        MFCC_3 = medianFilter(mel_spectrum[2], hop_size)
        MFCC_4 = medianFilter(mel_spectrum[3], hop_size)
        MFCC_5 = medianFilter(mel_spectrum[4], hop_size)
        MFCC_6 = medianFilter(mel_spectrum[5], hop_size)
        MFCC_7 = medianFilter(mel_spectrum[6], hop_size)
        MFCC_8 = medianFilter(mel_spectrum[7], hop_size)
        MFCC_9 = medianFilter(mel_spectrum[8], hop_size)
        MFCC_10 = medianFilter(mel_spectrum[9], hop_size)
        MFCC_11 = medianFilter(mel_spectrum[10], hop_size)
        MFCC_12 = medianFilter(mel_spectrum[11], hop_size)
        MFCC_13 = medianFilter(mel_spectrum[12], hop_size)
        MFCC_14 = medianFilter(mel_spectrum[13], hop_size)
        MFCC_15 = medianFilter(mel_spectrum[14], hop_size)
        
        # Build matrix
        M = pitch, centroid, flatness, rolloff, zero_crossing, MFCC_1, MFCC_2, MFCC_3,MFCC_4,MFCC_5,MFCC_6, MFCC_7, MFCC_8, MFCC_9, MFCC_10, MFCC_11, MFCC_12, MFCC_13, MFCC_14, MFCC_15

        audio_features.append(M)
        
        
    elapsed = time.process_time() - start
    print("Features extracted from all data")
    print("Total number of audio analyzed: " + str(len(audio_features)))
    print("Total number of features extracted: " + str(np.size(audio_features[0], axis=0)))
    print("Execution time: " + str(elapsed) + " s\n")

    return audio_features

In [0]:
print("Starting train")
# filenames ordered by level

""" -------------------------- FULL AUDIO ----------------------------------- """
# Exercise 9 Pizzicato --------------------> DONE!!
# filenames = "0031.wav", "0051.wav", "0001.wav", "0011.wav",  "0061.wav", "0071.wav"

# Exercise 14 Kreutzer 4 --------------------> DONE!!
# filenames = "0032.wav", "0052.wav", "0002.wav", "0012.wav",  "0062.wav", "0072.wav"

# Exercise 18 Schradieck 1,1 --------------------> DONE!!
# filenames = "0033.wav", "0053.wav", "0003.wav", "0013.wav",  "0063.wav", "0073.wav"

# Exercise 20 Sevcik 16 --------------------> ERROR!!
# filenames = "0034.wav", "0054.wav", "0004.wav", "0014.wav",  "0064.wav", "0074.wav"

# Exercise 21 Shift 1st position --------------------> ERROR!!
# filenames = "0035.wav", "0055.wav", "0005.wav", "0015.wav",  "0065.wav", "0075.wav"

# Exercise 23 Shift 5th position --------------------> ERROR!!
# filenames = "0036.wav", "0056.wav", "0006.wav", "0016.wav",  "0066.wav", "0076.wav"

# Exercise 28 Flesch Arpeggio Excerpt --------------------> DONE!!
# filenames = "0037.wav", "0057.wav", "0007.wav", "0017.wav",  "0067.wav", "0077.wav"

# Exercise 30 G Major scale --------------------> DONE!!
# filenames = "0038.wav", "0058.wav", "0008.wav", "0018.wav",  "0068.wav", "0078.wav"

# Exercise 35 D Major scale --------------------> DONE!!
# filenames = "0039.wav", "0059.wav", "0009.wav", "0019.wav",  "0069.wav", "0079.wav"

# Exercise 38 Chromatic Scale --------------------> DONE!!
# filenames = "0040.wav", "0060.wav", "0010.wav", "0020.wav",  "0070.wav", "0080.wav"

""" ------------------------ SEGMENTED AUDIO -------------------------------- """
# Exercise 14 - P1
# filenames = "0032_p1.wav", "0052_p1.wav", "0002_p1.wav", "0012_p1.wav",  "0062_p1.wav", "0072_p1.wav"

# Exercise 14 - P2
# filenames = "0032_p2.wav", "0052_p2.wav", "0002_p2.wav", "0012_p2.wav",  "0062_p2.wav", "0072_p2.wav"

# Exercise 14 - P3
# filenames = "0032_p3.wav", "0052_p3.wav", "0002_p3.wav", "0012_p3.wav",  "0062_p3.wav", "0072_p3.wav"

# Exercise 14 - P4
filenames = "0032_p4.wav", "0052_p4.wav", "0002_p4.wav", "0012_p4.wav",  "0062_p4.wav", "0072_p4.wav"

labels = "E", "E", "M", "M", "L", "L"

print("Extracting features...")
audio_features = featureExtraction(filenames, hop_size = 10)

for i in np.arange(0, len(audio_features)):
    print("   Length of features from " + filenames[i] + ": " + str(len(audio_features[i][1])))

In [0]:
# Export features of each audio

import csv

for j in np.arange(0,len(filenames)):
    
    csv_filename = myDrive + '/Features extracted/' + filenames[j] + '.csv'
    features_extracted = ["pitch","centroid", "flatness", "rolloff", "zero-crossing", "mel_01", "mel_02", "mel_03", "mel_04", "mel_05", "mel_06", "mel_07", "mel_08", "mel_09", "mel_10", "mel_11", "mel_12", "mel_13", "mel_14", "mel_15"]
    
    with open(csv_filename, 'w') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        #spamwriter.writerow(features_extracted)
        for i in np.arange(0,len(features_extracted)):
            spamwriter.writerow(audio_features[j][i])

    print("Features are saved in " + csv_filename)
    

In [0]:
# to know the total time of execution of DTW

aver = 4419 #cells per second
indi_time_exec = []

matrix = len(audio_features[0][1])*len(audio_features[1][1])/aver
indi_time_exec.append(matrix)

for E in [0,1]:
    for i in np.arange(2,len(filenames)):
        #print(str(len(audio_features[E][1])), ' and ', str(len(audio_features[i][1])))
        matrix = len(audio_features[E][1])*len(audio_features[i][1])/aver
        indi_time_exec.append(matrix)

print(indi_time_exec)
# if minutes
if (np.sum(indi_time_exec)/60 > 60):
    print('Aprox total time of execution: ',np.sum(indi_time_exec)/60/60, 'hour(s)')
elif (np.sum(indi_time_exec) > 60):
    print('Aprox total time of execution: ',np.sum(indi_time_exec)/60, 'minute(s)')
else: 
    print('Aprox total time of execution: ',np.sum(indi_time_exec), 'second(s)')
        
        

[3.333785924417289, 3.9112921475446933, 3.386286490156144, 2.8612808327675943, 5.433808553971486, 4.282190540846345, 3.707399864222675, 3.1326091875990043, 5.94908350305499]
Aprox total time of execution:  35.99773704458022 second(s)


# Create DataBase Model
---

In this section, **DTW** is computed using the feature extracted from the audios and then, it save the distances into a csv in order to get the data in the next step as many times as it's required. 

This block is the one which requires most computation so be patient, it will take the time mentioned at the end of the last section.

In [0]:
features = np.size(audio_features[0], axis=0)
print('Number of features: ', features)

dB = []
label = []
distance = dtwDistanceND(audio_features[0], audio_features[1], features, info = True)
dB.append(distance)
label.append("E")
print("Distance between " + labels[0] + " and " + labels[1] + ": " + str(distance) + "")

for E in [0,1]:
    for i in np.arange(2,len(filenames)):
        distance = dtwDistanceND(audio_features[E], audio_features[i], features, info = True)
        print("Distance between " + labels[E] + " and " + labels[i] + ": " + str(distance) + "")
        dB.append(distance)
        label.append(labels[i])

#print(dB)
#print(label)
print("DTW Done!")


In [0]:
"""print('---------------------- TEST 3 ----------------------')
m1_m2 = dtwDistanceND(audio_features[0], audio_features[1], 4, info = True)
m1_m3 = dtwDistanceND(audio_features[0], audio_features[2], 4, info = True)
m2_m3 = dtwDistanceND(audio_features[1], audio_features[2], 4, info = True)

print('Distance between m1 and m2: ' + str(m1_m2))
print('Distance between m1 and m3: ' + str(m1_m3))
print('Distance between m2 and m3: ' + str(m2_m3))"""

# Exporting Train Dataset
---

Save the DTW distance of all audios in a csv file.

In [0]:
import csv
EXERCISE = 14
PART = 4

csv_filename = myDrive + "/Data_test/ex" + str(EXERCISE) + "-noreb-p" + str(PART)+".csv"
features_extracted = ["pitch","centroid", "flatness", "rolloff", "zero-crossing", "mel_01", "mel_02", "mel_03", "mel_04", "mel_05", "mel_06", "mel_07", "mel_08", "mel_09", "mel_10", "mel_11", "mel_12", "mel_13", "mel_14", "mel_15", "label"]

with open(csv_filename, 'w') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
    for i in np.arange(0,len(dB)):
        spamwriter.writerow([dB[i][0],dB[i][1],dB[i][2],dB[i][3],dB[i][4],dB[i][5],dB[i][6],dB[i][7],dB[i][8],dB[i][9],dB[i][10],dB[i][11],dB[i][12],dB[i][13],dB[i][14],dB[i][15],dB[i][16],dB[i][17],dB[i][18],dB[i][19], label[i]])
        
print("Model saved in " + csv_filename)

""" ------------------------- SAVE WITH DE FEATURES'S NAME--------------------------------"""
csv_filename = myDrive + "/Data_test/ex" + str(EXERCISE) + "-noreb-name-p" + str(PART)+".csv"
with open(csv_filename, 'w') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
    spamwriter.writerow(features_extracted)
    for i in np.arange(0,len(dB)):
        spamwriter.writerow([dB[i][0],dB[i][1],dB[i][2],dB[i][3],dB[i][4],dB[i][5],dB[i][6],dB[i][7],dB[i][8],dB[i][9],dB[i][10],dB[i][11],dB[i][12],dB[i][13],dB[i][14],dB[i][15],dB[i][16],dB[i][17],dB[i][18],dB[i][19], label[i]])
        
print("Model saved with names in " + csv_filename)
    

# Exporting Test Dataset
---
On this step, it's done the whole process for the test dataset.

At the end it's saved in a csv.

In [0]:
print("Starting Test")
# Get Test dataset

""" -------------------------- FULL AUDIO ----------------------------------- """
# Exercise 9 Pitzzicato --------------------> DONE!!
# test_filenames = "0031.wav", "0051.wav", "0021.wav", "0041.wav"

# Exercise 14 Kreutzer 4 --------------------> DONE!!
# test_filenames = "0032.wav", "0052.wav", "0022.wav", "0042.wav"

# Exercise 18 Schradieck 1,1 --------------------> DONE!!
# test_filenames = "0033.wav", "0053.wav", "0023.wav", "0043.wav"

# Exercise 20 Sevcik 16 --------------------> ERROR!!
# test_filenames = "0034.wav", "0054.wav", "0024.wav", "0044.wav"

# Exercise 21 Shift 1st position --------------------> ERROR!!
# test_filenames = "0035.wav", "0055.wav", "0025.wav", "0045.wav"

# Exercise 23 Shift 5th position --------------------> ERROR!!
# test_filenames = "0036.wav", "0056.wav", "0026.wav", "0046.wav"

# Exercise 28 Flesch Arpeggio Excerpt --------------------> DONE!!
# test_filenames = "0037.wav", "0057.wav", "0027_M.wav", "0047.wav"

# Exercise 30 G Major scale --------------------> DONE!!
# test_filenames = "0038.wav", "0058.wav", "0028.wav", "0048.wav"

# Exercise 35 D Major scale --------------------> DONE!!
# test_filenames = "0039.wav", "0059.wav", "0029.wav", "0049.wav"

# Exercise 38 Chromatic scale --------------------> DONE!!
# test_filenames = "0040.wav", "0060.wav", "0030.wav", "0050.wav"

""" ------------------------ SEGMENTED AUDIO -------------------------------- """
# Exercise 14 - P1
# test_filenames = "0032_p1.wav", "0052_p1.wav", "0022_p1.wav", "0042_p1.wav"

# Exercise 14 - P2
# test_filenames = "0032_p2.wav", "0052_p2.wav", "0022_p2.wav", "0042_p2.wav"

# Exercise 14 - P3
# test_filenames = "0032_p3.wav", "0052_p3.wav", "0022_p3.wav", "0042_p3.wav"

# Exercise 14 - P4
test_filenames = "0032_p4.wav", "0052_p4.wav", "0022_p4.wav", "0042_p4.wav"

test_labels = "E", "E", "M", "E"

print("Extracting features...")
audio_features = featureExtraction(test_filenames, hop_size = 10)

for i in np.arange(0, len(audio_features)):
    print("   Length of features from " + test_filenames[i] + ": " + str(len(audio_features[i][1])))

In [0]:
# Export features of each audio (the lasts)

import csv

for j in np.arange(2,len(test_filenames)):
    
    csv_filename = myDrive + '/Features extracted/' + test_filenames[j] + '.csv'
    features_extracted = ["pitch","centroid", "flatness", "rolloff", "zero-crossing", "mel_01", "mel_02", "mel_03", "mel_04", "mel_05", "mel_06", "mel_07", "mel_08", "mel_09", "mel_10", "mel_11", "mel_12", "mel_13", "mel_14", "mel_15"]
    
    with open(csv_filename, 'w') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
        #spamwriter.writerow(features_extracted)
        for i in np.arange(0,len(features_extracted)):
            spamwriter.writerow(audio_features[j][i])

    print("Features are saved in " + csv_filename)
    

In [0]:
X_test = []
y_test = "M", "E", "M", "E"
features = 20

for E in [0,1]:
    for i in (2,len(audio_features)-1):
        distance = dtwDistanceND(audio_features[E], audio_features[i], features, info = True)
        print("Distance between " + test_labels[E] + " and " + test_labels[i] + ": " + str(distance) + "")
        X_test.append(distance)


In [0]:
import csv

csv_filename = myDrive + "/Data_test/ex" + str(EXERCISE) + "-noreb-test-p" + str(PART)+".csv"
features_extracted = ["pitch","centroid", "flatness", "rolloff", "zero-crossing", "mel_01", "mel_02", "mel_03", "mel_04", "mel_05", "mel_06", "mel_07", "mel_08", "mel_09", "mel_10", "mel_11", "mel_12", "mel_13", "mel_14", "mel_15", "label"]

with open(csv_filename, 'w') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
    for i in np.arange(0,len(X_test)):
        spamwriter.writerow([X_test[i][0],X_test[i][1],X_test[i][2],X_test[i][3],X_test[i][4],X_test[i][5],X_test[i][6],X_test[i][7],X_test[i][8],X_test[i][9],X_test[i][10],X_test[i][11],X_test[i][12],X_test[i][13],X_test[i][14],X_test[i][15],X_test[i][16],X_test[i][17],X_test[i][18],X_test[i][19], y_test[i]])
        
print("Model saved in " + csv_filename)


""" ------------------------- SAVE WITH DE FEATURES'S NAME--------------------------------"""

csv_filename = myDrive + "/Data_test/ex" + str(EXERCISE) + "-noreb-test-name-p" + str(PART)+".csv"
with open(csv_filename, 'w') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_NONE)
    spamwriter.writerow(features_extracted)
    for i in np.arange(0,len(X_test)):
        spamwriter.writerow([X_test[i][0],X_test[i][1],X_test[i][2],X_test[i][3],X_test[i][4],X_test[i][5],X_test[i][6],X_test[i][7],X_test[i][8],X_test[i][9],X_test[i][10],X_test[i][11],X_test[i][12],X_test[i][13],X_test[i][14],X_test[i][15],X_test[i][16],X_test[i][17],X_test[i][18],X_test[i][19], y_test[i]])
        
print("Model saved with name in " + csv_filename)