<a href="https://colab.research.google.com/github/allakoala/data_science/blob/main/colab_notebooks/HW_Neural_Networks_and_Basic_Natural_Language_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#HW - https://docs.google.com/document/d/1RXVGCi56qaWzC2SuNGk8gZXicSnl1eZb8_MqWvW9Igg/edit

##EDA + Preprocessing

###1. [Previous HW on Clustering](https://colab.research.google.com/drive/1udWpcOae_qcEB-Crj2KaXAafaDt27iZ3#scrollTo=StbTKheJkX2S&uniqifier=5)

In [None]:
import pandas as pd
import numpy as np

from google.colab import drive
drive.mount('/content/drive')
#path of the file to read
url = "/content/drive/MyDrive/Colab Notebooks/LargeMovieReviewDataset.csv"

#read the file into a variable
data = pd.read_csv(url, sep=',')

#examine the data
data.head()

In [None]:
!pip install word2number
!pip install contractions
!pip install unidecode

In [None]:
from bs4 import BeautifulSoup
from word2number import w2n #!pip install word2number
import contractions #!pip install contractions
import nltk
import re
import string
import unicodedata
from collections import Counter
import unidecode #!pip install unidecode

nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

from nltk.corpus import stopwords

stop_words = {word for word in stopwords.words('english') if word not in {'no', 'not'}}

def clean_text(documents):
    # Remove HTML tags
    documents = [BeautifulSoup(doc, "html.parser").get_text(separator=" ") for doc in documents]

    # Remove accented characters from text, e.g. café
    documents = [unidecode.unidecode(doc) for doc in documents]

    # Expand contractions
    documents = [contractions.fix(doc) for doc in documents]

    # Convert number words to numeric form
    new_documents = []
    for doc in documents:
        words = []
        for word in doc.split():
            if word.isalpha():
                try:
                    num = w2n.word_to_num(word)
                    words.append(str(num))
                except ValueError:
                    words.append(word)
            else:
                words.append(word)
        new_documents.append(' '.join(words))
    documents = new_documents

    # Remove numbers
    documents = [re.sub(r'\b\d+\b', '', doc) for doc in documents]

    # Remove leading and ending spaces
    documents = [" ".join(doc.strip().split()) for doc in documents]

    # Remove punctuation
    documents = [doc.translate(str.maketrans('', '', string.punctuation)) for doc in documents]

    # Convert to lowercase
    documents = [doc.lower() for doc in documents]

    # Tokenize text and count the number of words in the corpus
    tokens = [nltk.word_tokenize(doc) for doc in documents]
    corpus_size_words = np.sum([len(d) for d in tokens])

    # Remove stop words
    filtered_tokens = [[token for token in doc_tokens if token not in stop_words] for doc_tokens in tokens]

    # Print descriptive statistics
    corpus_size_docs = len(documents)
    sentiment_distr = Counter(data['sentiment'])
    print('Corpus Size (Number of Documents): {}'.format(corpus_size_docs))
    print('Corpus Size (Number of Words): {}'.format(corpus_size_words))
    print('Sentiment Distribution: {}'.format(sentiment_distr))

    return [' '.join(filtered_doc_tokens) for filtered_doc_tokens in filtered_tokens]

data['review_cleaned'] = clean_text(data['review'])
data['review_cleaned']

In [None]:
from nltk.stem import WordNetLemmatizer, PorterStemmer

# Normalization 1 - lemmatization
lemmatizer = WordNetLemmatizer()

def normalize_text1(text):
    # Tokenize words
    text = nltk.word_tokenize(text)
    # Lemmatize words
    text = [lemmatizer.lemmatize(word) for word in text]
    return text

data['review_normalized1'] = data['review_cleaned'].apply(normalize_text1)

# concatenate the lists of normalized words for all reviews
normalized1 = []
for review in data['review_normalized1']:
    normalized1.extend(review)

print(normalized1[:7]) # print first 7 words

In [None]:
#convert sentiment to binary (0 or 1)
data['sentiment'] = data['sentiment'].apply(lambda x: 1 if x=='positive' else 0)
#data[['review_normalized1','review_normalized2','sentiment']].head()
data[['review_normalized1','sentiment']]

###2. According to [the article](https://towardsdatascience.com/deep-learning-for-natural-language-processing-using-word2vec-keras-d9a240c7bb9d)

In [None]:
!pip install num2words

In [None]:
from nltk.stem import WordNetLemmatizer
from nltk import pos_tag
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import re
import unicodedata
import string
import contractions
import nltk
# import os
# os._exit(00)
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
from nltk.corpus import stopwords, wordnet
from nltk.tokenize import word_tokenize
from num2words import num2words #pip install num2words

url = "/content/drive/MyDrive/Colab Notebooks/LargeMovieReviewDataset.csv"

class Preprocessor:
    def __init__(self, filepath):
        self.filepath = filepath
        self.data = None
        self.feature_name = 'review'
        self.target_name = 'sentiment'
        self.target_encoding = None

    def read_input_file(self):
        """
        Reads input file from given filepath
        """
        self.data = pd.read_csv(self.filepath)
        return self.data

    def encode_target(self):
        """
        Encodes the target column using factorize method
        """
        self.data[self.target_name], self.target_encoding = pd.factorize(self.data[self.target_name])

    @staticmethod
    def remove_html_tags(text):
        """
        Removes HTML tags from the text
        """
        clean = re.compile('<.*?>')
        return re.sub(clean, '', text)

    @staticmethod
    def remove_accented_characters(text):
        """
        Removes accented characters from the text
        """
        text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8', 'ignore')
        return text

    @staticmethod
    def expand_contractions(text):
        """
        Expands contractions in the text
        """
        return contractions.fix(text)

    @staticmethod
    def convert_number_words_to_numeric(text):
        """
        Converts number words to numeric in the text
        """
        try:
            number = float(text)
            return num2words(number, lang='en')
        except ValueError:
            return text

    @staticmethod
    def remove_numbers(text):
        """
        Removes numbers from the text
        """
        return re.sub(r'\d+', '', text)

    @staticmethod
    def remove_punctuation(text):
        """
        Removes punctuation from the text
        """
        return text.translate(str.maketrans('', '', string.punctuation))

    @staticmethod
    def lowercase(text):
        """
        Converts text to lowercase
        """
        return text.lower()

    @staticmethod
    def tokenize(text):
        """
        Tokenizes the text
        """
        return word_tokenize(text)

    @staticmethod
    def remove_stopwords(tokens):
        """
        Removes stopwords from the tokens
        """
        stop_words = set(stopwords.words('english')) - {'no', 'not'}
        return [word for word in tokens if word not in stop_words]

    @staticmethod
    def lemmatize(word):
        """
        Lemmatizes the word
        """
        wordnet_pos = Preprocessor.get_wordnet_pos(word)
        return WordNetLemmatizer().lemmatize(word, pos=wordnet_pos)

    @staticmethod
    def get_wordnet_pos(word):
        """
        Maps POS tag to first character used by WordNetLemmatizer
        """
        tag = nltk.pos_tag([word])[0][1][0].upper()
        tag_dict = {"J": wordnet.ADJ,
                    "N": wordnet.NOUN,
                    "V": wordnet.VERB,
                    "R": wordnet.ADV}
        return tag_dict.get(tag, wordnet.NOUN)

    def apply_preprocessing(self):
        """
        Applies all preprocessing steps to the data
        """
        self.encode_target()
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.remove_html_tags)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.remove_accented_characters)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.expand_contractions)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.convert_number_words_to_numeric)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.remove_numbers)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.remove_punctuation)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.lowercase)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.tokenize)
        self.data[self.feature_name] = self.data[self.feature_name].apply(self.remove_stopwords)
        self.data[self.feature_name] = self.data[self.feature_name].apply(lambda x: [self.lemmatize(word) for word in x])
        self.data[self.feature_name] = self.data[self.feature_name].apply(lambda x: ' '.join(x))

    def save_cleaned_data(self, filepath="/content/drive/MyDrive/Colab Notebooks/Cleaned_MovieReviewDataset.csv"):
        """
        Saves the cleaned data to a CSV file
        """
        self.data.to_csv(filepath, index=False)

    def load(self, filepath="/content/drive/MyDrive/Colab Notebooks/Cleaned_MovieReviewDataset.csv"):
        """
        Loads the cleaned data from a CSV file
        """
        self.data = pd.read_csv(filepath)
        return self.data

# Create a Preprocessing object
preprocessor = Preprocessor(url)

# Read the input file
preprocessor.read_input_file()

# Apply preprocessing steps
preprocessor.apply_preprocessing()

# Save the preprocessed file to a new location
preprocessor.save_cleaned_data()

# Load the cleaned data
preprocessor.load()

##Train linear model based on TF-IDF as a baseline
https://realpython.com/python-keras-text-classification/

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load the preprocessed dataset
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Cleaned_MovieReviewDataset.csv')

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(df['review'], df['sentiment'], test_size=0.2, random_state=42)

# Create a TF-IDF vectorizer
vectorizer = TfidfVectorizer()

# Fit the vectorizer on the training data
X_train_tfidf = vectorizer.fit_transform(X_train)

# Train a logistic regression model on the TF-IDF features
model = LogisticRegression(random_state=42)
model.fit(X_train_tfidf, y_train)

# Transform the test data using the same vectorizer
X_test_tfidf = vectorizer.transform(X_test)

# Predict the test data using the trained model
y_pred = model.predict(X_test_tfidf)

# Evaluate the model performance on the test data
accuracy = accuracy_score(y_test, y_pred)
print("The baseline linear model based on TF-IDF Accuracy:", accuracy)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import learning_curve
from wordcloud import WordCloud

# Plot the confusion matrix & report
confusion_mat = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_mat, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

# Print the classification report
print('Classification Report:')
print(classification_report(y_test, y_pred))

# Plot the learning curve
train_sizes, train_scores, test_scores = learning_curve(model, X_train_tfidf, y_train, cv=5)
train_mean = train_scores.mean(axis=1)
train_std = train_scores.std(axis=1)
test_mean = test_scores.mean(axis=1)
test_std = test_scores.std(axis=1)

plt.figure(figsize=(8, 6))
plt.plot(train_sizes, train_mean, label='Training Accuracy')
plt.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.2)
plt.plot(train_sizes, test_mean, label='Validation Accuracy')
plt.fill_between(train_sizes, test_mean - test_std, test_mean + test_std, alpha=0.2)
plt.title('Learning Curve')
plt.xlabel('Training Examples')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from wordcloud import WordCloud

# Create pipeline
pipeline = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('clf', SGDClassifier(alpha=0.001))
])

# Fit pipeline on training data
pipeline.fit(X_train, y_train)

# Extract the coefficients of the model from the pipeline
importances = pipeline.named_steps['clf'].coef_.flatten()

# Get featnames from tfidfvectorizer
feature_names = np.array(pipeline.named_steps['tfidf'].get_feature_names_out())
feature_importance_df = pd.DataFrame({
    'FEATURE': feature_names,
    'IMPORTANCE': importances,
    'SENTIMENT': ['pos' if importance >= 0 else 'neg' for importance in importances]
})

# Select top 10 positive/negative features
top_pos_features = feature_importance_df[feature_importance_df['SENTIMENT'] == 'pos'].nlargest(10, 'IMPORTANCE')
top_neg_features = feature_importance_df[feature_importance_df['SENTIMENT'] == 'neg'].nsmallest(10, 'IMPORTANCE')

# Generate WordCloud for positive features
positive_text = ' '.join(top_pos_features['FEATURE'])
positive_wordcloud = WordCloud(width=800, height=400).generate(positive_text)

# Generate WordCloud for negative features
negative_text = ' '.join(top_neg_features['FEATURE'])
negative_wordcloud = WordCloud(width=800, height=400).generate(negative_text)

# Plot the WordClouds
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].imshow(positive_wordcloud, interpolation='bilinear')
axes[0].set_title('Negative Features')
axes[0].axis('off')

axes[1].imshow(negative_wordcloud, interpolation='bilinear')
axes[1].set_title('Positive Features')
axes[1].axis('off')

plt.tight_layout()
plt.show()

##Build and train RNN models: LSTM, Bidirectional LSTM
https://www.analyticsvidhya.com/blog/2022/01/the-complete-lstm-tutorial-with-implementation/


In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding, LSTM, Bidirectional, Dense, Dropout
from tensorflow.keras.models import Sequential

# Tokenize the text data https://www.analyticsvidhya.com/blog/2020/03/pretrained-word-embeddings-nlp/
tokenizer = Tokenizer(num_words=5000, oov_token="<OOV>")
tokenizer.fit_on_texts(X_train)
word_index = tokenizer.word_index

# Convert text to sequence
X_train_seq = tokenizer.texts_to_sequences(X_train)
X_test_seq = tokenizer.texts_to_sequences(X_test)

# Pad the sequences
max_len = 100
X_train_padded = pad_sequences(X_train_seq, maxlen=max_len, truncating='post', padding='post')
X_test_padded = pad_sequences(X_test_seq, maxlen=X_train_padded.shape[1], truncating='post', padding='post')

size_of_vocabulary=len(tokenizer.word_index) + 1 #+1 for padding
print('The number of unique words in the training data:', size_of_vocabulary)

In [None]:
#deep learning library
from keras.models import *
from keras.layers import *
from keras.callbacks import *

# Define the LSTM model architecture
model_lstm = Sequential()
model_lstm.add(Embedding(size_of_vocabulary,300, input_length=max_len, trainable=True))

#lstm layer
model_lstm.add(LSTM(128,return_sequences=True,dropout=0.2))

#Global Maxpooling
model_lstm.add(GlobalMaxPooling1D())

#Dense Layer
model_lstm.add(Dense(64,activation='relu'))
model_lstm.add(Dense(1,activation='sigmoid'))

#Compile the model, add loss function, metrics, optimizer
model_lstm.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

#Adding callbacks
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1,patience=3)
mc=ModelCheckpoint('best_model.h5', monitor='val_acc', mode='max', save_best_only=True,verbose=1)

# Train the LSTM model
history_lstm = model_lstm.fit(X_train_padded, y_train, epochs=10, batch_size=64, validation_split=0.1)

# Evaluate the LSTM model on the test data
loss, accuracy = model_lstm.evaluate(X_test_padded, y_test)
print(model_lstm.summary())
print("LSTM Model Accuracy:", accuracy)

# Define the Bidirectional LSTM model architecture
model_bilstm = Sequential()
model_bilstm.add(Embedding(size_of_vocabulary,300, input_length=max_len, trainable=True))
model_bilstm.add(Bidirectional(LSTM(128,return_sequences=True,dropout=0.2)))

#Global Maxpooling
model_bilstm.add(GlobalMaxPooling1D())

#Dense Layer
model_bilstm.add(Dense(64,activation='relu'))
model_bilstm.add(Dense(1,activation='sigmoid'))

# Compile the model
model_bilstm.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

#Adding callbacks
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1,patience=3)
mc=ModelCheckpoint('best_model.h5', monitor='val_acc', mode='max', save_best_only=True,verbose=1)

# Train the Bidirectional LSTM model
history_bilstm = model_bilstm.fit(X_train_padded, y_train, epochs=10, batch_size=64, validation_split=0.1)

# Evaluate the Bidirectional LSTM model on the test data
loss, accuracy = model_bilstm.evaluate(X_test_padded, y_test)
print(model_bilstm.summary())
print("Bidirectional LSTM Model Accuracy:", accuracy)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import learning_curve
from wordcloud import WordCloud

# Plot the confusion matrix
def plot_confusion_matrix(y_true, y_pred, title='Confusion Matrix'):
    cm = confusion_matrix(y_true, y_pred)
    labels = ['Negative', 'Positive']
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()

# Print the classification report
def print_classification_report(y_true, y_pred):
    report = classification_report(y_true, y_pred)
    print(report)

# Plot the learning curve
def plot_learning_curve(history):
    train_loss = history.history['loss']
    val_loss = history.history['val_loss']
    train_acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']

    plt.figure(figsize=(8, 6))
    plt.plot(train_loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.plot(train_acc, label='Training Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.title('Learning Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Loss/Accuracy')
    plt.legend()
    plt.show()

# Generate WordCloud for positive reviews
def generate_wordcloud_positive(text, y_train):
    positive_text = ' '.join([review for review, sentiment in zip(text, y_train) if sentiment == 1])
    wordcloud = WordCloud(width=800, height=400, background_color='white').generate(positive_text)
    plt.figure(figsize=(10, 6))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title('WordCloud - Positive Reviews')
    plt.show()

# Generate WordCloud for negative reviews
def generate_wordcloud_negative(text, y_train):
    negative_text = ' '.join([review for review, sentiment in zip(text, y_train) if sentiment == 0])
    wordcloud = WordCloud(width=800, height=400, background_color='white').generate(negative_text)
    plt.figure(figsize=(10, 6))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title('WordCloud - Negative Reviews')
    plt.show()

# Plot confusion matrix for LSTM model
y_pred_lstm = model_lstm.predict(X_test_padded)
y_pred_lstm_classes = (y_pred_lstm > 0.5).astype("int32")
plot_confusion_matrix(y_test, y_pred_lstm_classes)

# Print classification report for LSTM model
print_classification_report(y_test, y_pred_lstm_classes)

# Plot learning curve for LSTM model
plot_learning_curve(history_lstm)

# Generate WordCloud for positive reviews in LSTM model
generate_wordcloud_positive(X_train, y_train)

# Generate WordCloud for negative reviews in LSTM model
generate_wordcloud_negative(X_train, y_train)

# Plot confusion matrix for Bidirectional LSTM model
y_pred_bilstm = model_bilstm.predict(X_test_padded)
y_pred_bilstm_classes = (y_pred_bilstm > 0.5).astype("int32")
plot_confusion_matrix(y_test, y_pred_bilstm_classes)

# Print classification report for Bidirectional LSTM model
print_classification_report(y_test, y_pred_bilstm_classes)

# Plot learning curve for Bidirectional LSTM model
plot_learning_curve(history_bilstm)

# Generate WordCloud for positive reviews in Bidirectional LSTM model
generate_wordcloud_positive(X_train, y_train)

# Generate WordCloud for negative reviews in Bidirectional LSTM model
generate_wordcloud_negative(X_train, y_train)

##RNN model with: failed to identify any instances of negative sentiment (class 0) - ?

1. Pretrained embedding (word2vec, fastText, GloVe)
2. Early Stopping using val_loss https://www.analyticsvidhya.com/blog/2020/03/pretrained-word-embeddings-nlp/
3. Model Checkpoints (save model)
4. Hyperparameter tuning - https://realpython.com/python-keras-text-classification/


In [None]:
!pip install gensim
from gensim.models import KeyedVectors
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Load the preprocessed dataset
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Cleaned_MovieReviewDataset.csv')

# Split the data into training, validation, and test sets
train_texts, val_test_texts, train_labels, val_test_labels = train_test_split(df['review'], df['sentiment'], test_size=0.4, random_state=42)
val_texts, test_texts, val_labels, test_labels = train_test_split(val_test_texts, val_test_labels, test_size=0.5, random_state=42)

# Tokenize the texts and pad the sequences
tokenizer = Tokenizer()
tokenizer.fit_on_texts(train_texts)

train_sequences = tokenizer.texts_to_sequences(train_texts)
val_sequences = tokenizer.texts_to_sequences(val_texts)
test_sequences = tokenizer.texts_to_sequences(test_texts)

max_sequence_length = max(len(sequence) for sequence in train_sequences)
train_sequences = pad_sequences(train_sequences, maxlen=max_sequence_length)
val_sequences = pad_sequences(val_sequences, maxlen=max_sequence_length)
test_sequences = pad_sequences(test_sequences, maxlen=max_sequence_length)

# Pretrained Embeddings
embedding_type = "word2vec"  # Change this to "fasttext" or "glove"

if embedding_type == "word2vec":
    w2v_model = KeyedVectors.load_word2vec_format("/content/drive/MyDrive/Colab Notebooks/GoogleNews-vectors-negative300.bin", binary=True)
    embedding_matrix = np.zeros((size_of_vocabulary, w2v_model.vector_size))

elif embedding_type == "fasttext":
    ft_model = FastText.load("/content/drive/MyDrive/Colab Notebooks/wiki-news-300d-1M.vec")
    embedding_matrix = np.zeros((size_of_vocabulary, ft_model.vector_size))

elif embedding_type == "glove":
    glove2word2vec("/content/drive/MyDrive/Colab Notebooks/glove.6B.100d.txt", "/content/drive/MyDrive/Colab Notebooks/to/glove.6B.100d.txt.word2vec")
    glove_model = KeyedVectors.load_word2vec_format("/content/drive/MyDrive/Colab Notebooks/to/glove.6B.100d.txt.word2vec", binary=False)
    embedding_matrix = np.zeros((size_of_vocabulary, glove_model.vector_size))


# Build the RNN model
def build_model(lstm_units=128):
    model = Sequential()
    model.add(Embedding(embedding_matrix.shape[0], embedding_matrix.shape[1], input_length=max_sequence_length, weights=[embedding_matrix], trainable=False))
    model.add(LSTM(lstm_units))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# Create a wrapper function for Keras model
def create_model(lstm_units=128):
    model = build_model(lstm_units=lstm_units)
    return model

# Define hyperparameters to tune
param_grid = {
    'batch_size': [32],
    'epochs': [5],
    'lstm_units': [64]
}

# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
checkpoint_path = '/content/drive/MyDrive/Colab Notebooks/to/best_model.h5'
model_checkpoint = ModelCheckpoint(checkpoint_path, monitor='val_accuracy', save_best_only=True)

# Wrap the Keras model with the scikit-learn wrapper
keras_model = KerasClassifier(build_fn=create_model)

# Perform grid search for hyperparameter tuning
model = GridSearchCV(keras_model, param_grid, cv=3, scoring='accuracy')
model.fit(train_sequences, train_labels, validation_data=(val_sequences, val_labels), callbacks=[early_stopping, model_checkpoint]) #https://towardsdatascience.com/random-forest-regression-5f605132d19d

# Load the best model
best_model = build_model(lstm_units=model.best_params_['lstm_units'])
best_model.load_weights(checkpoint_path)

# Evaluate on the test set
predictions = best_model.predict(test_sequences)
predictions = np.round(predictions).flatten()
accuracy = accuracy_score(test_labels, predictions)
print(f'Test Accuracy: {accuracy}')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Get the predicted labels
predicted_labels = np.round(predictions).flatten()

# Create a confusion matrix
cm = confusion_matrix(test_labels, predicted_labels)

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()


In [None]:
from sklearn.metrics import classification_report

# Print the classification report
print(classification_report(test_labels, predicted_labels))

In [None]:
def plot_learning_curve(history):
    # Plot training and validation accuracy values
    plt.figure(figsize=(8, 6))
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

    # Plot training and validation loss values
    plt.figure(figsize=(8, 6))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Train the best model and get the training history
history = best_model.fit(train_sequences, train_labels, validation_data=(val_sequences, val_labels), epochs=model.best_params_['epochs'], batch_size=model.best_params_['batch_size'], callbacks=[early_stopping, model_checkpoint])

# Plot the learning curve
plot_learning_curve(history)


In [None]:
from wordcloud import WordCloud

# Get positive reviews
positive_reviews = df[df['sentiment'] == 1]['review'].values

# Concatenate all positive reviews into a single string
positive_text = ' '.join(positive_reviews)

# Generate WordCloud
wordcloud = WordCloud(width=800, height=400).generate(positive_text)

# Plot the WordCloud
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.title('WordCloud - Positive Reviews')
plt.axis('off')
plt.show()


In [None]:
# Get negative reviews
negative_reviews = df[df['sentiment'] == 0]['review'].values

# Concatenate all negative reviews into a single string
negative_text = ' '.join(negative_reviews)

# Generate WordCloud
wordcloud = WordCloud(width=800, height=400).generate(negative_text)

# Plot the WordCloud
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.title('WordCloud - Negative Reviews')
plt.axis('off')
plt.show()

##BERT based approach + GPU https://towardsdatascience.com/sentiment-analysis-in-10-minutes-with-bert-and-hugging-face-294e8a04b671
gpu_enabled = True  # Set to False if GPU is not available or not desired
bert_enabled = True  # Set to False if BERT is not available or not desired
https://www.analyticsvidhya.com/blog/2019/11/comprehensive-guide-attention-mechanism-deep-learning/
https://medium.com/intel-student-ambassadors/implementing-attention-models-in-pytorch-f947034b3e66
https://learnopencv.com/attention-mechanism-in-transformer-neural-networks/


In [None]:
!pip install transformers

In [None]:
import torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
import matplotlib.pyplot as plt
from wordcloud import WordCloud

# Load the preprocessed dataset
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Cleaned_MovieReviewDataset.csv')

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(df['review'], df['sentiment'], test_size=0.2, random_state=42)

# Load the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

# Tokenize and encode the training data
train_encodings = tokenizer.batch_encode_plus(X_train.tolist(), truncation=True, padding=True, return_tensors='pt')
train_input_ids = train_encodings['input_ids']
train_attention_mask = train_encodings['attention_mask']

# Convert y_train to numeric values
train_labels = torch.tensor(y_train.values, dtype=torch.long)

# Tokenize and encode the test data
test_encodings = tokenizer.batch_encode_plus(X_test.tolist(), truncation=True, padding=True, return_tensors='pt')
test_input_ids = test_encodings['input_ids']
test_attention_mask = test_encodings['attention_mask']

# Convert y_test to numeric values
test_labels = torch.tensor(y_test.values, dtype=torch.long)

# Create the BERT model
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# Use GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Set up the optimizer
optimizer = AdamW(model.parameters(), lr=1e-5)

# Training loop
epochs = 5
batch_size = 16
train_size = len(X_train)
steps_per_epoch = int(np.ceil(train_size / batch_size))

for epoch in range(epochs):
    model.train()
    train_loss = 0.0

    for step in range(steps_per_epoch):
        start = step * batch_size
        end = min((step + 1) * batch_size, train_size)

        input_ids = train_input_ids[start:end].to(device)
        attention_mask = train_attention_mask[start:end].to(device)
        labels = train_labels[start:end].to(device)

        optimizer.zero_grad()

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= steps_per_epoch

    print(f'Epoch {epoch + 1}/{epochs} - Training loss: {train_loss}')

# Evaluation
model.eval()
eval_loss = 0.0
predictions = []
with torch.no_grad():
    for i in range(0, len(X_test), batch_size):
        input_ids = test_input_ids[i:i + batch_size].to(device)
        attention_mask = test_attention_mask[i:i + batch_size].to(device)
        labels = test_labels[i:i + batch_size].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits

        eval_loss += loss.item()
        predictions.extend(torch.argmax(logits, dim=1).cpu().numpy())

eval_loss /= int(np.ceil(len(X_test) / batch_size))
predictions = np.array(predictions)