## Sentiment Analysis with Logistic Regression:
1. For a given text, extract features for Logistic Regression
2. Implement Logistic Regression
3. Apply Logistic Regression on an NLP task
4. Test and perform error analysis

Learned and implemented as a part of NLP course on Coursera

In [1]:
# Import necessary packages
import re
import math
import string
import numpy as np
import pandas as pd

import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.corpus import twitter_samples
from nltk.tokenize import TweetTokenizer

In [2]:
# Get sample tweets data & stopwords
nltk.download('twitter_samples')
nltk.download('stopwords')

[nltk_data] Downloading package twitter_samples to /root/nltk_data...
[nltk_data]   Package twitter_samples is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

### Function to process a given tweet to be used to train with Logistic Regression

In [3]:
# Input: A string (tweet)
# Output: A list of words containing the processed tweet

def process_tweet(tweet):

  stemmer = PorterStemmer()
  stopwords_english = stopwords.words('english')

  # Remove stock market tickers like $GE
  tweet = re.sub(r'\$\w*', '', tweet)

  # Remove retweet text "RT"
  tweet = re.sub(r'^RT[\s]+', '', tweet)

  # Remove hyperlinks
  tweet = re.sub(r'https?://[^\s\n\r]+', '', tweet)

  # Remove hash sign
  tweet = re.sub(r'#', '', tweet)

  # Tokenize tweets
  tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True, reduce_len=True)
  tweet_tokens = tokenizer.tokenize(tweet)

  tweets_clean = []
  for word in tweet_tokens:
    if word not in stopwords_english and word not in string.punctuation:  
      # Remove stopwords & punctuation
      # Obtain word stem
      stem_word = stemmer.stem(word)
      tweets_clean.append(stem_word)

  return tweets_clean


### Function to build the frequency map

In [4]:
# Input: List of tweets and their corresponding labels (0 or 1)
# Output: A frequency map of the form <(word, sentiment) : count>

def build_freqs(tweets, labels):
  # Convert np array to list since zip needs an iterable
  # The squeeze is necessary or the list ends up with one element
  # Also note that this is just a NOP if labels is already a list
  labels_list = np.squeeze(labels).tolist()

  freqs = {}
  for label, tweet in zip(labels_list, tweets):
    for word in process_tweet(tweet):
      pair = (word, label)
      if pair in freqs:
        freqs[pair] += 1
      else:
        freqs[pair] = 1

  return freqs
  

### Function to generate Sigmoid

In [5]:
# Input: z = 𝜃0𝑥0+𝜃1𝑥1+𝜃2𝑥2+...𝜃𝑁𝑥𝑁, may be a scalar or an array
# Output: h = sigmoid of z

def sigmoid(z): 

  h = None
  z_type = type(z)
  
  # 1. Assuming z is scalar, we return a scalar
  if z_type == int or z_type == float:
    h = 1 / (1 + math.exp(-1 * z))
  
  # 2. Assuming z is an array: Then, h is also an array of sigmoid values
  else:
    h = []
    for each_z in z:
      sigmoid_value = 1 / (1 + math.exp(-1 * each_z))
      h.append(sigmoid_value)
    h = np.array(h)
  
  return h


### Function to implement gradient descent

In [6]:
# Input:
# x: matrix of features which is (m,n+1)
# y: corresponding labels of the input matrix x, dimensions (m,1)
# theta: weight vector of dimension (n+1,1)
# alpha: learning rate
# num_iters: number of iterations you want to train your model for

# Output:
# J: Final cost
# theta: Final weight vector
def gradientDescent(x, y, theta, alpha, num_iters):
  
  m = len(x)
  for i in range(0, num_iters):
    z = x @ theta
    h = sigmoid(z)
    
    # Calculate the cost function
    log_h = []
    log_1_minus_h = []
    for each_h in h:
      log_h.append(math.log(each_h))    
      log_1_minus_h.append(math.log(1 - each_h))
    
    ones_matrix_of_size_mx1 = np.ones((m, 1))
    one_minus_y = ones_matrix_of_size_mx1 - y
    
    # Compute cost: J
    J = (-1 / m) * ( (np.transpose(y) @ np.array(log_h)) + (np.transpose(one_minus_y) @ np.array(log_1_minus_h)) )

    # Update the weights theta
    theta = theta - ( (alpha / m) * (np.transpose(x) @ (np.transpose(np.matrix(h)) - y)) )
      
  J = float(J)
  theta = np.array(theta)
  
  return J, theta


### Extract features for Logistic Regression

In [7]:
# Input: 
# tweet: a list of words for one tweet
# freqs: a dictionary corresponding to the frequencies of each tuple (word, label)

# Output: x = A feature vector of dimension (1,3)

def extract_features(tweet, freqs, process_tweet=process_tweet):
    
  # process_tweet tokenizes, stems, and removes stopwords
  word_l = process_tweet(tweet)
  
  # 3 elements in the form of a 1 x 3 vector
  x = np.zeros((1, 3)) 
  
  # Bias term is set to 1
  x[0,0] = 1 
  
  # Loop through each word in the list of words
  for word in word_l:
      
    # Increment the word count for the positive label 1
    x[0,1] += freqs[(word, 1)] if (word, 1) in freqs.keys() else 0
    
    # Increment the word count for the negative label 0
    x[0,2] += freqs[(word, 0)] if (word, 0) in freqs.keys() else 0
      
  assert(x.shape == (1, 3))
  return x


### Function to predict sentiment of a tweet

In [8]:
# Input: 
# tweet: a string
# freqs: a dictionary corresponding to the frequencies of each tuple (word, label)
# theta: (3,1) vector of weights

# Output: y_pred = the probability of a tweet being positive or negative

def predict_tweet(tweet, freqs, theta):
    
  # Extract the features of the tweet and store it into x
  x = extract_features(tweet, freqs)
  
  # Make the prediction using x and theta
  logits = x @ theta
  y_pred = np.matrix(sigmoid(logits))

  return y_pred


### Function to test Logistic Regression

In [9]:
# Input: 
# test_x: a list of tweets
# test_y: (m, 1) vector with the corresponding labels for the list of tweets
# freqs: a dictionary with the frequency of each pair (or tuple)
# theta: weight vector of dimension (3, 1)

# Output: accuracy = (# of tweets classified correctly) / (total # of tweets)

def test_logistic_regression(test_x, test_y, freqs, theta, predict_tweet=predict_tweet):
    
  # List for storing predictions
  y_hat = []
  
  for tweet in test_x:
    # Get the label prediction for the tweet
    y_pred = predict_tweet(tweet, freqs, theta)
    y_pred = y_pred[0]
    
    if y_pred > 0.5:
      # Append 1.0 to the list
      y_hat.append(1.0)
    else:
      # Append 0 to the list
      y_hat.append(0.0)

  # With the above implementation, y_hat is a list, but test_y is (m,1) array
  # Convert both to one-dimensional arrays in order to compare them using the '==' operator
  y_hat = np.array(y_hat)
  correct, m = 0, len(test_x)

  for idx in range(0, m):
    correct += 1 if y_hat[idx] == test_y[idx] else 0
  accuracy = np.float64(np.array([correct / m]))

  return accuracy


In [10]:
# Select the set of positive and negative tweets
all_positive_tweets = twitter_samples.strings('positive_tweets.json')
all_negative_tweets = twitter_samples.strings('negative_tweets.json')

# Split the data into two pieces, one for training and one for testing (validation set) 
test_pos = all_positive_tweets[4000:]
train_pos = all_positive_tweets[:4000]
test_neg = all_negative_tweets[4000:]
train_neg = all_negative_tweets[:4000]

train_x = train_pos + train_neg 
test_x = test_pos + test_neg

# Combine positive and negative labels
train_y = np.append(np.ones((len(train_pos), 1)), np.zeros((len(train_neg), 1)), axis=0)
test_y = np.append(np.ones((len(test_pos), 1)), np.zeros((len(test_neg), 1)), axis=0)

# Print the shape train and test sets
print("train_y.shape = " + str(train_y.shape))
print("test_y.shape = " + str(test_y.shape))

# Create frequency dictionary
freqs = build_freqs(train_x, train_y)

# Check the output
print("type(freqs) = " + str(type(freqs)))
print("len(freqs) = " + str(len(freqs.keys())))


train_y.shape = (8000, 1)
test_y.shape = (2000, 1)
type(freqs) = <class 'dict'>
len(freqs) = 11428


In [11]:
# Collect the features 'x' and stack them into a matrix 'X'
X = np.zeros((len(train_x), 3))
for i in range(len(train_x)):
  X[i, :]= extract_features(train_x[i], freqs)

# Training labels corresponding to X
Y = train_y

# Apply gradient descent
J, theta = gradientDescent(X, Y, np.zeros((3, 1)), 1e-9, 1500)
print(f"The cost after training is {J:.8f}.")
print(f"The resulting vector of weights is {[round(t, 8) for t in np.squeeze(theta)]}")


The cost after training is 0.22521260.
The resulting vector of weights is [6e-08, 0.0005382, -0.0005583]


In [12]:
# Error analysis
print('Label Predicted Tweet')
for x,y in zip(test_x,test_y):
  y_hat = predict_tweet(x, freqs, theta)

  if np.abs(y - (y_hat > 0.5)) > 0:
    print('THE TWEET IS:', x)
    print('THE PROCESSED TWEET IS:', process_tweet(x))
    print('%d\t%0.8f\t%s' % (y, y_hat, ' '.join(process_tweet(x)).encode('ascii', 'ignore')))

# Prediction
my_tweet = 'This is a ridiculously bright movie. The plot was terrible and I was sad until the ending!'
print(process_tweet(my_tweet))

y_hat = predict_tweet(my_tweet, freqs, theta)
print(y_hat)

if y_hat > 0.5:
  print('Positive sentiment')
else: 
  print('Negative sentiment')


Label Predicted Tweet
THE TWEET IS: @MarkBreech Not sure it would be good thing 4 my bottom daring 2 say 2 Miss B but Im gonna be so stubborn on mouth soaping ! #NotHavingit :p
THE PROCESSED TWEET IS: ['sure', 'would', 'good', 'thing', '4', 'bottom', 'dare', '2', 'say', '2', 'miss', 'b', 'im', 'gonna', 'stubborn', 'mouth', 'soap', 'nothavingit', ':p']
1	0.48942982	b'sure would good thing 4 bottom dare 2 say 2 miss b im gonna stubborn mouth soap nothavingit :p'
THE TWEET IS: I'm playing Brain Dots : ) #BrainDots
http://t.co/UGQzOx0huu
THE PROCESSED TWEET IS: ["i'm", 'play', 'brain', 'dot', 'braindot']
1	0.48418982	b"i'm play brain dot braindot"
THE TWEET IS: I'm playing Brain Dots : ) #BrainDots http://t.co/aOKldo3GMj http://t.co/xWCM9qyRG5
THE PROCESSED TWEET IS: ["i'm", 'play', 'brain', 'dot', 'braindot']
1	0.48418982	b"i'm play brain dot braindot"
THE TWEET IS: I'm playing Brain Dots : ) #BrainDots http://t.co/R2JBO8iNww http://t.co/ow5BBwdEMY
THE PROCESSED TWEET IS: ["i'm", 'play', 