In [28]:
import tensorflow as tf
import numpy as np
import os

%load_ext autoreload
%autoreload 2

# Import the library to mount Google Drive
from google.colab import drive
drive.mount('/content/drive')
FOLDERNAME = 'DL project/fma'
import sys
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME))
%cd /content/drive/My\ Drive/$FOLDERNAME/

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/.shortcut-targets-by-id/1Gzbs2MQ8ttHuTOpxDaZoGkbdHOw1ukQO/DL project/fma


In [29]:
import math
from tensorflow import keras
from tensorflow.keras import layers

NUM_CLASSES = 8

kaiming_normal = keras.initializers.VarianceScaling(scale=2.0, mode='fan_out', distribution='untruncated_normal')

def conv3x3(x, out_planes, stride=1, name=None):
    x = layers.ZeroPadding1D(padding=1, name=f'{name}_pad')(x)
    return layers.Conv1D(filters=out_planes, kernel_size=3, strides=stride, use_bias=False, kernel_initializer=kaiming_normal, name=name)(x)

def basic_block(x, planes, stride=1, downsample=None, name=None):
    identity = x

    out = conv3x3(x, planes, stride=stride, name=f'{name}.conv1')
    out = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.bn1')(out)
    out = layers.ReLU(name=f'{name}.relu1')(out)

    out = conv3x3(out, planes, name=f'{name}.conv2')
    out = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.bn2')(out)

    if downsample is not None:
        for layer in downsample:
            identity = layer(identity)

    out = layers.Add(name=f'{name}.add')([identity, out])
    out = layers.ReLU(name=f'{name}.relu2')(out)

    return out

def make_layer(x, planes, blocks, stride=1, name=None):
    downsample = None
    inplanes = x.shape[2]
    if stride != 1 or inplanes != planes:
        downsample = [
            layers.Conv1D(filters=planes, kernel_size=1, strides=stride, use_bias=False, kernel_initializer=kaiming_normal, name=f'{name}.0.downsample.0'),
            layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.0.downsample.1'),
        ]

    x = basic_block(x, planes, stride, downsample, name=f'{name}.0')
    for i in range(1, blocks):
        x = basic_block(x, planes, name=f'{name}.{i}')

    return x

def resnet(x, blocks_per_layer, rnn_n_layers, rnn_type, bidirectional, num_classes=1000):
    x = layers.Reshape((x.shape[-1], 1))(x)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn0')(x)
    x = layers.ZeroPadding1D(padding=3, name='conv1_pad')(x)
    x = layers.Conv1D(filters=64, kernel_size=7, strides=2, use_bias=False, kernel_initializer=kaiming_normal, name='conv1')(x)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn1')(x)
    x = layers.ReLU(name='relu1')(x)
    x = layers.ZeroPadding1D(padding=1, name='maxpool_pad')(x)
    x = layers.MaxPool1D(pool_size=3, strides=2, name='maxpool')(x)

    x = make_layer(x, 64, blocks_per_layer[0], name='layer1')
    # x = make_layer(x, 64, blocks_per_layer[0], stride=3, name='layer11')
    x = make_layer(x, 128, blocks_per_layer[1], stride=3, name='layer2')
    x = make_layer(x, 256, blocks_per_layer[2], stride=3, name='layer3')
    x = make_layer(x, 512, blocks_per_layer[3], stride=3, name='layer4')

    x = layers.Conv1D(filters=16, kernel_size=1, strides=1, use_bias=False, kernel_initializer=kaiming_normal, name='convdown')(x)



    print(x.shape)
    for _ in range(rnn_n_layers):
        if rnn_type == 'gru':
            rnn_layer = layers.GRU(16, return_sequences=True)
        elif rnn_type == 'lstm':
            rnn_layer = layers.LSTM(16, return_sequences=True)
        elif rnn_type == 'simple':
            rnn_layer = layers.SimpleRNN(16, return_sequences=True)
        else:
            raise ValueError("rnn_type must be 'gru', 'lstm', or 'simple'")
    if bidirectional:
        rnn_layer = layers.Bidirectional(rnn_layer)

    x = rnn_layer(x)
    x = layers.GlobalAveragePooling1D(name='avgpool')(x)
    initializer = keras.initializers.RandomUniform(-1.0 / math.sqrt(512), 1.0 / math.sqrt(512))
    x = layers.Dense(units=num_classes, kernel_initializer=initializer, bias_initializer=initializer, name='fc')(x)
    x = layers.Activation('softmax')(x)



    return x

def resnet18(x, **kwargs):
    return resnet(x, [2, 2, 2, 2], rnn_n_layers=1,
                  rnn_type = 'simple', bidirectional= False, **kwargs)

inputs = keras.Input(shape=(10000,))
# inputs = keras.Input(shape=(59953,))
outputs = resnet18(inputs, num_classes=NUM_CLASSES)
model = keras.Model(inputs, outputs)



(None, 93, 16)


In [37]:
import math
from tensorflow import keras
from tensorflow.keras import layers

NUM_CLASSES = 8

kaiming_normal = keras.initializers.VarianceScaling(scale=2.0, mode='fan_out', distribution='untruncated_normal')

def conv3x3(x, out_planes, stride=1, name=None):
    x = layers.ZeroPadding1D(padding=1, name=f'{name}_pad')(x)
    return layers.Conv1D(filters=out_planes, kernel_size=3, strides=stride, use_bias=False, kernel_initializer=kaiming_normal, name=name)(x)

def basic_block(x, planes, stride=1, downsample=None, name=None):
    identity = x

    out = conv3x3(x, planes, stride=stride, name=f'{name}.conv1')
    out = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.bn1')(out)
    out = layers.ReLU(name=f'{name}.relu1')(out)

    out = conv3x3(out, planes, name=f'{name}.conv2')
    out = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.bn2')(out)

    if downsample is not None:
        for layer in downsample:
            identity = layer(identity)

    out = layers.Add(name=f'{name}.add')([identity, out])
    out = layers.ReLU(name=f'{name}.relu2')(out)

    return out

def make_layer(x, planes, blocks, stride=1, name=None):
    downsample = None
    inplanes = x.shape[2]
    if stride != 1 or inplanes != planes:
        downsample = [
            layers.Conv1D(filters=planes, kernel_size=1, strides=stride, use_bias=False, kernel_initializer=kaiming_normal, name=f'{name}.0.downsample.0'),
            layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.0.downsample.1'),
        ]

    x = basic_block(x, planes, stride, downsample, name=f'{name}.0')
    for i in range(1, blocks):
        x = basic_block(x, planes, name=f'{name}.{i}')

    return x

def resnet(x, blocks_per_layer, num_classes=1000):
    x = layers.Reshape((x.shape[-1], 1))(x)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn0')(x)
    # x = layers.ZeroPadding1D(padding=3, name='conv1_pad')(x)
    x = layers.Conv1D(filters=64, kernel_size=7, strides=2, use_bias=False, kernel_initializer=kaiming_normal, name='conv1')(x)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn1')(x)
    x = layers.ReLU(name='relu1')(x)
    # x = layers.ZeroPadding1D(padding=1, name='maxpool_pad')(x)
    x = layers.MaxPool1D(pool_size=3, strides=2, name='maxpool')(x)

    x = make_layer(x, 64, blocks_per_layer[0], name='layer1')
    x = make_layer(x, 128, blocks_per_layer[1], stride=2, name='layer2')
    x = make_layer(x, 256, blocks_per_layer[2], stride=2, name='layer3')
    x = make_layer(x, 512, blocks_per_layer[3], stride=2, name='layer4')
    x = layers.Conv1D(filters=16, kernel_size=1, strides=1, use_bias=False, kernel_initializer=kaiming_normal, name='convdown')(x)


    rnn_layer = layers.SimpleRNN(16, return_sequences=True)
    x = rnn_layer(x)

    x = layers.GlobalAveragePooling1D(name='avgpool')(x)
    initializer = keras.initializers.RandomUniform(-1.0 / math.sqrt(512), 1.0 / math.sqrt(512))
    x = layers.Dense(units=num_classes, kernel_initializer=initializer, bias_initializer=initializer, name='fc')(x)
    x = layers.Activation('softmax')(x)

    return x

def resnet18(x, **kwargs):
    return resnet(x, [2, 2, 2, 2], **kwargs)

inputs = keras.Input(shape=(10000,))
# inputs = keras.Input(shape=(59953,))
outputs = resnet18(inputs, num_classes=NUM_CLASSES)
model = keras.Model(inputs, outputs)



In [38]:
import tensorflow as tf
import numpy as np
import os


dataset_dir = "data/raw_small/"

# Function to load a single .npy file and assign its label
def load_npy_file(file_path, label):
    # Read the file and decode its path
    npy = tf.numpy_function(lambda path: np.load(path).astype(np.float32), [file_path], tf.float32)
    npy.set_shape([10000,])  # Set shape (update according to your spectrogram's dimensions)

    return npy, label


# Function to create a dataset for all files
def create_dataset(dataset_dir):
    # List all files and infer labels from folder names
    all_files = []
    all_labels = []
    for class_name in sorted(os.listdir(dataset_dir)):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        label = ([int(digit) for digit in class_name])  # Convert folder name to integer label
        files = [os.path.join(class_path, f) for f in os.listdir(class_path) if f.endswith(".npy")]
        all_files.extend(files)
        all_labels.extend([label] * len(files))

    print(len(all_labels))
    print(len(all_files))



    # Create a dataset from the files and labels
    file_paths = tf.constant(all_files)
    labels = tf.constant(all_labels, dtype=tf.int32)
    num_classes = 8  # Determine the number of classes
    # labels = tf.one_hot(labels, depth=num_classes)  # Apply one-hot encoding
    # print(labels)

    dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels))
    dataset = dataset.shuffle(len(all_files))  # Shuffle dataset
    dataset = dataset.map(load_npy_file, num_parallel_calls=tf.data.AUTOTUNE)  # Load files
    batch_size = 128
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset


# Create the train dataset
train_dataset = create_dataset(dataset_dir+"train")
val_dataset = create_dataset(dataset_dir+"val")


# Inspect a sample
for spectrogram, label in train_dataset.take(1):
    print("Spectrogram shape:", spectrogram.shape)
    print("Label:", label.shape)
    num_classes = label.shape[1]


6386
6386
800
800
Spectrogram shape: (128, 10000)
Label: (128, 8)


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

new_optimizer = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(
    optimizer=new_optimizer ,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)
checkpoint = ModelCheckpoint(
    filepath='best_small_crnn_model.keras',  # Filepath to save the model
    monitor='val_accuracy',       # Metric to monitor (e.g., 'val_loss', 'val_accuracy')
    save_best_only=True,      # Save only the best model
    save_weights_only=False,  # Save the entire model (not just weights)
    mode='min',               # Save when the monitored metric decreases ('min') or increases ('max')
    verbose=1                 # Print a message when saving
)




history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=40,
    verbose=1,
    callbacks=[checkpoint]
    # callbacks=[PrintEveryFewBatchesCallback(interval=1), checkpoint]
)
print('///////////////////////////////////')
print(history)

Epoch 1/40
