In [1]:
#!pip install opendatasets

In [109]:
import os
#import opendatasets as od
import tensorflow as tf
from tensorflow import keras
import os
import argparse
import numpy as np
import tensorflow_model_optimization as tfmot
import tempfile
import tensorflow.lite as tflite
import zlib
import base64


#Kaggle imports
#import opendatasets as od

In [110]:
#https://www.analyticsvidhya.com/blog/2021/04/how-to-download-kaggle-datasets-using-jupyter-notebook/

In [111]:
#od.download("https://www.kaggle.com/fabianavinci/guitar-chords-v3")

In [112]:
class SignalGenerator:
    def __init__(self, labels, sampling_rate, frame_length, frame_step, 
                 num_mel_bins=None, lower_frequency=None, upper_frequency=None, 
                 num_coefficients=None, mfcc=False):
        self.labels = labels
        self.sampling_rate = sampling_rate
        self.frame_length = frame_length
        self.frame_step = frame_step
        self.num_mel_bins = num_mel_bins
        self.lower_frequency = lower_frequency
        self.upper_frequency = upper_frequency
        self.num_coefficients = num_coefficients
        num_spectrogram_bins = (frame_length) // 2 + 1

        if mfcc is True:
            self.linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
                self.num_mel_bins, num_spectrogram_bins, self.sampling_rate,
                self.lower_frequency, self.upper_frequency)
            self.preprocess = self.preprocess_with_mfcc
        else:
            self.preprocess = self.preprocess_with_stft

    def read(self, file_path):
        #print(f"File Path in read: {file_path}")
        parts = tf.strings.split(file_path, os.path.sep)
        label = parts[-2]
        #print(f"Parts[4]= {parts}")
        
        #print(f"Parts {parts}")
        
        label_id = tf.argmax(label == self.labels) # assigning a number to the label
        #print(f"Label id: {label_id}")
        audio_binary = tf.io.read_file(file_path)
        audio, _ = tf.audio.decode_wav(audio_binary)
        print(audio.shape)
        #print(f"Audio binary= {audio_binary}")
        audio = tf.squeeze(audio, axis=1)
        #print(f"Audio = {audio}")
        return audio, label_id

    def pad(self, audio):
        zero_padding = tf.zeros([self.sampling_rate] - tf.shape(audio), dtype=tf.float32)
        audio = tf.concat([audio, zero_padding], 0)
        audio.set_shape([self.sampling_rate])

        return audio

    def get_spectrogram(self, audio):
        stft = tf.signal.stft(audio, frame_length=self.frame_length,
                              frame_step=self.frame_step, fft_length=self.frame_length)
        spectrogram = tf.abs(stft)
        
        print("Spectogram shape", spectrogram.shape)

        return spectrogram

    def get_mfccs(self, spectrogram):
        mel_spectrogram = tf.tensordot(spectrogram,
                                       self.linear_to_mel_weight_matrix, 1)
        log_mel_spectrogram = tf.math.log(mel_spectrogram + 1.e-6)
        mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrogram)
        mfccs = mfccs[..., :self.num_coefficients]

        return mfccs

    def preprocess_with_stft(self, file_path):
        audio, label = self.read(file_path)
        audio = self.pad(audio)
        spectrogram = self.get_spectrogram(audio)
        spectrogram = tf.expand_dims(spectrogram, -1)
        spectrogram = tf.image.resize(spectrogram, [32, 32])
        print("Spectogram shape STFT", spectrogram.shape)
        print("Label STFT", label)

        return spectrogram, label

    def preprocess_with_mfcc(self, file_path):
        audio, label = self.read(file_path)
        print("Audio", audio)
        print("Label", label)
        audio = self.pad(audio)
        spectrogram = self.get_spectrogram(audio)
        mfccs = self.get_mfccs(spectrogram)
        mfccs = tf.expand_dims(mfccs, -1)

        return mfccs, label

    def make_dataset(self, files, train, batch_size):
        # files are list of file paths and train is a boolean (true for returning also labels)
        ds = tf.data.Dataset.from_tensor_slices(files)
        '''for element in ds:
            print(element)'''
        ds = ds.map(self.preprocess, num_parallel_calls=4)
        ds = ds.batch(batch_size)
        ds = ds.cache()
         
        if train is True:
            ds = ds.shuffle(100, reshuffle_each_iteration=True)

        return ds

In [113]:
# define parameters for the signal generator
sampling_rate = 16000
frame_length = 256
frame_step = 128
freq_mel_high = 4000
freq_mel_low = 20
num_MFCC = 10
flag_MFCC = False
mel_bins = 40
batch_size=32

In [114]:
#def create_data_partition():
#    train_paths = list(np.loadtxt("kws_train_split.txt", dtype = "str"))
#    val_paths = list(np.loadtxt("kws_val_split.txt", dtype = "str"))
#    test_paths = list(np.loadtxt("kws_test_split.txt", dtype = "str"))
#    return train_paths, val_paths, test_paths

In [115]:
path= "guitar-chords-v3"
train_path= f"{path}/Training"
test_path = f"{path}/Test"

In [116]:
#Going through the folder and getting the titles of the audio.wav files in the directory

def audio_to_str(audio):
    with open(audio, 'rb') as fd:
        wav_file = fd.read()
    #convert from bytes to string
    wav_base64 = base64.b64encode(wav_file)
    audio_string = wav_base64.decode('utf-8')
    return audio_string

def readings(path, ext = ".wav"):
    #reads all the files in a given directory with a given extension
    files = []
    for filename in os.listdir(path):
        if filename.endswith(ext):
             files.append(f"{path}/{filename}")
            
        else:
            None
    return files

files_train=readings(train_path) 
#print(files_train)
files_test=readings(test_path)
#print(files_test)

In [117]:
#generate tensors
chords= np.array(['Am', 'Bb', 'Bdim', 'C', 'Dm', 'Em', 'F', 'G' ], dtype=str)

# create the signal generator
SG = SignalGenerator(chords, sampling_rate, frame_length, frame_step, mel_bins, freq_mel_low, freq_mel_high, num_MFCC, flag_MFCC)

# create the various dataset split
tf_train_files = tf.convert_to_tensor(files_train)
#print(tf_train_files)
print("------------------Train files--------------------")

#val_files = tf.convert_to_tensor(val_paths)
tf_test_files = tf.convert_to_tensor(files_test)

train_ds = SG.make_dataset(tf_train_files, True, batch_size)
#val_ds = SG.make_dataset(val_files, False, batch_size)
test_ds = SG.make_dataset(tf_test_files, False, batch_size)

------------------Train files--------------------
(None, None)
Spectogram shape (124, 129)
Spectogram shape STFT (32, 32, 1)
Label STFT Tensor("ArgMax:0", shape=(), dtype=int64)
(None, None)
Spectogram shape (124, 129)
Spectogram shape STFT (32, 32, 1)
Label STFT Tensor("ArgMax:0", shape=(), dtype=int64)


In [118]:
def gen_model(input_shape, output_shape, choose="MLP"):
    inputs = keras.Input(shape=input_shape)
    model = None
    units= 8
    
    if flag_MFCC == False:
        strides = [2, 2]
    else:
        strides = [2, 1]
    '''
    if choose == "CNN_separable":
        x = tf.keras.layers.SeparableConv1D(filters=int(64 * scaling_factor), kernel_size=3, activation="relu")(inputs)
        x = keras.layers.Flatten()(x)
        x = keras.layers.Dense(units=int(64 * scaling_factor), activation="relu")(x)
        x = keras.layers.Dense(units=2 * output_shape[0])(x)
        outputs_cnn = keras.layers.Reshape(output_shape)(x)

        model = keras.Model(inputs=inputs, outputs=outputs_cnn, name="CNN_separable")

    if choose == "CNN_separable_3":
        x = tf.keras.layers.SeparableConv1D(filters=int(64 * scaling_factor), kernel_size=3, activation="relu")(inputs)
        x = keras.layers.Flatten()(x)
        # x = keras.layers.Dense(units = int(64 * scaling_factor), activation = "relu") (x)
        x = keras.layers.Dense(units=2 * output_shape[0])(x)
        outputs_cnn = keras.layers.Reshape(output_shape)(x)

        model = keras.Model(inputs=inputs, outputs=outputs_cnn, name="CNN_separable_3")
    
    if choose == "CNN":
        model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(filters=128, kernel_size=[3,3], strides=strides, use_bias=False),
        tf.keras.layers.BatchNormalization(momentum=0.1),
        tf.keras.layers.ReLU(),
        tf.keras.layers.Conv2D(filters=128, kernel_size=[3,3], strides=[1,1], use_bias=False),
        tf.keras.layers.BatchNormalization(momentum=0.1),
        tf.keras.layers.ReLU(),
        tf.keras.layers.Conv2D(filters=128, kernel_size=[3,3], strides=[1,1], use_bias=False),
        tf.keras.layers.BatchNormalization(momentum=0.1),
        tf.keras.layers.ReLU(),
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(units = 8)
        ])
        
    if choose == "LSTM":
    x = keras.layers.LSTM(units=64)(inputs)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(units=2 * output_shape[0])(x)
    outputs_lstm = keras.layers.Reshape(output_shape)(x)
    #model = keras.Model(inputs=inputs, outputs=outputs_lstm, name="LSTM")

    '''
    if choose == "CNN":
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(filters=64, kernel_size=[3,3],strides=strides, activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(units=64, activation='relu'),
            tf.keras.layers.Dense(units=units)
            ])
        
    if choose == "LSTM":
        model = tf.keras.Sequential([
            tf.keras.layers.LSTM(units=64),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(units=units)
        ])

    return model

In [119]:
def compile_and_train(model, train_ds, learning_rate, n_epochs):
    # then we can compile and print the summary of the model
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
    metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]
    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
    summary = model.summary()
    print(f"Start training {model.name} model")
    history = model.fit(train_ds, epochs=n_epochs)

    return model, summary, history

In [127]:
# generate the model
inp = (None,32,32,1)

output_shape = 8
learning_rate_training=0.001
n_epochs_train=20

chosen_model= 'CNN'
#chosen_model= 'LSTM'

model = gen_model(inp, output_shape, chosen_model)
model.build(inp) 
#train the model
trained_model, summary, history = compile_and_train(model, train_ds, learning_rate_training, n_epochs_train)
saving_path = saving_model(trained_model)

Model: "sequential_25"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_7 (Conv2D)            (None, 15, 15, 64)        640       
_________________________________________________________________
flatten_25 (Flatten)         (None, 14400)             0         
_________________________________________________________________
dense_48 (Dense)             (None, 64)                921664    
_________________________________________________________________
dense_49 (Dense)             (None, 8)                 520       
Total params: 922,824
Trainable params: 922,824
Non-trainable params: 0
_________________________________________________________________
Start training sequential_25 model
Epoch 1/20


InvalidArgumentError:  Can not squeeze dim[1], expected a dimension of 1, got 2
	 [[node Squeeze (defined at <ipython-input-119-d27ce9a46388>:9) ]]
	 [[IteratorGetNext]] [Op:__inference_train_function_24495]

Function call stack:
train_function
