In [None]:
import os
import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.metrics import recall_score
from sklearn.utils import class_weight
from util import util

In [None]:
fullName = 'beats-dense-fullset'
epochs = 2000
batchSize = 32

In [None]:
df = pd.read_pickle(os.path.join(util.dataPath, 'training.pkl'))
df

In [None]:
weight = class_weight.compute_class_weight('balanced',
                                           classes=[1, 2, 3, 4, 5],
                                           y=df['Disease category'].tolist())
weightDict = dict(enumerate(weight))
weightDict

In [None]:
valIndex = df.groupby(['Disease category']).sample(frac=0.1, random_state=5397).index
steps_per_epoch = ((len(df) - len(valIndex)) // batchSize) + 1

ds = util.getDsFromDf(df, batchSize=batchSize)

trainDs = util.getDsFromDf(util.fillBatch(df[~df.index.isin(valIndex)], batchSize), batchSize=batchSize)
valDs = util.getDsFromDf(util.fillBatch(df[df.index.isin(valIndex)], batchSize), batchSize=batchSize)
valX, valY = util.getDsFromDf(df[df.index.isin(valIndex)], returnNp=True)

batchShape = next(trainDs.take(1).as_numpy_iterator())[0]
ds

In [None]:
publicTestDf = pd.read_pickle(os.path.join(util.publicTestPath, 'publitTest.pkl'))
publicTestX = util.getDsFromDf(publicTestDf, testMode=True)
publicTestX

In [None]:
privateTestDf = pd.read_pickle(os.path.join(util.privateTestPath, 'privateTest.pkl'))
privateTestX = util.getDsFromDf(privateTestDf, testMode=True)
privateTestX

In [None]:
def macro_recall(labels, predictions):
    """
    Calculates the macro recall given the true labels and predicted labels using TensorFlow.

    Arguments:
    labels -- true labels, a tensor of shape (batch_size, num_classes)
    predictions -- predicted labels, a tensor of shape (batch_size, num_classes)

    Returns:
    recall -- macro recall value
    """

    # Calculate true positives, false negatives, and the number of positive samples for each class
    true_positives = tf.reduce_sum(labels * predictions, axis=0)
    false_negatives = tf.reduce_sum(labels * (1 - predictions), axis=0)
    num_positives = tf.reduce_sum(labels, axis=0)

    # Calculate recall for each class
    class_recall = true_positives / (true_positives + false_negatives + tf.keras.backend.epsilon())

    # Calculate macro recall
    macro_recall = tf.reduce_mean(class_recall)

    return macro_recall

In [None]:
audioInput = tf.keras.layers.Input(shape=batchShape['audio'].shape[1:], batch_size=batchSize)
audioX = tf.keras.layers.GlobalAvgPool1D()(audioInput)
audioX = tf.keras.layers.Dense(32, activation='relu')(audioX)

structuredInput = tf.keras.layers.Input(shape=batchShape['structured'].shape[1:], batch_size=batchSize)
structuredX = tf.keras.layers.Dense(32, activation='relu')(structuredInput)

mergedX = tf.keras.layers.Concatenate()([audioX, structuredX])
mergedX = tf.keras.layers.Dropout(0.5)(mergedX)
mergedX = tf.keras.layers.Dense(5, activation='softmax')(mergedX)

model = tf.keras.models.Model(inputs={'audio': audioInput, 'structured': structuredInput}, outputs=mergedX)

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-4),
    loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.0),
    metrics=[
        'accuracy',
        macro_recall,
    ]
)
model.summary()
tf.keras.utils.plot_model(model, show_shapes=True, rankdir="LR")

In [None]:
hist = model.fit(
    x=trainDs,
    validation_data=valDs,
    steps_per_epoch=steps_per_epoch,
    class_weight=weightDict,
    epochs=epochs,
    max_queue_size=250,
    workers=4,
    callbacks=[
        tf.keras.callbacks.ModelCheckpoint('weight/{}-best.h5'.format(fullName), monitor='val_accuracy', verbose=1, mode='max', save_best_only=True, save_weights_only=True),
    ])
model.save_weights('weight/{}.h5'.format(fullName))
pd.DataFrame.from_dict(hist.history).to_pickle('history/{}.pkl'.format(fullName))

In [None]:
hist = pd.read_pickle('history/{}.pkl'.format(fullName))
util.showHist(hist)

In [None]:
model.load_weights('weight/{}-best.h5'.format(fullName))

In [None]:
recall = recall_score(
    np.argmax(valY, axis=1),
    np.argmax(model.predict(valX), axis=1),
    average=None
)
recall, recall.mean()

In [None]:
publicTestDf['pred'] = model.predict(publicTestX).tolist()
publicTestDf['predLabel'] = publicTestDf['pred'].apply(lambda x: np.argmax(x, axis=-1) + 1)
privateTestDf['pred'] = model.predict(privateTestX).tolist()
privateTestDf['predLabel'] = privateTestDf['pred'].apply(lambda x: np.argmax(x, axis=-1) + 1)

testDf = pd.concat([publicTestDf, privateTestDf])
testDf[['ID', 'pred', 'predLabel']].to_pickle('output/{}-pred.pkl'.format(fullName))
testDf[['ID', 'predLabel']].to_csv('output/{}.csv'.format(fullName), header=False, index=False)
testDf