In [24]:
import os
from tqdm import tqdm
import numpy as np
import tensorflow as tf
from pathlib import Path
import pandas as pd
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50
from sklearn.metrics import mean_squared_error
from tensorflow.keras.backend import clear_session
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, Concatenate
from tensorflow.keras.applications import ResNet50


In [25]:
# Clear any previous session
clear_session()

In [26]:
# Import labels
labels_data = pd.read_csv('data/echonest_norm.csv').values
print(f"Label shape: {labels_data.shape}")

Label shape: (13131, 9)


In [16]:
def attach_label(image_path):
    try:
        image_id = int(image_path.split("/")[-1].split("_")[0])
        label = labels_data[labels_data[:, 0] == image_id, 1:]
        if label.shape[0] == 0:
            print(f"No label found for image {image_id}")
            return None
        return image_path, label.reshape(-1)
    except Exception as e:
        print(f"Error with {image_path}: {e}")
        return None

In [17]:
# Load image paths and labels
data_path = Path("spectrogram")
all_image_paths = sorted(map(str, data_path.glob("**/*.png")))
valid_pairs = list(filter(None, map(attach_label, all_image_paths)))

if len(valid_pairs) == 0:
    raise ValueError("No valid image-label pairs found!")

data_paths, labels = zip(*valid_pairs)

In [18]:
def load_image(image_path, label):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, (int(984 / 3), int(2385 / 3)))
    image = tf.keras.applications.resnet50.preprocess_input(image)
    return image, label

# Create dataset
dataset = tf.data.Dataset.from_tensor_slices((list(data_paths), list(labels)))
dataset = dataset.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)

# Shuffle and split dataset
train_size = int(0.8 * len(data_paths))
train_dataset = dataset.take(train_size).batch(4).prefetch(tf.data.AUTOTUNE)
val_dataset = dataset.skip(train_size).batch(4).prefetch(tf.data.AUTOTUNE)

In [19]:
def combined_mse_cosine_loss(y_true, y_pred):
    mse = tf.reduce_mean(tf.square(y_true - y_pred))
    y_true_norm = tf.nn.l2_normalize(y_true, axis=1)
    y_pred_norm = tf.nn.l2_normalize(y_pred, axis=1)
    cosine_loss = 1 - tf.reduce_mean(tf.reduce_sum(y_true_norm * y_pred_norm, axis=1))
    return mse + 0.3 * cosine_loss

In [27]:
# Build model
clear_session()

# Base model
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(328, 795, 3))  # 984/3 = 328, 2385/3 = 795
base_model.trainable = False

# Input
inputs = Input(shape=(328, 795, 3))
x = base_model(inputs, training=False)
x = GlobalAveragePooling2D()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.3)(x)

# Branch 1: acousticness, instrumentalness, liveness, speechiness
content_output = Dense(4, name='content_output')(x)

# Branch 2: danceability, energy, tempo, valence
feel_output = Dense(4, name='feel_output')(x)

# Concatenate final output
final_output = Concatenate(name='combined_output')([content_output, feel_output])

# Build model
model = Model(inputs, final_output)

base_model.trainable = False
model.compile(optimizer='adam', loss=combined_mse_cosine_loss, metrics=['mae'])
model.summary()

try:
    with tf.device('/device:GPU:0'):
        model.fit(train_dataset, validation_data=val_dataset, epochs=5)
except Exception as e:
    print(f"Training failed: {e}")
    exit(1)

Epoch 1/5
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m129s[0m 47ms/step - loss: 0.1561 - mae: 0.2332 - val_loss: 0.1067 - val_mae: 0.1894
Epoch 2/5
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m119s[0m 45ms/step - loss: 0.0791 - mae: 0.1705 - val_loss: 0.1155 - val_mae: 0.1955
Epoch 3/5
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m120s[0m 46ms/step - loss: 0.0762 - mae: 0.1675 - val_loss: 0.1217 - val_mae: 0.1966
Epoch 4/5
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m119s[0m 45ms/step - loss: 0.0754 - mae: 0.1666 - val_loss: 0.1156 - val_mae: 0.1958
Epoch 5/5
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 45ms/step - loss: 0.0746 - mae: 0.1659 - val_loss: 0.1196 - val_mae: 0.1962


In [28]:
base_model.trainable = True
for layer in base_model.layers[:100]:
    layer.trainable = False  # Keep early layers frozen
model.compile(optimizer=tf.keras.optimizers.Adam(1e-5), loss=combined_mse_cosine_loss, metrics=['mae'])

try:
    with tf.device('/device:GPU:0'):
        model.fit(train_dataset, validation_data=val_dataset, epochs=10)
except Exception as e:
    print(f"Training failed: {e}")
    exit(1)

Epoch 1/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m207s[0m 74ms/step - loss: 0.0871 - mae: 0.1814 - val_loss: 0.1232 - val_mae: 0.1963
Epoch 2/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m188s[0m 72ms/step - loss: 0.0697 - mae: 0.1614 - val_loss: 0.1186 - val_mae: 0.1937
Epoch 3/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m190s[0m 72ms/step - loss: 0.0627 - mae: 0.1530 - val_loss: 0.1148 - val_mae: 0.1889
Epoch 4/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m183s[0m 70ms/step - loss: 0.0590 - mae: 0.1483 - val_loss: 0.1129 - val_mae: 0.1868
Epoch 5/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m187s[0m 71ms/step - loss: 0.0542 - mae: 0.1416 - val_loss: 0.1072 - val_mae: 0.1824
Epoch 6/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m187s[0m 71ms/step - loss: 0.0524 - mae: 0.1387 - val_loss: 0.1028 - val_mae: 0.1797
Epoch 7/10
[1m2625/2625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

In [29]:
# Save results
y_test = []
y_pred = []
for images, batch_labels in tqdm(val_dataset):
    y_test.append(batch_labels.numpy())
    y_pred.append(model.predict(images, verbose=0))
y_test = np.concatenate(y_test, axis=0)
y_pred = np.concatenate(y_pred, axis=0)

# Create data frame with interleaved y_test and y_pred columns
data = {}
for i in range(y_test.shape[1]):
    data[f'y_test_{i}'] = y_test[:, i]
    data[f'y_pred_{i}'] = y_pred[:, i]
df = pd.DataFrame(data)
df.to_csv("data/evaluate.csv", index=False)

# Report MSE
mse = mean_squared_error(y_test, y_pred)
print(f"Mean squared error: {mse}")

100%|██████████| 657/657 [01:11<00:00,  9.25it/s] 

Mean squared error: 0.06301725984859978





In [23]:
# Save the model
model.save("models/resnet50.keras")