<a href="https://colab.research.google.com/github/Avisha-7/Twitter-Sentiment-Analysis/blob/main/Twitter_Sentiment_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Loading the dataset from Google drive

In [None]:
# Import PyDrive and associated libraries
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
file_id = laggVyWshwcyP6kEI-y_W3P8D26sz
downloaded = drive.CreateFile({'id': file_id})

# Save file in Colab memory
downloaded.GetContentFile('tweet_data.csv')  

# Preprocessing Data

In [None]:
import pandas as pd
import numpy as np

In [None]:
df = pd.read_csv("tweet_data.csv")

In [None]:
# checking data entry
#df.sample(10) 

In [None]:
# Verifying the total number of tweets
# print("Number of tweets: {}".format(len(df)))

# Dataset Visualization

In [None]:
import matplotlib.pyplot as plt

In [None]:
sentiment_count = df["sentiment"].value_counts()
plt.pie(sentiment_count, labels=sentiment_count.index,
        autopct='%1.1f%%', shadow=True, startangle=140)
plt.show()

In [None]:
print("Number of + tweets: {}".format(df[df["sentiment"]=="positive"].count()[0]))
print("Number of - tweets: {}".format(df[df["sentiment"]=="negative"].count()[0]))

In [None]:
from wordcloud import WordCloud

In [None]:
# to visualise the most recurrent words in the text corpus with positive sentiment

pos_tweets = df[df["sentiment"]=="positive"]
txt = " ".join(tweet.lower() for tweet in pos_tweets["tweet_text"])
wordcloud = WordCloud().generate(txt)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.show()

In [None]:
# to visualise the most recurrent words in the text corpus with negative sentiment

neg_tweets = df[df["sentiment"]=="negative"]
txt = " ".join(tweet.lower() for tweet in neg_tweets["tweet_text"])
wordcloud = WordCloud().generate(txt)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.show()

# Text Normalization

In [None]:
import re

In [None]:
# handle RT tag by replacing occurences of RT with a default value, i.e., " "
def replace_retweet(tweet, default_replace=""):
  tweet = re.sub('RT\s+', default_replace, tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(replace_retweet(tweet)))

In [None]:
#handle user tag by replacing @ with default value
def replace_user(tweet, default_replace="twitteruser"):
  tweet = re.sub('\B@\w+', default_replace, tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(replace_user(tweet)))

In [None]:
pip install emoji --upgrade

In [None]:
import emoji

In [None]:
# handle emojis by replacing them by meaningful text
def demojize(tweet):
  tweet = emoji.demojize(tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(demojize(tweet)))

In [None]:
# handle url by replacing http:// or https:// with a default value
def replace_url(tweet, default_replace=""):
  tweet = re.sub('(http|https):\/\/\S+', default_replace, tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(replace_url(tweet)))

In [None]:
# handle hashtag by replacing occurrences of #value
def replace_hashtag(tweet, default_replace=""):
  tweet = re.sub('#+', default_replace, tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(replace_hashtag(tweet)))

In [None]:
tweet = "LOOOOOOOOK at this ... I'd like it so much!"

In [None]:
# converting tweets to lowercase
def to_lowercase(tweet):
  tweet = tweet.lower()
  return tweet

In [None]:
print("Processed tweet: {}".format(to_lowercase(tweet)))

In [None]:
#handling repitition of characters 
def word_repetition(tweet):
  tweet = re.sub(r'(.)\1+', r'\1\1', tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(word_repetition(tweet)))

In [None]:
#handling repitition of punctuations
def punct_repetition(tweet, default_replace=""):
  tweet = re.sub(r'[\?\.\!]+(?=[\?\.\!])', default_replace, tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(punct_repetition(tweet)))

In [None]:
pip install contractions

In [None]:
import contractions

In [None]:
print(contractions.contractions_dict)

In [None]:
# function to replace contractions with their extended forms 
def _fix_contractions(tweet):
  for k, v in contractions.contractions_dict.items():
    tweet = tweet.replace(k, v)
  return tweet

In [None]:
print("Processed tweet: {}".format(_fix_contractions(tweet)))

* Create a `_fix_contractions` function used to replace contractions with their extended forms by using the contractions package

In [None]:
# function to replace contractions with their extended forms 
def fix_contractions(tweet):
  tweet = contractions.fix(tweet)
  return tweet

In [None]:
print("Processed tweet: {}".format(fix_contractions(tweet)))

# Tokenization

In [None]:
pip install nltk

In [None]:
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')

In [None]:
def tokenize(tweet):
  tokens = word_tokenize(tweet)
  return tokens

In [None]:
print(type(tokenize(tweet)))
print("Tweet tokens: {}".format(tokenize(tweet)))

In [None]:
#custom tokenization
import string

In [None]:
print(string.punctuation)

In [None]:
from nltk.corpus import stopwords
nltk.download('stopwords')

In [None]:
stop_words = set(stopwords.words('english'))
print(stop_words)

In [None]:
stop_words.discard('not')
print(stop_words)

In [None]:
def custom_tokenize(tweet,
                    keep_punct = False,
                    keep_alnum = False,
                    keep_stop = False):
  
  token_list = word_tokenize(tweet)

  if not keep_punct:
    token_list = [token for token in token_list
                  if token not in string.punctuation]

  if not keep_alnum:
    token_list = [token for token in token_list if token.isalpha()]
  
  if not keep_stop:
    stop_words = set(stopwords.words('english'))
    stop_words.discard('not')
    token_list = [token for token in token_list if not token in stop_words]

  return token_list

In [None]:
print("Tweet tokens: {}".format(custom_tokenize(tweet, 
                                                keep_punct=True, 
                                                keep_alnum=True, 
                                                keep_stop=True)))
print("Tweet tokens: {}".format(custom_tokenize(tweet, keep_stop=True)))
print("Tweet tokens: {}".format(custom_tokenize(tweet, keep_alnum=True)))

# Stemming

In [None]:
from nltk.stem import PorterStemmer
from nltk.stem import LancasterStemmer
from nltk.stem.snowball import SnowballStemmer

In [None]:
porter_stemmer = PorterStemmer()
lancaster_stemmer = LancasterStemmer()
snoball_stemmer = SnowballStemmer('english')

In [None]:
# function to take the list of tokens as input and returns a list of stemmed tokens
def stem_tokens(tokens, stemmer):
  token_list = []
  for token in tokens:
    token_list.append(stemmer.stem(token))
  return token_list

In [None]:
print("Porter stems: {}".format(stem_tokens(tokens, porter_stemmer)))
print("Lancaster stems: {}".format(stem_tokens(tokens, lancaster_stemmer)))
print("Snowball stems: {}".format(stem_tokens(tokens, snoball_stemmer)))

In [None]:
# trying different functions
print("Porter stems: {}".format(stem_tokens(tokens, porter_stemmer)))
print("Lancaster stems: {}".format(stem_tokens(tokens, lancaster_stemmer)))
print("Snowball stems: {}".format(stem_tokens(tokens, snoball_stemmer)))

# Lemmatization

In [None]:
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
nltk.download('wordnet')

In [None]:
lemmatizer = WordNetLemmatizer()

In [None]:
def lemmatize_tokens(tokens, word_type, lemmatizer):
  token_list = []
  for token in tokens:
    token_list.append(lemmatizer.lemmatize(token, word_type[token]))
  return token_list

In [None]:
print("Tweet lemma: {}".format(
    lemmatize_tokens(tokens, word_type, lemmatizer)))

In [None]:
# function to process tweets end-to-end (compiling together)
def process_tweet(tweet, verbose=False):
  if verbose: print("Initial tweet: {}".format(tweet))

  ## Twitter Features
  tweet = replace_retweet(tweet) # replace retweet
  tweet = replace_user(tweet, "") # replace user tag
  tweet = replace_url(tweet) # replace url
  tweet = replace_hashtag(tweet) # replace hashtag
  if verbose: print("Post Twitter processing tweet: {}".format(tweet))

  ## Word Features
  tweet = to_lowercase(tweet) # lower case
  tweet = fix_contractions(tweet) # replace contractions
  tweet = punct_repetition(tweet) # replace punctuation repetition
  tweet = word_repetition(tweet) # replace word repetition
  tweet = demojize(tweet) # replace emojis
  if verbose: print("Post Word processing tweet: {}".format(tweet))

  ## Tokenization & Stemming
  tokens = custom_tokenize(tweet, keep_alnum=False, keep_stop=False) # tokenize
  stemmer = SnowballStemmer("english") # define stemmer
  stem = stem_tokens(tokens, stemmer) # stem tokens

  return stem

In [None]:
print(process_tweet(complex_tweet, verbose=False))

In [None]:
import random

In [None]:
for i in range(5):
  tweet_id = random.randint(0,len(df))
  tweet = df.iloc[tweet_id]["tweet_text"]
  print(process_tweet(tweet, verbose=True))
  print("\n")

# Text Representation

Processing Tweets


In [None]:
pip install -U scikit-learn

In [None]:
df["tokens"] = df["tweet_text"].apply(process_tweet)
df["tweet_sentiment"] = df["sentiment"].apply(lambda i: 1
                                              if i == "positive" else 0)
df.head(10)

In [None]:
# converting DataFrame to two lists: one for the tweet tokens (X) and one for the tweet sentiment (y)
X = df["tokens"].tolist()
y = df["tweet_sentiment"].tolist()

In [None]:
print(X)
print(y)

In [None]:
# function used to build a dictionary with the word and sentiment as index and the count of occurence as value
def build_freqs(tweet_list, sentiment_list):
  freqs = {}
  for tweet, sentiment in zip(tweet_list, sentiment_list):
    for word in tweet:
      pair = (word, sentiment)
      if pair in freqs:
        freqs[pair] += 1
      else:
        freqs[pair] = 1
  return freqs

In [None]:
freqs = build_freqs(corpus, sentiment)

In [None]:
print(freqs)

In [None]:
freqs_all = build_freqs(X, y) #entire dataset

In [None]:
print("Frequency of word 'love' in + tweets: {}".format(freqs_all[("love", 1)]))
print("Frequency of word 'love' in - tweets: {}".format(freqs_all[("love", 0)]))

In [None]:
def tweet_to_freq(tweet, freqs):
  x = np.zeros((2,))
  for word in tweet:
    if (word, 1) in freqs:
      x[0] += freqs[(word, 1)]
    if (word, 0) in freqs:
      x[1] += freqs[(word, 0)]
  return x

In [None]:
fig, ax = plt.subplots(figsize = (8, 8))

word1 = "happi"
word2 = "sad"

def word_features(word, freqs):
  x = np.zeros((2,))
  if (word, 1) in freqs:
    x[0] = np.log(freqs[(word, 1)] + 1)
  if (word, 0) in freqs:
    x[1] = np.log(freqs[(word, 0)] + 1)
  return x

x_axis = [word_features(word, freqs_all)[0] for word in [word1, word2]]
y_axis = [word_features(word, freqs_all)[1] for word in [word1, word2]]

ax.scatter(x_axis, y_axis)  

plt.xlabel("Log Positive count")
plt.ylabel("Log Negative count")

ax.plot([0, 9], [0, 9], color = 'red')
plt.text(x_axis[0], y_axis[0], word1)
plt.text(x_axis[1], y_axis[1], word2)
plt.show()

Bag of Words

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

In [None]:
# function used to build the Bag-of-Words vectorizer with the corpus
def fit_cv(tweet_corpus):
  cv_vect = CountVectorizer(tokenizer=lambda x: x,
                            preprocessor=lambda x: x)
  cv_vect.fit(tweet_corpus)
  return cv_vect

In [None]:
cv_vect = fit_cv(corpus)

In [None]:
ft = cv_vect.get_feature_names()

In [None]:
print("There are {} features in this corpus".format(len(ft)))
print(ft)

In [None]:
cv_mtx = cv_vect.transform(corpus)

In [None]:
print("Matrix shape is: {}".format(cv_mtx.shape))

In [None]:
cv_mtx.toarray()

Term Frequency – Inverse Document Frequency (TF-IDF)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
# function used to build the TF-IDF vectorizer with the corpus
def fit_tfidf(tweet_corpus):
  tf_vect = TfidfVectorizer(preprocessor=lambda x: x,
                            tokenizer=lambda x: x)
  tf_vect.fit(tweet_corpus)
  return tf_vect

In [None]:
tf_vect = fit_tfidf(corpus)
tf_mtx = tf_vect.transform(corpus)

In [None]:
ft = tf_vect.get_feature_names()

In [None]:
print("There are {} features in this corpus".format(len(ft)))
print(ft)

In [None]:
print(tf_mtx.shape)

In [None]:
tf_mtx.toarray()


# Sentiment Model

In [None]:
import seaborn as sn

def plot_confusion(cm):
  plt.figure(figsize = (5,5))
  sn.heatmap(cm, annot=True, cmap="Blues", fmt='.0f')
  plt.xlabel("Prediction")
  plt.ylabel("True value")
  plt.title("Confusion Matrix")
  return sn

Train/ Test Split

In [None]:
print(X)
print(y)

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    random_state=0,
                                                    train_size=0.80)

In [None]:
print("Size of X_train: {}".format(len(X_train)))
print("Size of y_train: {}".format(len(y_train)))
print("\n")
print("Size of X_test: {}".format(len(X_test)))
print("Size of y_test: {}".format(len(y_test)))
print("\n")
print("Train proportion: {:.0%}".format(len(X_train)/
                                        (len(X_train)+len(X_test))))

In [None]:
id = random.randint(0,len(X_train))
print("Train tweet: {}".format(X_train[id]))
print("Sentiment: {}".format(y_train[id]))

# Logistic Regression


Model

In [None]:
from sklearn.linear_model import LogisticRegression

In [None]:
def fit_lr(X_train, y_train):
  model = LogisticRegression()
  model.fit(X_train, y_train)
  return model

Frequency


In [None]:
freqs = build_freqs(X_train, y_train)
X_train_pn = [tweet_to_freq(tweet, freqs) for tweet in X_train]
X_test_pn = [tweet_to_freq(tweet, freqs) for tweet in X_test]

In [None]:
model_lr_pn = fit_lr(X_train_pn, y_train)
print(model_lr_pn.coef_, model_lr_pn.intercept_)

Count Vector


In [None]:
cv = fit_cv(X_train)
X_train_cv = cv.transform(X_train)
X_test_cv = cv.transform(X_test)

In [None]:
model_lr_cv = fit_lr(X_train_cv, y_train)

TF-IDF


In [None]:
tf = fit_tfidf(X_train)
X_train_tf = tf.transform(X_train)
X_test_tf = tf.transform(X_test)

In [None]:
model_lr_tf = fit_lr(X_train_tf, y_train)

# Performance Metrics

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

Positive/Negative frequencies

In [None]:
y_pred_lr_pn = model_lr_pn.predict(X_test_pn)

In [None]:
print("LR Model Accuracy: {:.2%}".format(accuracy_score(y_test, y_pred_lr_pn)))

In [None]:
plot_confusion(confusion_matrix(y_test, y_pred_lr_pn))

In [None]:
y_pred_lr_cv = model_lr_cv.predict(X_test_cv)

In [None]:
print("LR Model Accuracy: {:.2%}".format(accuracy_score(y_test, y_pred_lr_cv)))

In [None]:
plot_confusion(confusion_matrix(y_test, y_pred_lr_cv))

In [None]:
y_pred_lr_tf = model_lr_tf.predict(X_test_tf)

In [None]:
print("LR Model Accuracy: {:.2%}".format(accuracy_score(y_test, y_pred_lr_tf)))

In [None]:
plot_confusion(confusion_matrix(y_test, y_pred_lr_tf))