In [77]:
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
import numpy as np
from scipy.special import softmax
import csv
import urllib.request
import math
import torch 

In [78]:
# Preprocess text (username and link placeholders)
def preprocess(text):
    new_text = []
 
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

In [79]:
# Tasks:
# emoji, emotion, hate, irony, offensive, sentiment
# stance/abortion, stance/atheism, stance/climate, stance/feminist, stance/hillary

tasks = ['emotion', 'hate', 'irony', 'offensive', 'sentiment']
model_paths = []

for task in tasks:
  model_path = f"cardiffnlp/twitter-roberta-base-{task}"
  model_paths.append(model_path)
    
model_mapping = {task: model_paths[i] for i, task in enumerate(tasks)}

In [80]:
print(model_paths)

['cardiffnlp/twitter-roberta-base-emotion', 'cardiffnlp/twitter-roberta-base-hate', 'cardiffnlp/twitter-roberta-base-irony', 'cardiffnlp/twitter-roberta-base-offensive', 'cardiffnlp/twitter-roberta-base-sentiment']


# Loading data

In [81]:
import pandas as pd
import os

In [82]:
df = pd.read_csv('dataset_.csv')
df = df[['Text', 'label', 'emoi', 'hashtags', 'Media URLs']]
df = df.sample(frac=0.5)
df.shape

(3683, 5)

In [83]:
from sklearn.model_selection import train_test_split

y = df['label']
df_train, df_test, y_train, y_test = train_test_split(df, y, test_size=0.2)
df_train = df_train.reset_index(drop=True)
df_test = df_test.reset_index(drop=True)
X_train = df_train['Text']
X_test = df_test['Text']

In [84]:
df_train.head()

Unnamed: 0,Text,label,emoi,hashtags,Media URLs
0,RT @urstrulyMahesh : Congratulations team #HER...,happy,"['👏', '👏', '👏']",['HERO'],
1,RT @SVPhillimore : By several ‘teams’. Because...,angry,[],[],
2,@ReportsDaNews Why is everyone so angry?,disappointed,[],[],
3,@DanielJMath1 @thecarolemalone Having read you...,disappointed,[],[],
4,(( I’m actually so cold I’m angry? Sort of? He...,disappointed,[],[],


In [85]:
import ast
from tensorflow import keras
hashtags = df_train['hashtags'].str.strip('[]').str.replace("'", '').str.lower()

emojis = df_train['emoi'].str.strip('[]').str.replace("'", '').str.lower()
emoticons_model = keras.models.load_model('emoticons.h5')

images = df_train['Media URLs']

In [86]:
print(hashtags.shape, '==', X_train.shape)

(2946,) == (2946,)


In [87]:
def predict(model, tokenizer, preprocess, X, emb_max_size=512):
  X = X.apply(preprocess)
  encoded_input = tokenizer(X.to_list(), return_tensors='pt', padding=True)
  encoded_input['input_ids'] = encoded_input['input_ids'][:, :emb_max_size]
  encoded_input['attention_mask'] = encoded_input['attention_mask'][:, :emb_max_size]

  output = model(**encoded_input)
  return output

In [88]:
class TextModel:
    def __init__(self, model_path_mapping, batch_size=10):
        self.batch_size = batch_size
        self.model_path_mapping = model_path_mapping
    
    def predict(self, X, tasks_list, prefix='', verbose=1):
        df = pd.DataFrame()
        
        for i, task in enumerate(tasks_list):
            if verbose:
                print(f'Step {i}/{len(tasks_list)}, Task: {task}')
            model_path = self.model_path_mapping[task]
            
            tokenizer = AutoTokenizer.from_pretrained(model_path)
            model = AutoModelForSequenceClassification.from_pretrained(model_path)
            tokenizer.save_pretrained(model_path)
            model.save_pretrained(model_path) 
            
            labels=[]
            mapping_link = f"https://raw.githubusercontent.com/cardiffnlp/tweeteval/main/datasets/{task}/mapping.txt"
            with urllib.request.urlopen(mapping_link) as f:
                html = f.read().decode('utf-8').split("\n")
                csvreader = csv.reader(html, delimiter='\t')
            labels = [row[1] for row in csvreader if len(row) > 1]
            
            outputs = []
            n_batches = math.ceil(X.shape[0] / self.batch_size)
            for i in range(n_batches):
                if verbose > 1:
                    print(i, '/', n_batches)
                x = X[i*self.batch_size: (i+1)*self.batch_size]

                out = predict(model, tokenizer, preprocess, x)
                out['logits'] = out['logits'].cpu().detach()
                outputs.append(out)
                
            output = {}
            output['logits'] = torch.cat([out['logits'] for out in outputs], axis=0)
            
            scores = output['logits'].detach().numpy()
            scores = softmax(scores, axis=1)
            if verbose:
                print('Output shape:', scores.shape)
            
            for i in range(scores.shape[1]):
                label = labels[i]
                df[prefix + label] = scores[:, i]
        return df

class HashtagModel(TextModel):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
    def predict(self, X, *args, **kwargs):
        mask = X.str.len() == 0
        df = super().predict(X, *args, **kwargs)
        print(mask.shape, df.shape)
        df.loc[mask, :] = 0.0
        return df

In [89]:
text_model = TextModel(model_mapping)
text_preds = text_model.predict(X_train, tasks)

Step 0/5, Task: emotion
Output shape: (2946, 4)
Step 1/5, Task: hate
Output shape: (2946, 2)
Step 2/5, Task: irony
Output shape: (2946, 2)
Step 3/5, Task: offensive


KeyboardInterrupt: 

In [None]:
hashtag_model = HashtagModel(model_mapping)
hashtag_preds = hashtag_model.predict(hashtags, tasks, prefix='hm_')

In [None]:
from tensorflow import keras
from sklearn.preprocessing import OneHotEncoder
import pickle
import os
import ast
from joblib import dump, load

class EmojiModel():
    def __init__(self, emoji_data_path, optimizer='adam', loss='bce', metrics=None):
        
        with open(emoji_data_path, 'rb') as f:
            self.emoji_data =  pickle.load(f)
        self.keys = np.array(list(self.emoji_data.keys()))
        
        if metrics is None:
            metrics = ['acc']

        self.model = self.build_model(optimizer, loss, metrics)
        self.encoder = None 
        self.categories = None
        
    def build_model(self, optimizer, loss, metrics):
        model = keras.models.Sequential()
        model.add(keras.layers.Input((len(self.keys),)))
        model.add(keras.layers.Dropout(0.5))
        model.add(keras.layers.Dense(256, activation='relu'))
        model.add(keras.layers.Dropout(0.5))
        model.add(keras.layers.Dense(256, activation='relu'))
        model.add(keras.layers.Dense(4, activation='softmax'))

        model.compile(optimizer=optimizer, metrics=metrics, loss=loss)
        return model
    
    def preprocess(self, X):
        X = X.str.split(', ')
        mask = X.map(lambda d: len(d) > 2 or (len(d) == 1 and d[0] != '' and d[0] != ' '))
        encoded = np.zeros((len(X), self.keys.shape[0]))
        
        for i, x in enumerate(X):
            if mask[i]:
                idx = self._find_indices(x)
                if len(idx) > 0:
                    encoded[i, idx] = 1
            
        print(encoded.sum().sum())
        return encoded
    
    def fit(self, X, y, *args, validation_data=None, **kwargs):
        mask = X.str.split(', ').map(lambda d: len(d) > 2 or (len(d) == 1 and d[0] != '' and d[0] != ' '))
        X = self.preprocess(X)
        y = self.encode_y(y)
        
        X = X[mask]
        y = y[mask]
        
        if validation_data is not None:
            validation_data = (self.preprocess(validation_data[0]), self.encode_y(validation_data[1]))
        
        return self.model.fit(X, y, *args, validation_data=validation_data, **kwargs)
    
    def predict(self, X):
        mask = X.str.split(', ').map(lambda d: len(d) > 2 or (len(d) == 1 and d[0] != '' and d[0] != ' '))
        X = self.preprocess(X)
        preds = self.model.predict(X)
        preds[~mask] = 0
        return preds
    
    def encode_y(self, y):
        if self.encoder is None:
            self.classes = np.unique(y)
            print(self.classes)
            self.encoder = OneHotEncoder(handle_unknown='ignore')#, categories=self.classes)
            self.encoder.fit(pd.DataFrame(y))
            
            
        return self.encoder.transform(pd.DataFrame(y)).toarray()
    
    def evaluate(self, X, y):
        y = self.encode_y(y)
        preds = self.predict(X)
        
        d = {
            'MSE': ((y - preds) ** 2).mean(),
            'MAE': np.abs((y - preds)).mean(),
            'accuracy': sum(np.argmax(preds, axis=1) == np.argmax(y, axis=1)) / len(y)
        }
        
        return d
    
    def _find_indices(self, values):
        columns = self.keys.flatten()
        inds = np.array([np.where(columns == searchval)[0] for searchval in values if searchval in self.keys]).flatten()
        #print('values:', values)
        #print('columns:', columns.shape)
        #print('inds:', inds)
        return inds
    
    def save_model(self, model_path, encoder_path):
        dump(self.encoder, encoder_path) # save the model
        self.model.save(model_path)
        
    
    def load_model(self, model_path, encoder_path):
        self.encoder = load(encoder_path) # load and reuse the model
        self.model = keras.models.load_model(model_path)
        

In [None]:
mask = emojis.str.split(', ').map(lambda d: len(d) > 2 or (len(d) == 1 and d[0] != '' and d[0] != ' '))
#emojis[mask]

In [None]:
emojis_test = df_test['emoi'].str.strip('[]').str.replace("'", '').str.lower()
emoji_model = EmojiModel('emoji_data/UNICODE_EMOJI.pkl')
merged = emoji_model.preprocess(emojis_test)
print(merged.sum().sum(), merged.shape)

In [None]:
emojis_test.shape

In [None]:
pd.concat([emojis, emojis_test]).shape

In [None]:
emoji_model = EmojiModel('emoji_data/UNICODE_EMOJI_ALIAS.pkl')
emojis_test = df_test['emoi'].str.strip('[]').str.replace("'", '').str.lower()
emoji_model.fit(emojis, y=df_train['label'], batch_size=16, epochs=20, validation_data=(emojis_test, df_test['label']))

emoji_preds = emoji_model.predict(emojis)

score = emoji_model.evaluate(emojis_test, df_test['label'])

In [None]:
emoji_model.save_model('emoji_model.h5', 'encoder.joblib')

In [None]:
emoji_model = EmojiModel('emoji_data/UNICODE_EMOJI_ALIAS.pkl')
emoji_model.load_model('emoji_model.h5', 'encoder.joblib')
emoji_model.evaluate(emojis_test, df_test['label'])

In [None]:
score

In [None]:
import cv2
import fer
import requests

class ImageModel():
    def __init__(self, prefix='im_'):
        self.template = {'angry': 0.0, 'disgust': 0.0, 'fear': 0.0, 'happy': 0.0, 'sad': 0.0, 'surprise': 0.0, 'neutral': 0.0}
        self.detector = fer.FER()
        self.prefix = prefix
    
    def predict(self, X):
        preds = []
        for i, url in enumerate(X):
            print(f'{i}: {url}')
            if pd.isna(url):
                preds.append(self.template.copy())
                continue
            img_data = requests.get(url).content
            with open('./sample/images/temp.png', 'wb') as f:
                f.write(img_data)
            
            bad_net_img = cv2.imread('./sample/images/temp.png')
            try:
                pred = self.detector.detect_emotions(bad_net_img)
            except:
                pred = []
            if len(pred) > 0:
                pred = [p['emotions'] for p in pred]
                final_pred = self.template.copy()
                for j, p in enumerate(pred, 1):
                    for key in p.keys():
                        final_pred[key] += p[key]
                for key in p.keys():
                    final_pred[key] /= j
                        
                print(final_pred)
                preds.append(final_pred)
            else:
                preds.append(self.template.copy())
                
        df = pd.DataFrame(preds)
        df.columns = [self.prefix + col for col in df.columns]
        return df
                
                
            
            

In [None]:
image_model = ImageModel()
image_preds = image_model.predict(images)

In [None]:
df_train.loc[38, 'Text']

In [None]:
df_train.loc[60, 'Text']

In [None]:
df_train.head()

In [None]:
print(text_preds.shape, hashtag_preds.shape, emoji_preds.shape, image_preds.shape)

In [None]:
print(type(text_preds), type(hashtag_preds), type(emoji_preds), type(image_preds))

In [None]:
df_output_train = pd.concat([text_preds, hashtag_preds, pd.DataFrame(emoji_preds), image_preds], axis=1)
df_output_train['label'] = df_train['label']
df_output_train.to_excel('dataset_preds_all_models_train.xlsx')
df_output_train.to_csv('dataset_preds_all_models_train.csv', index=0)

In [None]:
text_preds_test = text_model.predict(X_test, tasks)

hashtags_test = df_test['hashtags'].str.strip('[]').str.replace("'", '').str.lower()
hashtag_preds_test = hashtag_model.predict(hashtags_test, tasks, prefix='hm_')
emoji_preds_test = pd.DataFrame(emoji_model.predict(emojis_test))
image_preds_test = image_model.predict(df_test['Media URLs'])
df_output_test = pd.concat([text_preds_test, hashtag_preds_test, emoji_preds_test, image_preds_test], axis=1)

In [None]:
df_output_test['label'] = df_test['label']
df_output_test.to_excel('dataset_preds_all_models_test.xlsx')
df_output_test.to_csv('dataset_preds_all_models_test.csv', index=0)

In [None]:
df_train.shape