In [3]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/.part-00177-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet.crc
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/.part-00062-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet.crc
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/.part-00183-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet.crc
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/part-00107-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/.part-00087-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet.crc
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/part-00151-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/.part-00130-4dd3417b-16db-42f6-8de9-3f12aedebd25-c000.snappy.parquet.crc
/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet/.part-00154-4dd3417b-16db-42f6-8de9-3f12aedebd25-c0

In [4]:
# Cell 1: Imports & Spark setup
from pyspark import SparkContext
from pyspark.sql import SQLContext, Window
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Spark
sc     = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)




In [5]:
# Cell 2: Load raw data and split chronologically
input_path = "/kaggle/input/df-ecom-part-parquet/df_ecom_part_parquet"
cutoff_day = 23

df = sqlCtx.read.parquet(input_path) \
    .withColumn("date", F.to_date("event_time"))

train_raw = df.filter(F.dayofmonth("date") <= cutoff_day)
test_raw  = df.filter(F.dayofmonth("date")  > cutoff_day)


In [6]:
# Cell 3: Create labels on training data (best 2-hr bin per brand-day)
# 3a) Bucket into 2-hr bins
train_binned = train_raw.withColumn("hour_bin", (F.hour("event_time")/2).cast("int")*2)

# 3b) Compute per-brand-date-bin minimum price
train_min = (
    train_binned
    .groupBy("brand", "date", "hour_bin")
    .agg(F.min("price").alias("bin_min_price"))
)

# 3c) Pick the overall minimum bin per brand-date
win = Window.partitionBy("brand","date") \
            .orderBy(F.col("bin_min_price").asc(), F.col("hour_bin").asc())

train_label = (
    train_min
    .withColumn("rn", F.row_number().over(win))
    .filter("rn = 1")
    .select("brand", "date", F.col("hour_bin").alias("label_bin"))
)


In [7]:
# Cell 4 (updated): Feature engineering on training data
# 4a) Brand → index + one-hot (keep unseen)
brand_indexer = StringIndexer(
    inputCol="brand", outputCol="brand_idx", handleInvalid="keep"
)
brand_encoder = OneHotEncoder(
    inputCol="brand_idx", outputCol="brand_vec", handleInvalid="keep"
)

# 4b) Category → index + one-hot (keep unseen)
cat_indexer = StringIndexer(
    inputCol="category_code", outputCol="cat_idx", handleInvalid="keep"
)
cat_encoder = OneHotEncoder(
    inputCol="cat_idx", outputCol="cat_vec", handleInvalid="keep"
)

# 4c) Day-of-week cyclical
train_features = (
    train_raw
      .select("brand","category_code","date")
      .distinct()
      .withColumn("dow", F.dayofweek("date"))   # 1=Sun…7=Sat
      .withColumn("dow_sin", F.sin(2 * 3.14159 * (F.col("dow")/7)))
      .withColumn("dow_cos", F.cos(2 * 3.14159 * (F.col("dow")/7)))
)


In [10]:
# Cell 5 (updated): Assemble training dataset and train model
from pyspark.ml.feature import VectorAssembler

# 5a) Join features + labels
train_ds = train_features.join(train_label, on=["brand","date"])

# 5b) Vector assembler now includes category vector
assembler = VectorAssembler(
    inputCols=["brand_vec", "cat_vec", "dow_sin", "dow_cos"],
    outputCol="features"
)

# 5c) Multinomial Logistic Regression (same as before)
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label_bin",
    family="multinomial",
    maxIter=50
)

# 5d) Pipeline now includes the category indexer & encoder
pipeline = Pipeline(stages=[
    brand_indexer,
    brand_encoder,
    cat_indexer,
    cat_encoder,
    assembler,
    lr
])

# 5e) Fit the model
model = pipeline.fit(train_ds)


                                                                                                    

In [11]:
# Cell 6: Prepare test dataset (same feature logic + labels for eval)
from pyspark.sql import Window

# 6a) Features (including category_code)
test_features = (
    test_raw
      .select("brand", "category_code", "date")    # <- added category_code
      .distinct()
      .withColumn("dow",     F.dayofweek("date"))
      .withColumn("dow_sin", F.sin(2 * 3.14159 * (F.col("dow")/7)))
      .withColumn("dow_cos", F.cos(2 * 3.14159 * (F.col("dow")/7)))
)

# 6b) Labels (unchanged)
test_binned = test_raw.withColumn(
    "hour_bin", (F.hour("event_time")/2).cast("int")*2
)
test_min = (
    test_binned
      .groupBy("brand","date","hour_bin")
      .agg(F.min("price").alias("bin_min_price"))
)
test_label = (
    test_min
      .withColumn("rn", F.row_number().over(
          Window.partitionBy("brand","date")
                .orderBy("bin_min_price","hour_bin")
      ))
      .filter("rn = 1")
      .select("brand","date", F.col("hour_bin").alias("label_bin"))
)

# 6c) Join features + labels — then when you call model.transform(test_ds),
# the pipeline will index & encode both brand and category_code, assemble features, and predict.
test_ds = test_features.join(test_label, on=["brand","date"])


In [12]:
# Cell 7: Predict & evaluate with category_code included
from pyspark.sql import functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 7a) Run the full pipeline (which now encodes both brand & category_code)
pred = model.transform(test_ds)

# 7b) Top-1 accuracy (exact bin match)
acc_eval = MulticlassClassificationEvaluator(
    labelCol="label_bin",
    predictionCol="prediction",
    metricName="accuracy"
)
top1 = acc_eval.evaluate(pred)

# 7c) Top-2 accuracy (allow ±2-hour neighbor)
pred2 = pred.withColumn(
    "top2_correct",
    F.when(
        (F.col("prediction") == F.col("label_bin")) |
        (F.abs(F.col("prediction") - F.col("label_bin")) == 2),
        1
    ).otherwise(0)
)
top2 = pred2.agg(F.avg("top2_correct")).first()[0]

print(f"Top-1 Accuracy: {top1:.3%}")
print(f"Top-2 Accuracy: {top2:.3%}")

# 7d) Confusion matrix of true vs. predicted bins
pred.groupBy("label_bin", "prediction") \
    .count() \
    .orderBy("label_bin", "prediction") \
    .show(24)


                                                                                                    

Top-1 Accuracy: 20.296%
Top-2 Accuracy: 46.273%


                                                                                                    

+---------+----------+-----+
|label_bin|prediction|count|
+---------+----------+-----+
|        0|       0.0| 1285|
|        0|       2.0| 1051|
|        0|       4.0|  393|
|        0|       6.0|  134|
|        0|       8.0|   60|
|        0|      10.0|   45|
|        0|      12.0|   49|
|        0|      14.0|   82|
|        0|      16.0|   41|
|        0|      18.0|    8|
|        0|      20.0|    2|
|        2|       0.0|  721|
|        2|       2.0| 1558|
|        2|       4.0|  660|
|        2|       6.0|  288|
|        2|       8.0|  153|
|        2|      10.0|  106|
|        2|      12.0|  100|
|        2|      14.0|  133|
|        2|      16.0|   76|
|        2|      18.0|   17|
|        2|      20.0|    6|
|        2|      22.0|    1|
|        4|       0.0|  390|
+---------+----------+-----+
only showing top 24 rows



In [13]:
# Cell 8: Inference by product_id → predict 2-hr window including category_code
from pyspark.sql import Row

# 8a) Specify the product_id you want to query
product_id_input = "4501766"

# 8b) Lookup its brand and category_code from the raw DataFrame
meta = (
    df
    .filter(F.col("product_id") == product_id_input)
    .select("brand", "category_code")
    .limit(1)
    .collect()[0]
)
brand_input = meta["brand"]
category_input = meta["category_code"]

# 8c) Find the most recent event date for that product
ref_date = (
    df
    .filter(F.col("product_id") == product_id_input)
    .select(F.to_date("event_time").alias("date"))
    .orderBy(F.col("date").desc())
    .limit(1)
    .collect()[0]["date"]
)

# 8d) Build a single-row DataFrame with brand, category_code & cyclical day-of-week features
infer_df = sqlCtx.createDataFrame(
    [Row(brand=brand_input, category_code=category_input, date=ref_date)]
).withColumn("dow", F.dayofweek("date")) \
 .withColumn("dow_sin", F.sin(2 * 3.14159 * (F.col("dow")/7))) \
 .withColumn("dow_cos", F.cos(2 * 3.14159 * (F.col("dow")/7)))

# 8e) Run it through the trained pipeline to get a prediction
pred = model.transform(infer_df)

# 8f) Extract the predicted bin and convert to a human-readable window
pred_bin = int(pred.select("prediction").collect()[0][0])
start = pred_bin
end   = pred_bin + 2
print(f"Product {product_id_input} (brand={brand_input}, category_code={category_input}):")
print(f"  Predicted lowest-price window: {start:02d}:00–{end:02d}:00")


[Stage 687:>                                                                            (0 + 4) / 4]

Product 4501766 (brand=midea, category_code=appliances.kitchen.hob):
  Predicted lowest-price window: 02:00–04:00


                                                                                                    