## Necessay pip packages

In [0]:
!pip install praw
!sudo apt install tesseract-ocr
!pip install pytesseract
!pip install transformers
!pip install tensorflow==2.1.0-rc0
!pip install pandas 
!pip install numpy
!pip install nltk

## Import Libraries

In [0]:
import os
import requests
import praw
import concurrent.futures
import pandas as pd
import getpass
import tensorflow as tf
from tensorflow.keras.layers import Dense, Concatenate, Input, Dropout, Flatten
from tensorflow.keras.applications import VGG19
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from transformers import BertTokenizer, TFBertModel
from tensorflow.keras.utils import plot_model
from tensorflow.keras.preprocessing import image as keras_image
import numpy as np

import nltk
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from collections import defaultdict
import regex as re
from keras.preprocessing import image as keras_image
import pickle
import pytesseract
import shutil
import random
try:
 from PIL import Image
except ImportError:
 import Image

nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('stopwords')

## Image downloader

In [0]:
class redditImageScraper:
    def __init__(self):
        self.client_id = str(getpass.getpass(
            "For Client ID and Client secret go to\n https://www.reddit.com/prefs/apps \n and create your token\nEnter Your Client ID:\n"))

        self.client_secret = str(getpass.getpass("\nEnter your client Secret:\n"))
        # print(client_id,client_secret)
        self.reddit = praw.Reddit(client_id=self.client_id,
                                  client_secret=self.client_secret,
                                  user_agent='my user agent')
        



    def download(self, image):
        r = requests.get(image['url'])
        with open(image['fname'], 'wb') as f:
            f.write(r.content)
        print("File Downloaded: {} File Path: {} ".format(image['url'], image['fname']))
        self.images['filepath'].append(image['fname'])

    def start(self,path,page):
        self.subreddit = self.reddit.subreddit(page)
        if page == 'Dark_memes':
          self.subreddit.quaran.opt_in()
        self.imageGen = self.subreddit.top(limit=50)
        self.images = {'filepath': []}
        image = []
        print('Started')
        try:

            for submission in self.imageGen:

                if submission.url.endswith(('jpg', 'jpeg', 'png')):

                    fname = path + re.search('(?s:.*)\w/(.*)', submission.url).group(1)
                    if not os.path.isfile(fname):
                        image.append({'url': submission.url, 'fname': fname})

            if len(image):
                if not os.path.exists(path):
                    os.makedirs(path)
                with concurrent.futures.ThreadPoolExecutor() as ptolemy:
                    ptolemy.map(self.download, image)
        except Exception as e:
            if e=='received 403 HTTP response' :
              print("Page is quarantined or you have'nt joined it yet")


In [0]:
downloader = redditImageScraper()
downloader.start(path='./dataset/negative/',page  = 'Dark_memes')
downloader.start(path='./dataset/positive/', page= 'wholesomememes')

downloader.start(path='./dataset/neutral/', page = 'antimeme')

# Text Extractor and preprocessing pipeline

In [0]:
def preprocess_image(filepath):
  image = keras_image.load_img(
                  filepath,
                  target_size=(224,224),
                  interpolation ='bicubic'))
  img = image.copy()
  img = keras_image.img_to_array(img)
  img = img//255.0
  return img

def preprocess_txt(text):
  tag_map = defaultdict(lambda: wn.NOUN)
  tag_map['J'] = wn.ADJ
  tag_map['V'] = wn.VERB
  tag_map['R'] = wn.ADV
  word_Lemmatized = WordNetLemmatizer()
  text = text.lower()
  text = re.sub(r"\n"," ",text)
  text = re.sub(r'((www\.[^\s]+)|(https?://[^\s]+)|(http?://[^\s]+))', '', text)
  text = re.sub(r'http\S+', '', text)
  stop = stopwords.words('english')
  pat = r'\b(?:{})\b'.format('|'.join(stop))
  text = text.replace(pat, '')
  text = text.replace(r'\s+', ' ')
  text = re.sub(r'[^a-zA-Z0-9 -]', '', text) 
  text = re.sub('@[^\s]+','',text)
  text = word_tokenize(text)
  Final_words = []
  for word, tag in pos_tag(text):
      word_Final = word_Lemmatized.lemmatize(word, tag_map[tag[0]])
      Final_words.append(word_Final)
  text = " ".join(Final_words)

  return text

In [0]:
data = {"image":[],"filepath":[],"text":[],"label":[]}
for dirname,_,filenames in os.walk('./dataset/'):
  for filename in filenames:
    try:
      if dirname == './dataset/positive':
        data['label'].append(int(1))
      if dirname == './dataset/neutral':
        data['label'].append(int(2))
      if dirname == './dataset/negative':
        data['label'].append(int(0))
      
      data['image'].append(
          preprocess_image(
              os.path.join(dirname,filename)))
      
      data['filepath'].append(
          os.path.join(dirname,filename))
      
      data['text'].append(
          preprocess_txt(
              pytesseract.image_to_string(
                  Image.open(
                      os.path.join(dirname,filename)))))

    except Exception as e:
      print(e)
      continue
    

    print('\r images: {} texts: {} labels : {}'.format(len(data['image']),len(data['text']),len(data['label'])),end='')  

In [0]:

with open("data.pkl","wb") as pickle_out:
  pickle.dump(data, pickle_out)
pickle_out.close()


# Multimodal Architecture


In [0]:
class Classifier:

    def __init__(self, epochs=32, batch_size=64,metrics = False, plot_model_diagram=False, summary=False):
        self.epochs = epochs
        self.metrics = metrics
        self.batch_size = batch_size
        self.plot_model_diagram = plot_model_diagram
        self.summary = summary
        self.seq_len = 42
        self.bert_layer = TFBertModel.from_pretrained('bert-base-uncased')
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.vgg = VGG19(weights='imagenet', include_top=False)
        self.bert_layer.trainable = False
        self.vgg.trainable = False

    def encode(self, texts):
        input_id = []
        token_type_id = []
        attention_mask = []
        for text in texts:
            dictIn = self.tokenizer.encode_plus(text, max_length=self.seq_len, pad_to_max_length=True)
            input_id.append(dictIn['input_ids'])
            token_type_id.append(dictIn['token_type_ids'])
            attention_mask.append(dictIn['attention_mask'])
        return np.array(input_id), np.array(token_type_id), np.array(attention_mask)

    def labelencoder(self, labels):
        new_label = np.zeros((len(labels), 3))
        for i, label in enumerate(labels):
            if label == 0:
                new_label[i] = [0, 0, 1]
            elif label == 1:
                new_label[i] = [0, 1, 0]
            elif label == 2:
                new_label[i] = [1, 0, 0]

        return new_label

    def build(self):
        input_id = Input(shape=(self.seq_len,), dtype=tf.int64)
        mask_id = Input(shape=(self.seq_len,), dtype=tf.int64)
        seg_id = Input(shape=(self.seq_len,), dtype=tf.int64)

        _, bert_out = self.bert_layer([input_id, mask_id, seg_id])
        dense = Dense(768, activation='relu')(bert_out)
        dense = Dense(256, activation='relu')(dense)
        txt_repr = Dropout(0.4)(dense)
        ################################################
        img_in = Input(shape=(224, 224, 3))
        img_out = self.vgg(img_in)
        flat = Flatten()(img_out)
        dense = Dense(2742, activation='relu')(flat)
        dense = Dense(256, activation='relu')(dense)
        img_repr = Dropout(0.4)(dense)
        concat = Concatenate(axis=1)([img_repr, txt_repr])
        dense = Dense(64, activation='relu')(concat)
        out = Dense(3, activation='softmax')(dense)
        model = Model(inputs=[input_id, mask_id, seg_id, img_in], outputs=out)

        model.compile(loss='categorical_crossentropy', optimizer=Adam(2e-5),
                      metrics=['accuracy', precision, recall, f1]) if self.metrics else model.compile(
            loss='categorical_crossentropy', optimizer=Adam(2e-5), metrics=['accuracy'])

        plot_model(model) if self.plot_model_diagram else None
        model.summary() if self.summary else None

        return model

    def train(self, data, validation_split=0.2):

        model = self.build()
        input_id, token_type_id, attention_mask = self.encode(data['text'])
        image_data = np.asarray(data['image'])
        labels = self.labelencoder(data['label'])

        self.history = model.fit([input_id, token_type_id, attention_mask, image_data],
                                 labels,
                                 validation_split=validation_split,
                                 batch_size=self.batch_size,
                                 epochs=self.epochs)

        model.save_weights('./model/MemSem')

    def evaluate(self, data):
        model = self.build()
        model.load_weights('./model/MemSem')
        input_id, token_type_id, attention_mask = self.encode(data['text'].apply(preprocess_txt))
        image_data = data['image'].apply(preprocess_image)
        eval_data = [input_id, token_type_id, attention_mask,image_data]
        labels = self.labelencoder(data['label'])
        evaluation = model.evaluate(eval_data, labels)
        return evaluation


    def predict(self, image_path='./dataset/test/text.jpg', text=""):

        try:
            model = self.build()
            model.load_weights('./model/MemSem')
            input_id, token_type_id, attention_mask = self.encode([preprocess_txt(text)])
            image_data = preprocess_image(
                keras_image.load_img(image_path,
                                     target_size=(224, 224),
                                     interpolation='bicubic'))
            image_data = np.expand_dims(image_data, axis=0)
            value = model.predict([input_id, token_type_id, attention_mask, image_data])

            prediction = np.argmax(value)
            if prediction == 2:  # negative = [0,0,1]
                print("Its a bad meme")
            elif prediction == 1:  # postive = [0,1,0]
                print("Its not bad XD")
            elif prediction == 0:  # neutral = [1,0,0]
                print("Its meaningless")

        except Exception as e:
            print(e)


## Model object and training

In [0]:
cls = Classifier(epochs =6,batch_size=50)

In [0]:
with open("data.pkl","rb") as pickle_in:
  final = pickle.load(pickle_in)
pickle_in.close()

cls.train(final)

In [0]:
pd.DataFrame(cls.history.history)