In [30]:
from google.colab import drive
from keras.models import Sequential
from keras.layers.core import Dense
from keras.layers import Embedding, LSTM
from keras.initializers import Constant
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelBinarizer
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.layers import TextVectorization
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import *
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from keras.layers import Dropout
import numpy as np
import tensorflow as tf
import random
import csv





In [31]:
drive.mount('/content/gdrive')


# paths
embeddings_file = '/content/gdrive/MyDrive/Data/embeddings/glove.6B.100d.txt'
train_file_OD = '/content/gdrive/MyDrive/Data/trainOD.csv'

train_file_ID = '/content/gdrive/MyDrive/Data/trainID.csv'
dev_file_ID = '/content/gdrive/MyDrive/Data/devID.csv'
test_file_ID = '/content/gdrive/MyDrive/Data/testID.csv'

test_trump = '/content/gdrive/MyDrive/Data/testtrump.csv'
test_police = '/content/gdrive/MyDrive/Data/testpolice.csv'
test_education = '/content/gdrive/MyDrive/Data/testeducation.csv'
test_immigration = '/content/gdrive/MyDrive/Data/testimmigration.csv'
test_economy = '/content/gdrive/MyDrive/Data/testeconomy.csv'


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [32]:
def read_embeddings(embeddings_file):
    '''Read in word embeddings from file and save as numpy array'''
    with open(embeddings_file, 'r') as embeddings:
      embeddingsdict = dict()
      for line in embeddings:
        line = line.split()
        word = line[0]
        embeds = line [1:]
        embeddingsdict[word] = np.array(embeds)

      return embeddingsdict

In [33]:
def read_corpus(corpus_file):
    '''Read in csv data set and returns docs and labels'''
    documents = []
    labels = []
    with open(corpus_file, encoding='utf-8') as f:
        lines = csv.reader(f, delimiter=',')
        for line in lines:
            line[0] = line[0].replace('timescontent.com', '').replace('MATP', '').replace('Reprint', '').replace('â€', '' ).replace('â€¢', '').replace('Â', '').replace('™️', '').replace('Herald', '')
            documents.append(line[0])
            labels.append(line[-1])
    return documents, labels

In [34]:
def get_emb_matrix(voc, emb):
    '''Get embedding matrix given vocab and the embeddings'''
    num_tokens = len(voc) + 2
    word_index = dict(zip(voc, range(len(voc))))
    # Bit hacky, get embedding dimension from the word "the"
    embedding_dim = len(emb["the"])
    # Prepare embedding matrix to the correct size
    embedding_matrix = np.zeros((num_tokens, embedding_dim))
    for word, i in word_index.items():
        embedding_vector = emb.get(word)
        if embedding_vector is not None:
            # Words not found in embedding index will be all-zeros.
            embedding_matrix[i] = embedding_vector
    # Final matrix with pretrained embeddings that we can feed to embedding layer
    return embedding_matrix

In [35]:
def create_model(Y_train, emb_matrix):
    '''Create the Keras model to use'''
    # Define settings, you might want to create cmd line args for them
    learning_rate = 0.01
    loss_function = 'binary_crossentropy'
    optim = SGD(learning_rate=learning_rate)
    # Take embedding dim and size from emb_matrix
    embedding_dim = len(emb_matrix[0])
    num_tokens = len(emb_matrix)
    num_labels = len(Y_train[0])
    # Now build the model
    model = Sequential()
    model.add(Embedding(num_tokens, embedding_dim, embeddings_initializer=Constant(emb_matrix),trainable=False))
    # Here you should add LSTM layers (and potentially dropout)
   # model.add(LSTM(units=num_labels))
    model.add(LSTM(units=64))
    #Dropout    # Ultimately, end with dense layer with softmax
    model.add(Dense(1, activation='sigmoid'))
    # Compile model using our settings, check for accuracy
    model.compile(loss=loss_function, optimizer="adam", metrics=['accuracy'])
    return model

In [36]:
def train_model(model, X_train, Y_train, X_dev, Y_dev):
    '''Trains the model'''
    # Potentially change these to cmd line args again
    # And yes, don't be afraid to experiment!
    verbose = 1
    batch_size = 16
    epochs = 50
    # Early stopping: stop training when there are three consecutive epochs without improving
    # It's also possible to monitor the training loss with monitor="loss"
    callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    # Finally fit the model to our data
    model.fit(X_train, Y_train, verbose=verbose, epochs=epochs, batch_size=batch_size, validation_data=(X_dev, Y_dev))
    # Print final accuracy for the model (clearer overview)
    return model

In [37]:
def test_set_predict(model, X_test, Y_test, ident, labels):
    '''Do predictions and measure accuracy on test set)'''
    # Get predictions using the trained model
    Y_pred = model.predict(X_test)
    class_one = Y_pred > 0.5
    digits = 3

    ac = accuracy_score(Y_test, class_one)
    pr = precision_score(Y_test, class_one, average='macro', zero_division=0)
    re = recall_score(Y_test, class_one, average='macro', zero_division=0)
    f1 = f1_score(Y_test, class_one, average='macro', zero_division=0)
    
    msg = f'''
    {classification_report(Y_test, class_one, digits=digits, zero_division=0, target_names=labels)}
    Accuracy:   {round(ac, digits)}
    Precision:  {round(pr, digits)}
    Recall:     {round(re, digits)}
    F-score:    {round(f1, digits)}
    '''
    return msg

In [None]:
embeddings = read_embeddings(embeddings_file)

# change test_file_ID to desired testset, for example read_corpus(test_immigration) for immigration
X_train, Y_train = read_corpus(train_file_OD)
X_dev, Y_dev = read_corpus(dev_file_ID)
X_test, Y_test = read_corpus(test_file_ID)


vectorizer = TextVectorization(standardize="lower_and_strip_punctuation", output_sequence_length=1000)
text_ds = tf.data.Dataset.from_tensor_slices(X_train + X_dev + X_test)
vectorizer.adapt(text_ds)
voc = vectorizer.get_vocabulary()




In [None]:
emb_matrix = get_emb_matrix(voc, embeddings)
encoder = LabelBinarizer()
Y_train_bin = encoder.fit_transform(Y_train)
Y_dev_bin = encoder.fit_transform(Y_dev)
X_train_vect = vectorizer(np.array([[s] for s in X_train])).numpy()
X_dev_vect = vectorizer(np.array([[s] for s in X_dev])).numpy()


In [None]:
model = create_model(Y_train, emb_matrix)
model = train_model(model, X_train_vect, Y_train_bin, X_dev_vect, Y_dev_bin)


In [None]:
Y_test_bin = encoder.fit_transform(Y_test)
X_test_vect = vectorizer(np.array([[s] for s in X_test])).numpy()

scores = model.evaluate(X_test_vect, Y_test_bin, verbose=0)
print("Accuracy: %.2f%%" % (scores[1]*100))

results = test_set_predict(model, X_test_vect, Y_test_bin, "test", labels=encoder.classes_)
print(results)