#  Sentiment Classification

#### Import necessary packages
You may import more packages here.

In [1]:
# Import necessary packages
import re
from os.path import join
import numpy as np
import torch
from transformers import BertTokenizer,BertModel
from sklearn import svm
import nltk
from nltk.corpus import stopwords
from sklearn import linear_model
import os 
import torchtext
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from sklearn.preprocessing import LabelEncoder
import torch.nn.functional as F
from sklearn.feature_extraction.text import CountVectorizer
from torchtext.data import get_tokenizer
from tqdm import tqdm
from sklearn.metrics import accuracy_score
import gc
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader,TensorDataset
from torchtext.data.functional import to_map_style_dataset
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.optim import Adam
from torchtext.data.functional import to_map_style_dataset
from sklearn.metrics import f1_score

# nltk.download('stopwords')

  torch.utils._pytree._register_pytree_node(


In [2]:
# Define test sets
testsets = ['twitter-test1.txt','twitter-test2.txt', 'twitter-test3.txt']


In [3]:
# Skeleton: Evaluation code for the test sets
def read_test(testset):
    '''
    readin the testset and return a dictionary
    :param testset: str, the file name of the testset to compare
    '''
    id_gts = {}
    with open(testset, 'r', encoding='utf8') as fh:
        for line in fh:
            fields = line.split('\t')
            tweetid = fields[0]
            gt = fields[1]

            id_gts[tweetid] = gt

    return id_gts


def confusion(id_preds,testset, classifier):
    '''
    print the confusion matrix of {'positive', 'netative'} between preds and testset
    :param id_preds: a dictionary of predictions formated as {<tweetid>:<sentiment>, ... }
    :param testset: str, the file name of the testset to compare
    :classifier: str, the name of the classifier
    '''
    id_gts = read_test(testset)

    gts = []
    for m, c1 in id_gts.items():
        if c1 not in gts:
            gts.append(c1)

    gts = ['positive', 'negative', 'neutral']

    conf = {}
    for c1 in gts:
        conf[c1] = {}
        for c2 in gts:
            conf[c1][c2] = 0

    for tweetid, gt in id_gts.items():
        if tweetid in id_preds:
            pred = id_preds[tweetid]
        else:
            pred = 'neutral'
        conf[pred][gt] += 1

    print(''.ljust(12) + '  '.join(gts))

    for c1 in gts:
        print(c1.ljust(12), end='')
        for c2 in gts:
            if sum(conf[c1].values()) > 0:
                print('%.3f     ' % (conf[c1][c2] / float(sum(conf[c1].values()))), end='')
            else:
                print('0.000     ', end='')
        print('')

    print('')


def evaluate(id_preds, testset, classifier):
    '''
    print the macro-F1 score of {'positive', 'netative'} between preds and testset
    :param id_preds: a dictionary of predictions formated as {<tweetid>:<sentiment>, ... }
    :param testset: str, the file name of the testset to compare
    :classifier: str, the name of the classifier
    '''
    id_gts = read_test(testset)

    acc_by_class = {}
    for gt in ['positive', 'negative', 'neutral']:
        acc_by_class[gt] = {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0}

    catf1s = {}

    ok = 0
    for tweetid, gt in id_gts.items():
        if tweetid in id_preds:
            pred = id_preds[tweetid]
        else:
            pred = 'neutral'

        if gt == pred:
            ok += 1
            acc_by_class[gt]['tp'] += 1
        else:
            acc_by_class[gt]['fn'] += 1
            acc_by_class[pred]['fp'] += 1

    catcount = 0
    itemcount = 0
    macro = {'p': 0, 'r': 0, 'f1': 0}
    micro = {'p': 0, 'r': 0, 'f1': 0}
    semevalmacro = {'p': 0, 'r': 0, 'f1': 0}

    microtp = 0
    microfp = 0
    microtn = 0
    microfn = 0
    for cat, acc in acc_by_class.items():
        catcount += 1

        microtp += acc['tp']
        microfp += acc['fp']
        microtn += acc['tn']
        microfn += acc['fn']

        p = 0
        if (acc['tp'] + acc['fp']) > 0:
            p = float(acc['tp']) / (acc['tp'] + acc['fp'])

        r = 0
        if (acc['tp'] + acc['fn']) > 0:
            r = float(acc['tp']) / (acc['tp'] + acc['fn'])

        f1 = 0
        if (p + r) > 0:
            f1 = 2 * p * r / (p + r)

        catf1s[cat] = f1

        n = acc['tp'] + acc['fn']

        macro['p'] += p
        macro['r'] += r
        macro['f1'] += f1

        if cat in ['positive', 'negative']:
            semevalmacro['p'] += p
            semevalmacro['r'] += r
            semevalmacro['f1'] += f1

        itemcount += n

    micro['p'] = float(microtp) / float(microtp + microfp)
    micro['r'] = float(microtp) / float(microtp + microfn)
    micro['f1'] = 2 * float(micro['p']) * micro['r'] / float(micro['p'] + micro['r'])

    semevalmacrof1 = semevalmacro['f1'] / 2

    print(testset + ' (' + classifier + '): %.3f' % semevalmacrof1)

#### Load training set, dev set and testing set
Here, you need to load the training set, the development set and the test set. For better classification results, you may need to preprocess tweets before sending them to the classifiers.

In [4]:
def preprocess_tweets(tweets):

    emoji = re.compile('[\U00010000-\U0010ffff]', flags=re.UNICODE)
    non_alpha_numeric = re.compile('[^a-zA-Z0-9 -]')
    url = re.compile('https?:[^\s]+')
    punctuation = re.compile('[^\w\s]')
    username = re.compile('@[\w]+')
    tags = re.compile('#[\w]+')
    numbers = re.compile(r'\b\d+\b')
    single_char = re.compile(r'\b\w\b')

    for twt in tweets:
        twt[0] = username.sub(r'', twt[0])
        twt[0] = tags.sub(r'', twt[0])
        twt[0] = url.sub(r'', twt[0])
        twt[0] = emoji.sub(r'', twt[0])
        twt[0] = punctuation.sub(r'', twt[0])
        twt[0] = non_alpha_numeric.sub(r'', twt[0])
        twt[0] = numbers.sub(r'', twt[0])
        twt[0] = single_char.sub(r'', twt[0])

        remove_stop_words = ' '.join(word for word in twt[0].split() if word not in stopwords.words('english'))
        twt[0] = remove_stop_words
    
    return tweets

In [5]:
def encode_labels(labels):

    # label_encoder = LabelEncoder()
    # encoded_labels = label_encoder.fit_transform(labels)
    label_map = {"positive": 0, "negative": 1, "neutral": 2}
    encoded_labels = [label_map[label] for label in labels]

    return encoded_labels

In [6]:
# Load training set, dev set and testing set
data = {}
tweetids = {}
tweetgts = {}
tweets = {}

X_testsets = []
y_testsets = []
test_tweet_id = []

for dataset in ['twitter-dev-data.txt'] + testsets:
    data[dataset] = []
    tweets[dataset] = []
    tweetids[dataset] = []
    tweetgts[dataset] = []

    count = 0
    
    # write code to read in the datasets here
    with open(dataset, 'r', encoding="utf-8") as file:
        for line in file:
            line = str.lower(line)
            tweetids[dataset].append(re.findall('^[^\d]*(\d+)',line))
            tweetgts[dataset].append(re.search(r'(neutral)|(positive)|(negative)',line).group())
            tweets[dataset].append(re.findall(r'(?:[neutral|positive|negative])\b\s*(.+)',line))
            data[dataset].append(line)

    # Preprocessing
    # Format data for tokenizing - removal of redundant data
    tweets[dataset]=(preprocess_tweets(tweets[dataset]))

    # Convert labels to numerical values for easier processing
    tweetgts[dataset] = encode_labels(tweetgts[dataset])

    if dataset in testsets:
        X_testsets.append(tweets[dataset])
        y_testsets.append(tweetgts[dataset])
        test_tweet_id.append(tweetids[dataset])

X_train = tweets['twitter-dev-data.txt']
y_train = tweetgts['twitter-dev-data.txt']


In [7]:
print(X_train[:10])
print(y_train[:10])

[['hey im gonna dublin february know im saying'], ['literally excited im going sam smith concert october'], ['option buy 2gb ram model moto 3rd gen instead 1gb model'], ['little ms philippines'], ['know tpp expanded wars drone strikes mass surveillance'], ['using moto 2nd gen month absolute delight stock android good design best'], ['juan heard green days time life 1st time since leaving florida burst tears miss everyone kellogg'], ['fidel castro died dont worry george soros willing fill shoes wicked man world'], ['cried every episode dream high starting episode tt tomorrow shall watch last final episode'], ['show updates ipad apple tv products today']]
[2, 0, 2, 2, 1, 0, 1, 1, 0, 2]


#### Feature Functions
The following section outlines some functions to tokenize and extract features that are used as part of the next classification section.

In [8]:
def tokenize_tweets_with_bert(tweets):

    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    model = BertModel.from_pretrained('bert-base-uncased')

    encoding = [tokenizer.encode(twt, add_special_tokens=True) for twt in tweets]

    encoding = torch.tensor(encoding)

    with torch.no_grad():
        outputs = model(encoding)
    
    embeddings = outputs.last_hidden_state
    
    return embeddings

In [9]:
def build_vocab(tweets):
    tokenizer = get_tokenizer("basic_english")
    for text in tweets[0]:
        yield tokenizer(text)
        
def bag_of_words(tweets):
    tweets = [' '.join(sublist) for sublist in tweets if sublist]
    vocab = build_vocab_from_iterator(build_vocab([tweets]), specials=["<UNK>"])
    vocab.set_default_index(vocab["<UNK>"])
    tokenizer = get_tokenizer("basic_english")
    vectorizer = CountVectorizer(max_features=5000,tokenizer=tokenizer)
    # print(vocab.get_stoi())
    return vectorizer.fit_transform(tweets).todense()


In [10]:
# a) Build an embedding matrix that will be loaded into an Embedding layer later. It must be a matrix
#of shape (max_words, embedding_dim), where each entry i contains the embedding_dim-
#dimensional vector for the word of index i in our reference word index (built during
#tokenization).

def gloVe_word_embeddings(tokenized_tweets):
    #Set embedding_dim and max_words

    max_words = 5000
    embedding_dim = 100

    glove_dir = 'glove\glove.6B.100d.txt'
    embed_id = {}
    with open(glove_dir, encoding="utf-8") as f:
        for line in f:
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embed_id[word] = coefs
            
    m_index = 0

    embedding_matrix = np.zeros((max_words, embedding_dim))
    for i, twts in enumerate(tokenized_tweets):
        for word in twts:
            if  i < max_words:
                embedding_vector = embed_id.get(word)
                if embedding_vector is not None:
                    embedding_matrix[i] = embedding_vector

    # print(embedding_matrix.shape)
    return embedding_matrix,max_words,embedding_dim,embed_id

#### CNN Classifer for part 3

The following section outlines some functions to tokenize and extract features that are used as part of the next classification section.

In [11]:
def CalcValLossAndAccuracy(model, loss_fn, val_loader):
    # -- Disable the gradient --
    with torch.no_grad():
        Y_shuffled, Y_preds, losses = [],[],[]
        for X, Y in val_loader:
            preds = model(X)
            loss  = loss_fn(preds, Y)
            losses.append(loss.item())

            Y_shuffled.append(Y)
            Y_preds.append(preds.argmax(dim=-1))

        Y_shuffled = torch.cat(Y_shuffled)
        Y_preds    = torch.cat(Y_preds)

        print("Valid Loss : {:.3f}".format(torch.tensor(losses).mean()))
        print("Valid Acc  : {:.3f}".format(accuracy_score(Y_shuffled.detach().numpy(), Y_preds.detach().numpy())))

    return Y_preds

In [12]:
def TrainingLoop(model, loss_fn, optimizer, train_loader, val_loader, epochs=1):
    for i in range(1, epochs+1):
        losses = []
        # Cycle over the training examples (using minibatches)
        # X are the examples, Y are the associated labels
        for X, Y in tqdm(train_loader):
            # Make the prediction
            Y_preds = model(X)
            # Compute the loss
            loss = loss_fn(Y_preds, Y)
            losses.append(loss.item())

            # Reset the gradient
            optimizer.zero_grad()
            
            # Compute the gradient
            loss.backward()
            
            # Update the weights
            optimizer.step()

        print("Train Loss : {:.3f}".format(torch.tensor(losses).mean()))
        CalcValLossAndAccuracy(model, loss_fn, val_loader)
        break


In [13]:
# Evaluate on the Test Set - Compute the statistics for the Confusion Matrix
def MakePredictions(model, loader):
    Y_shuffled, Y_preds = [], []
    for X, Y in loader:
        preds = model(X)
        Y_preds.append(preds)
        Y_shuffled.append(Y)
    gc.collect()
    Y_preds, Y_shuffled = torch.cat(Y_preds), torch.cat(Y_shuffled)

    return Y_shuffled.detach().numpy(), F.softmax(Y_preds, dim=-1).argmax(dim=-1).detach().numpy()


In [14]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

embedding_matrix,max_words,embedding_dim,embedding = gloVe_word_embeddings(X_train)

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True

class TextClassifier(nn.Module):
    def __init__(self,max_words,embedding_dim):
        super(TextClassifier, self).__init__()

        self.embedding = nn.Embedding(max_words, embedding_dim)
        self.embedding.weight.requires_grad = False
        self.lstm = nn.LSTM(embedding_dim,
                            64,
                            num_layers = 2,
                            bidirectional = True,
                            dropout = 0.2,
                            batch_first = True
                           )
        self.fc = nn.Linear(64 * 2,3)
        self.dropout = nn.Dropout(0.2)

    def forward(self, X_batch):
        out = self.embedding(X_batch) 
        out =  self.dropout(out)
        out, (hidden_state,cell_state) = self.lstm(out)
        out = torch.cat((hidden_state[-2,:,:], hidden_state[-1,:,:]), dim = 1)
        out =  self.dropout(out)
        out = self.fc(out)
        return out
    
def create_LSTM_model(X_train,y_train,X_testsets,y_testsets):
    # Set the device to perform the computation
    id_preds = []
    y_preds = {}

    #set seed for reporducibility 
    setup_seed(42)

    text_classifier = TextClassifier(max_words,embedding_dim).to(DEVICE)
    # X_train = [twt for twts in X_train for twt in twts]
    X_train_bow = bag_of_words(X_train)
    X_train_tensor = torch.tensor(X_train_bow[:(len(y_train))])
    y_train_tensor = torch.tensor(y_train)
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    train_loader = DataLoader(dataset=train_dataset,batch_size=30,shuffle=True, num_workers=10)
    
    epochs = 2
    learning_rate = 1e-4

    #Loss Func
    loss_fn = nn.CrossEntropyLoss()
    # Optimizer
    optimizer = Adam(text_classifier.parameters(), lr=learning_rate)

    ### Training Loop ###
    for i in range(len(X_testsets)):
        
        X_test = X_testsets[i]
        y_test = y_testsets[i]
    
        # X_test = [twt for twts in X_test for twt in twts]

        X_test_bow = bag_of_words(X_test)
        X_test_tensor = torch.tensor(X_test_bow[:(len(y_test))])
        y_test_tensor = torch.tensor(y_test)
        test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
        test_loader = DataLoader(dataset=test_dataset,batch_size=50, num_workers=10)
    
        print('training')
        TrainingLoop(text_classifier.to(DEVICE), loss_fn, optimizer, train_loader, test_loader, epochs)

        torch.save(text_classifier.state_dict(), 'text_classifier.pth')

        # Y_actual, Y_preds = MakePredictions(text_classifier, test_loader)
        # print(Y_preds)
        
        # for key, value in zip(test_tweet_id[i], Y_preds):
        #     print(value)
        #     if value == 0:
        #         y_preds[key[0]] = 'positive'
        #     elif value == 1:
        #         y_preds[key[0]] = 'negative'
        #     elif value == 2:
        #         y_preds[key[0]] = 'neutral'
        # id_preds.append(y_preds)
        # break
    return id_preds

In [15]:
# create_LSTM_model(X_train,y_train,X_testsets,y_testsets)

In [16]:
def get_id_preds(model,X_testsets,features):
    id_preds = []
    y_preds = {}
    for i in range(len(X_testsets)):
        X_test = X_testsets[i]
        if features == 'bow':
            X_test_feature = bag_of_words(X_test)
        elif features == 'bert':
            X_test_feature = tokenize_tweets_with_bert(X_test)
            X_test_feature =  X_test_feature.reshape(X_test_feature.shape[0], -1)
        Y_preds = model.predict(np.asarray(X_test_feature))
        for key, value in zip(test_tweet_id[i], Y_preds):
            if value == 0:
                y_preds[key[0]] = 'positive'
            elif value == 1:
                y_preds[key[0]] = 'negative'
            elif value == 2:
                y_preds[key[0]] = 'neutral'
        id_preds.append(y_preds)
    return id_preds

#### Build sentiment classifiers
You need to create your own classifiers (at least 3 classifiers). For each classifier, you can choose between the bag-of-word features and the word-embedding-based features. Each classifier has to be evaluated over 3 test sets. Make sure your classifier produce consistent performance across the test sets. Marking will be based on the performance over all 5 test sets (2 of them are not provided to you).

In [17]:
# Buid traditional sentiment classifiers. An example classifier name 'svm' is given
# in the code below. You should replace the other two classifier names
# with your own choices. For features used for classifier training, 
# the 'bow' feature is given in the code. But you could also explore the 
# use of other features.['svm', 'maxent', 'LSTM']
for classifier in ['maxent','svm']:
    for features in ['bow','bert']:
        # Skeleton: Creation and training of the classifiers
        if classifier == 'svm':
            print('Training ' + classifier)
            #Select feature and format training data to match
            if features == 'bow':
                X_train_svm = np.asarray(bag_of_words(X_train))
            elif features == 'bert':
                X_train_svm = tokenize_tweets_with_bert(X_train)
                X_train_svm = X_train_svm.reshape(X_train_svm.shape[0], -1)
            svm_classifier = svm.SVC(kernel='linear')
            svm_classifier.fit(X_train_svm, y_train) 
            id_preds = get_id_preds(svm_classifier,X_testsets,features)
        elif classifier == 'maxent':
            print('Training ' + classifier)
            if features == 'bow':
                X_train_maxent = np.asarray(bag_of_words(X_train))
            elif features == 'bert':
                X_train_maxent = tokenize_tweets_with_bert(X_train)
                X_train_maxent = X_train_maxent.reshape(X_train_maxent.shape[0], -1)
            maxent = linear_model.LogisticRegression()
            maxent.fit(np.asarray(X_train_maxent),y_train)
            id_preds = get_id_preds(maxent,X_testsets,features)
        elif classifier == 'LSTM':
            print('Training ' + classifier)
            # write the LSTM classifier here
            # a) Build an embedding matrix 
            # b) Build and train a neural model built on LSTM.
            if features == 'bow':
                id_preds = create_LSTM_model(X_train,y_train,X_testsets,y_testsets)
            if features == 'bert':
                pass
        else:
            print('Unknown classifier name' + classifier)
            continue

        all_preds = []
         #Predition performance of the classifiers
        for i,testset in enumerate(testsets):
            id_preds_test = id_preds[i]
            all_preds.append(id_preds_test)
            testset_name = testset
            testset_path = join('semeval-tweets', testset_name)
            evaluate(id_preds, testset_path, features + '-' + classifier)
            confusion(id_preds,testset, classifier)

            # macro_f1 = f1_score(y_testsets_f1, all_preds_f1, average='macro')
            # print("Macro-averaged F1 score:", macro_f1)

Training maxent




semeval-tweets\twitter-test1.txt (bow-maxent): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.416     0.158     0.426     

semeval-tweets\twitter-test2.txt (bow-maxent): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.530     0.109     0.361     

semeval-tweets\twitter-test3.txt (bow-maxent): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.434     0.153     0.413     

Training maxent
semeval-tweets\twitter-test1.txt (bert-maxent): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.416     0.158     0.426     

semeval-tweets\twitter-test2.txt (bert-maxent): 0.000
            positive  negative  neutr



semeval-tweets\twitter-test1.txt (bow-svm): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.416     0.158     0.426     

semeval-tweets\twitter-test2.txt (bow-svm): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.530     0.109     0.361     

semeval-tweets\twitter-test3.txt (bow-svm): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.434     0.153     0.413     

Training svm
semeval-tweets\twitter-test1.txt (bert-svm): 0.000
            positive  negative  neutral
positive    0.000     0.000     0.000     
negative    0.000     0.000     0.000     
neutral     0.416     0.158     0.426     

semeval-tweets\twitter-test2.txt (bert-svm): 0.000
            positive  negative  neutral
positive    0.0