<a href="https://colab.research.google.com/github/Satwikram/Computer-Vision-Implementations/blob/main/Multimodal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Author: Satwik Ram K

### Setup

In [None]:
!pip install transformers

!pip install pytesseract
# !pip install pdf2image
!apt-get install poppler-utils 
!apt install tesseract-ocr
# !pip install keras_nlp
!pip install attention

### Importing Dependencies

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import string
import re

import os
from pathlib import Path

import matplotlib.pyplot as plt
import cv2
import tensorflow as tf

from tensorflow.keras.layers import Dense, Input, Flatten, Conv2D, MaxPool2D, GlobalAveragePooling1D, BatchNormalization, Embedding, Bidirectional, LSTM
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
# import keras_nlp

from sklearn.model_selection import train_test_split

import spacy
from unicodedata import normalize

from tqdm import tqdm

import pytesseract
from PIL import Image

from transformers import AutoTokenizer, TFAutoModelForSequenceClassification

import plotly.express as px

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


Moving 0 files to the new cache system


0it [00:00, ?it/s]

### Gathering Dataset

In [None]:
!unzip /content/drive/MyDrive/Deceptive-Research/dataset.zip

### Global Variables

In [None]:
img_shape = (224, 224)
deceptive_path = Path("/content/deceptive")
normal_path = Path("/content/Ads/")
uniq_labels = ["Non Deceptive", "Deceptive"]

### Cleaning the Data

In [None]:
nlp = spacy.load('en_core_web_sm', disable=['parser', 'ner'])

In [None]:
def clean_text(text):

    text = normalize("NFKD", text) #Normalization

    text = re.sub(r"[^\w\s]","", text) #Remove Punc

    text = " ".join([token.lemma_ for token in nlp(text) if not token.is_stop])

    text = re.sub("\s+", " ", text)

    text = text.strip()

    return text

In [None]:
def clean_img(fname):

  img = cv2.imread(fname)

  img = cv2.resize(img, img_shape) 

  # Normalization
  img = img/255.0

  return img

### OCR - Image to Text

In [None]:
def get_data(image):

  txt = pytesseract.image_to_string(Image.open(image), lang="eng")
  txt = re.sub("[\n]{2,}", "\t\t", txt)
  txt = re.sub("\n", "", txt)
  txt = re.sub("\t\t", "\n", txt)

  return txt

### Image Extraction

In [None]:
def extract_image(path, target):

  X1 = []
  X2 = []
  y = []

  for img in os.listdir(path):

    _, tail = os.path.splitext(img)

    if tail in [".jpg", ".jpeg", ".png"]:

      try:
        
        fname = f"{path}/{img}"

        # OCR - Image to Text        
        text = get_data(fname)

        # Cleaning the text
        text = clean_text(text)

        if not text:
          text = "No Information"
          print(fname)

        # Cleaning the Image
        img = clean_img(fname)

        X1.extend([img])
        X2.extend([text])
        y.extend([target])

      except Exception as e: print(f"Exception: {e}")

  return X1, X2, y

In [None]:
X1, X2, y = extract_image(deceptive_path, 1)

In [None]:
for i in os.listdir(normal_path):
  X1_t, X2_t, y_t = extract_image(f"{normal_path}/{i}", 0)
  X1.extend(X1_t)
  X2.extend(X2_t)
  y.extend(y_t)

/content/Ads/18/15.png
Exception: OpenCV(4.6.0) /io/opencv/modules/imgproc/src/resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'resize'

/content/Ads/18/13.png
/content/Ads/18/12.png
/content/Ads/18/14.png
/content/Ads/8/14.png
/content/Ads/9/15.png
Exception: OpenCV(4.6.0) /io/opencv/modules/imgproc/src/resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'resize'

/content/Ads/12/12.png
Exception: OpenCV(4.6.0) /io/opencv/modules/imgproc/src/resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'resize'

/content/Ads/5/13.png
/content/Ads/6/11.png
/content/Ads/6/13.png
/content/Ads/6/12.png
/content/Ads/1/13.png
Exception: OpenCV(4.6.0) /io/opencv/modules/imgproc/src/resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'resize'

/content/Ads/2/14.png
/content/Ads/15/13.png
/content/Ads/3/11.png


In [None]:
X1 = np.array(X1)
# X2 = np.array(X2)

y = np.array(y, dtype="float32")

In [None]:
X1.shape, y.shape

((392, 224, 224, 3), (392,))

In [None]:
len_X2 = [len(x.split()) for x in X2]

In [None]:
px.box(len_X2)

### Tokenization

In [None]:
checkpoint = "bert-base-uncased"
# checkpoint = "gpt2"
sequence_length = 100

def tokenize(samples):

  tokenizer = AutoTokenizer.from_pretrained(checkpoint)

  if checkpoint == "gpt2" and tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})

  tokens = tokenizer(
      samples,
      max_length=sequence_length,
      truncation=True,
      padding="max_length",
      add_special_tokens=True,
      return_tensors="np"
  )

  return {"input_ids": tokens["input_ids"].tolist(), "attention_mask": tokens["attention_mask"].tolist()}

In [None]:
X_tokenized = pd.DataFrame(tokenize(X2), columns=["input_ids", "attention_mask"])

### Splitting Data into Train/Test

In [None]:
X_tokenized_train, X_tokenized_test, X_img_train, X_img_test, y_train, y_test = train_test_split(X_tokenized, X1, y, test_size=0.2, random_state=42, shuffle=True, stratify=y)

In [None]:
X_tokenized_train

Unnamed: 0,input_ids,attention_mask
217,"[101, 4770, 2795, 5096, 7479, 10760, 16869, 10...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
308,"[101, 2489, 7829, 7479, 13278, 9692, 9006, 248...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, ..."
103,"[101, 4497, 8913, 1053, 2669, 2080, 1052, 1034...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, ..."
15,"[101, 2053, 2592, 102, 0, 0, 0, 0, 0, 0, 0, 0,...","[1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
230,"[101, 2184, 12849, 10264, 2102, 2192, 5906, 37...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, ..."
...,...,...
100,"[101, 9686, 17443, 1023, 2243, 2290, 24707, 12...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
273,"[101, 3046, 12882, 2382, 2154, 2489, 3711, 231...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
276,"[101, 3338, 2919, 3143, 2186, 11387, 16068, 23...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
277,"[101, 10036, 22477, 14154, 9938, 2860, 2860, 2...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."


### Unzipping the IDs and Masks 

In [None]:
unzip_x = lambda x: [np.vstack(x["input_ids"]), np.vstack(x["attention_mask"])]

X_tokenized_train, X_tokenized_test = unzip_x(X_tokenized_train), unzip_x(X_tokenized_test)

In [None]:
X_train = [X_img_train, X_tokenized_train]
X_test = [X_img_test, X_tokenized_test]

In [None]:
from attention import Attention

### Building the model

In [None]:
def build_model(img_shape, targets, checkpoint, sequence_length):

  # Loading the pre-trained Resnet model
  base_model_img = tf.keras.applications.ResNet50(input_shape=img_shape, include_top=False, weights="imagenet")
  
  # Loading the pre-trained BERT model
  base_model_txt = TFAutoModelForSequenceClassification.from_pretrained(checkpoint)

  # Freezing the base model
  base_model_img.trainable = False

  input_ids = Input(shape=(sequence_length,), name="input_ids", dtype="int32")
  attenion_mask = Input(shape=(sequence_length,), name="attention_mask", dtype="int32")

  if checkpoint == "gpt2": x1 = base_model_txt.transformer(input_ids, attention_mask=attenion_mask)[0]
  else: x1 = base_model_txt.bert(input_ids, attention_mask=attenion_mask)[1]

  # Defining the custom head for our neural network
  global_average_layer = tf.keras.layers.GlobalAveragePooling2D()(base_model_img.output)

  x1 = Flatten()(x1)

  # Concat the results

  concat_vec = tf.keras.layers.concatenate([x1, global_average_layer], name="Concat")

  
  # x = tf.concat([x1, global_average_layer], axis=-1, name="Concat")

  # x = tf.keras.layers.Reshape(target_shape=((1, x.shape) + (1,)), name="Reshaping")(x)
  expanded_concat = tf.keras.layers.Reshape((concat_vec.shape[1], 1))(concat_vec)

  # print(expanded_concat)
  
  # # Encoder Layer
  x = tf.keras.layers.LSTM(128, return_sequences=True, return_state=True)(expanded_concat)
  x = tf.keras.layers.LSTM(128, return_sequences=True, return_state=True)(x)

  atten = tf.keras.layers.Attention()(x)

  dense = tf.keras.layers.Reshape(target_shape=(-1, 1))(atten)

  dense = tf.keras.layers.Flatten()(dense)
  
  print(dense.shape)

  # Output layer
  units = len(np.unique(targets))

  if units > 2:
      activation = "softmax"
      loss = "sparse_categorical_crossentropy"
  
  else:
      activation = "sigmoid"
      loss = "binary_crossentropy"
      units = units - 1
  
  outputs = Dense(units=units, activation=activation)(dense)

  print(outputs.shape)

  model = Model(inputs=[base_model_img.input, input_ids, attenion_mask], outputs=outputs)

  # Compiling the model
  model.compile(optimizer="adam", loss=loss, metrics=["accuracy"])

  # Model Architecture Export
  tf.keras.utils.plot_model(model, to_file='model.png', show_shapes=True, 
                            show_dtype=True, show_layer_names=True, rankdir='TB',
                            expand_nested=True, dpi=300, layer_range=None, 
                            show_layer_activations=True)

  
  return model

In [None]:
model = build_model((224, 224, 3), y, checkpoint, sequence_length)

All model checkpoint layers were used when initializing TFBertForSequenceClassification.

Some layers of TFBertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


(None, 360448)
(None, 1)
dot: graph is too large for cairo-renderer bitmaps. Scaling by 0.422163 to fit



### Callbacks

In [None]:
def callbacks() -> list:

    run_name = "run 1"
    save_path = Path("/models")
    os.makedirs(save_path/"logs", exist_ok=True)
    
    checkpoint = ModelCheckpoint(save_path, monitor="val_loss", save_best_only=True, 
                                                    verbose=1)

    earlystopping = EarlyStopping(monitor="val_loss", verbose=1, restore_best_weights = True,
                                                    patience=5)

    logger = TensorBoard(save_path/"logs"/run_name, histogram_freq=2, write_graph=True, write_images=True)

    lr = ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=2, verbose=1,
                                         min_delta=0.0001, cooldown=0, min_lr=0)
    
    return [checkpoint, earlystopping, lr, logger]

In [None]:
#  !sudo rm -rf /content/drive/MyDrive/Deceptive-Research/models

### Model Training

In [None]:
history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, batch_size=8, callbacks=callbacks())

Epoch 1/5
Epoch 1: val_loss improved from inf to 0.75011, saving model to /models


