# Brexit Polarity Tweets - Word Embeddings

In [1]:
import os
import re
import nltk
import tensorflow as tf

from gensim import utils
from gensim.models import Word2Vec
from gensim.test.utils import datapath
from gensim.models.fasttext import FastText

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TextVectorization, Dense, Embedding
from tensorflow.keras.layers import GlobalAveragePooling1D
from sklearn.preprocessing import LabelEncoder

In [2]:
# settings
EMBEDDING_DIMS = 300
SEQ_LENGTH  = 200
WINDOW_SIZE = 15
EPOCHS      = 15
MIN_COUNT   = 5

# paths to data
PATH_TRAIN = "./data/preprocessed/train/"
PATH_EMBEDDINGS = "./data/embeddings/"

tokenizer = nltk.TweetTokenizer()

In [3]:
# define helper functions
def read_tweet(filepath):
    import pandas as pd
    
    tweets = []
    with open(filepath, "r") as f:
        for tweet in f:
            tweets.append(tweet.replace("\n", ""))
    return pd.Series(tweets)

def train_embedding(model_type, **kwargs):
    if model_type.lower() == "word2vec":
        model = Word2Vec(**kwargs)
    elif model_type.lower() == "fasttext":
        model = FastText(**kwargs)
    else:
        raise Exception("`model_type` must either 'word2vec' or 'fasttext'.")
    
    return model.wv.index_to_key, model.wv.vectors

def train_vectorizer(tweets, vocab_size, seq_length):
    # create a variable to store frequency distribution based on label
    fdist = nltk.FreqDist()

    # calculate the frequency of tokens based on label
    for index, tweet in zip(tweets.index, tweets):
        for token in tokenizer.tokenize(tweet):
            fdist[token] += 1
    
    vocab = [token for token, count in fdist.most_common(vocab_size)]
    
    return TextVectorization(output_sequence_length = seq_length,
                             output_mode = 'int',
                             vocabulary  = vocab)

    
def train_embedding_nn(tweets, targets, vocab_size, seq_length, embedding_dims, epochs = 5):
    vectorizer = train_vectorizer(tweets, vocab_size, seq_length)
    vocab = vectorizer.get_vocabulary()[2:]
    
    X = vectorizer(tweets)
    y = LabelEncoder().fit_transform(targets)
    
    model = Sequential([
        Embedding(len(vocab) + 2, embedding_dims, input_shape = (seq_length,)),
        GlobalAveragePooling1D(),
        Dense(1, activation = "sigmoid")
    ])
    
    model.compile(optimizer = 'adam', loss = "BinaryCrossentropy")
    
    model.fit(X, y, epochs = epochs, verbose = 0)
    
    vectors = model.layers[0].get_weights()[0][2:].astype(str)
    
    tf.keras.backend.clear_session()
    
    return vocab, vectors

In [4]:
def get_all_embeddings(tweets, targets):
    def tokenize_tweet(tweet):
        return nltk.TweetTokenizer().tokenize(tweet)
    
    embeddings = {"bw": {}, "sg": {}, "ft": {}, "nn": {}}
    
    embeddings["bw"]["vocab"], embeddings["bw"]["vectors"] = train_embedding(
        "Word2Vec",
        sentences = tweets.apply(tokenize_tweet),
        vector_size = EMBEDDING_DIMS,
        window = WINDOW_SIZE,
        min_count = MIN_COUNT,
        epochs = EPOCHS,
        sg = 0, # bag of words
        workers = 10
    )
    
    embeddings["sg"]["vocab"], embeddings["sg"]["vectors"] = train_embedding(
        "Word2Vec",
        sentences = tweets.apply(tokenize_tweet),
        vector_size = EMBEDDING_DIMS,
        window = WINDOW_SIZE,
        min_count = MIN_COUNT,
        epochs = EPOCHS,
        sg = 1, # skipgram
        workers = 10
    )
    
    embeddings["ft"]["vocab"], embeddings["ft"]["vectors"] = train_embedding(
        "Word2Vec",
        sentences = tweets.apply(tokenize_tweet),
        vector_size = EMBEDDING_DIMS,
        window = WINDOW_SIZE,
        min_count = MIN_COUNT,
        epochs = EPOCHS,
        workers = 10
    )
    
    embeddings["nn"]["vocab"], embeddings["nn"]["vectors"] = train_embedding_nn(
        tweets = tweets,
        targets = targets,
        seq_length = SEQ_LENGTH,
        embedding_dims = EMBEDDING_DIMS,
        vocab_size = 25000,
        epochs = EPOCHS
    )
    
    return embeddings

def save_embeddings(vocab, vectors, filepath):
    assert len(vocab) == len(vectors)
    
    with open(filepath, "w") as f:
        for word, vector in zip(vocab, vectors):
            f.write(word + " ")
            f.write(" ".join(vector) + "\n")

In [5]:
tweet_files = [f for f in os.listdir(PATH_TRAIN) if re.match(".*clean.*", f)]
targets = read_tweet(PATH_TRAIN + "0-targets.txt") # re-import for all rows

for tweet_file in tweet_files:
    print(tweet_file)

    tweets = read_tweet(PATH_TRAIN + tweet_file)
    embeddings = get_all_embeddings(tweets, targets)    
    
    for key in embeddings.keys():
        save_embeddings(embeddings[key]["vocab"], 
                        embeddings[key]["vectors"].astype(str),
                        PATH_EMBEDDINGS + key + "-" + tweet_file)

0-clean.txt
1-clean-lemma.txt
1-clean-negat.txt
1-clean-nostw.txt
2-clean-lemma-negat.txt
2-clean-nostw-lemma.txt
2-clean-nostw-negat.txt
3-clean-nostw-lemma-negat.txt
