In [303]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import json
import random as random_
import re
from glob import iglob

from typing import List, Dict, Tuple, Set, Any,  Union, Optional, Callable
from typing_extensions import TypedDict, Literal
from math import floor

In [304]:
print(tf.__version__)
print(tf.config.list_physical_devices('GPU'))

2.6.2
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [305]:
DATASET_BASE = "../input/ludwig-music-dataset-moods-and-subgenres"
MFCCS  = f"{DATASET_BASE}/mfccs"
LABELS = f"{DATASET_BASE}/labels.json"
SUBGENERES = f"{DATASET_BASE}/subgeneres.json"
CHECKPOINT = "./checkpoint/saved_model.h5"
BATCH_SIZE = 25

SEED = random_.randint(0, 100)
random = random_.Random(SEED)



## JSON Declaration and other Dicts 

In [306]:


class N(TypedDict): # A number 
    N: Union[float, int]

class S(TypedDict): # A string
    S: str

class L(TypedDict): # A list of strings
    L: List[S]

class Track(TypedDict):
    # IDs
    PK: S
    mbid: S

    # Genres: 
    genre: S
    subgenres: L
    otherSubgenres: Optional[L]

    # Moods
    aggressive: Optional[N]
    happy: Optional[N]
    party: Optional[N]
    acoustic: Optional[N] 
    electronic: Optional[N]
    sad: Optional[N]
    relaxed: Optional[N]

    # Metadata
    preview: S
    name: Optional[S]
    artist: Optional[S]
    popularity: Optional[N]
    album: Optional[S]
    

class LabelsJson(TypedDict):
    tracks: Dict[str, Track]

class Mfcc(TypedDict):
    mfccs: np.ndarray
    track_id: str
    splits: int


In [307]:
def load_json(path:str) -> Dict[str, Any]:
    """Loads a json file 

    Args:
        path (str): json file path

    Returns:
        Dict: A dictionary indexed by a string
    """
    with open(path, "r") as f:
        return json.load(f)

In [308]:
def get_subgenres(parent_genre: str, subgenres: List[str]):
    """Get all subgenres of a given genre

    Args:
        parent_genre (str): 
        subgenres (List[str]): List of subgenres to match with a parent genre

    Returns:
        List[str]: List of subgenres
    """
    return list(filter(lambda s: parent_genre in s.split("---")[0] ,subgenres))

In [309]:
def load_mfccs(subgenres_list: List[str], subgenres: Dict[str, List[str]], search_path: str = MFCCS):
    
    track_set: Set[str] = set() # List of track ids among all subgenres_list subgenres

    track_splits: List[Mfcc] = [] 

    for g in subgenres_list:
        track_set.update(subgenres[g])

    for npy in iglob(search_path + '/**/*.npy', recursive=True):
        match = re.search(r"[a-zA-Z0-9]+.npy", npy)

        if (match and match.group(0)):
            track_id = match.group(0).replace(".npy", "")
            if track_id in track_set: 
                try:
                    mfccs_splits = np.load(npy)
                    track_splits.append({"mfccs": mfccs_splits, "track_id": track_id, "splits": len(mfccs_splits)})
                except IOError:
                    print(f"File {npy} not found")

    return track_splits
        

In [310]:
def train_test_val(ds: List[Mfcc], test=0.01, val=0.2):
    # TRAIN     
    train_slice = floor(len(ds) * (1 - val - test))
    train = ds[:train_slice]

    rest = ds[train_slice:]
    rest_slice =  floor(len(rest) * (1 - (test / ( test + val))))

    # TEST
    val = rest[:rest_slice]

    # VAL
    test = rest[rest_slice:]

    return train, test, val

## Tensorflow 

### Callbacks

In [311]:
class StopCallback(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs={}):
        if (logs.get("accuracy") or 0) > 0.99:
            self.model.stop_training = True

stop_callback = StopCallback()
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    CHECKPOINT,
    monitor='val_accuracy',
    save_best_only = True,
)

In [312]:
class TrackDataGen(keras.utils.Sequence):
    
    def __init__(self, data: List[Mfcc],
                 target_f: Callable[[Track], Any],
                 labels: LabelsJson,
                 batch_size = BATCH_SIZE):
        
        self.batch_size = batch_size
        X: List[np.ndarray] = []
        Y: List[str] = []

        for mfcc_ in data:
            track = labels["tracks"][mfcc_["track_id"]]
            target = target_f(track)
            mfccs = mfcc_["mfccs"]
            
            for split in mfccs:
                X.append(split)
                Y.append(target)

        X_np = np.array(X)
        X_np = np.expand_dims(X_np, axis=3)
        Y_np = np.array(Y)

        assert len(X_np) == len(Y_np)
        self.X, self.Y = self.unison_shuffled_copies(X_np, Y_np)
        
    @staticmethod
    def unison_shuffled_copies(a, b):
        assert len(a) == len(b), f"len(a) = {len(a)} != len(b) = {len(b)}"
        p = np.random.permutation(len(a))
        return a[p], b[p]

    def __getitem__(self, idx):
        batch_x = self.X[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.Y[idx * self.batch_size:(idx + 1) * self.batch_size] 
        
        return batch_x, np.array(batch_y)
    
    def __len__(self):
        return len(self.X) // self.batch_size

In [313]:
def build_network(type: Literal["multiclass", "multilabel", "mood"], labels: int, shape: Tuple[int] = None):


    if type == "multilabel":
        activation, loss, metrics = "sigmoid", "binary_crossentropy", ["categorical_accuracy",]
    elif type == "mood":
        activation, loss, metrics = "sigmoid", "sparse_categorical_crossentropy", ["categorical_accuracy"]
    elif type == "multiclass":
        activation, loss, metrics = "softmax", "sparse_categorical_crossentropy", ["accuracy"]
    else:
        raise ValueError("Invalid Type")
    conv_base = keras.applications.EfficientNetB0(
                    include_top = False, 
                    weights = "imagenet",
                    drop_connect_rate=0.8)
    conv_base.trainable = False
    model = keras.models.Sequential([keras.layers.Conv2D(3,(3,3),padding='same', input_shape=shape), 
                                    conv_base, 
                                    keras.layers.GlobalAveragePooling2D(),
                                    keras.layers.Dense(32, activation="relu"),
                                    keras.layers.Dense(labels, activation = activation)])
    
    model.compile(optimizer = "adam", loss = loss, metrics = metrics)
    return model
                                 

## Target Functions

In [314]:
def genre_target(t: Track, genres2labels: Dict[str, int]) -> int:
    return genres2labels[t["genre"]["S"]]


def mood_target(t: Track) -> List[float]:
    default: N = {"N": 0.5}

    acoustic = t.get("acoustic") or default
    aggressive = t.get("aggressive") or default
    electronic = t.get("electronic") or default
    happy = t.get("happy") or default
    party = t.get("party") or default
    relaxed = t.get("relaxed") or default 
    sad = t.get("sad") or default
    return [acoustic["N"], aggressive["N"], electronic["N"], happy["N"], party["N"], relaxed["N"], sad["N"]]

def subgenre_target(t: Track, subgenres: List[str]) -> List[Literal[1, 0]]:
    track_subgenres = set([s["S"] for s in t["subgenres"]["L"]])
    return [1 if sub in track_subgenres else 0 for sub in subgenres]

## Main Function

#### Prepare the Data

In [315]:
labels: LabelsJson = load_json(LABELS) # type: ignore

# Track Genres:
genres = list(set([t["genre"]["S"] for t in labels["tracks"].values()]))
genres2labels = {g: i for i,g in enumerate(genres)}
labels2genres = {i: g for i,g in enumerate(genres)}

# Track Subgenres: 
subgenres: Dict[str, List[str]] = load_json(SUBGENERES)
subgenre_list = list(subgenres.keys())

# Get all subgenres:
subgenre_list = get_subgenres("pop", subgenre_list)
print(f"Subgenres: {subgenre_list}")


#subgen2labels = {g: i for i,g in enumerate(subgenre_list)}
#labels2subgen = {i: g for i,g in enumerate(subgenre_list)}

target_function = lambda t: subgenre_target(t, subgenre_list)
    

Subgenres: ['pop---indie pop', 'pop---europop', 'pop---ballad']


In [316]:
print(f"rock: {genres2labels['rock']}")
print(f"pop: {genres2labels['pop']}")

rock: 10
pop: 7


#### Load the Data

#### Train the Model

In [317]:
mfccs = load_mfccs(subgenre_list, subgenres)
random.shuffle(mfccs)

train, test, val = train_test_val(mfccs)

print(f"train {len(train)} + test {len(test)} + val {len(val)} = {len(train) + len(test) + len(val)}" )

# Initialize the dataset generators 
train_generator = TrackDataGen(train, target_function,  labels)
test_generator = TrackDataGen(test, target_function,  labels)
validation_generator = TrackDataGen(val, target_function,  labels)

shape = train_generator[0][0][0].shape

print(f"shape: {shape}")


train 735 + test 10 + val 186 = 931
shape: (32, 130, 1)


In [318]:
type = "multilabel"
labels = len(subgenre_list)

if type == "multilabel":
    activation, loss, metrics = "sigmoid", "binary_crossentropy", ["categorical_accuracy",]
elif type == "mood":
    activation, loss, metrics = "sigmoid", "sparse_categorical_crossentropy", ["categorical_accuracy"]
elif type == "multiclass":
    activation, loss, metrics = "softmax", "sparse_categorical_crossentropy", ["accuracy"]
else:
    raise ValueError("Invalid Type")
conv_base = keras.applications.EfficientNetB0(
                include_top = False, 
                weights = "imagenet",
                drop_connect_rate=0.8)
conv_base.trainable = True
model = keras.models.Sequential([keras.layers.Conv2D(3,(3,3),padding='same', input_shape=shape), 
                                conv_base, 
                                keras.layers.GlobalAveragePooling2D(),
                                #keras.layers.Dense(64, activation="relu"),

                                #keras.layers.Dense(32, activation="relu"),
                                keras.layers.Dense(labels, activation = activation)])

model.compile(optimizer = keras.optimizers.Adam(1e-5), loss = loss, metrics = metrics)

In [319]:
if False:
    conv_base.trainable = True
    model.compile(optimizer=keras.optimizers.Adam(1e-5), loss = loss, metrics = metrics)

In [None]:

# Build and train the net 
#model = build_network("multilabel", labels=len(subgenre_list), shape=shape )
history = model.fit(train_generator,
                    validation_data = validation_generator, 
                    epochs=200,
                    callbacks = [stop_callback, checkpoint_callback],
                    batch_size = BATCH_SIZE
                )


Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

In [None]:
model.save("final.h5")

#### Analyze the Results 

In [None]:
from matplotlib import pyplot as plt

def plot_history(history):
    # plot accuracy+


    # plot categorical accuracy
    plt.plot(history.history['categorical_accuracy'])
    plt.plot(history.history['val_categorical_accuracy'])

    plt.plot(history.history['loss'], label='train')
    plt.plot(history.history['val_loss'], label='test')
    plt.legend()
    plt.show()
plot_history(history)