<a href="https://colab.research.google.com/github/Pragnya08/Sentiment-Classification-for-Social-Media/blob/main/sentiment_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Import necessary packages


In [None]:
# Import necessary packages
import re
import os
from os.path import join
import numpy as np
import pandas as pd
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression
import nltk
import torch
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import pickle


In [None]:
training_dataset = '/content/semeval-tweets/twitter-training-data.txt'
dev_dataset = '/content/semeval-tweets/twitter-dev-data.txt'
glove_path = '/content/glove.6B.100d.txt'
# Define test sets
testsets = ['/content/semeval-tweets/twitter-test1.txt', '/content/semeval-tweets/twitter-test2.txt', '/content/semeval-tweets/twitter-test3.txt']

In [None]:
# Skeleton: Evaluation code for the test sets
def read_test(testset):
    '''
    readin the testset and return a dictionary
    :param testset: str, the file name of the testset to compare
    '''
    id_gts = {}
    with open(testset, 'r', encoding='utf8') as fh:
        for line in fh:
            fields = line.split('\t')
            tweetid = fields[0]
            gt = fields[1]

            id_gts[tweetid] = gt

    return id_gts


def confusion(id_preds, testset, classifier):
    '''
    print the confusion matrix of {'positive', 'netative'} between preds and testset
    :param id_preds: a dictionary of predictions formated as {<tweetid>:<sentiment>, ... }
    :param testset: str, the file name of the testset to compare
    :classifier: str, the name of the classifier
    '''
    id_gts = read_test(testset)

    gts = []
    for m, c1 in id_gts.items():
        if c1 not in gts:
            gts.append(c1)

    gts = ['positive', 'negative', 'neutral']

    conf = {}
    for c1 in gts:
        conf[c1] = {}
        for c2 in gts:
            conf[c1][c2] = 0

    for tweetid, gt in id_gts.items():
        if tweetid in id_preds:
            pred = id_preds[tweetid]
        else:
            pred = 'neutral'
        conf[pred][gt] += 1

    print(''.ljust(12) + '  '.join(gts))

    for c1 in gts:
        print(c1.ljust(12), end='')
        for c2 in gts:
            if sum(conf[c1].values()) > 0:
                print('%.3f     ' % (conf[c1][c2] / float(sum(conf[c1].values()))), end='')
            else:
                print('0.000     ', end='')
        print('')

    print('')


def evaluate(id_preds, testset, classifier):
    '''
    print the macro-F1 score of {'positive', 'netative'} between preds and testset
    :param id_preds: a dictionary of predictions formated as {<tweetid>:<sentiment>, ... }
    :param testset: str, the file name of the testset to compare
    :classifier: str, the name of the classifier
    '''
    id_gts = read_test(testset)

    acc_by_class = {}
    for gt in ['positive', 'negative', 'neutral']:
        acc_by_class[gt] = {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0}

    catf1s = {}

    ok = 0
    for tweetid, gt in id_gts.items():
        if tweetid in id_preds:
            pred = id_preds[tweetid]
        else:
            pred = 'neutral'

        if gt == pred:
            ok += 1
            acc_by_class[gt]['tp'] += 1
        else:
            acc_by_class[gt]['fn'] += 1
            acc_by_class[pred]['fp'] += 1

    catcount = 0
    itemcount = 0
    macro = {'p': 0, 'r': 0, 'f1': 0}
    micro = {'p': 0, 'r': 0, 'f1': 0}
    semevalmacro = {'p': 0, 'r': 0, 'f1': 0}

    microtp = 0
    microfp = 0
    microtn = 0
    microfn = 0
    for cat, acc in acc_by_class.items():
        catcount += 1

        microtp += acc['tp']
        microfp += acc['fp']
        microtn += acc['tn']
        microfn += acc['fn']

        p = 0
        if (acc['tp'] + acc['fp']) > 0:
            p = float(acc['tp']) / (acc['tp'] + acc['fp'])

        r = 0
        if (acc['tp'] + acc['fn']) > 0:
            r = float(acc['tp']) / (acc['tp'] + acc['fn'])

        f1 = 0
        if (p + r) > 0:
            f1 = 2 * p * r / (p + r)

        catf1s[cat] = f1

        n = acc['tp'] + acc['fn']

        macro['p'] += p
        macro['r'] += r
        macro['f1'] += f1

        if cat in ['positive', 'negative']:
            semevalmacro['p'] += p
            semevalmacro['r'] += r
            semevalmacro['f1'] += f1

        itemcount += n

    micro['p'] = float(microtp) / float(microtp + microfp)
    micro['r'] = float(microtp) / float(microtp + microfn)
    micro['f1'] = 2 * float(micro['p']) * micro['r'] / float(micro['p'] + micro['r'])

    semevalmacrof1 = semevalmacro['f1'] / 2

    print(testset + ' (' + classifier + '): %.3f' % semevalmacrof1)

### Preprocessing

In [None]:

nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

# Preprocessing function
def preprocess_tweet(tweet):
    tweet = re.sub(r"http\S+|www\S+|https\S+", '', tweet, flags=re.MULTILINE)
    tweet = re.sub(r'\@\w+|\#', '', tweet)
    tweet = re.sub(r'[^\w\s]', '', tweet)
    tweet = re.sub(r'\d+', '', tweet)  # Remove digits
    tweet = tweet.lower()
    stop_words = set(stopwords.words('english'))
    word_tokens = word_tokenize(tweet)
    filtered_tweet = [w for w in word_tokens if not w in stop_words]
    lemmatizer = WordNetLemmatizer()
    lemmatized_tweet = [lemmatizer.lemmatize(w) for w in filtered_tweet]
    return ' '.join(lemmatized_tweet)




[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


#### Load training set, dev set and testing set


In [None]:
# Load training set, dev set and testing set
data = {}
tweetids = {}
tweetgts = {}
tweets = {}

for dataset in [training_dataset] + testsets:
    dataset_key = dataset.split('/')[-1]
    with open(dataset, 'r', encoding='utf8') as sentiment:
          if dataset_key not in data:
              data[dataset_key] = []
              tweets[dataset_key]= []
              tweetids[dataset_key] = []
              tweetgts[dataset_key] = []
          for line in sentiment:
              tweet_id, sentiment, tweet_text = line.strip().split('\t')
              preprocessed_text = preprocess_tweet(tweet_text)
              data[dataset_key].append((tweet_id, sentiment, preprocessed_text))
              tweets[dataset_key].append(preprocessed_text)
              tweetids[dataset_key].append(tweet_id)
              tweetgts[dataset_key].append(sentiment)



## Feature Extraction

In [None]:
# GloVe embeddings loader
def load_glove_embeddings(path):
    embeddings_dict = {}
    with open(path, 'r', encoding="utf-8") as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], "float32")
            embeddings_dict[word] = vector
    return embeddings_dict

def tweet_to_embedding(tweet, embeddings_dict, embedding_dim=100):
    words = tweet.split()
    accumulated_vector = np.zeros(embedding_dim)
    word_count = 0
    for word in words:
        if word in embeddings_dict:
            accumulated_vector += embeddings_dict[word]
            word_count += 1
    return accumulated_vector / word_count if word_count > 0 else accumulated_vector

glove_embeddings = load_glove_embeddings(glove_path)
glove_data = {}
for dataset_key in tweets.keys():
    glove_data[dataset_key] = np.array([tweet_to_embedding(tweet, glove_embeddings, 100) for tweet in tweets[dataset_key]])



#Bow
vectorizer = CountVectorizer()

X_bow_training = vectorizer.fit_transform(tweets['twitter-training-data.txt'])
X_train_glove = glove_data['twitter-training-data.txt']
y_train = np.array(tweetgts['twitter-training-data.txt'])


# Function to preprocess text and extract features for a given dataset
def preprocess_and_extract_features(dataset_path, vectorizer, glove_embeddings):
    # Initialize lists to hold tweets and their IDs
    tweets = []
    tweet_ids = []  # List to store tweet IDs

    with open(dataset_path, 'r', encoding='utf8') as file:
        for line in file:
            tweet_id, sentiment, tweet_text = line.strip().split('\t')
            preprocessed_text = preprocess_tweet(tweet_text)
            tweets.append(preprocessed_text)
            tweet_ids.append(tweet_id)  # Append the tweet ID

    # Extract BoW features
    X_bow = vectorizer.transform(tweets).toarray()  # Convert to dense array

    # Extract GloVe features
    X_glove = np.array([tweet_to_embedding(tweet, glove_embeddings) for tweet in tweets])

    return tweet_ids, X_bow, X_glove



In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    def forward(self, x):
      lstm_out, _ = self.lstm(x)
      output = self.fc(lstm_out)
      return output
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


#### Build sentiment classifiers


In [None]:

for classifier in ['svm','logistic_regression','LSTM']:
    for features in ['bow', 'glove']:


        if features == 'bow':
            X_train = X_bow_training
        elif features == 'glove':
            X_train = X_train_glove
        # Skeleton: Creation and training of the classifiers
        if classifier == 'svm':
            model_filename = classifier + '_' + features + '_model.pkl'
            if os.path.exists(model_filename):
              with open(model_filename, 'rb') as f:
                model = pickle.load(f)
              print(f"Model loaded .....'{model_filename}'")
            else:
              model = SVC(kernel='linear',C=1.0)
              model.fit(X_train, y_train)
              print('Training ' + classifier)
              with open(model_filename, 'wb') as f:
                pickle.dump(model, f)
              print(f'Model saved as {model_filename}')
        elif classifier == 'logistic_regression':
            model_filename = classifier + '_' + features + '_model.pkl'
            if os.path.exists(model_filename):
              with open(model_filename, 'rb') as f:
                model = pickle.load(f)
              print(f"Model loaded .....'{model_filename}'")
            else:
              model = LogisticRegression(max_iter=1000)
              model.fit(X_train, y_train)
              print('Training ' + classifier)
              with open(model_filename, 'wb') as f:
                pickle.dump(model, f)
              print(f'Model saved as {model_filename}')
        elif classifier == 'LSTM':
            # write the LSTM classifier here
            if features == 'bow':
                continue
            label_encoder = LabelEncoder()
            y_train_encoded = label_encoder.fit_transform(np.array(tweetgts['twitter-training-data.txt']))
            model_filename = classifier + '_' + features + '_model.pkl'
            if os.path.exists(model_filename):
                with open(model_filename, 'rb') as f:
                  model = pickle.load(f)
                print(f"Model loaded .....'{model_filename}'")
            else:
              print('Training ' + classifier)
              # Directly use LabelEncoder on your labels
              label_encoder = LabelEncoder()
              y_train_encoded = label_encoder.fit_transform(np.array(tweetgts['twitter-training-data.txt']))

              # Convert labels to tensor
              y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)

              # Convert GloVe features to tensor
              X_train_glove_tensor = torch.tensor(X_train, dtype=torch.float32)

              # Create dataset and dataloader for GloVe
              train_glove_dataset = TensorDataset(X_train_glove_tensor, y_train_tensor)
              train_glove_loader = DataLoader(train_glove_dataset, batch_size=64, shuffle=True,drop_last=True)

              # Define LSTM model
              input_dim = 100  # Assuming you're using GloVe vectors of 100 dimensions
              hidden_dim = 256
              output_dim = len(label_encoder.classes_)
              model = LSTMModel(input_dim, hidden_dim, output_dim).to(device)

              # Loss and optimizer
              loss_function = nn.CrossEntropyLoss()
              optimizer = torch.optim.Adam(model.parameters())

              # Train LSTM
              epochs = 5
              for epoch in range(epochs):
                model.train()
                for inputs, labels in train_glove_loader:
                    optimizer.zero_grad()
                    inputs, labels = inputs.to(device), labels.to(device)

                    outputs = model(inputs)

                    # Dynamically calculate loss based on the actual size of the batch
                    loss = loss_function(outputs, labels)
                    loss.backward()
                    optimizer.step()
                print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")
              with open(model_filename, 'wb') as f:
                pickle.dump(model,f)
              print(f'Model saved as {model_filename}')
        else:
          print('Unknown classifier name' + classifier)
          continue

        # Predition performance of the classifiers
        for testset in testsets:
            tweet_ids, X_bow_test, X_glove_test = preprocess_and_extract_features(testset, vectorizer, glove_embeddings)


            X_test = X_bow_test if features == 'bow' else X_glove_test

            if classifier == 'LSTM':
                X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
                model.eval()
                with torch.no_grad():
                    outputs = model(X_test_tensor)
                    _, predictions_numeric = torch.max(outputs, 1)
                    # Convert numeric predictions back to original labels
                    predictions_labels = label_encoder.inverse_transform(predictions_numeric.cpu().numpy())
                # Map tweet IDs to their predicted labels for evaluation
                id_preds = {tweet_id: pred_label for tweet_id, pred_label in zip(tweet_ids, predictions_labels)}
            else:
               predictions = model.predict(X_test)
               id_preds = {tweet_id: pred for tweet_id, pred in zip(tweet_ids, predictions)}

            testset_name = testset
            testset_path = join('semeval-tweets', testset_name)
            evaluate(id_preds, testset_path, features + '-' + classifier)

Model loaded .....'svm_bow_model.pkl'
/content/semeval-tweets/twitter-test1.txt (bow-svm): 0.557
/content/semeval-tweets/twitter-test2.txt (bow-svm): 0.573
/content/semeval-tweets/twitter-test3.txt (bow-svm): 0.526
Model loaded .....'svm_glove_model.pkl'
/content/semeval-tweets/twitter-test1.txt (glove-svm): 0.390
/content/semeval-tweets/twitter-test2.txt (glove-svm): 0.427
/content/semeval-tweets/twitter-test3.txt (glove-svm): 0.411
Model loaded .....'logistic_regression_bow_model.pkl'
/content/semeval-tweets/twitter-test1.txt (bow-logistic_regression): 0.552
/content/semeval-tweets/twitter-test2.txt (bow-logistic_regression): 0.576
/content/semeval-tweets/twitter-test3.txt (bow-logistic_regression): 0.536
Model loaded .....'logistic_regression_glove_model.pkl'
/content/semeval-tweets/twitter-test1.txt (glove-logistic_regression): 0.435
/content/semeval-tweets/twitter-test2.txt (glove-logistic_regression): 0.444
/content/semeval-tweets/twitter-test3.txt (glove-logistic_regression): 0.