In [None]:
import gc
import os
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf

from shared.utils import *
from shared.kaggle_path import *

from matplotlib import pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

In [1]:
strategy = tf.distribute.MirroredStrategy() 

# Data Preprocessing
## Audio Data

In [None]:
label = []
filepath = []

for classes in os.listdir(AUDIO_PATH):
    for file in os.listdir(os.path.join(AUDIO_PATH, classes)):
        file_path = os.path.join(AUDIO_PATH, classes, file)

        filepath.append(file_path)
        label.append(classes)

audio = pd.DataFrame({
    'filepath': filepath,
    'label': label,
})

del filepath, label

audio.head()

In [None]:
le = LabelEncoder()
le.fit(audio['label'])
audio['label_encoded'] = le.transform(audio['label'])
num_classes = len(le.classes_)

In [None]:
# with tf.device('/GPU:0'):
with strategy.scope():
    audio['data'] = audio.filepath.apply(preprocess_audio)
    audio = audio.sample(frac=1).reset_index(drop=True)

In [None]:
x_audio = np.stack(audio['data'].values)
y_audio = audio['label_encoded'].values
print(x_audio.shape)
del audio

In [None]:
x_audio_train, x_temp, y_audio_train, y_temp = train_test_split(
    x_audio, y_audio, test_size=0.3, random_state=100,
)

x_audio_val, x_audio_test, y_audio_val, y_audio_test = train_test_split(
    x_temp, y_temp, test_size=0.5, random_state=222,
)

del x_audio, y_audio
print(x_audio_train.shape)
print(x_audio_train.shape[0] == len(y_audio_train))

In [None]:
print(x_audio_train.shape[0] == len(y_audio_train))

## Image Data

In [None]:
filepath = []
label = []

i = 0
for classes in os.listdir(TRAIN_IMAGE_PATH):
    for file in os.listdir(os.path.join(TRAIN_IMAGE_PATH, classes)):
        path = os.path.join(TRAIN_IMAGE_PATH, classes, file)
        filepath.append(path)
        label.append(classes)

train_df = pd.DataFrame({
    'filepath': filepath,
    'label': label
})

del filepath, label

print(train_df.shape)
print(train_df['label'].unique())
train_df.head()

In [None]:
filepath = []
label = []

i = 0
for classes in os.listdir(TEST_IMAGE_PATH):
    for file in os.listdir(os.path.join(TEST_IMAGE_PATH, classes)):
        path = os.path.join(TEST_IMAGE_PATH, classes, file)
        filepath.append(path)
        label.append(classes)

img_test_df = pd.DataFrame({
    'filepath': filepath,
    'label': label
})

del filepath, label

print(img_test_df.shape)
print(img_test_df['label'].unique())
img_test_df.head()

In [None]:
le = LabelEncoder()
le.fit(train_df['label'])
train_df['label_encoded'] = le.transform(train_df['label'])
img_test_df['label_encoded'] = le.transform(img_test_df['label'])

In [None]:
# with tf.device('/GPU:0'):
with strategy.scope():
    train_df['data'] = train_df['filepath'].apply(lambda x : preprocess_image(x, (100, 100)))
    img_test_df['data'] = img_test_df['filepath'].apply(lambda x : preprocess_image(x, (100, 100)))

train_df.shape

In [None]:
x_img = np.stack(train_df['data'].values)
y_img = train_df['label_encoded'].values

x_img_test = np.stack(img_test_df['data'].values)
y_img_test = img_test_df['label_encoded'].values

del train_df, img_test_df

x_img_train, x_img_val, y_img_train, y_img_val = train_test_split(x_img, y_img, test_size=0.3, random_state=100, shuffle=True, stratify=y_img)
del x_img, y_img

# Modeling
## Creating Model

In [None]:
def create_base_model(instance_name, shape):
    base = tf.keras.applications.VGG19(
        include_top=False,
        weights='imagenet',
    )
    
    for layer in base.layers:
        layer.trainable = False
    
    inputs = tf.keras.Input(shape=shape)
    base_model = base(inputs)
    output_flat = tf.keras.layers.GlobalAveragePooling2D()(base_model)
    return tf.keras.Model(inputs, output_flat, name=f"vgg_19_{instance_name}")

In [None]:
input_image = tf.keras.layers.Input(shape=(100, 100, 3), name='input_image')
input_audio = tf.keras.layers.Input(shape=(128, 110, 3), name='input_audio')

input_img_shape = list(input_image.shape)
input_audio_shape = list(input_audio.shape)
image_features = create_base_model('image', input_img_shape[1:])(input_image)
audio_features = create_base_model('audio', input_audio_shape[1:])(input_audio)

feature = tf.keras.layers.Concatenate()([image_features, audio_features])

fc1 = tf.keras.layers.Dense(512, activation='relu')(feature)

out1 = tf.keras.layers.Dense(7, activation='softmax', name='image_class')(fc1)
out2 = tf.keras.layers.Dense(8, activation='softmax', name='audio_class')(fc1)

model = tf.keras.models.Model(inputs=[input_image, input_audio], outputs=[out1, out2])

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-2),
    loss={
        'image_class': tf.keras.utils.plot_model(model, show_shapes=True),
        'audio_class': tf.keras.utils.plot_model(model, show_shapes=True)
    },
    metrics={
        'image_class': 'accuracy',
        'audio_class': 'accuracy' 
    }
)

In [None]:
min_samples = min(len(x_img_train), len(x_audio_train), len(y_img_train), len(y_audio_train))

x_img_train = x_img_train[:min_samples]
x_audio_train = x_audio_train[:min_samples]
y_img_train = y_img_train[:min_samples]
y_audio_train = y_audio_train[:min_samples]

print("Input Image Shape:", x_img_train.shape)
print("Input Audio Shape:", x_audio_train.shape)
print("Image Labels Shape:", y_img_train.shape)
print("Audio Labels Shape:", y_audio_train.shape)

print("Input Image Type:", x_img_train.dtype)
print("Input Audio Type:", x_audio_train.dtype)
print("Image Labels Type:", y_img_train.dtype)
print("Audio Labels Type:", y_audio_train.dtype)

In [None]:
min_samples = min(len(x_img_val), len(x_audio_val), len(y_img_val), len(y_audio_val))

x_img_val = x_img_val[:min_samples]
x_audio_val = x_audio_val[:min_samples]
y_img_val = y_img_val[:min_samples]
y_audio_val = y_audio_val[:min_samples]


In [None]:
tf.keras.backend.clear_session()
gc.collect()

In [None]:
early = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    restore_best_weights=True,
    mode='min',
    # start_from_epoch=2,
    patience=5
)

checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath= os.path.join(MODEL_CHECKPOINT_PATH, 'best_model.keras'),
    verbose=2,
    save_best_only=True,
    monitor='val_loss',
    mode='min'
)

lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
    factor=0.2,
    min_lr=0.000001,
    patience=3,
    mode='min'
)

## Train

In [None]:
history = model.fit(
    x=[x_img_train, x_audio_train],
    y=[y_img_train, y_audio_train],
    validation_data=(
        [x_img_val, x_audio_val],
        [y_img_val, y_audio_val]
    ),
    callbacks=[lr_scheduler, checkpoint],
    epochs=300,
    batch_size=BATCH_SIZE,
    steps_per_epoch=len(x_img_train) // BATCH_SIZE,
)


## Plot the training result

In [None]:
def plot_and_save(metric_name, history, ylabel, filename):
    plt.figure()
    plt.plot(history.history[metric_name], label=f'Train {ylabel}')
    if f'val_{metric_name}' in history.history:
        plt.plot(history.history[f'val_{metric_name}'], label=f'Val {ylabel}')
    plt.title(f'{ylabel} over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel(ylabel)
    plt.legend()
    plt.grid(True)
    plt.savefig(filename)
    plt.close()

In [None]:
plot_and_save('loss', history, 'Loss', 'loss_plot.png')
plot_and_save('image_class_loss', history, 'Image Classification Loss', 'image_class_loss_plot.png')
plot_and_save('audio_class_loss', history, 'Audio Classification Loss', 'audio_class_loss_plot.png')
plot_and_save('image_class_accuracy', history, 'Image Classification Accuracy', 'image_class_accuracy_plot.png')
plot_and_save('audio_class_accuracy', history, 'Audio Classification Accuracy', 'audio_class_accuracy_plot.png')

# Evaluation
## Using model.evaluate

In [None]:
model.evaluate([x_img_val, x_audio_val], [y_img_val, y_audio_val], batch_size=BATCH_SIZE, verbose=2, steps=len(x_img_val) // BATCH_SIZE)

In [None]:
model.evaluate([x_img_test, x_audio_test], [y_img_test, y_audio_test], batch_size=BATCH_SIZE, verbose=2, steps=len(x_img_test) // BATCH_SIZE)

In [None]:
predictions = model.predict([x_img_test, x_audio_test], batch_size=BATCH_SIZE)

image_preds = predictions[0]
audio_preds = predictions[1]  

## Using confusion matrix

In [None]:
image_class = np.argmax(predictions[0], axis=1)
audio_class = np.argmax(predictions[1], axis=1)

image_cm = confusion_matrix(y_img_test, image_class)
audio_cm = confusion_matrix(y_audio_test, audio_class)

### Plot the confusion Matrix

In [None]:
plt.figure(figsize=(8, 6))
sns.heatmap(image_cm, annot=True, fmt='d', cmap='Blues')
plt.title('Image Classification Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
plt.savefig('image_confusion_matrix.png', dpi=300, bbox_inches='tight')
plt.close()


In [None]:
plt.figure(figsize=(8, 6))
sns.heatmap(audio_cm, annot=True, fmt='d', cmap='Greens')
plt.title('Audio Classification Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
plt.savefig('audio_confusion_matrix.png', dpi=300, bbox_inches='tight')
plt.close()


## Using Classification Report

In [None]:
aud_classes = [x for x in os.listdir(AUDIO_PATH)]
img_classes = [x for x in os.listdir(TRAIN_IMAGE_PATH)]

print(aud_classes, img_classes, sep='\n')

In [None]:
print(classification_report(y_img_test, image_class))

In [None]:
print(classification_report(y_audio_test, audio_class))