In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

In [None]:
label_map_fountana_org = {'abusive': 0, 'normal': 1, 'hateful': 2, 'spam': 3}
hate_map = {0: "NON HATE", 1: "HATE"}
reverse_hate_map = {"NON HATE": 0, "HATE": 1}

In [None]:
def hashtag(text):
    text = text.group()
    hashtag_body = text[1:]
    if hashtag_body.isupper():
        result = u"<hashtag> {} <allcaps>".format(hashtag_body)
    else:
        result = " ".join(["<hashtag>"] +
                          re.split(r"(?=[A-Z])", hashtag_body, flags=FLAGS))
    return result


def allcaps(text):
    text = text.group()
    return text.lower() + " <allcaps>"

def tokenize(text):
    # Different regex parts for smiley faces
    eyes = r"[8:=;]"
    nose = r"['`\-]?"

    # function so code less repetitive
    def re_sub(pattern, repl):
        return re.sub(pattern, repl, text, flags=FLAGS)

    text = re_sub(r"https?:\/\/\S+\b|www\.(\w+\.)+\S*", "<url>")
    text = re_sub(r"/", " / ")
    text = re_sub(r"@\w+", "<user>")
    text = re_sub(r"{}{}[)dD]+|[)dD]+{}{}".format(eyes, nose, nose, eyes),
                  "<smile>")
    text = re_sub(r"{}{}p+".format(eyes, nose), "<lolface>")
    text = re_sub(r"{}{}\(+|\)+{}{}".format(eyes, nose, nose, eyes),
                  "<sadface>")
    text = re_sub(r"{}{}[\/|l*]".format(eyes, nose), "<neutralface>")
    text = re_sub(r"<3", "<heart>")
    text = re_sub(r"[-+]?[.\d]*[\d]+[:,.\d]*", "<number>")
#     text = re_sub(r"#\S+", hashtag)
    text = re_sub(r"([!?.]){2,}", r"\1 <repeat>")
    text = re_sub(r"\b(\S*?)(.)\2{2,}\b", r"\1\2 <elong>")

    ## -- I just don't understand why the Ruby script adds <allcaps> to everything so I limited the selection.
    # text = re_sub(r"([^a-z0-9()<>'`\-]){2,}", allcaps)
    text = re_sub(r"([A-Z]){2,}", allcaps)
    text = re.sub(r'[^\w\s]','',text)
    text = text.strip()
    return text.lower()

In [None]:
def load_data():
    texts = []
    labels = []
    file = "fountana_norm_HAclean.json"
    with open(file, 'r') as f:
        ft_data = json.load(f)
    for each_tweet in ft_data:
        tweet = tokenize(ft_data[each_tweet]['text'])
        texts.append(tweet)
        labels.append(ft_data[each_tweet]['label'])
    return texts, labels

In [None]:
import matplotlib.pyplot as plt
import json
import pickle

import sys
import re

FLAGS = re.MULTILINE | re.DOTALL

# !pip install tweet-preprocessor

In [None]:
samples, labels = load_data()
assert len(samples) == len(labels)
len(labels)
plt.hist(labels)
plt.show()

In [None]:
# Shuffle the data
seed = 1337
rng = np.random.RandomState(seed)
rng.shuffle(samples)
rng = np.random.RandomState(seed)
rng.shuffle(labels)

# Extract a training & validation split
validation_split = 0.2
num_validation_samples = int(validation_split * len(samples))
train_samples = samples[:-num_validation_samples]
val_samples = samples[-num_validation_samples:]
train_labels = labels[:-num_validation_samples]
val_labels = labels[-num_validation_samples:]

### REF SOURCE: https://keras.io/examples/nlp/pretrained_word_embeddings/

In [None]:
from tensorflow.keras.layers import TextVectorization

vectorizer = TextVectorization(max_tokens=20000, output_sequence_length=200)
text_ds = tf.data.Dataset.from_tensor_slices(train_samples).batch(128)
vectorizer.adapt(text_ds)

In [None]:
vectorizer.get_vocabulary()[:5]

In [None]:
voc = vectorizer.get_vocabulary()
word_index = dict(zip(voc, range(len(voc))))

In [None]:
!wget https://nlp.stanford.edu/data/glove.twitter.27B.zip
!unzip -q glove.twitter.27B.zip

In [None]:
embedding_dim = 200

path_to_glove_file = "glove.twitter.27B."+str(embedding_dim)+"d.txt"

embeddings_index = {}
with open(path_to_glove_file) as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs

print("Found %s word vectors." % len(embeddings_index))

num_tokens = len(voc) + 2
hits = 0
misses = 0

embedding_matrix = np.zeros((num_tokens, embedding_dim))
for word, i in word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector
        hits += 1
    else:
        misses += 1
print("Converted %d words (%d misses)" % (hits, misses))

In [None]:
from tensorflow.keras.layers import Embedding

embedding_layer = Embedding(
    num_tokens,
    embedding_dim,
    embeddings_initializer=keras.initializers.Constant(embedding_matrix),
    trainable=True,
)
from tensorflow.keras import layers

int_sequences_input = keras.Input(shape=(None,), dtype="int64")
embedded_sequences = embedding_layer(int_sequences_input)
X = layers.SimpleRNN(units=128,recurrent_dropout=0.5, return_sequences=True)(embedded_sequences)
X = layers.Attention(use_scale=True)([X,X])
X = layers.GlobalAveragePooling1D()(X)
X = layers.Dense(128, activation="relu")(X)
X = layers.Dropout(0.5)(X)
preds = layers.Dense(1, activation="sigmoid")(X)
model = keras.Model(int_sequences_input, preds)
model.summary()

In [None]:
x_train = vectorizer(np.array([[s] for s in train_samples])).numpy()
x_val = vectorizer(np.array([[s] for s in val_samples])).numpy()

y_train = np.array(train_labels)
y_val = np.array(val_labels)

In [None]:
model.compile(
    loss=tf.keras.losses.BinaryCrossentropy(), optimizer=tf.keras.optimizers.Adam(
    learning_rate=0.001), metrics=["acc",tf.keras.metrics.Precision(),tf.keras.metrics.Recall()]
)
model.fit(x_train, y_train, batch_size=64, epochs=5, validation_data=[x_val, y_val])

In [None]:
model.evaluate(x_val,y_val)

In [None]:
def ext_eval_probs(yg,yp):
    diff = []
    for g,p in zip(yg,yp):
        if g[0] >=0.5 and p[0] >=0.5: ## 0 is hate label so we take <0.5
            diff.append(g-p)
    return np.mean(diff)

def return_feature_set(test_samples):
    return vectorizer(np.array([[s] for s in test_samples])).numpy()

def run_for_test(model_name):
    print(model_name)
    file = model_name + "_for_ext_eval.pkl"
    with open(file, "rb") as f:
        data = pickle.load(f)
    xg = data["ground"]
    xp = data["pred"]
    xg = return_feature_set(xg)
    xp = return_feature_set(xp)
    yg = model.predict(xg)
    yp = model.predict(xp)
    print(ext_eval_probs(yg,yp))

def run_for_test_dict(model_name):
    print(model_name)
    file = model_name + "_for_ext_eval.pkl"
    with open(file, "rb") as f:
        data = pickle.load(f)
    for k in data:
        print("------k-----",k)
        xg = data[k]["ground"]
        xp = data[k]["pred"]
        xg = return_feature_set(xg)
        xp = return_feature_set(xp)
        yg = model.predict(xg)
        yp = model.predict(xp)
        print(ext_eval_probs(yg,yp))

def execute_():
    run_for_test("neutral")
    run_for_test("drgpreds")
    run_for_test("ntpcares")
    run_for_test_dict("fgst")
    run_for_test_dict("style")
    run_for_test("nacl")

In [None]:
execute_()