# 3. Twitter Sentiment Analysis with PosTag Lemmatization and Neural Net classification

Ќе се обидеме да искористиме невронска мрежа наместо едноставен класификатор, претпроцесирањето на твитовите ќе го оставиме како во вториот обид со користење на WordNet лематизација од nltk пакетот. 

In [1]:
import nltk
from nltk.stem.wordnet import WordNetLemmatizer
from typing import List

# lemmatization with wordnet and lexicon creation

POS_TAGS_TO_IGNORE = ['CC', 'DT', 'EX', 'LS', 'PDT', 'POS', 'RP', 'UH', 'WDT', 'WP', 'WP$', 'WRB', 'MD']
POS_TAGS_TO_LEAVE_AS_IS = ['CD', 'FW', 'IN', 'PRP', 'PRP$']
POS_TAGS_TO_LEMMATIZE = ['JJ', 'JJR', 'JJS', 'NN', 'NNS', 'NNPS', 'RB', 'RBR', 'RBS', 'VB', 'VBD', 'VBG', 'VBN', 'VBP',
                         'VBZ']


def get_wordnet_pos(pos_tag: str):
    if pos_tag.startswith('J'):
        return nltk.corpus.reader.ADJ
    elif pos_tag.startswith('V'):
        return nltk.corpus.reader.VERB
    elif pos_tag.startswith('N'):
        return nltk.corpus.reader.NOUN
    elif pos_tag.startswith('R'):
        return nltk.corpus.reader.ADV
    else:
        return None


def lemmatize_with_wordnet(word: str, pos_tag: str, lemmatizer: WordNetLemmatizer):
    wordnet_pos = get_wordnet_pos(pos_tag)
    if wordnet_pos is not None:
        return lemmatizer.lemmatize(word, wordnet_pos)

    return word


def prune_tokens_based_on_pos(word: str, pos_tag: str, lemmatizer: WordNetLemmatizer):
    if pos_tag in POS_TAGS_TO_IGNORE:
        return None
    elif pos_tag in POS_TAGS_TO_LEAVE_AS_IS:
        return word
    elif pos_tag in POS_TAGS_TO_LEMMATIZE:
        return lemmatize_with_wordnet(word, pos_tag, lemmatizer)
    else:
        return word


def prune_token_list(tokens: List[str], lemmatizer: WordNetLemmatizer) -> List[List[str]]:
    tokens_with_pos = nltk.pos_tag(tokens)
    pruned = [prune_tokens_based_on_pos(token[0], token[1], lemmatizer) for token in tokens_with_pos]
    return list(filter(lambda x: x is not None, pruned))


def tokenize_and_lemmatize_tweets(tweets: List[str]) -> List[List[str]]:
    lemmatizer = WordNetLemmatizer()

    pruned_tweets = []
    count = 0
    for tweet in tweets:
        count += 1
        tokenized_tweet = nltk.word_tokenize(tweet.lower())
        pruned_tweet = prune_token_list(tokenized_tweet, lemmatizer)
        pruned_tweets.append(pruned_tweet)
        if count % 1000 == 0:
            print("Processed tweets: ",count)
            
    return pruned_tweets

# end of lexicon creation and lemmatization

print("Example usage:")
print(tokenize_and_lemmatize_tweets(["i like to eat pie"]))
print(tokenize_and_lemmatize_tweets(["eating pie can be extremely difficult"]))
print(tokenize_and_lemmatize_tweets(["How are you my friend? -Fine, thanks."]))


Example usage:
[['i', 'like', 'to', 'eat', 'pie']]
[['eat', 'pie', 'be', 'extremely', 'difficult']]
[['be', 'you', 'my', 'friend', '?', '-fine', ',', 'thanks', '.']]


In [2]:
import re

def remove_pattern_from_string(given_string, re_compiled_pattern):
    """removes a re compiled regex pattern from a given string """
    return re_compiled_pattern.sub("", given_string)

def remove_links_from_tweets(tweets_list: List[str]) -> List[str]: 
    """for a given list of tweet texts removes all links starting with http:// or https://"""
    regex = re.compile("(https?://\S+)", re.IGNORECASE)
    tweets_without_links = []
    for tweet in tweets_list:
        clean_tweet = remove_pattern_from_string(tweet, regex)
        tweets_without_links.append(clean_tweet)
    return tweets_without_links

In [3]:
from nltk.corpus import wordnet
from nltk import WordNetLemmatizer

letters_only_regex = re.compile("[^a-zA-Z]")

def keep_only_wordnet_tokens(tweet_tokens:List[str]) -> List[str]:
    tweet_tokens = [letters_only_regex.sub("", word) for word in tweet_tokens]
    return [word for word in tweet_tokens if wordnet.synsets(word)]
    
example_tokens = ['i', 'like', 'pie', '#fire', 'bableh','3rd']
pruned_tokens = keep_only_wordnet_tokens(example_tokens)
print(example_tokens, ' -> ', pruned_tokens)

def keep_only_wordnet_tokens_in_list_of_tweets(tweets: List[List[str]]) -> List[List[str]]:
    return [keep_only_wordnet_tokens(tweet) for tweet in tweets]

['i', 'like', 'pie', '#fire', 'bableh', '3rd']  ->  ['i', 'like', 'pie', 'fire']


In [4]:
def clean_tweets_tralala(tweet_list:List[str]) -> List[str]:
    modified_tweets = remove_links_from_tweets(tweet_list)
    modified_tweets = tokenize_and_lemmatize_tweets(modified_tweets)
    modified_tweets = keep_only_wordnet_tokens_in_list_of_tweets(modified_tweets)
    return modified_tweets

In [5]:
def get_tuple_from_input_file(lines_with_tweet_and_class, delimiter):
    """splits each tuple (tweet, class) and appends them to tweets[] and classes[] accordingly and 
        returns them as (tweets, classes)"""
    tweets = []
    classes = []
    for tweet in lines_with_tweet_and_class:
        splits = tweet.split(delimiter)
        tweets.append(splits[0])
        classes.append(splits[1])

    return tweets, classes


def get_tuple_from_test_input_file(tweets_with_number, delimiter):
    """splits each tuple (index, tweet) adding the results in into tweets and indexes returns (tweets, indexes)"""
    tweets = []
    index_numbers = []
    for tweet in tweets_with_number:
        splits = tweet.split(delimiter)
        tweets.append(splits[1])
        index_numbers.append(splits[0])

    return tweets, index_numbers



## Модел на невронската мрежа

За да може да креираме невронска мрежа и да ја тренираме, мораме влезот да го претставиме векторски, го користиме принципот на bag-of-centroids кој го опишавме во првиот документ. За креирање и тренирање на невронската мрежа ќе го користиме Tensorflow. Невронската мрежа ќе има 3 слоеви, секој слој по 500 неврони. Влезот ќе претставува вектор од толку елементи колку што има центроиди (кластери) изградени од word2vec моделот. 

In [50]:
from typing import List, Tuple

import numpy as np
import tensorflow as tf

n_nodes_hl1 = 500
n_nodes_hl2 = 500
n_nodes_hl3 = 500

n_classes = 3
batch_size = 100


def convert_question_to_vector(word_centroid_map, tweet: List[str]) -> np.ndarray:
    #
    # The number of clusters is equal to the highest cluster index
    # in the word / centroid map
    num_centroids = max(word_centroid_map.values()) + 1
    #
    # Pre-allocate the bag of centroids vector (for speed)
    bag_of_centroids = np.zeros(num_centroids, dtype="float32")
    #
    # Loop over the words in the review. If the word is in the vocabulary,
    # find which cluster it belongs to, and increment that cluster count
    # by one
    for word in tweet:
        if word in word_centroid_map:
            index = word_centroid_map[word]
            bag_of_centroids[index] += 1
    #
    # Return the "bag of centroids"
    return bag_of_centroids


In [24]:
def convert_tweets_to_feature_vectors(centroid_map, clean_tweets: List[List[str]]) -> List[np.ndarray]:
    return [convert_question_to_vector(centroid_map, tweet) for tweet in clean_tweets]


In [51]:

def neural_network_model(data, num_input:int):
    print("creating network model")
    hidden_1_layer = {'weights': tf.Variable(tf.random_normal([num_input, n_nodes_hl1])),
                      'biases': tf.Variable(tf.random_normal([n_nodes_hl1]))}

    hidden_2_layer = {'weights': tf.Variable(tf.random_normal([n_nodes_hl1, n_nodes_hl2])),
                      'biases': tf.Variable(tf.random_normal([n_nodes_hl2]))}

    hidden_3_layer = {'weights': tf.Variable(tf.random_normal([n_nodes_hl2, n_nodes_hl3])),
                      'biases': tf.Variable(tf.random_normal([n_nodes_hl3]))}

    output_layer = {'weights': tf.Variable(tf.random_normal([n_nodes_hl3, n_classes])),
                    'biases': tf.Variable(tf.random_normal([n_classes]))}

    l1 = tf.add(tf.matmul(data, hidden_1_layer['weights']), hidden_1_layer['biases'])
    l1 = tf.nn.relu(l1)

    l2 = tf.add(tf.matmul(l1, hidden_2_layer['weights']), hidden_2_layer['biases'])
    l2 = tf.nn.relu(l2)

    l3 = tf.add(tf.matmul(l2, hidden_3_layer['weights']), hidden_3_layer['biases'])
    l3 = tf.nn.relu(l3)

    output = tf.matmul(l3, output_layer['weights']) + output_layer['biases']

    return output

In [26]:
import pickle

def convert_class(class_of_tweet:str) -> List[int]:
    if class_of_tweet.strip() == 'positive':
        return [1,0,0]
    elif class_of_tweet.strip() == 'negative':
        return [0,0,1]
    return [0,1,0]

def convert_classes(classes: List[str]) -> List[List[int]]:
    return [convert_class(class_of_tweet) for class_of_tweet in classes]

input_lines = []

# training data set, each line = (tweet, class)
train_file_name = "train_and_dev_data/tweet_input/train_input.tsv"

# test data set, each line = (index/ line no., tweet)
test_file_name = "train_and_dev_data/tweet_input/test_input.tsv"

# solutions file, each line = (index, correct class)
solutions_file_name = "train_and_dev_data/tweet_output/test_solutions.tsv"

print("Reading input from: ", train_file_name)
with open(train_file_name) as f:
    input_lines = f.readlines()

# convert from (tweet, class)[] to (tweet[], class[])
tweets_and_class_tuple = get_tuple_from_input_file(input_lines, "\t")
clean_tweets = clean_tweets_tralala(tweets_and_class_tuple[0])

converted_classes = convert_classes(tweets_and_class_tuple[1])
print("Example converted classes: ", converted_classes[0:10])


Reading input from:  train_and_dev_data/tweet_input/train_input.tsv
Processed tweets:  1000
Processed tweets:  2000
Processed tweets:  3000
Processed tweets:  4000
Example converted classes:  [[0, 1, 0], [1, 0, 0], [1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 1, 0], [0, 1, 0], [1, 0, 0], [1, 0, 0], [0, 1, 0]]


За да не мора да правиме кластерирање на центроидите на секое пуштање, ги имаме серијализирано резултатите со пакетот pickle. 

In [27]:
idx = pickle.load(open("train_and_dev_data/neural_model/idx", "rb"))
index2word = pickle.load(open("train_and_dev_data/neural_model/index2word", "rb"))
word_centroid_map = dict(zip(index2word, idx))

Ги конвертираме твитовите од тренинг множеството во вектори и ги спремаме за тренирање на мрежата

In [28]:
#print(clean_tweets[0:3])
train_tweet_features = convert_tweets_to_feature_vectors(word_centroid_map, clean_tweets)

Истото го правиме и на тест множеството

In [29]:

print("Reading test input from: ", test_file_name)
with open(test_file_name) as f:
    test_input_lines = f.readlines()

# convert from (tweet, class)[] to (tweet[], class[])
test_tweets_and_class_tuple = get_tuple_from_test_input_file(test_input_lines, "\t")
clean_test_tweets = clean_tweets_tralala(test_tweets_and_class_tuple[0])

test_tweet_features = convert_tweets_to_feature_vectors(word_centroid_map, clean_test_tweets)

solutions = []
with open(solutions_file_name) as f:
    solutions = f.readlines()

solution_and_index = get_tuple_from_test_input_file(solutions, "\t")
solutions = solution_and_index[0]

converted_solutions = convert_classes(solutions)
print(converted_solutions[0])

Reading test input from:  train_and_dev_data/tweet_input/test_input.tsv
Processed tweets:  1000
Processed tweets:  2000
Processed tweets:  3000
Processed tweets:  4000
Processed tweets:  5000
Processed tweets:  6000
Processed tweets:  7000
Processed tweets:  8000
Processed tweets:  9000
Processed tweets:  10000
Processed tweets:  11000
Processed tweets:  12000
Processed tweets:  13000
Processed tweets:  14000
Processed tweets:  15000
Processed tweets:  16000
Processed tweets:  17000
Processed tweets:  18000
Processed tweets:  19000
Processed tweets:  20000
Processed tweets:  21000
Processed tweets:  22000
Processed tweets:  23000
Processed tweets:  24000
Processed tweets:  25000
Processed tweets:  26000
Processed tweets:  27000
Processed tweets:  28000
Processed tweets:  29000
Processed tweets:  30000
Processed tweets:  31000
Processed tweets:  32000
[1, 0, 0]


In [54]:

def train_neural_network(x, train_data: Tuple[List, List], test_data: Tuple[List, List]):
    print("training neural network")
    num_input = len(train_data[0][0])
    prediction = neural_network_model(x, num_input)
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=prediction, labels=y))
    optimizer = tf.train.AdamOptimizer().minimize(cost)

    hm_epochs = 20
    pos_class = np.array([1, 0, 0])
    neg_class = np.array([0, 0, 1])
    
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        for epoch in range(hm_epochs):
            print("starting epoch", epoch)
            epoch_loss = 0
            for num_batch in range(int(len(train_data[0]) / batch_size)):
                epoch_x = train_data[0][num_batch * batch_size: (num_batch + 1) * batch_size]
                epoch_y = train_data[1][num_batch * batch_size:(num_batch + 1) * batch_size]
                _, c = sess.run([optimizer, cost], feed_dict={x: epoch_x, y: epoch_y})
                epoch_loss += c

            print('Epoch', epoch, ' completed out of ', hm_epochs, '; loss:', epoch_loss)

        correct = tf.equal(tf.argmax(prediction, 1), tf.argmax(y, 1))
        accuracy = tf.reduce_mean(tf.cast(correct, 'float'))
        incorrect = tf.not_equal(tf.argmax(prediction, 1), tf.argmax(y, 1))
        predicted_pos = tf.equal(tf.argmax(prediction, 1), tf.argmax(pos_class, 0))
        predicted_neg = tf.equal(tf.argmax(prediction, 1), tf.argmax(neg_class, 0))

        tp = tf.logical_and(correct, predicted_pos)
        fp = tf.logical_and(incorrect, predicted_pos)
        tn = tf.logical_and(correct, predicted_neg)
        fn = tf.logical_and(incorrect, predicted_neg)

        accuracy = tf.reduce_mean(tf.cast(correct, 'float'))
        tp_red = tf.reduce_mean(tf.cast(tp, 'float'))
        tn_red = tf.reduce_mean(tf.cast(tn, 'float'))
        fp_red = tf.reduce_mean(tf.cast(fp, 'float'))
        fn_red = tf.reduce_mean(tf.cast(fn, 'float'))
        print('Accuracy:', accuracy.eval({x: test_data[0], y: test_data[1]}))
        ktp = tp_red.eval({x: test_data[0], y: test_data[1]})
        ktn = tn_red.eval({x: test_data[0], y: test_data[1]})
        kfp = fp_red.eval({x: test_data[0], y: test_data[1]})
        kfn = fn_red.eval({x: test_data[0], y: test_data[1]})

        print('TP:', ktp)
        print('TN:', ktn)
        print('FP:', kfp)
        print('FN:', kfn)
        prec_p  = (ktp) / (ktp + kfp)
        rek_p = (ktp) / (ktp + kfn)
        print('Precision P: ', prec_p)
        print('Recall P: ', rek_p)
        prec_n = (ktn) / (ktn + kfn)
        rek_n = (ktn) / (ktn + kfp)
        print('Precision N: ', prec_n)
        print('Recall N: ', rek_n)
        f1_p = 2*prec_p*rek_p/(prec_p+rek_p)
        f1_n = 2*prec_n*rek_n/(prec_n+rek_n)
        print('F1pn:', (f1_p+f1_n)/2)

In [55]:
print(len(train_tweet_features))
print(len(converted_classes))

print(len(test_tweet_features))
print(len(converted_solutions))

_x = tf.placeholder('float', [None, len(train_tweet_features[0])])
y = tf.placeholder('float')

train_neural_network(_x, (train_tweet_features, converted_classes), (test_tweet_features, converted_solutions))


4954
4954
32009
32009
training neural network
creating network model
starting epoch 0
Epoch 0  completed out of  20 ; loss: 233350.146973
starting epoch 1
Epoch 1  completed out of  20 ; loss: 129961.278809
starting epoch 2
Epoch 2  completed out of  20 ; loss: 82637.1984253
starting epoch 3
Epoch 3  completed out of  20 ; loss: 56088.772522
starting epoch 4
Epoch 4  completed out of  20 ; loss: 42728.5026245
starting epoch 5
Epoch 5  completed out of  20 ; loss: 33485.5773926
starting epoch 6
Epoch 6  completed out of  20 ; loss: 27504.4920349
starting epoch 7
Epoch 7  completed out of  20 ; loss: 24557.4794846
starting epoch 8
Epoch 8  completed out of  20 ; loss: 26855.1344891
starting epoch 9
Epoch 9  completed out of  20 ; loss: 9973.94610405
starting epoch 10
Epoch 10  completed out of  20 ; loss: 4743.44536918
starting epoch 11
Epoch 11  completed out of  20 ; loss: 3684.25131381
starting epoch 12
Epoch 12  completed out of  20 ; loss: 3300.7180016
starting epoch 13
Epoch 13  co