In [None]:
import nltk
nltk.download('all')

In [None]:
import os
import time
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
drive_dir = "/content/drive/MyDrive/Internship Work/Sentiment Analysis (IMDB)"

# Reading data

Dataset can be obtained from https://www.kaggle.com/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews

In [None]:
imdb_data = pd.read_csv(os.path.join(drive_dir, 'IMDB Dataset.csv'))
imdb_data.head(10)

In [None]:
imdb_data.shape

In [None]:
# summary
imdb_data.describe()

In [None]:
# value counts
imdb_data['sentiment'].value_counts()

# Preparing data

In [None]:
import re
import string
from bs4 import BeautifulSoup

import nltk
from nltk.corpus import wordnet
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.tokenize.toktok import ToktokTokenizer
from nltk.stem.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer

In [None]:
tokenizer = ToktokTokenizer()
ps = nltk.porter.PorterStemmer()
lemmatizer = WordNetLemmatizer()
stopword_list = nltk.corpus.stopwords.words('english')

In [None]:
stop = set(stopwords.words('english'))
print(stop)

In [None]:
# Removing the html strips
def strip_html(text):
  soup = BeautifulSoup(text, "html.parser")
  return soup.get_text().lower()
    

# removing punctuation and non-alphabetic tokens
def clean_text(text):
  text = strip_html(text)
  # split sentences on dots and make a single sentence
  text = " ".join(text.split('.'))
  # split into words
  tokens = tokenizer.tokenize(text)
  # remove punctuation from each word
  table = str.maketrans('', '', string.punctuation)
  stripped = (w.translate(table) for w in tokens)
  # remove remaining tokens that are not alphabetic
  words = (word for word in stripped if word.isalpha())
  return words


def get_wordnet_pos(text):
  """Map POS tag to first character lemmatize() accepts"""
  tag = nltk.pos_tag([text])[0][1][0].upper()
  tag_dict = {"J": wordnet.ADJ,
              "N": wordnet.NOUN,
              "V": wordnet.VERB,
              "R": wordnet.ADV}

  return tag_dict.get(tag, wordnet.NOUN)


def lemmatize_with_pos(tokens):
  tokens = (lemmatizer.lemmatize(w, get_wordnet_pos(w)) for w in tokens)
  return " ".join(tokens)


def simple_stemmer(tokens):
  tokens = (ps.stem(word) for word in tokens)
  return " ".join(tokens)


#removing the stopwords
def remove_stopwords(tokens):
  filtered_tokens = [token for token in tokens if token.lower() not in stopword_list]
  # filtered_text = ' '.join(filtered_tokens)
  return filtered_tokens


def prepare_data(text):
  text = strip_html(text)
  tokens = clean_text(text)
  filtered_tokens = remove_stopwords(tokens)
  return " ".join(filtered_tokens)


def prepare_data_using_stemming(text):
  text = strip_html(text)
  tokens = clean_text(text)
  filtered_tokens = remove_stopwords(tokens)
  stemmed_text = simple_stemmer(filtered_tokens)
  return stemmed_text


def prepare_data_using_lemmatization(text):
  text = strip_html(text)
  tokens = clean_text(text)
  filtered_tokens = remove_stopwords(tokens)
  lemmatized_text = lemmatize_with_pos(filtered_tokens)
  return lemmatized_text


# Apply function on review column

start = time.time()
if os.path.exists(os.path.join(drive_dir, "IMDB Dataset Clean.csv")):
  print("Dataset exists")
  imdb_data_clean = pd.read_csv(os.path.join(drive_dir, "IMDB Dataset Clean.csv"))
else:
  print("Starting cleaning")
  imdb_data_clean = imdb_data.copy()
  imdb_data_clean['review'] = imdb_data_clean['review'].apply(prepare_data)
print("Finished in", time.time() - start)

# start = time.time()
# if os.path.exists(os.path.join(drive_dir, "IMDB Dataset Clean (Stemmed).csv")):
#   print("Stemmed dataset exists")
#   imdb_data_stem = pd.read_csv(os.path.join(drive_dir, "IMDB Dataset Clean (Stemmed).csv"))
# else:
#   print("Starting cleaning with stemming")
#   imdb_data_stem = imdb_data.copy()
#   imdb_data_stem['review'] = imdb_data_stem['review'].apply(prepare_data_using_stemming)
# print("Finished in", time.time() - start)


# start = time.time()
# if os.path.exists(os.path.join(drive_dir, "IMDB Dataset Clean (Lemmatized).csv")):
#   print("Lemmatized dataset exists")
#   imdb_data_lemma = pd.read_csv(os.path.join(drive_dir, "IMDB Dataset Clean (Lemmatized).csv"))
# else:
#   print("Starting cleaning with lemmatization")
#   imdb_data_lemma = imdb_data.copy()
#   imdb_data_lemma['review'] = imdb_data_lemma['review'].apply(prepare_data_using_lemmatization)
# print("Finished in", time.time() - start)

In [None]:
# imdb_data_clean.to_csv(os.path.join(drive_dir, "IMDB Dataset Clean.csv"), index=False)
# imdb_data_stem.to_csv(os.path.join(drive_dir, "IMDB Dataset Clean (Stemmed).csv"), index=False)
# imdb_data_lemma.to_csv(os.path.join(drive_dir, "IMDB Dataset Clean (Lemmatized).csv"), index=False)

# Machine Learning approach

In [None]:
imdb_data_final = imdb_data_clean.copy()
# imdb_data_final = imdb_data_stem.copy()
# imdb_data_final = imdb_data_lemma.copy()
imdb_data_final['review'].head(10)

#### Normalizing train reviews

In [None]:
norm_train_reviews=imdb_data_final.review[:40000]
norm_train_reviews[0]

#### Normalizing test reviews

In [None]:
norm_test_reviews=imdb_data_final.review[40000:]
norm_test_reviews[45005]

### Bag of Words model

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

In [None]:
#Count vectorizer for bag of words
cv=CountVectorizer(min_df=0,max_df=1,binary=False,ngram_range=(1,3))
#transformed train reviews
cv_train_reviews=cv.fit_transform(norm_train_reviews)
#transformed test reviews
cv_test_reviews=cv.transform(norm_test_reviews)

print('BOW_cv_train:',cv_train_reviews.shape)
print('BOW_cv_test:',cv_test_reviews.shape)

#### Term Frequency-Inverse Document Frequency model (TFIDF) model

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
#Tfidf vectorizer
tv=TfidfVectorizer(min_df=0,max_df=1,use_idf=True,ngram_range=(1,3))
#transformed train reviews
tv_train_reviews=tv.fit_transform(norm_train_reviews)
#transformed test reviews
tv_test_reviews=tv.transform(norm_test_reviews)
print('Tfidf_train:',tv_train_reviews.shape)
print('Tfidf_test:',tv_test_reviews.shape)

#### Labeling the sentiment text


In [None]:
from sklearn.preprocessing import LabelBinarizer

In [None]:
#labeling the sentiment data
lb=LabelBinarizer()

In [None]:
#transformed sentiment data
sentiment_data=lb.fit_transform(imdb_data_final['sentiment'])
print(sentiment_data.shape)



#### Split the sentiment data


In [None]:
#Spliting the sentiment data
train_sentiments=sentiment_data[:40000]
test_sentiments=sentiment_data[40000:]
print(train_sentiments)
print(test_sentiments)

## Building models

### Logistic Regression

In [None]:
from sklearn.linear_model import LogisticRegression

Logistic regression model for bag of words features

In [None]:
#training the model
lr = LogisticRegression(penalty='l2',
                        max_iter=500,
                        C=1,
                        random_state=42)
#Fitting the model for Bag of words
lr_bow = lr.fit(cv_train_reviews, np.ravel(train_sentiments))
print(lr_bow)

Logistic regression model for TF-IDF features

In [None]:
#training the model
lr = LogisticRegression(penalty='l2',
                        max_iter=500,
                        C=1,
                        random_state=42)
#Fitting the model for tfidf features
lr_tfidf = lr.fit(tv_train_reviews, np.ravel(train_sentiments))
print(lr_tfidf)



Logistic regression model performane on test dataset


In [None]:
#Predicting the model for bag of words
lr_bow_predict=lr.predict(cv_test_reviews)
print("lr_bow_pred", lr_bow_predict)

#Predicting the model for tfidf features
lr_tfidf_predict=lr.predict(tv_test_reviews)
print("lr_tfidf_pred", lr_tfidf_predict)



Accuracy of the model

In [None]:
from sklearn.metrics import classification_report,confusion_matrix,accuracy_score

In [None]:
#Accuracy score for bag of words
lr_bow_score=accuracy_score(test_sentiments,
                            lr_bow_predict)
print("lr_bow_score :",lr_bow_score)

#Accuracy score for tfidf features
lr_tfidf_score=accuracy_score(test_sentiments,
                              lr_tfidf_predict)
print("lr_tfidf_score :",lr_tfidf_score)



Print the classification report


In [None]:
#Classification report for bag of words 
lr_bow_report=classification_report(test_sentiments,
                                    lr_bow_predict,
                                    target_names=['Positive','Negative'])
print(lr_bow_report)

#Classification report for tfidf features
lr_tfidf_report=classification_report(test_sentiments,
                                      lr_tfidf_predict,
                                      target_names=['Positive','Negative'])
print(lr_tfidf_report)



Confusion matrix


In [None]:
#confusion matrix for bag of words
cm_bow=confusion_matrix(test_sentiments,lr_bow_predict,labels=[1,0])
print(cm_bow)

#confusion matrix for tfidf features
cm_tfidf=confusion_matrix(test_sentiments,lr_tfidf_predict,labels=[1,0])
print(cm_tfidf)

# Deep Learning approach

### Initiate data

In [None]:
dataset = imdb_data_clean.copy()
dataset.head(10)

In [None]:
train_docs=dataset.review[:40000]
train_docs[0]

In [None]:
test_docs=dataset.review[40000:]
test_docs[45005]

### Define a Vocabulary

In [None]:
from collections import Counter

In [None]:
def add_doc_to_vocab(text, vocab):
  tokens = text.split()
  # update counts 
  vocab.update(tokens)


def process_docs(dataset, vocab):
  for row in dataset:
    add_doc_to_vocab(row, vocab)

In [None]:
# define vocab
vocab = Counter()

In [None]:
process_docs(train_docs, vocab)

In [None]:
print(len(vocab))
print(vocab.most_common(50))

In [None]:
# keep tokens with a min occurrence
min_occurance = 2
tokens = [k for k,c in vocab.items() if c >= min_occurance]
print(len(tokens))

In [None]:
# save list to file
def save_list(lines, filename):
  # convert lines to a single blob of text
  data = '\n'.join(lines)
  # open file
  file = open(filename, 'w')
  # write text
  file.write(data)
  # close file
  file.close()

# save tokens to a vocabulary file
save_list(tokens, os.path.join(drive_dir, 'vocab.txt'))

### Train Embedding Layer

In [None]:
# load doc into memory
def load_doc(filename):
	# open the file as read only
	file = open(filename, 'r')
	# read all text
	text = file.read()
	# close the file
	file.close()
	return text

# load the vocabulary
vocab_filename = os.path.join(drive_dir, 'vocab.txt')
vocab = load_doc(vocab_filename)
vocab = vocab.split()
vocab = set(vocab)

Encoding each document

In [None]:
from keras.preprocessing.text import Tokenizer

In [None]:
# create the tokenizer
tokenizer = Tokenizer(num_words=5000)

In [None]:
# fit the tokenizer on the documents
tokenizer.fit_on_texts(train_docs)

In [None]:
# sequence encode
encoded_docs = tokenizer.texts_to_sequences(train_docs)

Padding documents

In [None]:
from keras.preprocessing.sequence import pad_sequences

In [None]:
# pad sequences
# max_length = max([len(s.split()) for s in train_docs])
max_length = 100
Xtrain = pad_sequences(encoded_docs, maxlen=max_length, padding='post')

In [None]:
# define training labels
ytrain = dataset.sentiment[:40000].map({"positive": 1, "negative": 0}).values

In [None]:
# define testing data
# sequence encode
encoded_docs = tokenizer.texts_to_sequences(test_docs)
# pad sequences
Xtest = pad_sequences(encoded_docs, maxlen=max_length, padding='post')
# define test labels
ytest = dataset.sentiment[40000:].map({"positive": 1, "negative": 0}).values

### Building models

#### Initiate

In [None]:
import keras
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Embedding
from keras.layers import LSTM
from keras.layers import Dropout
from keras.layers import GlobalMaxPooling1D
from keras.layers import Bidirectional
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D

In [None]:
# define vocabulary size (largest integer value)
vocab_size = len(tokenizer.word_index) + 1

In [None]:
embeddings_dictionary = dict()
glove_file = open(os.path.join(drive_dir, 'glove.6B.100d.txt'), encoding="utf8")

for line in glove_file:
  records = line.split()
  word = records[0]
  vector_dimensions = np.asarray(records[1:], dtype='float32')
  embeddings_dictionary[word] = vector_dimensions
glove_file.close()

In [None]:
embedding_matrix = np.zeros((vocab_size, 100))
for word, index in tokenizer.word_index.items():
    embedding_vector = embeddings_dictionary.get(word)
    if embedding_vector is not None:
        embedding_matrix[index] = embedding_vector

#### Simple neural network

In [None]:
use_simple_model = False

In [None]:
if os.path.exists(os.path.join(drive_dir, "model_simple.h5")) and use_simple_model is True:
  print("Model exists")
  model_simple = load_model(os.path.join(drive_dir, "model_simple.h5"))
  print(model_simple.summary())
  history_simple_dict = pd.read_csv(os.path.join(drive_dir, "history_simple.csv")).to_dict('list')
else:
  # define model
  print("Training model")
  model_simple = Sequential()
  model_simple.add(Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=max_length , trainable=False))
  model_simple.add(Flatten())
  model_simple.add(Dense(1, activation='sigmoid'))
  # compile model
  model_simple.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])
  print(model_simple.summary())
  # fit model
  history_simple = model_simple.fit(Xtrain, ytrain, batch_size=128, epochs=6, verbose=1, validation_split=0.2)
  history_simple_dict = history_simple.history

In [None]:
score_simple = model_simple.evaluate(Xtest, ytest, verbose=1)

In [None]:
print("Test Score:", score_simple[0])
print("Test Accuracy:", score_simple[1])

In [None]:
plt.plot(history_simple_dict['acc'])
plt.plot(history_simple_dict['val_acc'])

plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

plt.plot(history_simple_dict['loss'])
plt.plot(history_simple_dict['val_loss'])

plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

In [None]:
# save model
model_simple.save(os.path.join(drive_dir, "model_simple.h5"))
# save history data
pd.DataFrame.from_dict(history_simple_dict).to_csv(os.path.join(drive_dir, 'history_simple.csv'), index=False)

#### Convolutional neural network

In [None]:
use_cnn_model = False

In [None]:
# define model
if os.path.exists(os.path.join(drive_dir, "model_cnn.h5")) and use_cnn_model is True:
  print("Model exists")
  model_cnn = load_model(os.path.join(drive_dir, "model_cnn.h5"))
  print(model_cnn.summary())
  history_cnn_dict = pd.read_csv(os.path.join(drive_dir, "history_cnn.csv")).to_dict('list')
else:
  model_cnn = Sequential()
  embedding_layer = Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=max_length, trainable=False)
  model_cnn.add(embedding_layer)
  model_cnn.add(Conv1D(128, 5, activation='relu'))
  model_cnn.add(Dropout(0.5))
  model_cnn.add(GlobalMaxPooling1D())
  model_cnn.add(Dense(1, activation='sigmoid'))
  # compile model
  model_cnn.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])
  print(model_cnn.summary())
  # fit model
  # es_callback = keras.callbacks.EarlyStopping(monitor='val_loss', patience=7, verbose=1)
  history_cnn = model_cnn.fit(Xtrain, 
                              ytrain, 
                              batch_size=128, 
                              epochs=6, 
                              verbose=1, 
                              validation_split=0.2, 
                              # callbacks=[es_callback]
                              )
  history_cnn_dict = history_cnn.history

In [None]:
score_cnn = model_cnn.evaluate(Xtest, ytest, verbose=1)

In [None]:
print("Test Score:", score_cnn[0])
print("Test Accuracy:", score_cnn[1])

In [None]:
plt.plot(history_cnn_dict['acc'])
plt.plot(history_cnn_dict['val_acc'])

plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

plt.plot(history_cnn_dict['loss'])
plt.plot(history_cnn_dict['val_loss'])

plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

In [None]:
# save model
model_cnn.save(os.path.join(drive_dir, "model_cnn.h5"))
# save history data
pd.DataFrame.from_dict(history_cnn_dict).to_csv(os.path.join(drive_dir, 'history_cnn.csv'), index=False)

#### Recurrent neural network

In [None]:
use_rnn_model = False

In [None]:
# define model
if os.path.exists(os.path.join(drive_dir, "model_rnn.h5")) and use_rnn_model is True:
  print("Model exists")
  model_rnn = load_model(os.path.join(drive_dir, "model_rnn.h5"))
  print(model_rnn.summary())
  history_rnn_dict = pd.read_csv(os.path.join(drive_dir, "history_rnn.csv")).to_dict('list')
else:
  model_rnn = Sequential()
  embedding_layer = Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=max_length, trainable=False)
  model_rnn.add(embedding_layer)
  # model_rnn.add(LSTM(128))
  # model_rnn.add(Bidirectional(LSTM(128, return_sequences=True)))
  model_rnn.add(Bidirectional(LSTM(128)))
  model_rnn.add(Dropout(0.5))
  model_rnn.add(Dense(1, activation='sigmoid'))
  # compile model
  model_rnn.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])
  print(model_rnn.summary())
  # fit model
  history_rnn = model_rnn.fit(Xtrain, 
                              ytrain,
                              batch_size=128,
                              epochs=6,
                              verbose=1,
                              validation_split=0.2)
  history_rnn_dict = history_rnn.history

In [None]:
score_rnn = model_rnn.evaluate(Xtest, ytest, verbose=1)

In [None]:
print("Test Score:", score_rnn[0])
print("Test Accuracy:", score_rnn[1])

In [None]:
plt.plot(history_rnn_dict['acc'])
plt.plot(history_rnn_dict['val_acc'])

plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

plt.plot(history_rnn_dict['loss'])
plt.plot(history_rnn_dict['val_loss'])

plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.show()

In [None]:
# save model
model_rnn.save(os.path.join(drive_dir, "model_rnn.h5"))
# save history data
pd.DataFrame.from_dict(history_rnn_dict).to_csv(os.path.join(drive_dir, 'history_rnn.csv'), index=False)