In [None]:
# %pip install numpy pandas matplotlib scikit-learn

# imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

# load data set
products = pd.read_csv('products.csv', low_memory=False)

# exploratory analysis

# ensure price column is numeric
products['price_usd'] = pd.to_numeric(products['price_usd'], errors='coerce')

# plot distribution of prices
plt.figure(figsize=(5, 3))
products['price_usd'].dropna().hist(bins=50)
plt.title('Distribution of Prices')
plt.xlabel('Price (USD)')
plt.ylabel('Frequency')
plt.show()

# plot the distribution of discounts
plt.figure(figsize=(5, 3))
products['pct_discount'] = pd.to_numeric(products['pct_discount'], errors='coerce')
products['pct_discount'].dropna().hist(bins=50)
plt.title('Distribution of Discounts')
plt.xlabel('Discount Percentage')
plt.ylabel('Frequency')
plt.show()

# plot the distribution of quantity sold
plt.figure(figsize=(5, 3))
products['qty_sold'] = pd.to_numeric(products['qty_sold'], errors='coerce')
products['qty_sold'].dropna().hist(bins=50)
plt.title('Distribution of Quantity Sold')
plt.xlabel('Quantity Sold')
plt.ylabel('Frequency')
plt.show()

# category exploration

unique_categories = products['category_name'].unique()

print("Product Categories:")
for category in unique_categories:
    print("-", category)

print("\nNumber of Categories:", unique_categories.size)

# best seller extraction

# some datasets store bestseller tags in "rank" column
bestsellers = products[products['rank'].astype(str).str.contains('Best Sellers', na=False)]

# plot count of bestsellers in each category
plt.figure(figsize=(10, 6))
bestseller_counts = bestsellers['category_name'].value_counts()
bestseller_counts.plot(kind='bar')
plt.title('Count of Best Sellers in Each Category')
plt.xlabel('Category')
plt.ylabel('Count')
plt.show()

# display the bestsellers dataframe
display(bestsellers.head())

print("\nNumber of Bestsellers:", bestsellers.shape[0])

# #1 best sellers analysis

# filter for #1 best sellers
number_one_bestsellers = bestsellers[bestsellers['rank'] == '#1 Best Sellers']

# plot the count of #1 bestsellers in each category
plt.figure(figsize=(10, 6))
number_one_bestseller_counts = number_one_bestsellers['category_name'].value_counts()
number_one_bestseller_counts.plot(kind='bar')
plt.title('Count of #1 Best Sellers in Each Category')
plt.xlabel('Category')
plt.ylabel('Count')
plt.show()

# print the number of #1 best sellers
print(f"Number of #1 Best Sellers: {number_one_bestsellers.shape[0]}")

# avp price and discount for best sellers

# calculate the average price of best sellers in each category
average_prices = (
    bestsellers.groupby('category_name')['price_usd']
    .mean()
    .sort_values(ascending=False)
)

# calculate the average discount percentage of best sellers in each category
average_discounts = (
    bestsellers.groupby('category_name')['pct_discount']
    .mean()
    .sort_values(ascending=False)
)

# plot the average discount percentages
plt.figure(figsize=(10, 6))
average_discounts.plot(kind='bar')
plt.title('Average Discount Percentages of Best Sellers in Each Category')
plt.xlabel('Category')
plt.ylabel('Average Discount Percentage')
plt.show()

# Plot the average prices
plt.figure(figsize=(10, 6))
average_prices.plot(kind='bar')
plt.title('Average Prices of Best Sellers in Each Category')
plt.xlabel('Category')
plt.ylabel('Average Price (USD)')
plt.show()


# data cleaning

columns_to_drop = [
    'Unnamed: 0',
    'color_count',
    'img_source_url',
    'black_friday_off_usd'
]

products.drop(columns=columns_to_drop, errors='ignore', inplace=True)

# ensure unified data types
products['price_usd'] = pd.to_numeric(products['price_usd'], errors='coerce')
products['pct_discount'] = pd.to_numeric(products['pct_discount'], errors='coerce')
products['qty_sold'] = pd.to_numeric(products['qty_sold'], errors='coerce')

products['rank'] = products['rank'].astype(str)
products['rank_category'] = products['rank_category'].astype(str)
products['product_title'] = products['product_title'].astype(str)
products['category_name'] = products['category_name'].astype(str)

# display updated dataframe
display(products.head())


# train / validation / test split function

from sklearn.model_selection import train_test_split
from typing import Tuple

def get_train_vld_test(
    df: pd.DataFrame,
    test_size: float = 0.2,
    vld_size: float = 0.1,
    random_state: int = 42
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Split the dataframe into training, validation, and test sets.
    """

    train_df, test_df = train_test_split(
        df, test_size=test_size, random_state=random_state
    )

    train_df, vld_df = train_test_split(
        train_df, test_size=vld_size, random_state=random_state
    )

    return train_df, vld_df, test_df
# imports: preprocessing, models, evaluation

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import learning_curve

# bestseller target function
def is_bestseller(rank_value):
    """Return 1 if 'rank' indicates a bestseller, else 0."""
    if pd.isna(rank_value):
        return 0
    if 'Best Sellers' in str(rank_value):
        return 1
    return 0

# train / validation / test split
train_df, vld_df, test_df = get_train_vld_test(products)

# define feature sets
numeric_features = ['price_usd', 'pct_discount']
categorical_features = ['category_name']


# preprocessing function
def preprocess_data(
    train_df: pd.DataFrame,
    vld_df: pd.DataFrame,
    test_df: pd.DataFrame,
    numeric_features: List[str],
    categorical_features: List[str]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, ColumnTransformer]:
    """Preprocess the data for training, validation, and testing."""

    # ----- create target variable -----
    for df in [train_df, vld_df, test_df]:
        df['target'] = df['rank'].apply(is_bestseller)

    # feature names
    FEATURES = numeric_features + categorical_features
    TARGET = 'target'

    # input / Output splits
    X_train = train_df[FEATURES].copy()
    y_train = train_df[TARGET].copy()

    X_vld = vld_df[FEATURES].copy()
    y_vld = vld_df[TARGET].copy()

    X_test = test_df[FEATURES].copy()
    y_test = test_df[TARGET].copy()

    # ----- pipelines -----
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # combine transformers
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )

    # fill missing numeric values in validation/test using *training medians*
    for col in numeric_features:
        median_val = X_train[col].median()
        X_vld[col] = X_vld[col].fillna(median_val)
        X_test[col] = X_test[col].fillna(median_val)

    return X_train, y_train, X_vld, y_vld, X_test, y_test, preprocessor

# learning curve plot function

def plot_learning_curve(model, X, y, title):
    train_sizes, train_scores, test_scores = learning_curve(
        model,
        X,
        y,
        cv=5,
        scoring='accuracy',
        n_jobs=-1,
        train_sizes=np.linspace(0.1, 1.0, 10)
    )

    train_scores_mean = np.mean(train_scores, axis=1)
    train_scores_std = np.std(train_scores, axis=1)
    test_scores_mean = np.mean(test_scores, axis=1)
    test_scores_std = np.std(test_scores, axis=1)

    plt.figure(figsize=(8, 6))
    plt.title(title)
    plt.xlabel("Training examples")
    plt.ylabel("Accuracy Score")
    plt.grid(True)

    # training curve
    plt.fill_between(
        train_sizes,
        train_scores_mean - train_scores_std,
        train_scores_mean + train_scores_std,
        alpha=0.1,
        color="r"
    )
    plt.plot(
        train_sizes,
        train_scores_mean,
        'o-',
        color="r",
        label="Training score"
    )

    # validation curve
    plt.fill_between(
        train_sizes,
        test_scores_mean - test_scores_std,
        test_scores_mean + test_scores_std,
        alpha=0.1,
        color="g"
    )
    plt.plot(
        train_sizes,
        test_scores_mean,
        'o-',
        color="g",
        label="Cross-validation score"
    )

    plt.legend(loc="best")
    plt.show()


# preprocess data
X_train, y_train, X_vld, y_vld, X_test, y_test, preprocessor = preprocess_data(
    train_df, vld_df, test_df,
    numeric_features,
    categorical_features
)


# logistic regression model
lr_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=128, random_state=42))
])

# fit the model
lr_pipeline.fit(X_train, y_train)

# evaluate training accuracy
train_accuracy = lr_pipeline.score(X_train, y_train)
print(f"Logistic Regression\nTraining Accuracy: {train_accuracy:.4f}")

# plot learning curve
plot_learning_curve(lr_pipeline, X_train, y_train, "Learning Curve (Logistic Regression)")


# random forest model
rf_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=128, random_state=42))
])

# fit the model
rf_pipeline.fit(X_train, y_train)

# Evaluate training accuracy
train_accuracy_rf = rf_pipeline.score(X_train, y_train)
print(f"\nRandom Forest\nTraining Accuracy: {train_accuracy_rf:.4f}")

# plot learning curve
plot_learning_curve(rf_pipeline, X_train, y_train, "Learning Curve (Random Forest)")

# decision boundary plot eval

def plot_decision_boundary(X, y, pipeline, title, numeric_features=['price_usd', 'pct_discount']):
    """
    Plot the decision boundary for the first two numeric features in original scale.
    Assumes:
    - 'pipeline' is a trained Pipeline with a 'preprocessor' (ColumnTransformer) and 'classifier'.
    - 'numeric_features' are the two numeric feature names used for plotting.
    """

    # check for NaNs in X
    if X[numeric_features].isna().any().any():
        raise ValueError("NaNs detected in the numeric features. Please impute or remove NaNs before plotting.")

    # convert X, y to numpy arrays (only numeric features for plotting)
    X_vals = np.array(X[numeric_features])
    y_vals = np.array(y)

    # mesh grid settings
    h = 0.05
    x_min, x_max = X_vals[:, 0].min(), X_vals[:, 0].max()
    y_min, y_max = X_vals[:, 1].min(), X_vals[:, 1].max()

    # catch bad ranges
    if any(map(np.isnan, [x_min, x_max, y_min, y_max])):
        raise ValueError("Invalid range due to NaNs.")
    if any(map(np.isinf, [x_min, x_max, y_min, y_max])):
        raise ValueError("Invalid range due to infinite values.")

    xx, yy = np.meshgrid(
        np.arange(x_min, x_max, h),
        np.arange(y_min, y_max, h)
    )

    # extract transformer + classifier
    preprocessor = pipeline.named_steps['preprocessor']
    classifier = pipeline.named_steps['classifier']

    # build grid dataframe in original scale
    grid_original = np.c_[xx.ravel(), yy.ravel()]
    grid_df = pd.DataFrame(grid_original, columns=numeric_features)

    # fill category with a default category seen in training
    default_cat = X_train['category_name'].mode()[0]
    grid_df['category_name'] = default_cat

    # transform grid
    grid_transformed = preprocessor.transform(grid_df)

    # predict
    Z = classifier.predict(grid_transformed)
    Z = Z.reshape(xx.shape)

    # plot
    plt.figure(figsize=(10, 6))
    plt.contourf(xx, yy, Z, alpha=0.8, cmap=plt.cm.coolwarm)
    plt.scatter(X_vals[:, 0], X_vals[:, 1], c=y_vals, edgecolors='k',
                s=20, cmap=plt.cm.coolwarm)
    plt.title(title)
    plt.xlabel(numeric_features[0])
    plt.ylabel(numeric_features[1])
    plt.show()


# logistic regression validation eval

y_vld_pred_lr = lr_pipeline.predict(X_vld)

print("Logistic Regression Validation Accuracy:",
      accuracy_score(y_vld, y_vld_pred_lr))

print("\nValidation Set Classification Report (Logistic Regression):")
print(classification_report(y_vld, y_vld_pred_lr, zero_division=0))


# logistic regression test eval

y_test_pred_lr = lr_pipeline.predict(X_test)

print("Logistic Regression Testing Accuracy:",
      accuracy_score(y_test, y_test_pred_lr))

print("\nTesting Set Classification Report (Logistic Regression):")
print(classification_report(y_test, y_test_pred_lr, zero_division=0))

# random forest validation test eval

y_vld_pred_rf = rf_pipeline.predict(X_vld)

print("Random Forest Validation Accuracy:",
      accuracy_score(y_vld, y_vld_pred_rf))

print("\nValidation Set Classification Report (Random Forest):")
print(classification_report(y_vld, y_vld_pred_rf, zero_division=0))


# random forest test eval

y_test_pred_rf = rf_pipeline.predict(X_test)

print("Random Forest Testing Accuracy:",
      accuracy_score(y_test, y_test_pred_rf))

print("\nTesting Set Classification Report (Random Forest):")
print(classification_report(y_test, y_test_pred_rf, zero_division=0))


# plot decision boundaries

# logistic regression
plot_decision_boundary(X_vld, y_vld, lr_pipeline,
                       "Logistic Regression Decision Boundary - Validation Set")

plot_decision_boundary(X_test, y_test, lr_pipeline,
                       "Logistic Regression Decision Boundary - Test Set")

# random forest
plot_decision_boundary(X_vld, y_vld, rf_pipeline,
                       "Random Forest Decision Boundary - Validation Set")

plot_decision_boundary(X_test, y_test, rf_pipeline,
                       "Random Forest Decision Boundary - Test Set")


print("All models evaluated. Visualizations displayed. End of pipeline.")

