In [69]:
import re
import string 
from nltk.corpus import stopwords 
from nltk.stem import PorterStemmer 
from nltk.tokenize import TweetTokenizer 
import numpy as np 

In [70]:
# Preprocessing tweets
def process_tweet(tweet):

    # Removing old style retweet text 'RT'
    tweet2 = re.sub(r'^RT[/s]','',tweet)

    # Removing URL's 
    tweet2 = re.sub(r'https?:\/\/.*[\r\n]*','',tweet2)

    # Removing '#'
    tweet2 = re.sub(r'#' ,'',tweet2)

    # Instantiate tokenizer class
    tokenizer = TweetTokenizer(preserve_case=False ,strip_handles=True ,reduce_len=True)

    # Tokenize tweets
    tweet_tokens = tokenizer.tokenize(tweet2)

    # Import stopwords
    stopwords_english = stopwords.words("english")

    # List of words without stopwords 
    tweets_clean = []
    for word in tweet_tokens:
        if word not in stopwords_english and word not in string.punctuation:
            tweets_clean.append(word)

    # Instantiate stemming class 
    stemmer = PorterStemmer()

    # List of stems in tweets 
    tweet_stems = []
    for word in tweets_clean:
        stem_word = stemmer.stem(word)
        tweet_stems.append(stem_word)

    return tweet_stems

In [71]:
# Frequency generating function
def build_freqs(tweets, ys):
    yslist = np.squeeze(ys).tolist() # squeezes ys into a 1-D array and then converts it into list.

    freqs = {}
    for y,tweet in zip(yslist ,tweets):  # zip creates pairs of elements.
        for word in process_tweet(tweet):
            pair = (word, y)
            freqs[pair] = freqs.get(pair, 0) + 1  # if pair is not present in the freqs dictionary 
                                                  # then initialize it to 0 and then update frequency
                                                  # count by 1. 
    return freqs


In [72]:
def sigmoid(z):
    h = 1/(1+np.exp(-z))  
    return h # Hypothesis

# h(x^(i), 0) = 1/(1+e^-(0^T*x^(i))) , this is basically y_pred

In [73]:
def gradientDescent(x, y, theta, learning_rate, num_iters):

    m = len(x)       

    for i in range(0, num_iters):

        z = np.dot(x, theta)
        h = sigmoid(z)

        J = (-1/m)*(np.dot(y.T ,np.log(h)) + (np.dot((1-y.T) ,np.log(1-h))))  # Cost Function.

        theta -= learning_rate*(1/m)*np.dot(x.T ,h-y)

    J = round(float(J.item()), 4)
    return J,theta 

# x = feature matrix = (m,n+1)
# y = target variable = (m,1)
# theta = weight matrix = (n+1,1)


In [74]:
def extract_features(tweet ,freqs):

    # pre-process tweet
    word_l = process_tweet(tweet)

    # 3 elements in the form of a 1x3 matrix
    x = np.zeros((1,3))

    # bias term is set to 1
    x[0,0] = 1

    # looping through each word
    for word in word_l:

        # increment the word count for positive label 1
        x[0,1] += freqs.get((word,1),0)   # returns 0 as default value if positive label is not present.

        # increment the word count for negative label 0
        x[0,2] += freqs.get((word,0),0)
    
    assert(x.shape == (1,3)) # Raises error if condition is not true

    return x

In [75]:
import nltk

nltk.download('twitter_samples')

from nltk.corpus import twitter_samples
all_positive_tweets=twitter_samples.strings('positive_tweets.json')
all_negative_tweets=twitter_samples.strings('negative_tweets.json')

[nltk_data] Downloading package twitter_samples to
[nltk_data]     C:\Users\ayush\AppData\Roaming\nltk_data...
[nltk_data]   Package twitter_samples is already up-to-date!


In [76]:
# Splitting the data into testing data and training data

test_pos = all_positive_tweets[4000:]
train_pos = all_positive_tweets[:4000]
test_neg = all_negative_tweets[4000:]
train_neg = all_negative_tweets[:4000]

train_x = train_pos + train_neg 
test_x = test_pos + test_neg 

# Combining the data for positive and negative labels
train_y = np.append(np.ones((len(train_pos) ,1)) , np.zeros((len(train_neg) ,1)) ,axis=0)
test_y = np.append(np.ones((len(test_pos) ,1)) , np.zeros((len(test_neg) ,1)) ,axis=0)

In [77]:
# Build frequency dictionary from training data
freqs = build_freqs(train_x, train_y)

In [78]:
# Build training feature matrix
X_train = np.zeros((len(train_x), 3))
for i, tweet in enumerate(train_x):
    X_train[i, :] = extract_features(tweet, freqs)

# Build test feature matrix
X_test = np.zeros((len(test_x), 3))
for i, tweet in enumerate(test_x):
    X_test[i, :] = extract_features(tweet, freqs)


In [79]:
# Initialize weights
theta = np.zeros((3, 1))

# Run gradient descent
J, theta = gradientDescent(X_train, train_y, theta, learning_rate=1e-9, num_iters=1500)
print("Training loss: ", J)


Training loss:  0.2422


In [80]:
# Predict using sigmoid
def predict(X, theta):
    return sigmoid(np.dot(X, theta)) >= 0.5

y_pred = predict(X_test, theta)
accuracy = np.mean(y_pred == test_y) * 100
print(f"Test Accuracy: {accuracy:.2f}%")


Test Accuracy: 99.50%
