In [None]:
# Install necessary libraries
!pip install transformers torch scikit-learn datasets matplotlib seaborn wordcloud sentence-transformers

# Import libraries
import pandas as pd
import numpy as np
import string
import random
import torch
import matplotlib.pyplot as plt
import seaborn as sns

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    DistilBertTokenizerFast,
    DistilBertModel
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    silhouette_score
)
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS

# Set random seeds for reproducibility
def set_seed(seed: int = 42) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed()

# Define the data path
data_path = '/notebooks/1429_1.csv'  # Update this path as necessary

# ======================
# A) Sentiment Classification
# ======================

# Load the dataset using pandas
df = pd.read_csv(data_path)

# Function to assign sentiment based on the reviews.rating
def assign_sentiment(rating: int) -> str:
    if rating >= 4:
        return 'positive'
    elif rating == 3:
        return 'neutral'
    else:
        return 'negative'

# Apply the function to create a new sentiment column
df['sentiment'] = df['reviews.rating'].apply(assign_sentiment)

# Display the first few rows to verify the sentiment column
print("Sentiment Classification - First Five Rows:")
print(df[['reviews.rating', 'sentiment']].head())

# Split the data into training and testing sets
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['reviews.text'].astype(str).tolist(),
    df['sentiment'].tolist(),
    test_size=0.2,
    random_state=42
)

# Load the tokenizer and model for sentiment classification
sentiment_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
sentiment_model = AutoModelForSequenceClassification.from_pretrained(
    'distilbert-base-uncased',
    num_labels=3
)

# Encode the dataset for training and testing
train_encodings = sentiment_tokenizer(
    train_texts,
    truncation=True,
    padding=True,
    max_length=512
)
test_encodings = sentiment_tokenizer(
    test_texts,
    truncation=True,
    padding=True,
    max_length=512
)

# Convert labels to integers for training
label_mapping = {'positive': 0, 'neutral': 1, 'negative': 2}
train_labels_int = [label_mapping[label] for label in train_labels]
test_labels_int = [label_mapping[label] for label in test_labels]

# Create a custom dataset class
class ReviewDataset(torch.utils.data.Dataset):
    def __init__(self, encodings: dict, labels: list):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx: int) -> dict:
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self) -> int:
        return len(self.labels)

# Create training and testing datasets
train_dataset = ReviewDataset(train_encodings, train_labels_int)
test_dataset = ReviewDataset(test_encodings, test_labels_int)

# Define compute metrics function
def compute_metrics(eval_pred) -> dict:
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(
        labels,
        predictions,
        average='weighted',
        zero_division=0
    )
    recall = recall_score(
        labels,
        predictions,
        average='weighted',
        zero_division=0
    )
    f1 = f1_score(
        labels,
        predictions,
        average='weighted',
        zero_division=0
    )
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

# Training arguments for sentiment classification
sentiment_training_args = TrainingArguments(
    output_dir='./sentiment_results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=64,
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    logging_dir='./sentiment_logs',
    save_strategy="epoch",
    load_best_model_at_end=True,
    report_to="none",
    logging_steps=10
)

# Initialize the Trainer for sentiment classification
sentiment_trainer = Trainer(
    model=sentiment_model,
    args=sentiment_training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

# Train the sentiment classification model
print("\nRunning sentiment classification training with learning rate: 5e-5")
sentiment_trainer.train()

# Evaluate on the test set
eval_results_test = sentiment_trainer.evaluate()
print("\nSentiment Classification - Test Set Evaluation:")
print(f"Test Accuracy: {eval_results_test['eval_accuracy']:.4f}")
print(f"Test Precision: {eval_results_test['eval_precision']:.4f}")
print(f"Test Recall: {eval_results_test['eval_recall']:.4f}")
print(f"Test F1 Score: {eval_results_test['eval_f1']:.4f}")
print(f"Test Loss: {eval_results_test['eval_loss']:.4f}")

# Evaluate on the training set
eval_results_train = sentiment_trainer.evaluate(train_dataset)
print("\nSentiment Classification - Training Set Evaluation:")
print(f"Train Accuracy: {eval_results_train['eval_accuracy']:.4f}")
print(f"Train Precision: {eval_results_train['eval_precision']:.4f}")
print(f"Train Recall: {eval_results_train['eval_recall']:.4f}")
print(f"Train F1 Score: {eval_results_train['eval_f1']:.4f}")
print(f"Train Loss: {eval_results_train['eval_loss']:.4f}")

# Function to get samples of reviews classified into positive, neutral, and negative
def get_review_samples(
    dataframe: pd.DataFrame,
    sentiment_column: str = 'sentiment',
    text_column: str = 'reviews.text',
    samples_per_category: int = 4
) -> None:
    """
    Extract and print sample reviews for each sentiment category.

    Args:
        dataframe (pd.DataFrame): The DataFrame containing the reviews.
        sentiment_column (str): The column name for sentiment labels.
        text_column (str): The column name for review texts.
        samples_per_category (int): Number of samples to extract per category.
    """
    categories = ['positive', 'neutral', 'negative']
    for category in categories:
        samples = dataframe[dataframe[sentiment_column] == category][text_column].sample(
            n=samples_per_category,
            random_state=42
        )
        print(f"\n{category.capitalize()} Reviews Samples:")
        for i, review in enumerate(samples, 1):
            print(f"{i}. {review}\n")

# Call the function to get samples from the DataFrame
get_review_samples(df)

# ======================
# B) Product Categorization
# ======================

# Drop rows where 'name' is empty or NaN
df = df.dropna(subset=['name'])

# Enhanced Text Preprocessing Function
def preprocess_text_enhanced(text: str, lower: bool = False) -> str:
    """
    Preprocess text by optionally lowercasing, removing stop words and punctuation.

    Args:
        text (str): The text to preprocess.
        lower (bool): Whether to convert text to lowercase.

    Returns:
        str: The preprocessed text.
    """
    if not isinstance(text, str):
        return ''

    if lower:
        text = text.lower()

    # Remove stop words
    words = text.split()
    words = [word for word in words if word not in ENGLISH_STOP_WORDS]
    text = ' '.join(words)

    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = ' '.join(text.split())

    return text

# Apply enhanced preprocessing to the 'name' column
df['clean_name'] = df['name'].apply(lambda x: preprocess_text_enhanced(x, lower=True))

# Generating Embeddings with DistilBERT
tokenizer_bert = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
model_bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
model_bert.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_bert.to(device)

def get_embedding(text: str) -> np.ndarray:
    """
    Generate a DistilBERT embedding for the given text.

    Args:
        text (str): The text to embed.

    Returns:
        np.ndarray: The embedding vector.
    """
    inputs = tokenizer_bert(
        text,
        return_tensors='pt',
        truncation=True,
        padding=True,
        max_length=128
    ).to(device)
    with torch.no_grad():
        outputs = model_bert(**inputs)
    embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()
    return embedding.flatten()

# Generate embeddings for all product names
print("\nGenerating embeddings for product names...")
embeddings = np.array([get_embedding(text) for text in df['clean_name']])

# Feature Scaling
scaler = StandardScaler()
embeddings_scaled = scaler.fit_transform(embeddings)

# Apply PCA to reduce the dimensionality of the embeddings before K-Means
pca = PCA(n_components=50, random_state=42)
embeddings_pca = pca.fit_transform(embeddings_scaled)

# Clustering with K-Means (K=5)
k = 5  # Number of clusters
kmeans = KMeans(n_clusters=k, n_init=20, random_state=42)
kmeans.fit(embeddings_pca)
sil_score = silhouette_score(embeddings_pca, kmeans.labels_)

print(f"\nFinal Results for K={k}:")
print(f"Inertia: {kmeans.inertia_:.4f}, Silhouette Score: {sil_score:.4f}")

# Assign the cluster labels to the DataFrame
df['cluster'] = kmeans.labels_

# Cluster Mapping
cluster_to_category = {
    0: 'Tablets & E-readers',               # Example: Kindle and Fire tablets
    1: 'Smart Speakers',                    # Example: Echo devices
    2: 'Kids Edition Tablets',              # Example: Kids Edition Tablets
    3: 'Entertainment Devices',             # Example: Fire TV
    4: 'Bluetooth & Charging Accessories'    # Example: chargers and Bluetooth devices
}

# Apply the mapping
df['assigned_category'] = df['cluster'].map(cluster_to_category)

# Reorder columns to place 'assigned_category' right after 'name'
columns = list(df.columns)
name_idx = columns.index('name')
columns.insert(name_idx + 1, columns.pop(columns.index('assigned_category')))

# Save the final dataset with reordered columns
output_file = '/notebooks/clustered_product_full_v5.csv'  # Update this path as necessary
df[columns].to_csv(output_file, index=False)

print(f"\nFinal categorized dataset saved to: {output_file}")

# Model Evaluation
print(f"\nSilhouette Score: {sil_score:.4f}")
print(f"Inertia: {kmeans.inertia_:.4f}")

# t-SNE Visualization
tsne = TSNE(n_components=2, random_state=42)
tsne_results = tsne.fit_transform(embeddings_scaled)

# Create a DataFrame for the t-SNE results
tsne_df = pd.DataFrame(tsne_results, columns=['tsne_x', 'tsne_y'])
tsne_df['cluster'] = df['cluster']

# Enhanced Plotting with Seaborn
plt.figure(figsize=(14, 10))
palette = sns.color_palette("husl", k)  # Generate a palette with k distinct colors
sns.scatterplot(
    x='tsne_x',
    y='tsne_y',
    hue='cluster',
    palette=palette,
    data=tsne_df,
    alpha=0.9,
    s=100,          # Marker size
    edgecolor='w'   # Marker edge color
)

# Optionally, add annotations
for i in range(tsne_df.shape[0]):
    plt.text(
        tsne_df['tsne_x'][i],
        tsne_df['tsne_y'][i],
        tsne_df['cluster'][i],
        fontsize=9,
        ha='center'
    )

plt.title('t-SNE Visualization of K-Means Clusters', fontsize=18)
plt.xlabel('t-SNE Dimension 1', fontsize=14)
plt.ylabel('t-SNE Dimension 2', fontsize=14)
plt.legend(title='Cluster', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.tight_layout()
plt.show()

# Sample Products from Each Category for Inspection
samples_per_category = 5  # Number of samples per category

# Create a DataFrame to hold the samples
category_samples = pd.DataFrame()

# Loop through each category and get samples
for category in range(k):
    samples = df[df['cluster'] == category].sample(
        n=min(samples_per_category, len(df[df['cluster'] == category])),
        random_state=42
    )
    category_samples = pd.concat([category_samples, samples])

# Save the samples to a CSV file for inspection
sample_output_file = '/notebooks/category_samples.csv'  # Update this path as necessary
category_samples[['name', 'assigned_category']].to_csv(sample_output_file, index=False)

print(f"\nSamples from each category saved to: {sample_output_file}")
