Κάνουμε import τα πακέτα που χρειαζόμαστε

In [None]:
from google.colab import drive
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.optimizers.legacy import SGD
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
import time

Κάνουμε mount το drive

In [None]:
drive.mount('/content/drive')
path = '/content/drive/My Drive/music_genre_data_di/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Ερώτημα 1

**Βήμα 1**: Κάνουμε mount με το drive και φορτώνουμε τα δεδομένα. Αφού μετατρέψουμε τις ετικέτες σε ακέραιους, χρησιμοποιούμε την from_tensor_slices του πακέτου Dataset ώστε το dataset να έχει την μορφή (δείγμα, ετικέτα) σε batches των 16 ζευγών και τα κάνουμε shuffle για να υπάρχει μια ψευτοτυχαιότητα στα δεδομένα και το μοντέλο μας να μην μάθει το pattern από την ήδη δοσμένη σειρά δεδομένων.

In [None]:
def load_data():
    X_train = np.load(os.path.join(path, 'train/mfccs/X.npy'))
    labels_train = np.load(os.path.join(path, 'train/mfccs/labels.npy'))
    X_val = np.load(os.path.join(path, 'val/mfccs/X.npy'))
    labels_val = np.load(os.path.join(path, 'val/mfccs/labels.npy'))
    X_test = np.load(os.path.join(path, 'test/mfccs/X.npy'))
    labels_test = np.load(os.path.join(path, 'test/mfccs/labels.npy'))

    label_class = {'classical': 0, 'hiphop': 1, 'rock_metal_hardrock': 2, 'blues': 3}

    # Μετατροπή ετικετών σε numpy arrays
    labels_train = np.array([label_class[label] for label in labels_train])
    labels_val = np.array([label_class[label] for label in labels_val])
    labels_test = np.array([label_class[label] for label in labels_test])

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, labels_train)).batch(16).shuffle(True)
    val_dataset = tf.data.Dataset.from_tensor_slices((X_val, labels_val)).batch(16).shuffle(True)
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, labels_test)).batch(16)


    return train_dataset, val_dataset, test_dataset

**Βήμα 2**: Ορίζουμε το νευρωνικό μας δίκτυο 3ων layers

In [None]:
def neural_network_definition(input_shape):
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=input_shape),
        tf.keras.layers.Dense(128),
        tf.keras.layers.Dense(32),
        tf.keras.layers.Dense(4)
    ])
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.002)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer, loss_fn, metrics=['accuracy'])
    return model

**Βήμα 3**: Για κάθε epoch εκπαιδεύουμε το μοντέλο μας για τα 16 batch και εκτυπώνουμε το train loss μέσω της model.fit().

In [None]:
def train_model(model, train_dataset):
    model.fit(train_dataset, epochs=30)
    return model

**Βήμα 4**: Αξιολόγηση μοντέλου.

In [None]:
def evaluate_model(model, test_dataset):
    loss_value, accuracy = model.evaluate(test_dataset)

    y_test = []
    y_pred = []

    for x_batch, y_batch in test_dataset:
        y_pred_batch = model.predict(x_batch)
        y_test.extend(y_batch.numpy())
        y_pred.extend(np.argmax(y_pred_batch, axis=1)) # Πρόσθεσε τις θέσεις από τις μέγιστες προβλεπόμενες κλάσεις στην λίστα y_pred

    f1_macro = f1_score(y_test, y_pred, average='macro')
    accuracy = accuracy_score(y_test, y_pred)
    c_matrix = confusion_matrix(y_test, y_pred)

    return loss_value, f1_macro, accuracy, c_matrix

**Βήμα 5**: Εκπαίδευση και αξιολόγηση μοντέλου με τις προδιαγραφές που δίνονται.

In [None]:
def train_and_evaluate_model(model, train_dataset, test_dataset, optimizer, loss_fn, epochs=30):

    # Εκπαίδευση του μοντέλου
    trained_model = train_model(model, train_dataset)

    # Αξιολόγηση του μοντέλου
    test_loss, test_f1, test_accuracy, test_confusion_matrix = evaluate_model(trained_model, test_dataset, loss_fn)

    print("Test Loss:", test_loss)
    print("Test F1 Score:", test_f1)
    print("Test Accuracy:", test_accuracy)
    print("Test Confusion Matrix:\n", test_confusion_matrix)


**Βήμα 7**: Επιλογή καλύτερου μοντέλου κάνοντας train το μοντέλο χρησιμοποιώντας το validation set.

In [None]:
def find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path):
    best_f1 = 0.0
    model.compile(optimizer, loss_fn, metrics=['accuracy'])
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=best_model_path, monitor='val_accuracy', mode='max', save_best_only=True)
    model.fit(train_dataset, epochs=epochs, validation_data=val_dataset)
    model.load_weights(best_model_path)
    return model

**Βήμα 5'**: Εκπαίδευση με CPU

In [None]:
train_dataset, val_dataset, test_dataset = load_data()
input_shape = (26,)
model = neural_network_definition(input_shape)
optimizer = SGD(learning_rate=0.002)
loss_fn = SparseCategoricalCrossentropy(from_logits=True)
best_model_path = 'best_model.h5'
epochs = 30

start_time = time.time()
model = find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
cpu_time = time.time() - start_time
print(f'CPU training time: {cpu_time:.2f} seconds')

best_model = tf.keras.models.load_model(best_model_path)
test_loss, test_f1_macro, test_accuracy, test_conf_matrix = evaluate_model(best_model, test_dataset)

print(f'Loss with CPU: {test_loss}')
print(f'F1 Macro with CPU: {test_f1_macro}')
print(f'Accuracy with CPU: {test_accuracy}')
print(f'Confusion Matrix with CPU:\n{test_conf_matrix}')

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
CPU training time: 16.12 seconds




Loss with CPU: 5.496510982513428
F1 Macro with CPU: 0.09529411764705882
Accuracy with CPU: 0.23546511627906977
Confusion Matrix with CPU:
[[  0   0   0 297]
 [  0   0   0 356]
 [  0   0   0 399]
 [  0   0   0 324]]


**Βήμα 6**: Εκπαίδευση με GPU

In [None]:
with tf.device('/device:GPU:0'):
    model = neural_network_definition(input_shape)

    start_time = time.time()
    model = find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
    gpu_time = time.time() - start_time
    print(f'GPU training time: {gpu_time:.2f} seconds')

    best_model = tf.keras.models.load_model(best_model_path)
    test_loss, test_f1_macro, test_accuracy, test_conf_matrix = evaluate_model(best_model, test_dataset, loss_fn)

    print(f'Loss with GPU: {test_loss}')
    print(f'F1 Macro with GPU: {test_f1_macro}')
    print(f'Accuracy with GPU: {test_accuracy}')
    print(f'Confusion Matrix with GPU:\n{test_conf_matrix}')

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
GPU training time: 17.20 seconds
Loss with GPU: 5.496510982513428
F1 Macro with GPU: 0.09529411764705882
Accuracy with GPU: 0.23546511627906977
Confusion Matrix with GPU:
[[  0   0   0 297]
 [  0   0   0 356]
 [  0   0   0 399]
 [  0   0   0 324]]


# Ερώτημα 2

**Βήμα 1**: Φόρτωση δεδομένων ακριβώς όπως και στο προηγούμενο ερώτημα με διαφορετικό directory για τα dataset.

In [None]:
def load_data():
    X_train = np.load(os.path.join(path, 'train/melgrams/X.npy'))
    labels_train = np.load(os.path.join(path, 'train/melgrams/labels.npy'))
    X_val = np.load(os.path.join(path, 'val/melgrams/X.npy'))
    labels_val = np.load(os.path.join(path, 'val/melgrams/labels.npy'))
    X_test = np.load(os.path.join(path, 'test/melgrams/X.npy'))
    labels_test = np.load(os.path.join(path, 'test/melgrams/labels.npy'))

    label_class = {'classical': 0, 'hiphop': 1, 'rock_metal_hardrock': 2, 'blues': 3}

    # Μετατροπή ετικετών σε numpy arrays
    labels_train = np.array([label_class[label] for label in labels_train])
    labels_val = np.array([label_class[label] for label in labels_val])
    labels_test = np.array([label_class[label] for label in labels_test])

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, labels_train)).batch(16).shuffle(True)
    val_dataset = tf.data.Dataset.from_tensor_slices((X_val, labels_val)).batch(16).shuffle(True)
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, labels_test)).batch(16)


    return train_dataset, val_dataset, test_dataset

**Βήμα 2**: Δημιουργία του νευρωνικού μας δικτύου.

In [None]:
def cnn_definition(input_shape, out_dim):
    model = tf.keras.Sequential([
        # 4 συνελικτικά επίπεδα, stride=1 για
        tf.keras.layers.Conv2D(16, kernel_size=5, padding='same', input_shape=input_shape),
        tf.keras.layers.Conv2D(32, kernel_size=5, padding='same'),
        tf.keras.layers.Conv2D(64, kernel_size=5, padding='same'),
        tf.keras.layers.Conv2D(128, kernel_size=5, padding='same'),

        # Flatenning του τελευταίου επιπέδου για την σύνδεση με το υπόλοιπο συνδεδεμένο νευρωνικό δίκτυο
        tf.keras.layers.Flatten(),

        # Πλήρως συνδεδεμένα επίπεδα
        tf.keras.layers.Dense(1024),
        tf.keras.layers.Dense(256),
        tf.keras.layers.Dense(32),
        tf.keras.layers.Dense(out_dim)
    ])

    model.compile(optimizer='SGD', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

    return model


**Βήμα 3**: Εκπαίδευση και αξιολόγηση μοντέλου.

In [None]:
def train_and_evaluate_model(model, train_dataset, val_dataset, test_dataset, epochs, checkpoint_path):
    best_f1 = 0.0

    # Ορισμός callback για αποθήκευση του καλύτερου μοντέλου βάσει F1 Score στο validation set
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_accuracy',
        mode='max',
        save_best_only=True,
        save_weights_only=True
    )

    # Εκπαίδευση του μοντέλου
    history = model.fit(train_dataset, validation_data=val_dataset, epochs=epochs, callbacks=[checkpoint_callback])

    # Φόρτωση των καλύτερων weights
    model.load_weights(checkpoint_path)

    # Αξιολόγηση στο test set
    test_loss, test_accuracy = model.evaluate(test_dataset)

    # Προβλέψεις και μετρήσεις
    y_true = []
    y_pred = []

    for x_batch, y_batch in test_dataset:
        y_pred_batch = model.predict(x_batch)
        y_true.extend(y_batch.numpy())
        y_pred.extend(np.argmax(y_pred_batch, axis=1))

    f1_macro = f1_score(y_true, y_pred, average='macro')
    accuracy = accuracy_score(y_true, y_pred)
    c_matrix = confusion_matrix(y_true, y_pred)

    return test_loss, f1_macro, accuracy, c_matrix

**Βήμα 4**: Τροποποίηση του Βήματος 2 ώστε να εφαρμόζεται padding 2 στοιχείων στα συνελικτικά επίπεδα ακολουθούμενα από max pooling με kernel size 2.

In [None]:
def cnn_definition_with_pooling_padding(input_shape, out_dim):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(16, kernel_size=5, padding='same', input_shape=input_shape),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Conv2D(32, kernel_size=5, padding='same'),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Conv2D(64, kernel_size=5, padding='same'),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Conv2D(128, kernel_size=5, padding='same'),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Flatten(),

        tf.keras.layers.Dense(1024),
        tf.keras.layers.Dense(256),
        tf.keras.layers.Dense(32),
        tf.keras.layers.Dense(out_dim)
    ])

    model.compile(optimizer='SGD', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

    return model


**Βήμα 5**: Εφαρμογή ReLU μετά την πράξη της συνέλιξης και πριν το pooling.

In [None]:
def cnn_definition_with_activation_function(input_shape, out_dim):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(16, kernel_size=5, padding='same', input_shape=input_shape),
        tf.keras.layers.ReLU(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Conv2D(32, kernel_size=5, padding='same'),
        tf.keras.layers.ReLU(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Conv2D(64, kernel_size=5, padding='same'),
        tf.keras.layers.ReLU(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Conv2D(128, kernel_size=5, padding='same'),
        tf.keras.layers.ReLU(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),

        tf.keras.layers.Flatten(),

        tf.keras.layers.Dense(1024),
        tf.keras.layers.ReLU(),
        tf.keras.layers.Dense(256),
        tf.keras.layers.ReLU(),
        tf.keras.layers.Dense(32),
        tf.keras.layers.ReLU(),
        tf.keras.layers.Dense(out_dim)
    ])

    model.compile(optimizer='SGD', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

    return model


Εκπαίδευση με CPU

In [None]:
train_dataset, val_dataset, test_dataset = load_data()
input_shape = (64,64,1)
optimizer = SGD(learning_rate=0.002)
loss_fn = SparseCategoricalCrossentropy(from_logits=True)
best_model_path = 'best_model_with_relu.ckpt'
epochs = 30
out_dim = 4
model = cnn_definition(input_shape, out_dim)


start_time = time.time()
model = train_and_evaluate_model(model, train_dataset, val_dataset, test_dataset, epochs, best_model_path)
cpu_time = time.time() - start_time
print(f'CPU training time: {cpu_time:.2f} seconds')

ValueError: Input 0 of layer "conv2d_12" is incompatible with the layer: expected min_ndim=4, found ndim=3. Full shape received: (None, 21, 128)

Εκπαίδευση με GPU

In [None]:
with tf.device('/device:GPU:0'):
    model = cnn_definition(input_shape)

    start_time = time.time()
    model = train_and_evaluate_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
    gpu_time = time.time() - start_time
    print(f'GPU training time: {gpu_time:.2f} seconds')

# Ερώτημα 3

**Βήμα 2**: Δοκιμή διάφορων αλγορίθμων

In [None]:
train_dataset, val_dataset, test_dataset = load_data()
input_shape = (26,)
model = neural_network_definition(input_shape)
optimizer = SGD(learning_rate=0.002)
loss_fn = SparseCategoricalCrossentropy(from_logits=True)
best_model_path = 'best_model.h5'
epochs = 30

start_time = time.time()
model = find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
cpu_time = time.time() - start_time
print(f'CPU training time: {cpu_time:.2f} seconds')

best_model = tf.keras.models.load_model(best_model_path)
test_loss, test_f1_macro, test_accuracy, test_conf_matrix = evaluate_model(best_model, test_dataset)

print(f'Loss with CPU: {test_loss}')
print(f'F1 Macro with CPU: {test_f1_macro}')
print(f'Accuracy with CPU: {test_accuracy}')
print(f'Confusion Matrix with CPU:\n{test_conf_matrix}')

optimizer = 'adam'
start_time = time.time()
model = find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
cpu_time = time.time() - start_time
print(f'CPU training time: {cpu_time:.2f} seconds')

best_model = tf.keras.models.load_model(best_model_path)
test_loss, test_f1_macro, test_accuracy, test_conf_matrix = evaluate_model(best_model, test_dataset)

print(f'Loss with CPU: {test_loss}')
print(f'F1 Macro with CPU: {test_f1_macro}')
print(f'Accuracy with CPU: {test_accuracy}')
print(f'Confusion Matrix with CPU:\n{test_conf_matrix}')

optimizer = 'adadelta'
start_time = time.time()
model = find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
cpu_time = time.time() - start_time
print(f'CPU training time: {cpu_time:.2f} seconds')

best_model = tf.keras.models.load_model(best_model_path)
test_loss, test_f1_macro, test_accuracy, test_conf_matrix = evaluate_model(best_model, test_dataset)

print(f'Loss with CPU: {test_loss}')
print(f'F1 Macro with CPU: {test_f1_macro}')
print(f'Accuracy with CPU: {test_accuracy}')
print(f'Confusion Matrix with CPU:\n{test_conf_matrix}')

**Βήμα 3**: Προσθήκη BatchNorm2d πριν από κάθε συνάρτηση ενεργοποίησης.

In [None]:
def cnn_definition_with_batchnorm(input_shape, out_dim):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(16, (2, 2), padding='same', input_shape=input_shape),
        tf.keras.layers.BatchNormalization(),  # Add BatchNormalization layer
        tf.keras.layers.Activation('relu'),    # Add activation function after BatchNormalization
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Conv2D(32, (2, 2), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Conv2D(64, (2, 2), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(1024),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Dense(256),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Dense(32),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Dense(out_dim)
    ])
    return model

**Βήμα 4**: Δοκιμή με weight_decay και dropout.

In [None]:
train_dataset, val_dataset, test_dataset = load_data()
input_shape = (26,)
model = neural_network_definition(input_shape)
optimizer = SGD(learning_rate=0.002, decay=1e-4)
loss_fn = SparseCategoricalCrossentropy(from_logits=True)
best_model_path = 'best_model.h5'
epochs = 60

start_time = time.time()
model = find_best_model(model, train_dataset, val_dataset, epochs, optimizer, loss_fn, best_model_path)
cpu_time = time.time() - start_time
print(f'CPU training time: {cpu_time:.2f} seconds')

# Ερώτημα 4

**Βήμα 1**: Συνάρτηση που δέχεται ως όρισμα ένα dataloader και ένα μοντέλο και επιστρέφει τις εκτιμόμενες εξόδους.

In [None]:
def inference(dataloader, model):
  predictions = []
  for x in dataloader:
    y_pred = model(x)
    predictions.extend(y_pred.numpy())
  return predictions