In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.regularizers import l1, l2
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from keras_tuner import HyperModel, Hyperband


In [2]:
# read captions in the data
df = pd.read_csv('data/all_data_850-870_embedding.csv')
if 'funny' in df['label'].values:
    df['label'] = df['label'].replace('funny', 2)
    df['label'] = df['label'].replace('somewhat_funny', 1)
    df['label'] = df['label'].replace('not_funny', 0)
    print('Recode label successfully!')
df['caption'] = df['caption'].astype(str)
captions = df['caption'].values.tolist()

Recode label successfully!


In [3]:
# caculate the vocabulary size
word_index = {}
for caption in captions:
    for word in caption.split():
        if word not in word_index:
            word_index[word] = 1
        else:
            word_index[word] += 1
print("vocabulary size: ", len(word_index))

# caculate the max length of captions
max_length = 0
for caption in captions:
    if len(caption.split()) > max_length:
        max_length = len(caption.split())
print("max_length: ",max_length)

vocabulary size:  8431
max_length:  73


In [4]:
# Constants
vocab_size = 6000   # Choose based on the vocabulary size of your dataset
max_length = 15   # Choose based on the length of your longest caption
embedding_dim = 300   # Size of the word embeddings
num_classes = 3   # Number of funniness categories

In [5]:
# Tokenize the captions
tokenizer = Tokenizer(num_words=vocab_size, oov_token="<OOV>")
tokenizer.fit_on_texts(df['caption'])
sequences = tokenizer.texts_to_sequences(df['caption'])
text_data = pad_sequences(sequences, maxlen=max_length, padding='post')

# Encode the labels
labels= df['label']

In [6]:
# Split the data into training and testing sets
X_training, X_test, y_training, y_test = train_test_split(
    text_data, labels, test_size=0.2, random_state=42
)

In [7]:
# merge x_training and y_training
train_data = np.column_stack((X_training, y_training))
train_data = pd.DataFrame(train_data)

### Hyperparameter tuning

In [8]:
class MyHyperModel(HyperModel):
    def __init__(self, num_classes):
        self.num_classes = num_classes
        self.vocab_size = 6000
        self.max_length = 15
        self.embedding_dim = 300

    def build(self, hp):
        model = Sequential()
        model.add(tf.keras.layers.Embedding(self.vocab_size, self.embedding_dim, input_length=self.max_length))
        model.add(tf.keras.layers.GlobalAveragePooling1D())
        model.add(Dense(units=hp.Int('unit', min_value=64, max_value=256, step=32),
                        activation=hp.Choice('activation', values=['relu', 'tanh']),
                        kernel_regularizer=tf.keras.regularizers.l1_l2(l1=hp.Choice('l1', values=[0.01, 0.001, 0.0001, 0.0]),
                                                                        l2=hp.Choice('l2', values=[0.01, 0.001, 0.0001, 0.0]))))
        model.add(BatchNormalization())
        model.add(Dropout(hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1)))

        model.add(Dense(self.num_classes, activation='softmax'))

        model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
                        loss='sparse_categorical_crossentropy',
                        metrics=['accuracy'])
        
        return model

In [9]:
# Prepare your data
X = train_data.iloc[:, :-1]
y = train_data.iloc[:, -1]

# K-Fold Cross-Validation
num_folds = 5
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

best_hyperparams_per_fold = []

for fold, (train_indices, val_indices) in enumerate(kf.split(X)):
    print(f"Running tuning for fold {fold + 1}")

    # Split data into train and validation for the current fold
    X_train, X_val = X.iloc[train_indices,], X.iloc[val_indices,]
    y_train, y_val = y.iloc[train_indices,], y.iloc[val_indices,]

    # Define the hypermodel
    hypermodel = MyHyperModel(num_classes=3)

    # Initialize the Hyperband tuner
    tuner = Hyperband(
        hypermodel,
        objective='val_accuracy',
        max_epochs=20,
        directory=f'result/my_dir_{fold}',
        project_name='hyperparameter_tuning',
        overwrite=True
    )

    # Start the tuning process
    tuner.search(X_train, y_train, validation_data=(X_val, y_val), class_weight={0: 1, 1: 1, 2: 1})

    # Store the top 3 best hyperparameters of this fold
    top_3_hyperparams_per_fold = tuner.get_best_hyperparameters(num_trials=3)
    best_hyperparams_per_fold.append(top_3_hyperparams_per_fold)

Trial 30 Complete [00h 00m 24s]
val_accuracy: 0.383584588766098

Best val_accuracy So Far: 0.40033501386642456
Total elapsed time: 00h 04m 02s


In [10]:
# select best hyperparameters from best_hyperparams_per_fold
best_hyperparams = []
for top_hyperparams in best_hyperparams_per_fold:
    best_hyperparams.append(top_hyperparams[0])

# show best hyperparameters
print("Best hyperparameters:")
for top_hyperparam in best_hyperparams:
    print(top_hyperparam.values)

Best hyperparameters:
{'unit': 256, 'activation': 'relu', 'l1': 0.0, 'l2': 0.0001, 'dropout': 0.0, 'learning_rate': 0.0001, 'tuner/epochs': 20, 'tuner/initial_epoch': 0, 'tuner/bracket': 0, 'tuner/round': 0}
{'unit': 192, 'activation': 'relu', 'l1': 0.01, 'l2': 0.0, 'dropout': 0.0, 'learning_rate': 0.0001, 'tuner/epochs': 20, 'tuner/initial_epoch': 7, 'tuner/bracket': 2, 'tuner/round': 2, 'tuner/trial_id': '0015'}
{'unit': 96, 'activation': 'tanh', 'l1': 0.01, 'l2': 0.0, 'dropout': 0.4, 'learning_rate': 0.0001, 'tuner/epochs': 3, 'tuner/initial_epoch': 0, 'tuner/bracket': 2, 'tuner/round': 0}
{'unit': 160, 'activation': 'relu', 'l1': 0.001, 'l2': 0.001, 'dropout': 0.30000000000000004, 'learning_rate': 0.0001, 'tuner/epochs': 20, 'tuner/initial_epoch': 0, 'tuner/bracket': 0, 'tuner/round': 0}
{'unit': 256, 'activation': 'tanh', 'l1': 0.0001, 'l2': 0.001, 'dropout': 0.0, 'learning_rate': 0.01, 'tuner/epochs': 20, 'tuner/initial_epoch': 0, 'tuner/bracket': 0, 'tuner/round': 0}


### Test

In [27]:
# Early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Build the model with the optimal hyperparameters
model = hypermodel.build(best_hyperparams[4])

# fit model
model.fit(X_training, y_training, epochs=20, validation_split=0.2, callbacks=[early_stopping])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20


<keras.src.callbacks.History at 0x23b31d54bb0>

In [28]:
# show the predictions
predictions = model.predict(X_test)

# convert the predictions to labels
predictions = np.argmax(predictions, axis=1)



In [29]:
# concat the predictions with the y_test
df_result = pd.DataFrame(columns=['label', 'prediction'])
df_result['label'] = y_test
df_result['prediction'] = predictions

# add a column for accuracy
df_result['correct'] = np.where(df_result['label'] == df_result['prediction'], 1, 0)
accuracy = df_result['correct'].sum() / len(df_result)

# calculate the accuracy for each class
df_result['correct'] = df_result['label'] == df_result['prediction']
df_result['correct'] = df_result['correct'].astype(int)
accuracy_class = df_result.groupby('label')['correct'].sum() / df_result.groupby('label')['correct'].count()


In [30]:
print('Accuracy average: ', accuracy)
print('Accuracy funny: ', accuracy_class[2])
print('Accuracy somewhat funny: ', accuracy_class[1])
print('Accuracy not funny: ', accuracy_class[0])

Accuracy average:  0.3119143239625167
Accuracy funny:  0.03076923076923077
Accuracy somewhat funny:  0.9106382978723404
Accuracy not funny:  0.04365079365079365
