In [21]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

In [22]:
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1" # Выпадало предупреждение при создании Спарк Сессии

In [23]:
spark = SparkSession.builder \
    .appName("ClickHouseToPySpark") \
    .config("spark.jars.packages", "com.clickhouse:clickhouse-jdbc:0.9.6") \
    .config("spark.driver.extraJavaOptions", "--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED") \
    .config("spark.executor.extraJavaOptions", "--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED") \
    .getOrCreate()

In [24]:
ch_options_purcases = {
    "url": "jdbc:clickhouse://localhost:9123/silver",
    "user": "user",            # логин из docker-compose
    "password": "strongpassword", # пароль
    "dbtable": "purchases",     # таблица, которую хочешь забрать
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

# Создаем ДФ с покупками
df_purchases = spark.read.format("jdbc").options(**ch_options_purcases).load()


In [5]:
# Оказывается Спарк не хочет читать Array(категории) и Tuple(координаты), поэтому на этапе выгрузки подменяем саму таблицу stores
# на select запрос, в котором склеиваем (или дропаем) значения, в качестве обучения я склеил категории и дропнул координаты

In [25]:
request = "(SELECT store_id, store_name, store_network, store_description, type, arrayStringConcat(categories, ', ') as categories_str, manager_name, manager_phone, manager_email, country, city, street, house, postal_code, accept_online_orders, delivery_available, warehouse_connected, last_inventory_date, load_datetime FROM silver.stores) AS t"

In [26]:
ch_options_stores = {
    "url": "jdbc:clickhouse://localhost:9123/silver",
    "user": "user",            # логин из docker-compose
    "password": "strongpassword", # пароль
    "dbtable": request,     # таблица, которую хочешь забрать
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
    #"customSchema": f"{purchase_schema}"
}

# Создаем ДФ с магазинами
df_stores = spark.read.format("jdbc").options(**ch_options_stores).load()

In [27]:
ch_options_customers = {
    "url": "jdbc:clickhouse://localhost:9123/silver",
    "user": "user",            # логин из docker-compose
    "password": "strongpassword", # пароль
    "dbtable": "customers",     # таблица, которую хочешь забрать
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
    #"customSchema": f"{purchase_schema}"
}

# Создаем ДФ с покупателями
df_customers = spark.read.format("jdbc").options(**ch_options_customers).load()

In [28]:
ch_options_products = {
    "url": "jdbc:clickhouse://localhost:9123/silver",
    "user": "user",            # логин из docker-compose
    "password": "strongpassword", # пароль
    "dbtable": "products",     # таблица, которую хочешь забрать
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
    #"customSchema": f"{purchase_schema}"
}

# Создаем ДФ с продуктами
df_products = spark.read.format("jdbc").options(**ch_options_products).load()

In [10]:
# Собираем данные в одну большую аналитическую таблицу (витрину данных)

In [29]:
wide_df = df_purchases.alias("p") \
    .join(df_products.alias("pr"), df_purchases.product_id == df_products.id, "left") \
    .join(df_customers.alias("c"), df_purchases.customer_id == df_customers.customer_id, "left") \
    .join(df_stores.alias("s"), df_purchases.store_id == df_stores.store_id, "left") \
    .select(
        "p.purchase_id",
        "p.purchase_datetime",
        "p.total_item_price",
        "p.price_per_unit",
        "p.total_amount",
        "p.payment_method",
        "p.quantity",
        "p.is_delivery",
        # Данные о товаре
        F.col("pr.id").alias('product_id'),
        F.col("pr.name").alias('product_name'),
        "pr.product_group",
        F.col("pr.description").alias("product_description"),
        "pr.kbju_calories",
        "pr.kbju_protein",
        "pr.kbju_fat",
        "pr.kbju_carbohydrates",
        F.col("pr.price").alias("product_price"),
        F.col("pr.unit").alias("product_unit"), 
        F.col("pr.origin_country").alias("product_origin_country"),
        F.col("pr.expiry_days").alias("product_expiry_date"),
        F.col("pr.is_organic").alias("product_is_organic"),
        F.col("pr.barcode").alias("product_barcode"),
        F.col("pr.manufacturer_name").alias("product_manufacturer_name"),
        "pr.manufacturer_inn",
        # Данные о клиенте
        "c.customer_id",
        F.col("c.first_name").alias("customer_first_name"),
        F.col("c.last_name").alias("customer_last_name"),
        F.col("c.email").alias("customer_email"),
        F.col("c.phone").alias("customer_phone"),
        F.col("c.birth_date").alias("customer_birth_date"),
        F.col("c.gender").alias("customer_gender"),
        F.col("c.registration_date").alias("customer_registration_date"),
        F.col("c.is_loyalty_member").alias("customer_is_loyalty_member"),
        F.col("c.loyalty_card_number").alias("customer_loyalty_card_number"),
        F.col("c.city").alias("customer_city"),
        F.col("c.country").alias('customer_country'),
        F.col("c.city").alias('customer_city'),
        F.col("c.street").alias('customer_street'),
        F.col("c.house").alias('customer_house'),
        F.col("c.apartment").alias('customer_apartment'),
        F.col("c.postal_code").alias('customer_postal_code'),
        F.col("c.preferred_language").alias('customer_preferred_language'),
        F.col("c.preferred_payment_method").alias('customer_preferred_payment_method'),
        F.col("c.receive_promotions").alias("customer_receive_promotions"),
        # Данные о магазине
        "s.store_id",
        "s.store_name",
        "s.store_network",
        "s.store_description",
        F.col("s.type").alias("store_type"),
        "s.manager_name",
        "s.manager_phone",
        "s.manager_email",
        F.col("s.country").alias("store_country"),
        F.col("s.city").alias("store_city"),
        F.col("s.street").alias("store_street"),
        F.col("s.house").alias("store_house"),
        F.col("s.postal_code").alias("store_postal_code"),
        "s.accept_online_orders",
        "delivery_available",
        "warehouse_connected",
        F.col("last_inventory_date").alias("store_last_inventory_date")
        
    )

In [30]:
# 1. Находим точку отсчета
max_date = wide_df.select(F.max("purchase_datetime")).collect()[0][0]

# 2. Формируем витрину признаков
features_df = wide_df.groupby("customer_id").agg(
    # 1. Покупал молоко за последние 30 дней
    F.max(
        F.when((F.col("product_group") == "молочные продукты") & 
               (F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 30)), 1).otherwise(0)
    ).alias("bought_milk_last_30d"),

    # 2. Покупал фрукты за последние 14 дней
    F.max(
        F.when((F.col("product_group") == "фрукты и ягоды") & 
               (F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 14)), 1).otherwise(0)
    ).alias("bought_fruits_last_14d"),

    # 3. НЕ покупал овощи за последние 14 дней
    (F.lit(1) - F.max(
        F.when((F.col("product_group") == "овощи и зелень") & 
               (F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 14)), 1).otherwise(0)
    )).alias("not_bought_veggies_14d"),

    # 4. Количество уникальных покупок за последние 30 дней
    F.countDistinct(
        F.when(F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 30), F.col("purchase_id"))
    ).alias("sum_purchases"),

    # 5. Потерянный клиент (покупка была 14-30 дней назад)
    F.max(
        F.when(F.datediff(F.lit(max_date), F.col("purchase_datetime")).between(14, 30), 1).otherwise(0)
    ).alias('lost_client'),

    # 6. Новый клиент (регистрация в последние 30 дней)
    F.max(
        F.when(F.col("customer_registration_date") >= F.date_sub(F.lit(max_date), 30), 1).otherwise(0)
    ).alias("new_customer"),

    # 7. Пользовался доставкой
    F.max(F.when(F.col("is_delivery") > 0, 1).otherwise(0)).alias("delivery_user"),

    # 8. Предпочитает органику (используем исправленное имя колонки)
    F.max(F.when(F.col("product_is_organic") > 0, 1).otherwise(0)).alias("organic_preference"),

    # 9. Средняя корзина > 1000₽ (убрали внешний max)
    F.when(F.avg(F.col("total_amount")) > 1000, 1).otherwise(0).alias("bulk_buyer"),

    # 10. Средняя корзина < 200₽
    F.when(F.avg(F.col("total_amount")) < 200, 1).otherwise(0).alias("low_cost_buyer"),

    # 11. Покупал выпечку
    F.max(
        F.when(F.col("product_group") == "Зерновые и хлебобулочные изделия", 1).otherwise(0)
    ).alias("buys_bakery"),

    # 12. Достаем флаг лояльности для использования в withColumn ниже
    F.max(F.col("customer_is_loyalty_member")).alias("is_loyalty_member"),

    # 13. Делал покупки в разных городах !!!! Пока не придумал

    # 14. Покупал мясо/рыбу/яйца за последнюю неделю
    F.max(
        F.when((F.col("product_group") == "Мясо, рыба, яйца и бобовые") & 
               (F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 7)), 1).otherwise(0)
    ).alias("bought_meat_last_week"),

    # 15. Делал покупки после 20:00
    F.max(
        F.when(F.hour(F.col("purchase_datetime")) > 20, 1).otherwise(0)
    ).alias("night_shopper"),

    # 16. Делал покупки до 10:00
    F.max(
        F.when(F.hour(F.col("purchase_datetime")) < 10, 1).otherwise(0)
    ).alias("morning_shopper"),

    # 17. Оплачивал наличными ≥ 70% покупок !!! Пока не знаю
    # Расчитал уже в другом ДФ, потом склею joinом

    # 18. Оплачивал картой ≥ 70% покупок !!! Тоже пока не знаю
    # Расчитал уже в другом ДФ, потом склею joinом

    # 19. Делал ≥ 60% покупок в выходные
    # Расчитал уже в другом ДФ, потом склею joinом

    # 20. Делал ≥ 60% покупок в будни
    # Расчитал уже в другом ДФ, потом склею joinом

    # 21. ≥50% покупок — 1 товар в корзине
    # Рассчитал ниже

    # 22. Покупал ≥4 разных категорий продуктов
    # Рассчитал ниже

    # 23. Ходит только в один магазин
    # Рассчитал ниже

    # 24. Ходит в разные магазины
    # Рассчитал ниже

    # 25. Среднее кол-во позиций в корзине ≥4
    # Рассчитал ниже

    # 26. Покупка в промежутке между 12 и 15 часами дня
    F.max(
        F.when((F.hour(F.col("purchase_datetime")) >= 12) &
               (F.hour(F.col("purchase_datetime")) <= 15), 1).otherwise(0)
    ).alias("early_bird"),

    # 27. Не совершал ни одной покупки (только регистрация)
    F.max(
        F.when(F.col("purchase_datetime").isNull(), 1).otherwise(0)).alias("no_purchases"),

    # 28. Купил на сумму >2000₽ за последние 7 дней
    # Рассчитал ниже

    # 29. ≥3 покупок фруктов за 30 дней
    # Рассчитал ниже

    # 30. Не купил ни одного мясного продукта за 90 дней
    F.max(
        F.when((F.col("product_group") != "Мясо, рыба, яйца и бобовые") & 
               (F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 90)), 1).otherwise(0)
    ).alias("vegetarian_profile")
).withColumn(
    "recurrent_buyer", F.when(F.col("sum_purchases") > 2, 1).otherwise(0)
).withColumn(
    "loyal_customer", F.when((F.col("sum_purchases") >= 3) & (F.col("is_loyalty_member") == 1), 1).otherwise(0)
).drop("is_loyalty_member") # удаляем вспомогательную колонку

In [31]:
# Количество товаров в корзине и ≥50% покупок — 1 товар в корзине
bucket_df = wide_df.groupby("customer_id").agg(
    F.countDistinct("purchase_id").alias("total_purchases")).join(
    # Вспомогательный DF: считаем количество товаров в каждом чеке
    wide_df.groupby("customer_id", "purchase_id").agg(F.count("*").alias("items_in_check")),
    "customer_id", "left"
).groupby("customer_id").agg(
    F.max("total_purchases").alias("total_purchases"),
    # Считаем чеки, где был только 1 товар
    F.sum(F.when(F.col("items_in_check") == 1, 1).otherwise(0)).alias("single_item_purchases")
).withColumn(
    # Рассчитываем процент и флаг >= 50%
    "percent_single_item", (F.col("single_item_purchases") / F.col("total_purchases")) * 100
).withColumn(
    "prefers_single_item", F.when(F.col("percent_single_item") >= 50, 1).otherwise(0)
)

In [32]:
diff_categories = wide_df.groupby("customer_id").agg(
    # Считаем количество групп товаров купленных у каждого клиента
    F.countDistinct("product_group").alias("num_categories"),
    # Если попадает под условие, то 1
    F.when(F.col("num_categories") >= 4, 1).otherwise(0).alias("varied_shopper"),
    # Такая же механика, считаем уникальные магазины у клиента и дальше добавляем в матрицу
    F.countDistinct("store_id").alias("num_stores"),
    F.when(F.col("num_stores") == 1, 1).otherwise(0).alias("store_loyal"),
    F.when(F.col("num_stores") > 1, 1).otherwise(0).alias("switching_store"),
    F.countDistinct("store_city").alias("different_cities"),
    F.when(F.col("different_cities") > 1, 1).otherwise(0).alias("multicity_buyer"),
    # Покупок более чем на 2000 за последние 7 дней. Вначале проходим условием, если покупка попадает в наш промежуток, то она
    # попадает в рассчет, если нет, то 0б затем все складываем и выводим показатель
    F.sum(F.when(F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 7), F.col("total_item_price")).otherwise(0)).alias("sum_7_days"),
    F.when(F.col("sum_7_days") >= 2000, 1).otherwise(0).alias("recent_high_spender"),
    # Похожая логика, считаем кол-во фруктов за врменной промежуток и затем выводим признак
    F.sum(F.when((F.col("product_group") == "фрукты и ягоды") & 
               (F.col("purchase_datetime") >= F.date_sub(F.lit(max_date), 30)), 1).otherwise(0)).alias("sum_fruits"),
    F.when(F.col("sum_fruits") >= 3, 1).otherwise(0).alias("fruit_lover"))


In [33]:
other_features_df = wide_df.groupby("customer_id").agg(
    F.countDistinct(F.col('purchase_id')).alias("sum_sales"),
    F.sum(F.when(F.col("payment_method") == "cash", 1).otherwise(0)).alias("sum_cash_purchases"),
    F.count("purchase_id").alias("all_items"),
    # В Spark dayofweek: 1=Sun, 2=Mon, ..., 6=Fri, 7=Sat. Будни: 2,3,4,5,6
    F.sum(F.when(F.dayofweek("purchase_datetime").between(2, 6), 1).otherwise(0)).alias("week_day_purchases")
).withColumn(
    "sum_card_purchases", F.col("sum_sales") - F.col("sum_cash_purchases")
).withColumn(
    "percent_cash", F.round((F.col("sum_cash_purchases") / F.col("sum_sales")) * 100)
).withColumn(
    "percent_card", F.round((F.col("sum_card_purchases") / F.col("sum_sales")) * 100)
).withColumn(
    "prefers_cash", F.when(F.col("percent_cash") > 70, 1).otherwise(0)
).withColumn(
    "prefers_card", F.when(F.col("percent_card") > 70, 1).otherwise(0)
).withColumn(
    "week_end_purchases", F.col("all_items") - F.col("week_day_purchases")
).withColumn(
    "persent_weekday", (F.col("week_day_purchases") / F.col("all_items")) * 100
).withColumn(
    "persent_weekend", 100 - F.col("persent_weekday")
).withColumn(
    "weekday_shopper", F.when(F.col("persent_weekday") >= 70, 1).otherwise(0)
).withColumn(
    "weekend_shopper", F.when(F.col("persent_weekend") >= 70, 1).otherwise(0)
)

In [36]:
# Собираем все датафреймы вместе, фильтруем столбцы которые нам больше не нужны, оставляем customer_id и набор признаков

result_df = features_df.alias("f") \
    .join(other_features_df.alias("o"), F.col("f.customer_id") == F.col("o.customer_id")) \
    .join(bucket_df.alias("b"), F.col("f.customer_id") == F.col("b.customer_id")) \
    .join(diff_categories.alias("d"), F.col("f.customer_id") == F.col("d.customer_id")) \
    .select(
        "f.customer_id",
        "f.bought_milk_last_30d",
        "f.bought_fruits_last_14d",
        "f.not_bought_veggies_14d",
        "f.recurrent_buyer",
        "f.lost_client",
        "f.new_customer",
        "f.delivery_user",
        "f.organic_preference",
        "f.bulk_buyer",
        "f.low_cost_buyer",
        "f.buys_bakery",
        "f.loyal_customer",
        "f.bought_meat_last_week",
        "f.night_shopper",
        "f.morning_shopper",
        "o.prefers_cash",
        "o.prefers_card",
        "o.weekday_shopper",
        "o.weekend_shopper",
        "b.prefers_single_item",
        "d.varied_shopper",
        "d.store_loyal",
        "d.switching_store",
        "d.multicity_buyer",
        "d.recent_high_spender",
        "f.early_bird",
        "f.no_purchases",
        "d.fruit_lover",
        "f.vegetarian_profile"
    ) 

result_df.show(5)

+-----------+--------------------+----------------------+----------------------+---------------+-----------+------------+-------------+------------------+----------+--------------+-----------+--------------+---------------------+-------------+---------------+------------+------------+---------------+---------------+-------------------+--------------+-----------+---------------+---------------+-------------------+----------+------------+-----------+------------------+
|customer_id|bought_milk_last_30d|bought_fruits_last_14d|not_bought_veggies_14d|recurrent_buyer|lost_client|new_customer|delivery_user|organic_preference|bulk_buyer|low_cost_buyer|buys_bakery|loyal_customer|bought_meat_last_week|night_shopper|morning_shopper|prefers_cash|prefers_card|weekday_shopper|weekend_shopper|prefers_single_item|varied_shopper|store_loyal|switching_store|multicity_buyer|recent_high_spender|early_bird|no_purchases|fruit_lover|vegetarian_profile|
+-----------+--------------------+----------------------

In [37]:
# Настраиваем подключение к Selenium 
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
from dotenv import load_dotenv
import os
import glob

In [38]:
load_dotenv() 

python-dotenv could not parse statement starting at line 8


True

In [39]:
class S3Client:
    '''
    Создаем идентичный класс и первого задания
    '''
    def __init__(self, endpoint, access_key, secret_key, bucket):
        """
        Инициализация клиента для работы с S3-совместимым хранилищем.
        """
        self.bucket = bucket

        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint,            # URL S3-хранилища (Selectel / MinIO / Yandex)
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            config=Config(signature_version='s3v4'),
            region_name="us-east-1",
            verify=False    # Пришлось отключить проверку, тк не грузил из-за сертификатов
        )


    def upload(self, file_path, object_name):
        """
        Загружает файл в бакет.
        """
        self.s3.upload_file(file_path, self.bucket, object_name)
        print(f"Загружено: {object_name}")

In [40]:
# Записываем нашу матрицу на диск (записывает он странно конечно по-умолчанию)
result_df.coalesce(1).write.mode("overwrite").csv("analytic/result_matrix.csv", header=True)

                                                                                

In [41]:
# Подключаемся к нашему хранилещу 
endpoint = os.getenv("S3_ENDPOINT")
bucket = os.getenv("S3_BUCKET")
access_key = os.getenv("S3_ACCESS_KEY")
secret_key = os.getenv("S3_SECRET_KEY")

de_bucket = S3Client(endpoint, access_key, secret_key, bucket)

In [42]:
path_to_folder = "analytic/result_matrix.csv"
actual_file_path = glob.glob(f"{path_to_folder}/part-*.csv")[0] 

# 3. Загружаем именно этот файл
de_bucket.upload(actual_file_path, "result_matrix_final.csv")



Загружено: result_matrix_final.csv


In [18]:
%history -l 100


other_features_df = wide_df.groupby("customer_id").agg(
    F.countDistinct(F.col('purchase_id')).alias("sum_sales"),
    F.sum(F.when(F.col("payment_method") == "cash", 1).otherwise(0)).alias("sum_cash_purchases"),
    F.sum(F.when(F.dayofweek("purchase_datetime") < 6, 1).otherwise(0)
).withColumn(
    "sum_card_purchases", F.col("sum_sales") - F.col("sum_cash_purchases")
).withColumn("percent_cash", F.round((F.col("sum_cash_purchases") / F.col("sum_sales")) * 100)
).withColumn("percent_card", F.round((F.col("sum_card_purchases") / F.col("sum_sales")) * 100)
).withColumn("prefers_cash", F.when(F.col("percent_cash") > 70, 1).otherwise(0)
).withColumn("prefers_card", F.when(F.col("percent_card") > 70, 1).otherwise(0)
)

other_features_df.show(5)
other_features_df = wide_df.groupby("customer_id").agg(
    F.countDistinct(F.col('purchase_id')).alias("sum_sales"),
    F.sum(F.when(F.col("payment_method") == "cash", 1).otherwise(0)).alias("sum_cash_purchases"),
    F.sum(F.when(F.dayofweek(