In [0]:
from pyspark.sql.functions import when, min, count, sum, mean
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Шлях до Delta таблиці
delta_path = "/Volumes/workspace/default/retail_rocket_delta_table"

# 1. ЗАВАНТАЖЕННЯ ДАНИХ
print("1. ЗАВАНТАЖЕННЯ ДАНИХ...")
rr_events = spark.read.format("delta").load(delta_path)

1. ЗАВАНТАЖЕННЯ ДАНИХ...


In [0]:
print("Початковий DataFrame:")
rr_events.show(5, truncate=False)
rr_events.printSchema()

Початковий DataFrame:
+-------------+---------+-----+------+-------------+-------------------+----+-----+---+-------+----------+-------------+
|timestamp    |visitorid|event|itemid|transactionid|event_time         |year|month|day|weekday|weekofyear|event_indexed|
+-------------+---------+-----+------+-------------+-------------------+----+-----+---+-------+----------+-------------+
|1433193207640|628886   |view |105274|0            |2015-06-01 21:13:27|2015|6    |1  |2      |23        |0            |
|1433200504091|982569   |view |182473|0            |2015-06-01 23:15:04|2015|6    |1  |2      |23        |0            |
|1433208091217|1373098  |view |243353|0            |2015-06-02 01:21:31|2015|6    |2  |3      |23        |0            |
|1433143168495|78096    |view |365014|0            |2015-06-01 07:19:28|2015|6    |1  |2      |23        |0            |
|1433203256537|172329   |view |18423 |0            |2015-06-02 00:00:56|2015|6    |2  |3      |23        |0            |
+---------

In [0]:
print("\n2. ПЕРВИННИЙ АНАЛІЗ ЗМІННИХ...")
categorical_cols = ["event", "itemid"]
numeric_cols = ["event_indexed"] 
time_cols = ["event_time", "year", "month", "day", "weekday", "weekofyear"]

print("Categorical:", categorical_cols)
print("Numeric:", numeric_cols)
print("Time:", time_cols)


2. ПЕРВИННИЙ АНАЛІЗ ЗМІННИХ...
Categorical: ['event', 'itemid']
Numeric: ['event_indexed']
Time: ['event_time', 'year', 'month', 'day', 'weekday', 'weekofyear']


In [0]:
print("\n3. ОБРОБКА КАТЕГОРІАЛЬНИХ ОЗНАК...")

# Топ-10 найчастіших значень
top_events = [r["event"] for r in rr_events.groupBy("event").count().orderBy("count", ascending=False).limit(10).collect()]
top_items = [r["itemid"] for r in rr_events.groupBy("itemid").count().orderBy("count", ascending=False).limit(10).collect()]

# Зменшення розмірності категоріальних стовпців
rr_events_reduced = (
    rr_events
    .withColumn("event_reduced", when(rr_events["event"].isin(top_events), rr_events["event"]).otherwise("other"))
    .withColumn("itemid_reduced", when(rr_events["itemid"].isin(top_items), rr_events["itemid"]).otherwise(-1))
)

# Вибірка (5%) для тренування індексера
sample_df = rr_events_reduced.sample(fraction=0.05, seed=42)

# Створення та застосування індексера
indexers = [
    StringIndexer(inputCol="event_reduced", outputCol="event_index", handleInvalid="keep"),
    StringIndexer(inputCol="itemid_reduced", outputCol="itemid_index", handleInvalid="keep")
]

pipeline_index = Pipeline(stages=indexers)
model_index = pipeline_index.fit(sample_df)

# *** rr_events_indexed - проміжний DF з індексами ***
rr_events_indexed = model_index.transform(rr_events_reduced)

print("Індексовані стовпці (перевірка):")
rr_events_indexed.select(
    "event", "event_reduced", "event_index",
    "itemid", "itemid_reduced", "itemid_index"
).show(5)



3. ОБРОБКА КАТЕГОРІАЛЬНИХ ОЗНАК...
Індексовані стовпці (перевірка):
+-----+-------------+-----------+------+--------------+------------+
|event|event_reduced|event_index|itemid|itemid_reduced|itemid_index|
+-----+-------------+-----------+------+--------------+------------+
| view|         view|        0.0|105274|            -1|         0.0|
| view|         view|        0.0|182473|            -1|         0.0|
| view|         view|        0.0|243353|            -1|         0.0|
| view|         view|        0.0|365014|            -1|         0.0|
| view|         view|        0.0| 18423|            -1|         0.0|
+-----+-------------+-----------+------+--------------+------------+
only showing top 5 rows


In [0]:
# 4. ОБРОБКА ЧАСОВИХ ОЗНАК 
print("\n4. ОБРОБКА ЧАСОВИХ ОЗНАК...")

# 1. Знаходимо час першої покупки, використовуючи унікальну назву стовпця (first_purchase_time)
purchase_time_df = rr_events.filter(rr_events.event=="transaction") \
                            .groupBy("transactionid") \
                            .agg(min("event_time").alias("first_purchase_time"))

# 2. Об'єднуємо з індексованим DataFrame
# *** Створюємо rr_events_all_features, який буде містити ВСІ ознаки ***
rr_events_all_features = rr_events_indexed.join(purchase_time_df, on="transactionid", how="left")

# 3. Розрахунок часу до покупки (від часу події до часу транзакції) в годинах
rr_events_all_features = rr_events_all_features.withColumn("time_to_purchase", 
    (rr_events_all_features["first_purchase_time"].cast("long") - rr_events_all_features["event_time"].cast("long"))/3600
)

# Перевірка результату
print("Часові ознаки:")
rr_events_all_features.select("transactionid", "event_time", "event", "time_to_purchase").show(5)



4. ОБРОБКА ЧАСОВИХ ОЗНАК...
Часові ознаки:
+-------------+-------------------+-----+------------------+
|transactionid|         event_time|event|  time_to_purchase|
+-------------+-------------------+-----+------------------+
|            0|2015-06-01 21:13:27| view| 336.3861111111111|
|            0|2015-06-01 23:15:04| view| 334.3591666666667|
|            0|2015-06-02 01:21:31| view|332.25166666666667|
|            0|2015-06-01 07:19:28| view|350.28583333333336|
|            0|2015-06-02 00:00:56| view|333.59472222222223|
+-------------+-------------------+-----+------------------+
only showing top 5 rows


In [0]:
# 5. АГРЕГУВАННЯ ТА СТВОРЕННЯ ПОХІДНИХ ОЗНАК
print("\n5. АГРЕГУВАННЯ ТА СТВОРЕННЯ ПОХІДНИХ ОЗНАК...")

# Агрегація ознак на рівні сесії (transactionid) - використовуємо rr_events_all_features
session_agg = rr_events_all_features.groupBy("transactionid").agg(
    count("event").alias("events_per_session"),
    sum((rr_events_all_features.event == "addtocart").cast("int")).alias("add_to_cart_count")
)

# Створення похідної ознаки: Співвідношення додавання до кошика
session_agg = session_agg.withColumn("add_to_cart_ratio", 
    session_agg["add_to_cart_count"]/session_agg["events_per_session"]
)

print("Агрегація сесій:")
session_agg.show(5)

# Фінальне об'єднання агрегованих ознак із основним DF для фінальної агрегації
final_rr = rr_events_all_features.join(session_agg.select("transactionid", "events_per_session", "add_to_cart_ratio"), 
                                on="transactionid", 
                                how="left")




5. АГРЕГУВАННЯ ТА СТВОРЕННЯ ПОХІДНИХ ОЗНАК...
Агрегація сесій:
+-------------+------------------+-----------------+-----------------+
|transactionid|events_per_session|add_to_cart_count|add_to_cart_ratio|
+-------------+------------------+-----------------+-----------------+
|        17149|                 1|                0|              0.0|
|        12914|                 1|                0|              0.0|
|         4366|                 1|                0|              0.0|
|         1021|                 2|                0|              0.0|
|         2737|                 1|                0|              0.0|
+-------------+------------------+-----------------+-----------------+
only showing top 5 rows


In [0]:
# Фінальна агрегація числових колонок по transactionid

agg_rr = final_rr.groupBy("transactionid").agg(
    # Індексовані та числові ознаки
    mean("event_indexed").alias("event_indexed_mean"),
    mean("event_index").alias("event_index_mean"),
    mean("itemid_index").alias("itemid_index_mean"),
    
    # Часові ознаки 
    mean("time_to_purchase").alias("time_to_purchase_mean"),
    mean("year").alias("year_mean"),
    mean("month").alias("month_mean"),
    mean("day").alias("day_mean"),
    mean("weekday").alias("weekday_mean"),
    mean("weekofyear").alias("weekofyear_mean"),
    
    # Ознаки рівня сесії
    mean("events_per_session").alias("events_per_session_mean"),
    mean("add_to_cart_ratio").alias("add_to_cart_ratio_mean")
)

print("Фінальний агрегований DataFrame (agg_rr):")
agg_rr.show(5)


Фінальний агрегований DataFrame (agg_rr):
+-------------+------------------+----------------+-----------------+---------------------+---------+----------+--------+------------+---------------+-----------------------+----------------------+
|transactionid|event_indexed_mean|event_index_mean|itemid_index_mean|time_to_purchase_mean|year_mean|month_mean|day_mean|weekday_mean|weekofyear_mean|events_per_session_mean|add_to_cart_ratio_mean|
+-------------+------------------+----------------+-----------------+---------------------+---------+----------+--------+------------+---------------+-----------------------+----------------------+
|        17149|               2.0|             2.0|              0.0|                  0.0|   2015.0|       6.0|    15.0|         2.0|           25.0|                    1.0|                   0.0|
|        12914|               2.0|             2.0|              0.0|                  0.0|   2015.0|       6.0|    17.0|         4.0|           25.0|                

In [0]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

numeric_cols_to_check = [
    "event_indexed_mean", "event_index_mean", "itemid_index_mean",
    "time_to_purchase_mean", "year_mean", "month_mean", "day_mean",
    "weekday_mean", "weekofyear_mean", "events_per_session_mean", "add_to_cart_ratio_mean"
]

assembler = VectorAssembler(inputCols=numeric_cols_to_check, outputCol="features_vector")
agg_rr_vector = assembler.transform(agg_rr)

cor_matrix = Correlation.corr(agg_rr_vector, "features_vector").head()[0]
print("Correlation matrix:\n", cor_matrix)


Correlation matrix:
 DenseMatrix([[ 1.00000000e+00,  1.00000000e+00, -2.90949652e-05,
               9.99960467e-01,             nan, -2.82059100e-06,
               1.04910317e-04, -2.42650397e-04,  7.23222167e-05,
              -9.99999999e-01, -1.00000000e+00],
             [ 1.00000000e+00,  1.00000000e+00, -2.90949652e-05,
               9.99960467e-01,             nan, -2.82059100e-06,
               1.04910317e-04, -2.42650397e-04,  7.23222167e-05,
              -9.99999999e-01, -1.00000000e+00],
             [-2.90949652e-05, -2.90949652e-05,  1.00000000e+00,
              -1.56989298e-05,             nan,  2.83934847e-02,
              -1.40083539e-02,  8.54803225e-03,  2.59292552e-02,
               2.86303615e-05,  2.90949652e-05],
             [ 9.99960467e-01,  9.99960467e-01, -1.56989298e-05,
               1.00000000e+00,             nan,  3.96425556e-05,
               1.28134728e-04, -2.36421902e-04,  1.21074321e-04,
              -9.99960503e-01, -9.99960467e-01],
   

In [0]:
selected_cols = [
    "transactionid",
    "event_indexed_mean",
    "itemid_index_mean",
    "time_to_purchase_mean",
    "events_per_session_mean",
    "add_to_cart_ratio_mean"
]

agg_rr_selected = agg_rr.select(selected_cols)

# Перевірка результату
print("Selected columns after removing duplicates and constants:")
agg_rr_selected.show(5)


Selected columns after removing duplicates and constants:
+-------------+------------------+-----------------+---------------------+-----------------------+----------------------+
|transactionid|event_indexed_mean|itemid_index_mean|time_to_purchase_mean|events_per_session_mean|add_to_cart_ratio_mean|
+-------------+------------------+-----------------+---------------------+-----------------------+----------------------+
|        17149|               2.0|              0.0|                  0.0|                    1.0|                   0.0|
|        12914|               2.0|              0.0|                  0.0|                    1.0|                   0.0|
|         4366|               2.0|              0.0|                  0.0|                    1.0|                   0.0|
|         1021|               2.0|              0.0|                  0.0|                    2.0|                   0.0|
|         2737|               2.0|              0.0|                  0.0|              

In [0]:
output_path = "/Volumes/workspace/default/retail_delta_2"

agg_rr.write.format("delta").mode("overwrite").save(output_path)