In [1]:
# %reload_ext autoreload
# %autoreload 2

In [2]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, MaxPooling1D, LSTM, Dense, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, RMSprop, SGD
from keras_tuner import BayesianOptimization
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.layers import (
    Embedding, Conv1D, MaxPooling1D, LSTM, Dense, Dropout, Bidirectional, GRU
)
import plotly.graph_objects as go
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import train_test_split

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

from text_classification.paths import DATA_DIR, MODELS_DIR

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, MaxPooling1D, LSTM, Dense, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, RMSprop, SGD
from keras_tuner import BayesianOptimization

In [3]:
import pandas as pd
from text_classification.paths import DATA_DIR, MODELS_DIR
from pathlib import Path
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
import numpy as np
import matplotlib.pyplot as plt

import text_classification.config as cfg
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.preprocessing import image_dataset_from_directory

import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from textblob import TextBlob
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd

# Download stopwords and lemmatizer data if necessary
# nltk.download('stopwords')
# nltk.download('wordnet')

%matplotlib inline

In [4]:
import tensorflow as tf
from keras import backend as K

In [5]:
def f1_score(y_true, y_pred):
    # Convert predictions to binary values
    y_pred_bin = K.round(y_pred)

    # Calculate true positives, false positives, false negatives
    tp = K.sum(K.cast(y_true * y_pred_bin, 'float'), axis=0)
    fp = K.sum(K.cast((1 - y_true) * y_pred_bin, 'float'), axis=0)
    fn = K.sum(K.cast(y_true * (1 - y_pred_bin), 'float'), axis=0)

    # Calculate precision and recall
    precision = tp / (tp + fp + K.epsilon())
    recall = tp / (tp + fn + K.epsilon())

    # Calculate F1 score
    f1 = 2 * precision * recall / (precision + recall + K.epsilon())
    return K.mean(f1)

In [None]:
# Check if MPS is available
if tf.config.list_physical_devices('GPU'):
    print("MPS backend is available and will be used.")
else:
    print("MPS backend is not available. Using CPU instead.")

In [None]:
# Load data 
data = pd.read_csv(DATA_DIR / 'historical_data.csv')

# train overview
data.head()

In [None]:
# Basic information about the dataset
data.info()

# Check the proportions of the target variable
data['Recommended.IND'].value_counts(normalize=True)

In [9]:
# Rename the columns
data = (
    data
    .rename(columns={'Recommended.IND': 'recommended', 'Review.Text': 'review'})
)

In [10]:
# Load from the text file negations
with open(Path(DATA_DIR / 'negations.txt'), 'r') as file:
    negations = [line.strip() for line in file]

In [11]:
# Initialize stop words and lemmatizer
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()

# Initialize the VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Function to detect and transform negated phrases
def handle_negations(text):
    words = text.split()  # Tokenize the text
    transformed_words = []
    negate = False

    for i, word in enumerate(words):
        # If negation detected, append "not" to the following word
        if word in negations and i + 1 < len(words):
            transformed_words.append(f"not_{words[i + 1]}")
            negate = True
            i += 1  # Skip the next word as it's combined with negation
        elif negate:
            transformed_words.append(f"not_{word}")
            negate = False
        else:
            transformed_words.append(word)
    return ' '.join(transformed_words)

# Function to handle repeated characters (like "soooo good" -> "soo good")
def handle_repeated_characters(word):
    return re.sub(r'(.)\1+', r'\1\1', word)

# Preprocessing function for sentiment analysis
def clean_text(text):
    # Convert text to lowercase
    text = text.lower()
    
    # Remove all punctuation
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    
    # Split the text into words
    words = text.split()
    
    # Remove stopwords, but keep negations and important words like "but", "very", etc.
    words = [word for word in words if word not in stop_words or word in negations]
    
    # Handle negations by concatenating them with the following word
    neg_handled_text = handle_negations(' '.join(words))
    
    # Tokenize the processed text after handling negations
    words_processed = neg_handled_text.split()
    
    # Lemmatize words
    lemmatized_words = [lemmatizer.lemmatize(word) for word in words_processed]
    
    # Handle repeated characters
    lemmatized_words = [handle_repeated_characters(word) for word in lemmatized_words]
    
    # Join the cleaned words back into a string
    cleaned_text = ' '.join(lemmatized_words)
    
    return cleaned_text



# Apply the cleaning function to the review text column
data['cleaned_review'] = data['review'].apply(clean_text)


In [None]:
# Calculate the number of characters per line in the 'review' and 'cleaned_review' columns
data['review_char_count'] = data['review'].str.len()
data['cleaned_review_char_count'] = data['cleaned_review'].str.len()

# Print the first few rows to verify the new columns
data.head()

In [13]:
X = data.drop(columns=['recommended', 'review', 'review_char_count', 'cleaned_review_char_count'])
y = data['recommended']

In [None]:
X.shape, y.shape

In [15]:
# Split the data into training and validation sets (80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(
    X, 
    y, 
    test_size=0.20, 
    stratify=y,
    random_state=123
)

In [16]:
# Ensure X_train and X_val contain the text from the 'cleaned_review' column
X_train_text = X_train['cleaned_review'].tolist()  # Convert the column to a list of strings
X_val_text = X_val['cleaned_review'].tolist()      # Convert validation set

# Tokenization parameters
max_words = 2500
max_length = 60  # Set the max_length to 60 to ensure each review has 60 words

# Initialize the tokenizer and fit it on the training data (on the actual text)
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(X_train_text)  # Fit tokenizer on the list of cleaned reviews

# Convert the training and validation sets to sequences of integers
X_train_seq = tokenizer.texts_to_sequences(X_train_text)
X_val_seq = tokenizer.texts_to_sequences(X_val_text)

# Pad the sequences to ensure uniform input length
X_train_pad = pad_sequences(X_train_seq, maxlen=max_length, padding='post')
X_val_pad = pad_sequences(X_val_seq, maxlen=max_length, padding='post')

In [None]:
len(X_train_pad), len(X_val_pad), len(y_train), len(y_val)  

In [18]:
# 8. Load GloVe embeddings and create the embedding matrix
glove_file = Path(DATA_DIR / 'glove.6B.100d.txt')

In [None]:
embedding_dim = 100  # For GloVe 100D
max_words = 2500  # Should match the tokenizer num_words

# Initialize dictionary to store GloVe embeddings
embeddings_index = {}

# Read the GloVe file and load into embeddings_index
with open(glove_file, 'r', encoding='utf8') as f:
    for line in f:
        values = line.split()
        word = values[0]  # First element is the word
        coefs = np.asarray(values[1:], dtype='float32')  # The rest are the embedding coefficients
        embeddings_index[word] = coefs

# Get the word index from the tokenizer
word_index = tokenizer.word_index

# Limit vocab size to max_words (2500)
vocab_size = min(max_words, len(word_index) + 1)

# Initialize the embedding matrix with zeros (shape = (max_words, embedding_dim))
embedding_matrix = np.zeros((vocab_size, embedding_dim))

# Fill the embedding matrix with GloVe embeddings for words found in the GloVe
# For words not found in GloVe, keep the initialized zeros or initialize randomly
for word, i in word_index.items():
    if i < max_words:  # Only consider the top max_words (2500)
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector  # Set GloVe embedding for this word
        else:
            # Randomly initialize embeddings for words not found in GloVe
            embedding_matrix[i] = np.random.uniform(-0.05, 0.05, embedding_dim)  # Random small values

# Print the shape of the final embedding matrix
print(f"Final embedding matrix shape: {embedding_matrix.shape}")


In [20]:
# Assuming y_train contains the training labels
classes = np.unique(y_train)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
class_weight_dict = dict(zip(classes, class_weights))

# Define aditional class weights
class_weight_options = [
    class_weight_dict,
    {0: 3.0, 1: 0.5},
    {0: 4.0, 1: 1.0},
    {0: 5.00, 1: 2.00}
]

In [21]:
def build_model(hp):
    model = Sequential()

    # Embedding layer using combined embeddings (GloVe + random embeddings for OOV)
    model.add(Embedding(input_dim=cfg.MAX_FREQ_WORDS,   # Should match the tokenizer's vocab size or max_words
                        output_dim=cfg.MAX_EMBEDDINGS_DIM,  # 100 for GloVe 100D
                        weights=[embedding_matrix],  # Combined embedding matrix (GloVe + random)
                        input_length=60,  # Set input length (max_length of your padded sequences)
                        trainable=True))  # Set to True if you want to fine-tune embeddings
    
    # Choose weight initializer
    weight_initializer = hp.Choice('weight_initializer', values=['he_normal', 'glorot_uniform', 'lecun_normal'])

    # 1D Convolutional Layer with L2 regularization and padding
    model.add(Conv1D(filters=hp.Int('conv_filters', min_value=16, max_value=264, step=16),
                     kernel_size=hp.Choice('conv_kernel_size', values=[3, 5]),
                     activation='relu',
                     padding='same',
                     kernel_regularizer=l2(hp.Float('l2_lambda', min_value=1e-5, max_value=1e-2, sampling='LOG')),
                     kernel_initializer=weight_initializer))
    model.add(MaxPooling1D(pool_size=2))

    # LSTM layer with L2 regularization
    model.add(LSTM(units=hp.Int('lstm_units', min_value=16, max_value=264, step=16),
                   return_sequences=True,
                   kernel_regularizer=l2(hp.Float('l2_lambda', min_value=1e-5, max_value=1e-2, sampling='LOG')),
                   kernel_initializer=weight_initializer))
 
    # GRU layer
    model.add(Bidirectional(GRU(units=hp.Int('gru_units', min_value=16, max_value=264, step=16),
                                return_sequences=False)))

    # Dense layer
    model.add(Dense(units=hp.Int('dense_units', min_value=16, max_value=264, step=16),
                    activation='relu',
                    kernel_regularizer=l2(hp.Float('l2_lambda', min_value=1e-5, max_value=1e-2, sampling='LOG')),
                    kernel_initializer=weight_initializer))
    model.add(Dropout(rate=hp.Float('dropout_rate', min_value=0.2, max_value=0.7, step=0.1)))

    # Output layer for binary classification
    model.add(Dense(1, activation='sigmoid', kernel_initializer=weight_initializer))

    # Choose optimizer
    optimizer = hp.Choice('optimizer', values=['adam', 'rmsprop', 'sgd'])
    if optimizer == 'adam':
        optimizer_instance = Adam(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG'))
    elif optimizer == 'rmsprop':
        optimizer_instance = RMSprop(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG'))
    elif optimizer == 'sgd':
        optimizer_instance = SGD(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG'))

    # Dynamically select class weights
    class_weight_index = hp.Choice('class_weight_index', [0, 1, 2, 3, 4])
    class_weight = class_weight_options[class_weight_index]

    # Compile the model
    model.compile(optimizer=optimizer_instance,
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model

In [None]:
# Initialize the BayesianOptimization tuner
tuner = BayesianOptimization(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory= Path(MODELS_DIR / 'tuner_random'),
    project_name='bayes_opt'
)

# Define the search space
tuner.search_space_summary()

In [None]:
# Use the tuner to search the hyperparameter space, but now pass the class weights dynamically inside the search loop
tuner.search(
    X_train_pad, y_train,
    epochs=15,
    validation_data=(X_val_pad, y_val),
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)],
    # Class weights will be selected dynamically within the model, no need to pass here
)

In [None]:
# Get the best hyperparameters after tuning
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Retrieve the best class weight index
best_class_weight_index = best_hps.get('class_weight_index')

# Retrieve the corresponding class weight dictionary
best_class_weight = class_weight_options[best_class_weight_index]

print(f"The best class weight is: {best_class_weight}")

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

y_train = np.array(y_train)
y_val = np.array(y_val)

# Build the model using the best hyperparameters
model_2 = build_model_1(best_hps)

# Define early stopping callback to avoid overfitting
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model with the best hyperparameters and appropriate class weights
history = model_2.fit(
    X_train_pad, y_train,
    epochs=50,
    validation_data=(X_val_pad, y_val),
    class_weight=class_weight_dict, 
    callbacks=[early_stopping]
)

# Display the model summary
model_2.summary()

In [28]:
# Save the trained model
model_2.save(MODELS_DIR / 'lstm_emb.keras') 

In [29]:
# Load the saved model
tc2_model = tf.keras.models.load_model(MODELS_DIR / 'lstm_emb.keras')

In [None]:
# Binary classification
y_pred_prob = tc2_model.predict(X_val_pad)
y_pred = (y_pred_prob > 0.5).astype(int)  # Convert probabilities to class labels using a threshold of 0.5

In [None]:
from sklearn.metrics import confusion_matrix, classification_report

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_val, y_pred)

# Print the classification report
print(classification_report(y_val, y_pred))

In [None]:
class_counts = np.bincount(y_val)
# Print the number of occurrences of each class
print(f"Number of 0s: {class_counts[0]}")
print(f"Number of 1s: {class_counts[1]}")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

# Assuming y_val and y_pred are already defined
# y_val: true labels
# y_pred: predicted labels

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_val, y_pred)

# Plot the confusion matrix using Seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='viridis')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
import plotly.graph_objects as go

# Create traces for training and validation loss
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=list(range(1, len(history.history['loss']) + 1)),
    y=history.history['loss'],
    mode='lines+markers',
    name='Training Loss'
))

fig.add_trace(go.Scatter(
    x=list(range(1, len(history.history['val_loss']) + 1)),
    y=history.history['val_loss'],
    mode='lines+markers',
    name='Validation Loss'
))

# Add titles and labels
fig.update_layout(
    title='Training and Validation Loss',
    xaxis_title='Epochs',
    yaxis_title='Loss',
    legend=dict(x=0, y=1),
    hovermode='x unified'
)

# Show the plot
fig.show()

In [None]:
# Create traces for training and validation accuracy
fig_accuracy = go.Figure()

fig_accuracy.add_trace(go.Scatter(
    x=list(range(1, len(history.history['accuracy']) + 1)),
    y=history.history['accuracy'],
    mode='lines+markers',
    name='Training Accuracy'
))

fig_accuracy.add_trace(go.Scatter(
    x=list(range(1, len(history.history['val_accuracy']) + 1)),
    y=history.history['val_accuracy'],
    mode='lines+markers',
    name='Validation Accuracy'
))

# Add titles and labels for accuracy plot
fig_accuracy.update_layout(
    title='Training and Validation Accuracy',
    xaxis_title='Epochs',
    yaxis_title='Accuracy',
    legend=dict(x=0, y=1),
    hovermode='x unified'
)

# Show the accuracy plot
fig_accuracy.show()

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Create subplots: 1 row, 2 columns
fig = make_subplots(rows=1, cols=2, subplot_titles=('Loss Evolution', 'Accuracy Evolution'))

# Add traces for training and validation loss (left subplot)
fig.add_trace(go.Scatter(
    x=list(range(1, len(history.history['loss']) + 1)),
    y=history.history['loss'],
    mode='lines+markers',
    name='Train Loss'
), row=1, col=1)

fig.add_trace(go.Scatter(
    x=list(range(1, len(history.history['val_loss']) + 1)),
    y=history.history['val_loss'],
    mode='lines+markers',
    name='Val Loss'
), row=1, col=1)

# Add traces for training and validation accuracy (right subplot)
fig.add_trace(go.Scatter(
    x=list(range(1, len(history.history['accuracy']) + 1)),
    y=history.history['accuracy'],
    mode='lines+markers',
    name='Train Acc'
), row=1, col=2)

fig.add_trace(go.Scatter(
    x=list(range(1, len(history.history['val_accuracy']) + 1)),
    y=history.history['val_accuracy'],
    mode='lines+markers',
    name='Val Acc'
), row=1, col=2)

# Update layout for the entire figure
fig.update_layout(
    width=1000,  # Set the width of the figure
    height=400,
    showlegend=True,
    legend=dict(x=1.05, y=1, orientation='v'),  # Place legend on the right
    hovermode='x unified',
    plot_bgcolor='rgba(0,0,0,0)',  # Remove background
    paper_bgcolor='rgba(0,0,0,0)',  # Remove background
    font=dict(color='black')  # Set font color to black
)

# Update x-axis and y-axis titles for each subplot
fig.update_xaxes(title_text='Epochs', row=1, col=1, title_font=dict(color='black'), tickfont=dict(color='black'))
fig.update_yaxes(title_text='Loss', row=1, col=1, title_font=dict(color='black'), tickfont=dict(color='black'))
fig.update_xaxes(title_text='Epochs', row=1, col=2, title_font=dict(color='black'), tickfont=dict(color='black'))
fig.update_yaxes(title_text='Accuracy', row=1, col=2, title_font=dict(color='black'), tickfont=dict(color='black'))

# Show the plot
fig.show()

In [None]:


# Calculate the ROC curve
fpr, tpr, thresholds = roc_curve(y_val, y_pred_prob)
roc_auc = auc(fpr, tpr)

# Create the ROC curve plot
fig_roc = go.Figure()

fig_roc.add_trace(go.Scatter(
    x=fpr,
    y=tpr,
    mode='lines',
    name=f'ROC curve (area = {roc_auc:.2f})',
    line=dict(color='red', width=2)
))

# Add a diagonal line representing a random classifier
fig_roc.add_trace(go.Scatter(
    x=[0, 1],
    y=[0, 1],
    mode='lines',
    name='Random Classifier',
    line=dict(color='gray', width=2, dash='dash')
))

# Update layout for the ROC curve plot
fig_roc.update_layout(
    title='Receiver Operating Characteristic (ROC) Curve',
    xaxis_title='False Positive Rate',
    yaxis_title='True Positive Rate',
    width=1000,  # Set the width of the figure
    height=400,  # Set the height of the figure
    showlegend=True,
    legend=dict(x=1.05, y=1, orientation='v'),  # Place legend on the right
    hovermode='x unified',
    plot_bgcolor='rgba(0,0,0,0)',  # Remove background
    paper_bgcolor='rgba(0,0,0,0)',  # Remove background
    font=dict(color='black')  # Set font color to black
)

# Show the ROC curve plot
fig_roc.show()

In [None]:
# Generate predictions on the validation set
y_val_pred = (model.predict(X_val_pad) > 0.5).astype("int32")

# Print confusion matrix and classification report
conf_matrix = confusion_matrix(y_val, y_val_pred)
class_report = classification_report(y_val, y_val_pred)

print("Confusion Matrix:")
print(conf_matrix)
print("\nClassification Report:")
print(class_report)

In [None]:
conf_matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Create a heatmap
plt.figure(figsize=(4, 2))

# Create the heatmap with no color bar and smaller annotation size
ax = sns.heatmap(conf_matrix, annot=True,
                 fmt='d',
                 cmap='viridis', 
                 xticklabels=x_labels,
                 yticklabels=y_labels,
                 cbar=False, 
                 annot_kws={"size": 8})

# Customize the font size of the tick labels
ax.tick_params(axis='x', labelsize=8)
ax.tick_params(axis='y', labelsize=8)

# Add labels
plt.xlabel('Predicted', size = 10, labelpad=10)
plt.ylabel('Actual', size=8, labelpad=8)

# Move the x-axis label to the top
plt.gca().xaxis.set_label_position('top')
plt.gca().xaxis.tick_top()

# Show the plot
plt.show()

In [28]:
# Clear the TensorFlow Keras session
tf.keras.backend.clear_session()