# Toxic Text Segmentation

Start by importing all dependencies

In [5]:
!pip install chars2vec
import csv
import chars2vec 
import re
import numpy as np
from tensorflow.keras import datasets, layers, models, losses, callbacks, Model
from sklearn.model_selection import train_test_split
import statistics



You should consider upgrading via the 'c:\users\james\appdata\local\programs\python\python39\python.exe -m pip install --upgrade pip' command.


In [1]:
def f1(predictions, gold):
    """
    Calculates F1 score(a.k.a. DICE)
    
    Args:
        predictions: a list of predicted offsets
        gold: a list of offsets serving as the ground truth
        
    Returns: 
        a float score between 0 and 1
    """
    if len(gold) == 0:
        return 1 if len(predictions) == 0 else 0
    if len(predictions) == 0:
        return 0
    predictions_set = set(predictions)
    gold_set = set(gold)
    nom = 2 * len(predictions_set.intersection(gold_set))
    denom = len(predictions_set) + len(gold_set)
    return float(nom)/float(denom)

Define a function to extract text

In [6]:
def read_text_data(filename):
    """
    Reads a csv file to extract text.
    
    Args:
        filename: a string specifying the filename / path
        
    Returns:
        A list of text sentences
    """
    data = []
    with open(filename) as csvfile:
        reader = csv.DictReader(csvfile)
        count = 0
        for row in reader:
            data.append(row['text'])
    csvfile.close()
    return data

Define a function to extract spans

In [7]:
def read_data_span(filename):
    """
    Reads a csv file to extract the toxic spans.
    
    Args:
        filename: a string specifying the filename / path
        
    Returns:
        data: a list of strings of the toxic chars, 
        will look like '[1,2,3]' so it'll have to be split
        
    """
    data = []
    with open(filename) as csvfile:
        reader = csv.DictReader(csvfile)
        count = 0
        for row in reader:
            data.append(row['span'])
    csvfile.close()
    return data

Read the training data

In [10]:
texts = read_text_data('../data/tsd_train_readable.csv')
spans = read_data_span('../data/tsd_train_readable.csv')
texts.extend(read_text_data('../data/tsd_trial_readable.csv'))
spans.extend(read_data_span('../data/tsd_trial_readable.csv'))


processed_texts = []
processed_spans = []
print(f"Lengths equal: {len(texts)==len(spans)}" + "\n")

Lengths equal: True



Preprocess the training data, after analysis the max sentence size is 1000 characters long, also removing empty strings and split the spans in to actual lists

In [11]:
c2v_model = chars2vec.load_model('eng_50')
word_limit = 1000
for i in range(0, len(texts)-1):
    to_use = True
    if len(texts[i]) > word_limit:
        to_use = False
    if texts[i] == "":
        to_use = False
    new_spans = [int(j) for j in spans[i][1:-1].split(", ")]
    if max(new_spans) > len(texts[i]) - 1:
        to_use = False
    if to_use:
        if spans[i] != []:
            full_span = [[0,0,1] for j in range(0, word_limit)]
            for char_offset in new_spans:
                full_span[char_offset] = [1,0,0]
            for j in range(0, len(texts[i])-1):
                if full_span[j][1] == 0 and full_span[j][2] == 1:
                    full_span[j] = [0,1,0]
        else:
            full_span = [[1,0,0] for j in range(0, len(texts[i]))]           
        processed_texts.append(texts[i])
        processed_spans.append(full_span)

AttributeError: module 'keras' has no attribute 'layers'

Get the maximim comment size (in no. of chars)

In [None]:

max_size = 0
for i in range(0, len(processed_texts)-1):
    if len(processed_texts[i]) > max_size:
        max_size = len(processed_texts[i])

Define the training and testing datasets with numpy zero arrays, this is to allow us to pad the end
Of the toxic span with zeros as it is a fully convolutional network

In [None]:
train_Y = np.zeros(shape=(len(processed_spans), max_size, 3))
train_X = np.zeros(shape=(len(processed_texts), max_size, 50))         

Terrible Python best practise but you might wanna manually free up some memory. This is going to be a very large compuation

In [None]:
del texts
del spans

Build Train_X

In [None]:
for x, string in enumerate(processed_texts):
    for y, char in enumerate(string):
            char_vect = c2v_model.vectorize_words([char])
            train_X[x][y] = [word_vect for word_vect in char_vect[0]]

Build train_Y

In [None]:
for x, label in enumerate(processed_spans):
    for y, output in enumerate(label):
        train_Y[x][y] = output

Split train_X and train_Y into training and validation datasets.

In [None]:
train_X, val_X, train_Y, val_Y = train_test_split(train_X, train_Y, test_size=0.1, random_state=42)

Confirm the shape of the train and val datasets, should be ([sample_size], 1000, 50) and ([sample_size], 1000, 3) respectively

In [None]:
print(train_X.shape)
print(train_Y.shape)

As a global variable would be out of scope for the callback object class manually create a HighScore class

In [None]:
class HighScore:
    def __init__(self):
        self.high_score = 0
    def get_high_score(self):
        return self.high_score
    def set_high_score(self, new_score):
        self.high_score = new_score
high_score = HighScore()

Free up more memory

In [None]:
del processed_texts
del processed_spans

A prediction callback to act as a validation step, as the tensor is of a different shape to the F1 score of SemEval we must Convert it into it's proper form before checking the F1 score.

In [None]:
class PredictionCallback(callbacks.Callback):    
    def on_epoch_end(self, epoch, logs={}):
        y_pred = self.model.predict(test_X)
        scores = []
        for x, pred in enumerate(y_pred):
            score = f1([j for j, i in enumerate(pred) if np.argmax(i) == 0], [j for j, i in enumerate(test_Y[x]) if np.argmax(i) == 0])
            scores.append(score)
        score = statistics.mean(scores)
        if score > high_score.get_high_score():
            high_score.set_high_score(score)
            model.save(f"{root_path}model_autoencoder_LSTM_checkpoint")
        print(f"F1 score: {score}")

In [2]:
# TODO write a "get_model" function
# def get_model(type):
#     return model

Define the model

In [None]:
model = models.Sequential()
model.add(layers.Input(shape = train_X.shape[1:]))
model.add(layers.Conv1D(filters=32, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Dropout(0.3))
model.add(layers.MaxPooling1D(strides=2))
model.add(layers.Conv1D(filters=64, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Dropout(0.3))
model.add(layers.MaxPooling1D(strides=2))
model.add(layers.Conv1D(filters=128, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Dropout(0.3))
model.add(layers.MaxPooling1D(strides=2))
model.add(layers.Conv1D(filters=256, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Bidirectional(layers.GRU(units=128, return_sequences=True)))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Bidirectional(layers.GRU(units=128, return_sequences=True)))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Conv1D(filters=256, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Conv1DTranspose(filters=128, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Dropout(0.3))
model.add(layers.UpSampling1D())
model.add(layers.Conv1DTranspose(filters=64, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Dropout(0.3))
model.add(layers.UpSampling1D())
model.add(layers.Conv1DTranspose(filters=32, kernel_size=9, strides=1, padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.ELU())
model.add(layers.Dropout(0.3))
model.add(layers.UpSampling1D())
model.add(layers.Conv1D(filters=3, kernel_size=9, strides=1, padding='same', activation='softmax'))
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()

Train the model

In [None]:
history = model.fit(train_X, train_Y, epochs=300, batch_size=32, callbacks=[PredictionCallback()])

Save model and check final validation score

In [None]:
model.save(f"{root_path}model_autoencoder_LSTM")
scores = []
pred_Y = model.predict(val_X)

for x, pred in enumerate(pred_Y):
    y_pred_f1_compatible = [j for j, i in enumerate(pred) if np.argmax(i) == 0]
    y_true_f1_compatible = [j for j, i in enumerate(val_Y[x]) if np.argmax(i) == 0]
    if val_Y[x] == []:
        y_pred_f1_compatible = []
    score = f1(y_pred_f1_compatible, y_true_f1_compatible)
    scores.append(score)

print('avg F1 %g' % statistics.mean(scores))


Manually view some predictions to check validity

In [None]:
for x, pred in enumerate(pred_Y):
    score = f1([j for j, i in enumerate(pred) if np.argmax(i) == 0], [j for j, i in enumerate(val_Y[x]) if np.argmax(i) == 0])
    print(f"F1 score: {score}")
    print(f"Predicted span one_hot: {[np.argmax(i) for i in pred]}")
    print(f"Predicted span: {[j for j, i in enumerate(pred) if np.argmax(i) == 0]}")
    print(f"Ground truth span: {[j for j, i in enumerate(val_Y[x]) if np.argmax(i) == 0]}" + "\n")
    if x == 100:
          break
    