**1. Ingest Bright Amazon Datasets**

In [0]:
# Load data 
products_df = spark.read.table("bright_data_amazon_best_seller_products_reviews_products_dataset.datasets.amazon_best_seller_products")
reviews_df = spark.read.table("bright_data_amazon_best_seller_products_reviews_products_dataset.datasets.amazon_reviews")
inventory_df = spark.read.table("bright_data_amazon_best_seller_products_reviews_products_dataset.datasets.amazon_products")

**2. Data Exploration & Profiling**

In [0]:
# Display basic stats and Quick Profile
display(products_df)
dbutils.data.summarize(products_df)

**3. Comparative Brand Analysis**

In [0]:
# Aggregate brand performance
from pyspark.sql.functions import avg, count, col
from pyspark.sql.functions import expr

brand_summary = products_df.groupBy("brand").agg(
    avg(col("final_price")).alias("avg_price"),
    count(col("title")).alias("num_products"),
    avg(
        expr("try_cast(root_bs_rank as double)")
    ).alias("avg_ranking")
)

display(brand_summary.orderBy("avg_ranking"))

In [0]:
# Drop rows with any null values in products_df
products_df_clean = products_df.na.drop()
display(products_df_clean)

In [0]:
# Split 'categories' column into multiple columns: category_1, category_2, etc.
from pyspark.sql.functions import split, expr

max_categories = 5

split_cols = [
    expr(f"try_element_at(split(categories, ','), {i+1})").alias(f"category_{i+1}")
    for i in range(max_categories)
]

products_with_categories = products_df.select("*", *split_cols)

display(products_with_categories)

In [0]:
# Drop null values from Category 5 column
products_with_categories_no_null = products_with_categories.na.drop(subset=["category_5"])
display(products_with_categories_no_null)

In [0]:
%sql
-- Compare top brands in a category using category_1 to category_5 columns
SELECT
  brand,
  COUNT(title) AS products,
  AVG(final_price) AS avg_price,
  AVG(TRY_CAST(root_bs_rank AS DOUBLE)) AS avg_ranking
FROM (
  SELECT *,
    TRY_ELEMENT_AT(SPLIT(categories, ','), 1) AS category_1,
    TRY_ELEMENT_AT(SPLIT(categories, ','), 2) AS category_2,
    TRY_ELEMENT_AT(SPLIT(categories, ','), 3) AS category_3,
    TRY_ELEMENT_AT(SPLIT(categories, ','), 4) AS category_4,
    TRY_ELEMENT_AT(SPLIT(categories, ','), 5) AS category_5
  FROM bright_data_amazon_best_seller_products_reviews_products_dataset.datasets.amazon_best_seller_products
)
WHERE category_1 = 'Electronics'
   OR category_2 = 'Electronics'
   OR category_3 = 'Electronics'
   OR category_4 = 'Electronics'
   OR category_5 = 'Electronics'
GROUP BY brand
ORDER BY avg_ranking ASC
LIMIT 10

In [0]:
#Drop null from Average ranking column
brand_summary_no_null = brand_summary.na.drop(subset=["avg_ranking"])
display(brand_summary_no_null.orderBy("avg_ranking"))

In [0]:
# Extract stock count from Availability column
from pyspark.sql.functions import when, regexp_extract, col

availability_count = (
    when(col("availability") == "In Stock", 1)
    .when(
        col("availability").rlike(r"Only \d+ left"),
        regexp_extract(col("availability"), r"Only (\d+) left", 1).cast("int")
    )
    .otherwise(0)
)

inventory_df = inventory_df.withColumn(
    "availability_count",
    availability_count
)

display(inventory_df.select("availability", "availability_count"))

**4. Price & Inventory Dynamics Over Time**

In [0]:
# Track price/inventory changes over time
inv_trends = (
    inventory_df.withColumn(
        "date_first_available",
        to_date("timestamp")
    )
    .groupBy(
        "brand",
        "date_first_available"
    )
    .agg(
        avg("final_price").alias("avg_price"),
        avg("availability_count").alias("avg_stock")  # Replace 'stock' with your actual inventory column name
    )
)
display(inv_trends)


**5. Review Sentiment Analytics**

In [0]:
# Simple sentiment based on review rating
from pyspark.sql.functions import avg, count

sentiment_summary = reviews_df.groupBy("brand") \
    .agg(
        avg("rating").alias("avg_rating"),
        count("*").alias("num_reviews")
    )
display(sentiment_summary)

**6. Visualize Competitive Positioning**

In [0]:
# Plot competitive scatter of avg price vs avg review rating
# Aggregate average price and product count per brand
brand_summary = (
    products_df.groupBy("brand")
    .agg(
        avg("final_price").alias("avg_price"),
        count("*").alias("num_products")
    )
)

# Join with sentiment_summary and display as scatter plot
brand_report = brand_summary.join(sentiment_summary, "brand")

import seaborn as sns
import matplotlib.pyplot as plt

brand_pd = brand_report.select(
    "brand",
    "avg_price",
    "avg_rating",
    "num_products"
).toPandas()

plt.figure(figsize=(10, 6))
scatter = sns.scatterplot(
    data=brand_pd,
    x="avg_price",
    y="avg_rating",
    size="num_products",
    hue="num_products",
    palette="viridis",
    legend="brief",
    sizes=(20, 200)
)
plt.xlabel("Average Price")
plt.ylabel("Average Review Rating")
plt.title("Brand Competitive Positioning: Avg Price vs Avg Review Rating")
plt.legend(title="Num Products", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
display(plt.gcf())
plt.close()