# importing libraries

In [None]:
import numpy as np
import pandas as pd
import re
import matplotlib.pyplot as plt
import seaborn as sns

# Load dataset

In [None]:
df=pd.read_json("/Users/alenbaby/Downloads/Amazon_Fashion.jsonl" , lines=True) # Load dataset

# Dataset info

In [None]:
print("Shape of dataset:", df.shape)
print("\nColumn Info:")
print(df.info())
print("\nMissing values:")
print(df.isnull().sum())
print("\nSample data:")
print(df.head())


# Data preprocessing

### Remove rows with missing text , remove duplicates , Keep only relevant columns

In [None]:
df = df[[ "text", "rating"]] # Keep only relevant columns
df = df.dropna(subset=["text"]) # Remove rows with missing text
df = df.drop_duplicates(subset=["text", "rating"]) # Remove duplicate reviews





### remove conflict reviews using vader

In [None]:
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk

nltk.download("vader_lexicon")
sia = SentimentIntensityAnalyzer()

def conflict_filter_vader(text, rating):
    score = sia.polarity_scores(text)["compound"]

    # compound score range: -1 (very negative) to +1 (very positive)
    if rating >= 4 and score < -0.2:
        return False
    if rating <= 2 and score > 0.2:
        return False

    return True

df = df[df.apply(lambda x: conflict_filter_vader(x["text"], x["rating"]), axis=1)]

### new Column (Review_Length)

In [None]:
df["Review_Length"] = df["text"].apply(len) # Length of each review


### Remove short and long reviews

In [None]:
df = df[
    (df["Review_Length"] >= 5) & # Minimum length filter
    (df["Review_Length"] <= 300) # Maximum length filter
]

# Display first few rows of cleaned dataset

In [None]:
df.head() # Display first few rows of cleaned dataset

In [None]:
def to_lower(text):
  
    return text.lower() if isinstance(text, str) else text
df["text"] = df["text"].apply(to_lower)

### Expand contractions

In [None]:
import contractions
import pandas as pd

def expand_contractions(text):
    if pd.isna(text):
        return text
    return contractions.fix(text)

### Normalisation and Cleaning (reuse code)

In [None]:
def clean_text(text):
    
    text = re.sub(r"http\S+|www\S+", "", text)          # URLs
    text = re.sub(r"<.*?>", "", text)                   # HTML
    text = re.sub(r"[^a-zA-Z\s]", "", text)             # Emojis, punctuation
    text = re.sub(r"\s+", " ", text).strip()
    return text

# Balanced Dataset

In [None]:
SAMPLES_PER_CLASS = 20000 # Balance classes by sampling

df_balanced = (
    df.groupby("rating", group_keys=False)
      .apply(lambda x: x.sample(
          n=min(len(x), SAMPLES_PER_CLASS),
          random_state=42
      ))
)

print(df_balanced["rating"].value_counts()) # Verify balanced classes

In [None]:
df_balanced.to_csv("balanced_reviews.csv", index=False)

In [None]:
# Display 3â€“10 full sample reviews per class (balanced)

for cls in sorted(df["rating"].unique()):
    samples = df[df["rating"] == cls].sample(n=3, random_state=42)

    print("\n" + "="*60)
    print(f"Rating: {cls}")
    print("="*60)

    for i, review in enumerate(samples["text"], 1):
        print(f"\nReview {i}:\n{review}\n")

### Rating Distribution - barchart 

In [None]:
sns.countplot(x="rating", data=df_balanced)
plt.title("Rating Distribution")
plt.xlabel("Rating")
plt.ylabel("Number of Reviews")
plt.show()


### Review Lenth Distribution - histogram

In [None]:
sns.histplot(x="Review_Length", data=df_balanced)
plt.title("Review Lenth Distribution")
plt.xlabel("Review lenth")
plt.ylabel("frequency")
plt.show()


### Review Length per Rating - boxplot

In [None]:
sns.boxplot(y="Review_Length",x="rating", data=df_balanced)
plt.title("Review Length per Rating")
plt.xlabel("Rating")
plt.ylabel("Review Length")
plt.show()


### Violin Plot of Review Length per Rating - Violin plot

In [None]:
sns.violinplot(x="rating", y="Review_Length", data=df_balanced)
plt.title("Violin Plot of Review Length per Rating")
plt.show()


### stratified train test split

In [None]:
from sklearn.model_selection import train_test_split # Split data into training and test sets

X = df_balanced["text"] # Use original text for modeling
y = df_balanced["rating"] # Target variable

X_train, X_test, y_train, y_test = train_test_split(X, y,test_size=0.2,stratify=y,shuffle=True,random_state=42) # Split data into training and test sets

In [None]:
print(y_train.value_counts(normalize=True)) # Check class distribution in training set
print(y_test.value_counts(normalize=True)) # Check class distribution in test set

### Normalisation and Cleaning

In [None]:
X_train_clean = X_train.apply(to_lower) # Clean training data
X_test_clean = X_test.apply(to_lower) 
X_train_clean = X_train_clean.apply(expand_contractions) # Clean training data
X_test_clean = X_test_clean.apply(expand_contractions) 
X_train_clean = X_train_clean.apply(clean_text) # Clean training data
X_test_clean = X_test_clean.apply(clean_text)

###  tokenize + pad function

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

#  Fit tokenizer on TRAIN data only
tokenizer = Tokenizer(oov_token="<OOV>")
tokenizer.fit_on_texts(X_train_clean)

#  Reusable tokenize + pad function
def tokenize_and_pad(texts, tokenizer, max_len):
    """
    Tokenizes and pads text using an existing fitted tokenizer.
    """
    sequences = tokenizer.texts_to_sequences(texts)
    return pad_sequences(
        sequences,
        maxlen=max_len,
        padding="post",
        truncating="post"
    )

#  Apply to TRAIN and TEST
MAX_LEN = 200

X_train_pad = tokenize_and_pad(X_train_clean, tokenizer, MAX_LEN)
X_test_pad  = tokenize_and_pad(X_test_clean, tokenizer, MAX_LEN)

In [None]:
VOCAB_SIZE = len(tokenizer.word_index) + 1
print("Vocabulary size:", VOCAB_SIZE)

### Load GloVe embeddings

In [None]:
import numpy as np

def load_glove_embeddings(glove_path):
    embeddings_index = {}
    with open(glove_path, encoding="utf8") as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype="float32")
            embeddings_index[word] = vector
    return embeddings_index

In [None]:
glove_path = "/Users/alenbaby/Downloads/glove.6B.100d.txt"
embeddings_index = load_glove_embeddings(glove_path)

### Create embedding matrix

In [None]:
def create_embedding_matrix(tokenizer, embeddings_index, embedding_dim):
    vocab_size = len(tokenizer.word_index) + 1
    embedding_matrix = np.zeros((vocab_size, embedding_dim))

    for word, i in tokenizer.word_index.items():
        if i < vocab_size:
            vector = embeddings_index.get(word)
            if vector is not None:
                embedding_matrix[i] = vector
    return embedding_matrix

In [None]:
EMBEDDING_DIM = 100
embedding_matrix = create_embedding_matrix(
    tokenizer,
    embeddings_index,
    EMBEDDING_DIM
)

### Use GloVe in the Embedding layer

In [None]:
from tensorflow.keras.layers import Embedding

embedding_layer = Embedding(
    input_dim=embedding_matrix.shape[0],
    output_dim=embedding_matrix.shape[1],
    weights=[embedding_matrix],
    input_length=MAX_LEN,
    trainable=False   # keep pretrained semantics
)

# `.

## CNN

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D, Dense, Dropout

def build_cnn_model():
    model = Sequential([
        embedding_layer,                     # trainable=False
        Conv1D(filters=64, kernel_size=5, activation="relu"),
        GlobalMaxPooling1D(),
        Dropout(0.5),
        Dense(64, activation="relu"),
        Dense(5, activation="softmax")
    ])
    
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

In [None]:
# Convert labels from 1â€“5 â†’ 0â€“4
y_train_enc = y_train - 1
y_test_enc  = y_test - 1

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stop = EarlyStopping(
    monitor="val_loss",
    patience=2,
    restore_best_weights=True
)

cnn_model = build_cnn_model()
cnn_model.summary()

cnn_model.fit(
    X_train_pad,
    y_train_enc,
    validation_data=(X_test_pad, y_test_enc),
    epochs=10,
    batch_size=64,
    callbacks=[early_stop]
)

In [None]:
import keras_tuner as kt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense, Dropout
from tensorflow.keras.optimizers import Adam

In [None]:
def build_cnn_tuned_model(hp):
    model = Sequential()

    # ðŸ”¹ Tunable embedding dimension
    embedding_dim = hp.Choice("embedding_dim", values=[50, 100, 200])

    model.add(
        Embedding(
            input_dim=embedding_matrix.shape[0],
            output_dim=embedding_dim,
            weights=[embedding_matrix[:, :embedding_dim]],
            input_length=MAX_LEN,
            trainable=False
        )
    )

    # ðŸ”¹ CNN layer
    model.add(Conv1D(filters=64, kernel_size=5, activation="relu"))
    model.add(GlobalMaxPooling1D())

    # ðŸ”¹ Tunable dropout
    dropout_rate = hp.Float("dropout", min_value=0.3, max_value=0.6, step=0.1)
    model.add(Dropout(dropout_rate))

    model.add(Dense(64, activation="relu"))
    model.add(Dense(5, activation="softmax"))

    # ðŸ”¹ Tunable learning rate
    learning_rate = hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4])

    model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    return model

In [None]:
tuner = kt.RandomSearch(
    build_cnn_tuned_model,
    objective="val_accuracy",
    max_trials=10,          # number of configurations tested
    executions_per_trial=1,
    directory="tuning",
    project_name="cnn_glove_tuning"
)

In [None]:
tuner.search(
    X_train_pad,
    y_train_enc,
    validation_data=(X_test_pad, y_test_enc),
    epochs=5,
    batch_size=64
)

In [None]:
best_model = tuner.get_best_models(num_models=1)[0]
best_model.summary()

In [None]:
history = best_model.fit(
    X_train_pad,
    y_train_enc,
    validation_data=(X_test_pad, y_test_enc),
    epochs=10,
    batch_size=64
)

##  Bi-Directional LSTM


In [None]:
from tensorflow.keras.layers import LSTM, Bidirectional

def build_bilstm_model():
    model = Sequential([
        embedding_layer,
        Bidirectional(LSTM(128)),
        Dropout(0.5),
        Dense(64, activation="relu"),
        Dense(5, activation="softmax")
    ])
    
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

In [None]:
bilstm_model = build_bilstm_model()
bilstm_model.summary()

bilstm_model.fit(
    X_train_pad,
    y_train_enc,
    validation_data=(X_test_pad, y_test_enc),
    epochs=10,
    batch_size=64
)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import keras_tuner as kt

In [None]:
MAX_VOCAB_SIZE = len(tokenizer.word_index) + 1
MAX_LEN = 200   # MUST match padding length

def build_bilstm_model(hp):
    model = Sequential()

    # Embedding Layer
    model.add(
        Embedding(
            input_dim=MAX_VOCAB_SIZE,
            output_dim=hp.Choice("embedding_dim", [64, 128, 256]),
            input_length=MAX_LEN
        )
    )

    # Bidirectional LSTM
    model.add(
        Bidirectional(
            LSTM(
                units=hp.Choice("lstm_units", [64, 128]),
                return_sequences=False
            )
        )
    )

    # Dropout
    model.add(
        Dropout(
            hp.Float("dropout", min_value=0.2, max_value=0.5, step=0.1)
        )
    )

    # Output Layer (5-class)
    model.add(Dense(5, activation="softmax"))

    # Compile INSIDE the function
    model.compile(
        optimizer=Adam(
            learning_rate=hp.Choice("learning_rate", [1e-4, 3e-4, 1e-3])
        ),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    return model

In [None]:
tuner = kt.RandomSearch(
    build_bilstm_model,
    objective="val_accuracy",
    max_trials=10,
    directory="bilstm_tuning",
    project_name="pretokenized_input"
)

In [None]:
tuner.search(
    X_train_pad,
    y_train_enc,
    validation_split=0.2,
    epochs=5,
    batch_size=128,
    verbose=1
)

In [None]:
best_model = tuner.get_best_models(num_models=1)[0]

history = best_model.fit(
    X_train_pad,
    y_train,
    validation_split=0.2,
    epochs=10,
    batch_size=128
)

In [None]:
loss, accuracy = best_model.evaluate(X_test_pad, y_test)
print(f"Test Accuracy: {accuracy:.4f}")

##  Bi-Directional GRU


In [None]:
from tensorflow.keras.layers import GRU

def build_bigru_model():
    model = Sequential([
        embedding_layer,
        Bidirectional(GRU(128)),
        Dropout(0.5),
        Dense(64, activation="relu"),
        Dense(5, activation="softmax")
    ])
    
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

In [None]:
bigru_model = build_bigru_model()
bigru_model.summary()

bigru_model.fit(
    X_train_pad,
    y_train_enc,
    validation_data=(X_test_pad, y_test_enc),
    epochs=10,
    batch_size=64
)

In [None]:
cnn_acc    = cnn_model.evaluate(X_test_pad, y_test_enc)[1]
bilstm_acc = bilstm_model.evaluate(X_test_pad, y_test_enc)[1]
bigru_acc  = bigru_model.evaluate(X_test_pad, y_test_enc)[1]

print("CNN Accuracy:", cnn_acc)
print("BiLSTM Accuracy:", bilstm_acc)
print("BiGRU Accuracy:", bigru_acc)