In [1]:
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
import tensorflow as tf
from keras.layers import TextVectorization
import numpy as np
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import f1_score

In [3]:
def read_corpus(corpus_file):
    """Read in review data set and returns docs and labels"""
    documents = []
    labels = []
    with open(corpus_file, encoding='utf-8') as f:
        for line in f:
            tokens = line.strip()
            documents.append(" ".join(tokens.split()[:-1]).strip())
            labels.append(tokens.split()[-1])
    return documents, labels

In [4]:
X_train, y_train = read_corpus('/content/gdrive/MyDrive/University/learning_from_data/assignment_4/train.tsv')
X_val, y_val = read_corpus('/content/gdrive/MyDrive/University/learning_from_data/assignment_4/dev.tsv')
X_test, y_test = read_corpus('/content/gdrive/MyDrive/University/learning_from_data/assignment_4/test.tsv')

In [5]:
encoder = LabelBinarizer()
y_train_bin = encoder.fit_transform(y_train)  # Use encoder.classes_ to find mapping back
y_val_bin = encoder.fit_transform(y_val)
y_test_bin = encoder.fit_transform(y_test)

In [6]:
vectorizer = TextVectorization(standardize=None, output_sequence_length=50)
# Use train and dev to create vocab - could also do just train
text_ds = tf.data.Dataset.from_tensor_slices(X_train)
vectorizer.adapt(text_ds)

In [7]:
X_train_vect = vectorizer(np.array([[s] for s in X_train])).numpy()
X_val_vect = vectorizer(np.array([[s] for s in X_val])).numpy()
X_test_vect = vectorizer(np.array([[s] for s in X_test])).numpy()

In [8]:
voc = vectorizer.get_vocabulary()

In [9]:
def compute_word_frequency(X_, vocabulary):
  word_counts = {word: 0 for word in vocabulary}
  for sentence in X_:
    for word_index in sentence:
      if word_index != 0: # If the word exists in the vocabulary increment the occurence count
        word_counts[vocabulary[word_index]] += 1
  return word_counts

In [10]:
word_frequency = compute_word_frequency(X_train_vect, voc)

In [11]:
def compute_word_frequency_in_offensive_instances(X_, y_, vocabulary):
  total_offensive_classifications = sum([1 if predicted_label == [1] else 0 for predicted_label in y_])
  word_counts = {word: 0 for word in vocabulary}
  for (sentence, predicted_label) in zip(X_, y_):
    if predicted_label == [1]: # If the data instance is predicted as offensive add it's o
      for word_index in sentence:
        if word_index != 0:
          word_counts[vocabulary[word_index]] += (1)
  return {k: v for k, v in sorted(word_counts.items(), key=lambda item: item[1], reverse=True)}

In [12]:
word_frequency_in_offensive_instances = compute_word_frequency_in_offensive_instances(X_train_vect, y_train_bin, voc)

In [13]:
def compute_normalized_word_frequency_in_offensive_instances(word_frequency_, word_frequency_in_offensive_instances_, vocabulary, support_threshold_):
  normalized_word_frequency_in_offensive_instances_ = {}
  for word in vocabulary:
    if word_frequency_[word] != 0 and word_frequency_[word] >= support_threshold_:
      normalized_word_frequency_in_offensive_instances_[word] = word_frequency_in_offensive_instances_[word] / word_frequency_[word]
  return {k: v for k, v in sorted(normalized_word_frequency_in_offensive_instances_.items(), key=lambda item: item[1], reverse=True)}

In [14]:
normalized_word_frequency_in_offensive_instances = compute_normalized_word_frequency_in_offensive_instances(word_frequency, word_frequency_in_offensive_instances, voc, 10)

In [15]:
def compute_offensiveness_metric(X_, y_pred_, vocabulary):
  word_frequency_ = compute_word_frequency(X_, voc)
  word_frequency_in_offensive_instances_ = compute_word_frequency_in_offensive_instances(X_, y_pred_, vocabulary)
  normalized_word_frequency_in_offensive_instances_ = compute_normalized_word_frequency_in_offensive_instances(word_frequency_, word_frequency_in_offensive_instances_, vocabulary, 10)
  return normalized_word_frequency_in_offensive_instances_

In [17]:
def classify_based_on_word_list(X, word_list):
  y_pred = []
  for sentence in X:
    prediction = [0]
    for word_token in sentence:
      if word_token in word_list:
        prediction = [1]
    y_pred.append(prediction)
  return y_pred

In [33]:
off_metric = compute_offensiveness_metric(X_train_vect, y_train_bin, voc)
word_list = list(off_metric.items())
word_list_filtered = list(filter(lambda e: e[1] > 0.5, word_list))
word_list_filtered_keys = [e[0] for e in word_list_filtered]
word_list_indexes = [int(word[0]) for word in vectorizer(word_list_filtered_keys)]

In [34]:
y_pred_word_list_test = classify_based_on_word_list(X_test_vect, word_list_indexes)
y_pred_word_list_f1_test = f1_score(y_test_bin, y_pred_word_list_test, average='macro')
y_pred_word_list_f1_test

0.63589952780044

In [35]:
y_pred_word_list_val = classify_based_on_word_list(X_val_vect, word_list_indexes)
y_pred_word_list_f1_val = f1_score(y_val_bin, y_pred_word_list_val, average='macro')
y_pred_word_list_f1_val

0.6422386363276588