<h2>Импорты и инициализация Spark</h2>
<p>Загружаем необходимые библиотеки, настраиваем SparkSession с выделением памяти под драйвер и executor. 
Устанавливаем уровень логов для удобного чтения.</p>
<hr>

In [None]:
import glob
import findspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, sum as spark_sum, when, lit, unix_timestamp, countDistinct, avg, stddev, exp, explode, datediff, current_date, min as spark_min
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import math
import os
from catboost import CatBoostRanker, Pool
from sklearn.metrics import ndcg_score
import numpy as np
findspark.init("/opt/spark")

In [None]:
spark = SparkSession.builder \
    .appName("OzonApparelAnalysis") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
# spark.sparkContext.setLogLevel("INFO") # расскомментировать чтобы видеть все логи

<h2>Загрузка исходных данных</h2>
<p>Собираем пути к parquet-файлам и читаем их в DataFrame: заказы, трекер действий пользователей, 
товары, дерево категорий и тестовый датасет.</p>
<hr>

In [None]:
items_files = glob.glob('/srv/data/ml_ozon_recsys_train_final_apparel_items_data/*.parquet')
categories_tree = glob.glob('/srv/data/ml_ozon_recsys_train_final_categories_tree/*.parquet')
participants_files = glob.glob('/srv/data/ml_ozon_recsys_test_for_participants/*.parquet')
orders_files = glob.glob('/srv/data/preprocessed/orders_preprocessed/*.parquet')
tracker_files = glob.glob('/srv/data/preprocessed/tracker_preprocessed/*.parquet')
test_files = glob.glob('/srv/data/ml_ozon_recsys_test/*.parquet')

In [None]:
orders = spark.read.parquet(*orders_files)
tracker = spark.read.parquet(*tracker_files)
items = spark.read.parquet(*items_files)
categories = spark.read.parquet(*categories_tree)
test = spark.read.parquet(*test_files)

<h2>Формирование train_df</h2>
<p>Создаём базовый тренировочный датасет с пользователями, товарами, весом заказа и количеством взаимодействий. 
Формируем бинарный target (1 — заказ доставлен, 0 — иначе).</p>
<hr>

In [None]:
train_df = (
    orders
    .join(items.select("item_id", "catalogid"), on="item_id", how="left")
    .join(tracker.groupBy("user_id", "item_id").count().withColumnRenamed("count", "user_item_interactions"),
          on=["user_id", "item_id"], how="left")
    .fillna(0, subset=["user_item_interactions"])
    .withColumn("target", F.when(F.col("last_status") == "delivered_orders", 1).otherwise(0))
).select("user_id", "item_id", "order_weight", "user_item_interactions")

<h2>Определение топ-500 популярных товаров</h2>
<p>Считаем количество взаимодействий для каждого товара. 
Делаем две версии: только id товаров (для crossJoin) и с популярностью (для сортировки).</p>
<hr>

In [None]:
top500_items = (
    tracker.groupBy("item_id")
           .agg(F.count("action_type").alias("amount_of_interactions"))
           .orderBy(F.desc("amount_of_interactions"))
           .limit(500)
           .select("item_id")
           .cache()
)

top500_pop = (
    tracker.groupBy("item_id")
           .agg(F.count("action_type").alias("amount_of_interactions"))
           .orderBy(F.desc("amount_of_interactions"))
           .limit(500)
           .cache()
)


<h2>Выделение новых пользователей и генерация кандидатов</h2>
<p>Находим пользователей из test, которых нет в train. 
Им присваиваем top-500 популярных товаров с усреднёнными значениями фич.</p>
<hr>

In [None]:
new_users = test.distinct().join(train_df.select('user_id').distinct(), how='left_anti', on='user_id').cache()
new_users.limit(5).show()

In [None]:
avg_values = train_df.agg(
    F.avg("order_weight").alias("avg_order_weight"),
    F.avg("user_item_interactions").alias("avg_user_item_interactions")
).collect()[0]

avg_order_weight = avg_values["avg_order_weight"]
avg_user_item_interactions = avg_values["avg_user_item_interactions"]

new_users_with_items = (
    new_users
    .crossJoin(top500_items)
    .withColumn("order_weight", F.lit(avg_order_weight))
    .withColumn("user_item_interactions", F.lit(avg_user_item_interactions))
)

print(new_users_with_items.count())
new_users_with_items.limit(5).show()

<h2>Добивка до 500 товаров для старых пользователей</h2>
<p>Формируем кандидатов (user × top500), исключаем уже существующие пары. 
Для каждого пользователя добавляем недостающие товары (до 500), сортируя по популярности.</p>
<hr>

In [None]:
user_item_existing = train_df.select("user_id", "item_id").distinct().cache()

train_users = train_df.select("user_id").distinct().cache()

cross_candidates = train_users.crossJoin(top500_items.select("item_id"))

missing_candidates = cross_candidates.join(
    user_item_existing, on=["user_id", "item_id"], how="left_anti"
)


user_item_counts = train_df.groupBy("user_id").agg(
    F.countDistinct("item_id").alias("item_count")
)

user_needs = user_item_counts.withColumn(
    "need_count", F.expr("GREATEST(500 - item_count, 0)")
)

missing_candidates = missing_candidates.join(user_needs, on="user_id", how="inner")

missing_candidates = missing_candidates.join(top500_pop, on="item_id", how="left")

w_pop = Window.partitionBy("user_id").orderBy(F.desc("amount_of_interactions"))
missing_candidates = (
    missing_candidates
    .withColumn("rn", F.row_number().over(w_pop))
    .filter(F.col("rn") <= F.col("need_count"))
    .drop("rn", "item_count", "need_count", "amount_of_interactions")
    .withColumn("order_weight", F.lit(avg_order_weight))
    .withColumn("user_item_interactions", F.lit(avg_user_item_interactions))
)



<h2>Объединение всех данных</h2>
<p>Склеиваем train, новых пользователей и добивку. 
Затем нормализуем: убираем дубликаты, оставляем ровно 500 товаров на пользователя (с приоритетом его оригинальных покупок).</p>
<hr>

In [None]:
cols = ["user_id", "item_id", "order_weight", "user_item_interactions"]

train_df_fix = train_df.select(*cols)
new_users_with_items_fix = new_users_with_items.select(*cols)
missing_candidates_fix = missing_candidates.select(*cols)

final_train_pre = (
    train_df_fix
    .unionByName(new_users_with_items_fix)
    .unionByName(missing_candidates_fix)
    .dropDuplicates(["user_id", "item_id"])
).cache()

original_pairs_flag = (
    train_df_fix.select("user_id", "item_id")
                .withColumn("is_original", F.lit(1))
                .distinct()
)

final_flagged = (
    final_train_pre
    .join(original_pairs_flag, on=["user_id", "item_id"], how="left")
    .fillna({"is_original": 0})
    .join(
        top500_pop.select("item_id", "amount_of_interactions")
                  .withColumnRenamed("amount_of_interactions", "popularity"),
        on="item_id", how="left"
    )
    .fillna({"popularity": 0})
)

w_cap = Window.partitionBy("user_id").orderBy(F.desc("is_original"), F.desc("popularity"), F.asc("item_id"))
final_train = (
    final_flagged
    .withColumn("rn", F.row_number().over(w_cap))
    .filter(F.col("rn") <= 500)
    .select(*cols)
).cache()

<h2>Санити-проверки</h2>
<p>Проверяем общее количество строк, 
а также что у каждого пользователя ровно по 500 уникальных товаров.</p>
<hr>

In [None]:
# --------- Санити-проверки ----------
print("Строк в final_train:", final_train.count())

cnts = final_train.groupBy("user_id").agg(F.countDistinct("item_id").alias("cnt"))
cnts.agg(
    F.min("cnt").alias("min"),
    F.avg("cnt").alias("avg"),
    F.max("cnt").alias("max")
).show()

# топ-10 пользователей по числу товаров — должно быть 500 у всех
cnts.orderBy(F.desc("cnt")).show(10)

final_train.show(5, truncate=False)

<h2>Формирование выборки только для тестовых пользователей</h2>
<p>Оставляем только тех пользователей, что встречаются в test. 
Сохраняем результат в parquet.</p>
<hr>

In [None]:
test_user_ids = test.select("user_id").distinct()

final_for_test_only = (
    final_train
    .join(test_user_ids, on="user_id", how="inner")
)

# final_for_test_only.write.mode("overwrite").parquet("/srv/data/preprocessed/final_test_only.parquet")

<h2>Финальная проверка</h2>
<p>Читаем сохранённый датасет и проверяем количество строк и распределение товаров по пользователям.</p>
<hr>

In [None]:
final_train_loaded = spark.read.parquet("/srv/data/preprocessed/final_test_only.parquet")
print("Количество строк:", final_train_loaded.count())
final_train_loaded.show(5)