<a href="https://colab.research.google.com/github/PankajKumarBeniwal/Memotion-Analysis/blob/main/task_b.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Loaing Necessary Libraries

In [None]:
import re
import string
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import tensorflow as tf
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPool2D, GlobalAveragePooling2D
from tensorflow.keras.layers import Dense, Flatten, BatchNormalization, Activation, Dropout
from tensorflow.keras.layers import Conv1D, Embedding, GlobalAveragePooling1D 
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.preprocessing import image

from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

### Reading Image Info from CSV and Cleaning

In [None]:
df = pd.read_csv('../input/memotion-dataset-7k/memotion_dataset_7k/labels.csv')
df.drop(df.columns[df.columns.str.contains('unnamed',case = False)],axis = 1, inplace = True)
df = df.drop(columns = ['text_ocr', 'overall_sentiment'])
df.head()

In [None]:
cleaned = df.copy()
cleaned.dropna(inplace=True)
cleaned.isnull().any()

# Image Modelling

### Loading Images

In [None]:
width = 100
height = 100
X = []
for i in tqdm(range(cleaned.shape[0])):
    if i in [119, 4799, 6781, 6784, 6786]:
        pass
    else:
        path = '../input/memotion-dataset-7k/memotion_dataset_7k/images/'+cleaned['image_name'][i]
        img = image.load_img(path,target_size=(width,height,3))
        img = image.img_to_array(img)
        img = img/255.0
        X.append(img)
        
X = np.array(X)

In [None]:
X.shape

### Dropping few rows to make shape consistent

In [None]:
rows_to_drop = ['image_120.jpg',
              'image_4800.jpg',
              'image_6782.jpg',
              'image_6785.jpg',
              'image_6787.jpg',
              'image_6988.jpg',
              'image_6989.jpg',
              'image_6990.png',
              'image_6991.jpg',
              'image_6992.jpg']

In [None]:
for images in rows_to_drop:
    cleaned.drop(cleaned[cleaned['image_name'] == images].index, inplace=True)

In [None]:
cleaned = cleaned.replace({'humour': {'not_funny': 0, 'funny': 1, 'very_funny': 1, 'hilarious':1},
                        'sarcasm': {'not_sarcastic': 0, 'general': 1, 'twisted_meaning': 1, 'very_twisted': 1},
                        'offensive': {'not_offensive': 0, 'slight': 1, 'very_offensive': 1, 'hateful_offensive': 1},
                        'motivational': {'not_motivational': 0, 'motivational': 1}})

In [None]:
target = cleaned.iloc[:,2:]
target.head()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, target, test_size = 0.2, stratify=target)

### Image Preprocessing

In [None]:
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.experimental.preprocessing.RandomFlip('horizontal'),
  tf.keras.layers.experimental.preprocessing.RandomContrast([.5,2]),
  tf.keras.layers.experimental.preprocessing.RandomRotation(0.2),
  tf.keras.layers.experimental.preprocessing.RandomZoom(0.1)
])

preprocess_input = tf.keras.applications.resnet_v2.preprocess_input

rescale = tf.keras.layers.experimental.preprocessing.Rescaling(1./127.5, offset= -1)

In [None]:
plt.figure(figsize=(10, 10))
for i in range(9):
  augmented_image = data_augmentation(X)
  ax = plt.subplot(3, 3, i + 1)
  plt.imshow(augmented_image[0])
  plt.axis("off")

### Base Model

In [None]:
base_model_1 = tf.keras.applications.ResNet50(input_shape=X[0].shape,
                                               include_top=False,
                                               weights='imagenet')
base_model_2 = tf.keras.applications.VGG16(input_shape=X[0].shape,
                                               include_top=False,
                                               weights='imagenet')

In [None]:
base_model_1.trainable = False
base_model_2.trainable = False

### Model for Image

In [None]:
def image_model():
    image_input = tf.keras.Input(shape=(150, 150, 3), name = 'image_input')
    image_layers = data_augmentation(image_input)
    image_layers = preprocess_input(image_layers)
    layer_bm_1 = base_model_1(image_input, training=False)
    dropout_layer = Dropout(0.2)(layer_bm_1)
    layer_bm_1 = Conv2D(2048, kernel_size=2,padding='valid')(layer_bm_1)
    dropout_layer = Dropout(0.2)(layer_bm_1)
    layer_bm_1 = Dense(512)(dropout_layer)
    dropout_layer = Dropout(0.2)(layer_bm_1)
    #layer_bm_2 = base_model_2(image_input, training=False)
    #dropout_layer = Dropout(0.2)(layer_bm_2)
    #layer_bm_2 = Dense(512)(layer_bm_2)
    #dropout_layer = Dropout(0.2)(layer_bm_2)
    #layers = tf.keras.layers.concatenate([layer_bm_1, layer_bm_2])
    image_layers = GlobalAveragePooling2D()(layer_bm_1)
    image_layers = Dropout(0.2, name = 'dropout_layer')(image_layers)
    return image_input, image_layers

In [None]:
image_input, image_layers = image_model()

# Text Modelling

### Standardization and Cleaning

In [None]:
def standardization(data):
    data = data.apply(lambda x: x.lower())
    data = data.apply(lambda x: re.sub(r'\d+', '', x))
    data = data.apply(lambda x: re.sub(r'\w*.com\w*', '', x, flags=re.MULTILINE))
    data = data.apply(lambda x: x.translate(str.maketrans('', '', string.punctuation)))
    return data

cleaned['text_corrected'] = standardization(cleaned.text_corrected)

### Vectorizing Layers

In [None]:
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
vocab_size = 10000
sequence_length = 50

vectorize_layer = TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length)

text_ds = np.asarray(cleaned['text_corrected'])
vectorize_layer.adapt(tf.convert_to_tensor(text_ds))

In [None]:
X_text_train, X_text_test, y_text_train, y_text_test = train_test_split(cleaned.text_corrected, target, test_size = 0.2, stratify=target)

In [None]:
embedding_dim=16

def text_model():
    text_input = tf.keras.Input(shape=(None,), dtype=tf.string, name='text')
    text_layers = vectorize_layer(text_input)
    text_layers = tf.keras.layers.Embedding(vocab_size, embedding_dim, name="embedding")(text_layers)
    dropout_layer = Dropout(0.2)(text_layers)
    
    text_layers = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(512, activation='relu', return_sequences=True))(text_layers)
    dropout_layer = Dropout(0.2)(text_layers)
    text_layers = tf.keras.layers.BatchNormalization()(text_layers)

    text_layers = tf.keras.layers.Conv1D(128, 7, padding="valid", activation="relu", strides=3)(text_layers)
    dropout_layer = Dropout(0.2)(text_layers)
    text_layers = tf.keras.layers.GlobalMaxPooling1D()(text_layers)
    dropout_layer = Dropout(0.2)(text_layers)
    
    text_layers = tf.keras.layers.Dense(2048, activation="relu")(text_layers)
    text_layers = tf.keras.layers.Dropout(0.5)(text_layers)
    return text_input, text_layers

text_input, text_layers = text_model()

# Combining and Evaluating

### Task A: Overall Sentiment

In [None]:
def model(layer_1, layer_2, image_input, text_input):
    concatenate = tf.keras.layers.concatenate([layer_1, layer_2], axis=1)
    semi_final_layer = tf.keras.layers.Dense(2048, activation='softmax')(concatenate)

    prediction_layer = tf.keras.layers.Dense(4, activation='softmax', name = 'task_B_out')
    

    output = prediction_layer(semi_final_layer)
    

    model = tf.keras.Model(inputs = [image_input, text_input] , 
                           outputs = output)
    return model

In [None]:
model = model(image_layers, text_layers, image_input, text_input)

In [None]:
import os
# Define the checkpoint directory to store the checkpoints
checkpoint_dir = './training_checkpoints'

# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

In [None]:
# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 5:
    return 1.0
  elif epoch >= 5 and epoch < 15:
    return 0.5
  else:
    return 0.1

In [None]:
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    tf.keras.callbacks.EarlyStopping(monitor = 'accuracy', patience=5),
    PrintLR()
]

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
                  loss = 'categorical_crossentropy',
                  metrics=['categorical_accuracy'])
    
history = model.fit(x = {"image_input": X_train, "text_input": X_text_train},
                        y= y_train,
                        batch_size=32,
                        epochs=25,
                        validation_data=({"image_input": X_test, "text_input": X_text_test}, y_test ),
                        callbacks=callbacks
                       )

In [None]:
prediction = model.predict(x = {"image_input": X_test, "text_input": X_text_test})
prediction = np.array(prediction)
prediction = np.squeeze(prediction).T
prediction = 1/(1+np.exp(-np.array(prediction)))
prediction = np.where(prediction > 0.5, 1, 0)
y_true = y_test.values


micro_f1_score = f1_score(y_true[:4,1], prediction[:4,1], average='micro')
macro_f1_score = f1_score(y_true[:4,1], prediction[:4,1], average='macro')

print("Micro F1 score for Task B is ", micro_f1_score)
print("Macro F1 score for Task B is ", macro_f1_score)

In [None]:
pd.DataFrame(history.history)

In [None]:
plt.imshow(X[1,:,:,:])
target.iloc[1,:]

In [None]:
prediction = model.predict(x = {"image_input": X_test, "text_input": X_text_test})
prediction = np.array(prediction)

In [None]:
plt.bar(['humuor', 'sarcasm', 'offensive', 'motivational'], np.where(prediction[:,1,0] > 0.5, 1, 0))

In [None]:
df = pd.DataFrame(history.history)

fig, axes = plt.subplots(1,3, figsize=(12, 4))

axes[0].plot(df.loss)
axes[0].plot(df.humuor_loss)
axes[0].plot(df.sarcasm_loss)
axes[0].plot(df.offensive_loss)
axes[0].plot(df.motivational_loss)
axes[0].set_xlabel('Epochs')
axes[0].set_ylabel('Losses')
axes[0].set_title('Losses Per Epoch')
axes[0].legend(['Humuor loss', 'Sarcasm loss','Offensive loss','Motivational Loss'], loc='upper right')


axes[1].plot(df.humuor_accuracy)
axes[1].plot(df.sarcasm_accuracy)
axes[1].plot(df.offensive_accuracy)
axes[1].plot(df.motivational_accuracy)
axes[1].set_xlabel('Epochs')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Accuracy Per Epoch')
axes[1].legend(['Humuor Acc', 'Sarcasm Acc','Offensive Acc','Motivational Acc'], loc='lower right')



axes[2].plot(df.loss)
axes[2].set_xlabel('Epochs')
axes[2].set_ylabel('Losses')
axes[2].set_title('Losses Per Epoch')

In [None]:
test_images = X_test.shape[0]

random_index = np.random.choice(test_images, 5)
random_test_images = X_test[random_index, ...]
random_test_labels = (y_test.humour[random_index, ...],
                      y_test.sarcasm[random_index, ...],
                      y_test.offensive[random_index, ...],
                      y_test.motivational[random_index, ...])

predictions = model.predict(random_test_images)

fig, axes = plt.subplots(5, 2, figsize=(16, 12))
fig.subplots_adjust(hspace=0.4, wspace=-0.2)

for i, (prediction, image, label) in enumerate(zip(predictions, random_test_images, random_test_labels)):
    axes[i, 0].imshow(np.squeeze(image))
    axes[i, 0].get_xaxis().set_visible(False)
    axes[i, 0].get_yaxis().set_visible(False)
    axes[i, 0].text(10., -1.5, f'Digit {label}')
    axes[i, 1].bar(np.arange(1,11), prediction)
    axes[i, 1].set_xticks(np.arange(1,11))
    axes[i, 1].set_title("Categorical distribution. Model prediction")
    
plt.show()