In [44]:
!pip install pyspark py4j
!pip install boto3
from pyspark.sql import SparkSession
import traceback
from datetime import datetime
import boto3
from botocore.client import Config
import os, glob
from config import *



In [80]:
# -- Конфигурация подключений к БД Clickhouse и S3 подгружается из файла config.py
# --- Инициализация SparkSession ---
spark = SparkSession.builder \
    .appName("ClickHouse External Tables Creator") \
    .config("spark.driver.extraClassPath", JDBC_DRIVER_PATH) \
    .config("spark.executor.extraClassPath", JDBC_DRIVER_PATH) \
    .getOrCreate()

# Проверяем подключение к ClickHouse
try:
    spark.read.jdbc(url=CLICKHOUSE_JDBC_URL, table="system.one", properties=CLICKHOUSE_PROPERTIES).limit(1).count()
    print("Успешное подключение к ClickHouse!")
    connection_ok = True
except Exception as e:
    print(f"Ошибка подключения к ClickHouse: {e}")
    connection_ok = False

# Если подключение успешно, получаем список таблиц и создаем внешние таблицы
if connection_ok:
    # Получаем список таблиц из ClickHouse
    query_for_tables = f"""
    (SELECT name
    FROM system.tables
    WHERE database = '{ACTUAL_CLICKHOUSE_DB_NAME}') as ch_user_tables
    """
    tables_df = spark.read.jdbc(url=CLICKHOUSE_JDBC_URL, table=query_for_tables, properties=CLICKHOUSE_PROPERTIES)
    all_tables = [row.name for row in tables_df.collect()]
        
    # Фильтруем таблицы, исключая те, что начинаются с "mv_"
    tables = [table for table in all_tables if not table.startswith("mv_")]
        
    created_tables = []
        
    # Создаем или заменяем внешние таблицы Spark
    for table_name in tables:
        qualified_ch_table_name = f"{ACTUAL_CLICKHOUSE_DB_NAME}.{table_name}"
            
        # Удаляем таблицу, если она уже существует
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")
            
        create_table_sql = f"""
        CREATE TABLE {table_name}
        USING org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
        OPTIONS (
            url '{CLICKHOUSE_JDBC_URL}',
            dbtable '{qualified_ch_table_name}',
            user '{CLICKHOUSE_PROPERTIES['user']}',
            password '{CLICKHOUSE_PROPERTIES['password']}',
            driver '{CLICKHOUSE_PROPERTIES['driver']}'
        )
        """
        try:
            spark.sql(create_table_sql)
            created_tables.append(table_name)
        except Exception as e:
            print(f"Ошибка при создании таблицы {table_name}: {e}")
        
    # Выводим список созданных таблиц
    print("\nСозданы следующие внешние таблицы:")
    for table in created_tables:
        print(f"- {table}")


Успешное подключение к ClickHouse!

Созданы следующие внешние таблицы:
- addresses
- categories
- customers
- dim_customers
- dim_products
- dim_stores
- duplicate_analysis_results
- fact_purchase_items
- fact_purchases
- manufacturers
- products
- purchases
- store_categories
- store_managers
- store_networks
- stores


In [48]:
# Функция для выполнения SQL-запросов и получения результата в виде DataFrame
def execute_query(query):
    return spark.sql(query)

# Создаем базовый DataFrame с основной информацией о клиентах
base_query = """
SELECT
    customer_id,
    first_name,
    last_name,
    loyalty_card_number
FROM dim_customers
"""
base_df = execute_query(base_query)

In [49]:
# Теперь последовательно добавим каждый признак
# 1. bought_milk_last_30d - Покупал молочные продукты за последние 30 дней
bought_milk_last_30d = """
SELECT
    cu.customer_id,
    CAST(MAX(c.category_name = 'молочные продукты' AND i.purchase_datetime BETWEEN NOW() - INTERVAL 30 DAY AND NOW()) AS INT) AS bought_milk_last_30d
FROM dim_customers cu
LEFT JOIN fact_purchases pu ON cu.customer_id = pu.customer_id
LEFT JOIN fact_purchase_items i ON pu.purchase_id = i.purchase_id
LEFT JOIN dim_products pr ON i.product_id = pr.product_id
LEFT JOIN categories c ON pr.category_id = c.category_id
GROUP BY cu.customer_id
"""
bought_milk_last_30d_df = execute_query(bought_milk_last_30d)

In [50]:
# 2. bought_fruits_last_14d - Покупал фрукты и ягоды за последние 14 дней
bought_fruits_last_14d = """
SELECT
    cu.customer_id,
    CAST(MAX(c.category_name = 'фрукты и ягоды' AND i.purchase_datetime BETWEEN NOW() - INTERVAL 14 DAY AND NOW()) AS INT) AS bought_fruits_last_14d
FROM dim_customers cu
LEFT JOIN fact_purchases pu ON cu.customer_id = pu.customer_id
LEFT JOIN fact_purchase_items i ON pu.purchase_id = i.purchase_id
LEFT JOIN dim_products pr ON i.product_id = pr.product_id
LEFT JOIN categories c ON pr.category_id = c.category_id
GROUP BY cu.customer_id
"""
bought_fruits_last_14d_df = execute_query(bought_fruits_last_14d)

In [51]:
# 3. not_bought_veggies_14d - Не покупал овощи и зелень за последние 14 дней
not_bought_veggies_14d = """
SELECT
    cu.customer_id,
    CAST(NOT MAX(c.category_name = 'овощи и зелень' AND i.purchase_datetime >= NOW() - INTERVAL '30' DAY) AS INT) AS not_bought_veggies_14d
FROM dim_customers cu
LEFT JOIN fact_purchases pu ON cu.customer_id = pu.customer_id
LEFT JOIN fact_purchase_items i ON pu.purchase_id = i.purchase_id
LEFT JOIN dim_products pr ON i.product_id = pr.product_id
LEFT JOIN categories c ON pr.category_id = c.category_id
GROUP BY cu.customer_id
"""
not_bought_veggies_14d_df = execute_query(not_bought_veggies_14d)

In [52]:
# 4. recurrent_buyer - Делал более 2 покупок за последние 30 дней
recurrent_buyer = """
SELECT
    cu.customer_id,
    CAST(COUNT(CASE
        WHEN pu.purchase_datetime BETWEEN NOW() - INTERVAL '30' DAY AND NOW()
        THEN pu.purchase_id ELSE NULL END) > 2 AS INT) AS recurrent_buyer
FROM dim_customers cu
LEFT JOIN fact_purchases pu ON cu.customer_id = pu.customer_id
GROUP BY cu.customer_id
"""
recurrent_buyer_df = execute_query(recurrent_buyer)

In [53]:
# 5. inactive_14_30 - Не покупал 14–30 дней (ушедший клиент?)
inactive_14_30 = """
SELECT
    cu.customer_id,
    CAST(COUNT(CASE WHEN pu.purchase_datetime BETWEEN NOW() - INTERVAL 30 DAY AND NOW() - INTERVAL 14 DAY THEN 1 END) = 0 AS INT) AS inactive_14_30
FROM dim_customers cu
LEFT JOIN fact_purchases pu ON cu.customer_id = pu.customer_id
GROUP BY cu.customer_id
"""
inactive_14_30_df = execute_query(inactive_14_30)

In [54]:
# 6. new_customer - Покупатель зарегистрировался менее 30 дней назад
new_customer = """
SELECT
    customer_id,
    CASE 
        WHEN registration_date BETWEEN NOW() - INTERVAL 30 DAY AND NOW() 
        THEN 1 
        ELSE 0 
    END AS new_customer
FROM dim_customers
"""
new_customer_df = execute_query(new_customer)

In [55]:
# 7. delivery_user - Пользовался доставкой хотя бы раз
delivery_user = """
SELECT
    customer_id,
    CAST(MAX(is_delivery = true) AS INT) AS delivery_user
FROM fact_purchases
GROUP BY customer_id
"""
delivery_user_df = execute_query(delivery_user)

In [56]:
# 8. organic_preference	- Купил хотя бы 1 органический продукт
organic_preference = """
SELECT
    fp.customer_id,
    CAST(MAX(dp.is_organic = true) AS INT) AS organic_preference
FROM fact_purchases fp
LEFT JOIN fact_purchase_items fpi ON fp.purchase_id=fpi.purchase_id
LEFT JOIN dim_products dp ON fpi.product_id=dp.product_id
GROUP BY customer_id
"""
organic_preference_df = execute_query(organic_preference)

In [57]:
# 9. bulk_buyer	- Средняя корзина > 1000₽
bulk_buyer = """
SELECT
    customer_id,
    CAST(AVG(total_amount) > 1000 AS INT) AS bulk_buyer
FROM fact_purchases
GROUP BY customer_id
"""
bulk_buyer_df = execute_query(bulk_buyer)

In [58]:
# 10. low_cost_buyer - Средняя корзина < 200₽
low_cost_buyer = """
SELECT
    customer_id,
    CAST(AVG(total_amount) < 200 AS INT) AS low_cost_buyer
FROM fact_purchases
GROUP BY customer_id
"""
low_cost_buyer_df = execute_query(low_cost_buyer)

In [59]:
# 11. buys_bakery - Лояльный клиент (карта и ≥3 покупки)
buys_bakery = """
SELECT
    cu.customer_id,
    CAST(MAX(c.category_name = 'зерновые и хлебобулочные изделия') AS INT) AS buys_bakery
FROM dim_customers cu
LEFT JOIN fact_purchases fp ON cu.customer_id = fp.customer_id
LEFT JOIN fact_purchase_items fpi ON fp.purchase_id = fpi.purchase_id
LEFT JOIN dim_products dp ON fpi.product_id = dp.product_id
LEFT JOIN categories c ON dp.category_id = c.category_id
GROUP BY cu.customer_id
"""
buys_bakery_df = execute_query(buys_bakery)

In [60]:
# 12. loyal_customer - Лояльный клиент (карта и ≥3 покупки)
loyal_customer = """
SELECT
    c.customer_id,
    CAST(c.is_loyalty_member AND COUNT(DISTINCT fp.purchase_id) >= 3 AS INT) AS loyal_customer
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id, c.is_loyalty_member
"""
loyal_customer_df = execute_query(loyal_customer)

In [61]:
# 13. multicity_buyer - Делал покупки в разных городах
multicity_buyer = """
SELECT 
    c.customer_id,
    CAST(COUNT(DISTINCT a.city) > 1 AS INT) AS multicity_buyer
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN dim_stores s ON fp.store_id = s.store_id
LEFT JOIN addresses a ON s.address_id = a.address_id
GROUP BY c.customer_id
"""
multicity_buyer_df = execute_query(multicity_buyer)

In [62]:
# 14. bought_meat_last_week - Покупал мясо/рыбу/яйца за последнюю неделю (в контексте задания покупал продукты категории 'мясо, рыба, яйца и бобовые')
bought_meat_last_week = """
SELECT
    c.customer_id,
    CAST(MAX(CASE WHEN ct.category_name IN ('мясо, рыба, яйца и бобовые') 
             AND fp.purchase_datetime >= NOW() - INTERVAL 1 WEEK THEN 1 ELSE 0 END) AS INT) AS bought_meat_last_week
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN fact_purchase_items fpi ON fp.purchase_id = fpi.purchase_id
LEFT JOIN dim_products p ON fpi.product_id = p.product_id
LEFT JOIN categories ct ON p.category_id = ct.category_id
GROUP BY c.customer_id
"""
bought_meat_last_week_df = execute_query(bought_meat_last_week)

In [63]:
# 15. night_shopper - Делал покупки после 20:00
night_shopper = """
SELECT
    c.customer_id,
    CAST(MAX(CASE WHEN HOUR(fp.purchase_datetime) > 20 THEN 1 ELSE 0 END) AS INT) AS night_shopper
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
night_shopper_df = execute_query(night_shopper)

In [64]:
# 16. morning_shopper - Делал покупки до 10:00
morning_shopper = """
SELECT
    c.customer_id,
    CAST(MAX(CASE WHEN HOUR(fp.purchase_datetime) < 10 THEN 1 ELSE 0 END) AS INT) AS morning_shopper
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
morning_shopper_df = execute_query(morning_shopper)

In [65]:
# 17. prefers_cash - Оплачивал наличными ≥ 70% покупок
prefers_cash = """
SELECT 
    c.customer_id,
    CAST(COUNT(fp.payment_method = 'cash') / nullif(COUNT(fp.purchase_id), 0) >= 0.7 AS INT) AS prefers_cash
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
prefers_cash_df = execute_query(prefers_cash)

In [66]:
# 18. prefers_card - Оплачивал картой ≥ 70% покупок
prefers_card = """
SELECT 
    c.customer_id,
    CAST(COUNT(fp.payment_method = 'card') / nullif(COUNT(fp.purchase_id), 0) >= 0.7 AS INT) AS prefers_card
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
prefers_card_df = execute_query(prefers_card)

In [67]:
# 19. weekend_shopper - Делал ≥ 60% покупок в выходные
weekend_shopper = """
SELECT
    c.customer_id,
    CAST(SUM(CASE WHEN DAYOFWEEK(fp.purchase_datetime) IN (1, 7) THEN 1 ELSE 0 END) / NULLIF(COUNT(fp.purchase_id), 0) >= 0.6 AS INT) AS weekend_shopper
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
weekend_shopper_df = execute_query(weekend_shopper)

In [68]:
# 20. weekday_shopper - Делал ≥ 60% покупок в будни
weekday_shopper = """
SELECT
    c.customer_id,
    CAST(SUM(CASE WHEN DAYOFWEEK(fp.purchase_datetime) BETWEEN 1 AND 5 THEN 1 ELSE 0 END) / NULLIF(COUNT(fp.purchase_id), 0) >= 0.6 AS INT) AS weekday_shopper
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
weekday_shopper_df = execute_query(weekday_shopper)

In [69]:
# 21. single_item_buyer - ≥50% покупок — 1 товар в корзине
single_item_buyer = """
SELECT
    c.customer_id,
    CAST(SUM(item_counts.is_single) * 1.0 / NULLIF(COUNT(fp.purchase_id), 0) >= 0.5 AS INT) AS single_item_buyer
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN (
    SELECT
        purchase_id,
        CAST(COUNT(1) = 1 AS INT) AS is_single
    FROM fact_purchase_items
    GROUP BY purchase_id
) item_counts ON fp.purchase_id = item_counts.purchase_id
GROUP BY c.customer_id
"""
single_item_buyer_df = execute_query(single_item_buyer)

In [70]:
# 22. varied_shopper - Покупал ≥4 разных категорий продуктов
varied_shopper = """
SELECT
    c.customer_id,
    CAST(COUNT(DISTINCT p.category_id) >= 4 AS INT) AS varied_shopper
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN fact_purchase_items fpi ON fp.purchase_id = fpi.purchase_id
LEFT JOIN dim_products p ON fpi.product_id = p.product_id
GROUP BY c.customer_id
"""
varied_shopper_df = execute_query(varied_shopper)

In [71]:
# 23. store_loyal - Ходит - только в один магазин
store_loyal = """
SELECT
    c.customer_id,
    CAST(COUNT(DISTINCT fp.store_id) = 1 AND COUNT(fp.purchase_id) > 0 AS INT) AS store_loyal
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
store_loyal_df = execute_query(store_loyal)

In [72]:
# 24. switching_store - Ходит в разные магазины
switching_store = """
SELECT
    c.customer_id,
    CAST(COUNT(DISTINCT fp.store_id) > 1 AS INT) AS switching_store
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
switching_store_df = execute_query(switching_store)

In [73]:
# 25. family_shopper - Среднее кол-во позиций в корзине ≥4
family_shopper = """
SELECT
    c.customer_id,
    CAST(AVG(item_counts.count) >= 4 AND COUNT(fp.purchase_id) > 0 AS INT) AS family_shopper
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN (
    SELECT
        purchase_id,
        COUNT(1) AS count -- Spark SQL эквивалент COUNT()
    FROM fact_purchase_items
    GROUP BY purchase_id
) item_counts ON fp.purchase_id = item_counts.purchase_id
GROUP BY c.customer_id
"""
family_shopper_df = execute_query(family_shopper)

In [74]:
# 26. early_bird - Покупка в промежутке между 12 и 15 часами дня
early_bird = """
SELECT
    c.customer_id,
    CAST(MAX(CASE WHEN HOUR(fp.purchase_datetime) BETWEEN 12 AND 14 THEN 1 ELSE 0 END) AS INT) AS early_bird
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
early_bird_df = execute_query(early_bird)

In [75]:
# 27. no_purchases - Не совершал ни одной покупки (только регистрация)
no_purchases = """
SELECT
    c.customer_id,
    CAST(COUNT(fp.purchase_id) = 0 AS INT) AS no_purchases
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
no_purchases_df = execute_query(no_purchases)

In [76]:
# 28. recent_high_spender - Купил на сумму >2000₽ за последние 7 дней
recent_high_spender = """
SELECT
    c.customer_id,
    CAST(SUM(CASE WHEN fp.purchase_datetime >= NOW() - INTERVAL '7' DAY THEN fp.total_amount ELSE 0 END) > 2000 AS INT) AS recent_high_spender
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
GROUP BY c.customer_id
"""
recent_high_spender_df = execute_query(recent_high_spender)

In [77]:
# 29. fruit_lover - ≥3 покупок фруктов за 30 дней (в контексте нашей таблицы "покупок продуктов категории "фрукты и ягоды"")
fruit_lover = """
SELECT
    c.customer_id,
    CAST(COUNT(DISTINCT
        CASE
            WHEN ct.category_name = 'фрукты и ягоды' AND fp.purchase_datetime >= NOW() - INTERVAL '30' DAY
            THEN fp.purchase_id
            ELSE NULL
        END
    ) >= 3 AS INT) AS fruit_lover
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN fact_purchase_items fpi ON fp.purchase_id = fpi.purchase_id
LEFT JOIN dim_products p ON fpi.product_id = p.product_id
LEFT JOIN categories ct ON p.category_id = ct.category_id
GROUP BY c.customer_id
"""
fruit_lover_df = execute_query(fruit_lover)

In [78]:
# 30. vegetarian_profile - Не купил ни одного мясного продукта за 90 дней (в контексте нашей таблицы "Не купил ни одного продукта категории "мясо, рыба, яйца и бобовые"")
vegetarian_profile = """
SELECT
    c.customer_id,
    CAST(SUM(CASE WHEN ct.category_name = 'мясо, рыба, яйца и бобовые' AND fp.purchase_datetime >= NOW() - INTERVAL '90' DAY THEN 1 ELSE 0 END) = 0 AS INT) AS vegetarian_profile
FROM dim_customers c
LEFT JOIN fact_purchases fp ON c.customer_id = fp.customer_id
LEFT JOIN fact_purchase_items fpi ON fp.purchase_id = fpi.purchase_id
LEFT JOIN dim_products p ON fpi.product_id = p.product_id
LEFT JOIN categories ct ON p.category_id = ct.category_id
GROUP BY c.customer_id
"""
vegetarian_profile_df = execute_query(vegetarian_profile)

In [81]:
# Объединяем все DataFrame в итоговую таблицу
feature_matrix_df = base_df
feature_matrix_df = feature_matrix_df.join(bought_milk_last_30d_df, on="customer_id", how="left") # - Покупал молочные продукты за последние 30 дней
feature_matrix_df = feature_matrix_df.join(bought_fruits_last_14d_df, on="customer_id", how="left") # - Покупал фрукты и ягоды за последние 14 дней
feature_matrix_df = feature_matrix_df.join(not_bought_veggies_14d_df, on="customer_id", how="left") # - Не покупал овощи и зелень за последние 14 дней
feature_matrix_df = feature_matrix_df.join(recurrent_buyer_df, on="customer_id", how="left") # - Делал более 2 покупок за последние 30 дней
feature_matrix_df = feature_matrix_df.join(inactive_14_30_df, on="customer_id", how="left") # - Не покупал 14–30 дней (ушедший клиент?)
feature_matrix_df = feature_matrix_df.join(new_customer_df, on="customer_id", how="left") # - Покупатель зарегистрировался менее 30 дней назад
feature_matrix_df = feature_matrix_df.join(delivery_user_df, on="customer_id", how="left") # - Пользовался доставкой хотя бы раз
feature_matrix_df = feature_matrix_df.join(organic_preference_df, on="customer_id", how="left") # - Купил хотя бы 1 органический продукт
feature_matrix_df = feature_matrix_df.join(bulk_buyer_df, on="customer_id", how="left") # - Средняя корзина > 1000₽
feature_matrix_df = feature_matrix_df.join(low_cost_buyer_df, on="customer_id", how="left") # - Средняя корзина < 200₽
feature_matrix_df = feature_matrix_df.join(buys_bakery_df, on="customer_id", how="left") # - Покупал хлеб/выпечку хотя бы раз
feature_matrix_df = feature_matrix_df.join(loyal_customer_df, on="customer_id", how="left") # - Лояльный клиент (карта и ≥3 покупки)
feature_matrix_df = feature_matrix_df.join(multicity_buyer_df, on="customer_id", how="left") # - Делал покупки в разных городах
feature_matrix_df = feature_matrix_df.join(bought_meat_last_week_df, on="customer_id", how="left") # - Покупал мясо/рыбу/яйца за последнюю неделю
feature_matrix_df = feature_matrix_df.join(night_shopper_df, on="customer_id", how="left") # - Делал покупки после 20:00
feature_matrix_df = feature_matrix_df.join(morning_shopper_df, on="customer_id", how="left") # - Делал покупки до 10:00
feature_matrix_df = feature_matrix_df.join(prefers_cash_df, on="customer_id", how="left") #  - Оплачивал наличными ≥ 70% покупок
feature_matrix_df = feature_matrix_df.join(prefers_card_df, on="customer_id", how="left") # - Оплачивал картой ≥ 70% покупок
feature_matrix_df = feature_matrix_df.join(weekend_shopper_df, on="customer_id", how="left") # - Делал ≥ 60% покупок в выходные
feature_matrix_df = feature_matrix_df.join(weekday_shopper_df, on="customer_id", how="left") # - Оплачивал картой ≥ 70% покупок
feature_matrix_df = feature_matrix_df.join(single_item_buyer_df, on="customer_id", how="left") # - ≥50% покупок — 1 товар в корзине
feature_matrix_df = feature_matrix_df.join(varied_shopper_df, on="customer_id", how="left") # - Покупал ≥4 разных категорий продуктов
feature_matrix_df = feature_matrix_df.join(store_loyal_df, on="customer_id", how="left") # -  Ходит только в один магазин
feature_matrix_df = feature_matrix_df.join(switching_store_df, on="customer_id", how="left") # - Ходит в разные магазины
feature_matrix_df = feature_matrix_df.join(family_shopper_df, on="customer_id", how="left") # - Среднее кол-во позиций в корзине ≥4
feature_matrix_df = feature_matrix_df.join(early_bird_df, on="customer_id", how="left") # - Покупка в промежутке между 12 и 15 часами дня
feature_matrix_df = feature_matrix_df.join(no_purchases_df, on="customer_id", how="left") # - Не совершал ни одной покупки (только регистрация)
feature_matrix_df = feature_matrix_df.join(recent_high_spender_df, on="customer_id", how="left") # - Купил на сумму >2000₽ за последние 7 дней
feature_matrix_df = feature_matrix_df.join(fruit_lover_df, on="customer_id", how="left") # - ≥3 покупок фруктов за 30 дней
feature_matrix_df = feature_matrix_df.join(vegetarian_profile_df, on="customer_id", how="left") # - Не купил ни одного мясного продукта за 90 дней 

# Заполняем null значения нулями
feature_matrix_df = feature_matrix_df.fillna(0)

# Показываем первые несколько строк результата
feature_matrix_df.show(5, vertical=True)

-RECORD 0----------------------------------
 customer_id            | cus-1000         
 first_name             | нестор           
 last_name              | фадеев           
 loyalty_card_number    | loyal-90f7f71182 
 bought_milk_last_30d   | 0                
 bought_fruits_last_14d | 0                
 not_bought_veggies_14d | 1                
 recurrent_buyer        | 0                
 inactive_14_30         | 1                
 new_customer           | 0                
 delivery_user          | 1                
 organic_preference     | 1                
 bulk_buyer             | 1                
 low_cost_buyer         | 0                
 buys_bakery            | 1                
 loyal_customer         | 1                
 multicity_buyer        | 1                
 bought_meat_last_week  | 0                
 night_shopper          | 1                
 morning_shopper        | 1                
 prefers_cash           | 1                
 prefers_card           | 1     

# Формируем имя файла с текущей датой
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_filename = f"feature_matrix_{current_time}.csv"

# Сохраняем в один файл CSV
feature_matrix_df.coalesce(1).write.option("header", "true").csv("temp_output")

# Переименовываем единственный партиционный файл
import os, glob
os.rename(glob.glob("temp_output/part-*.csv")[0], output_filename)
os.system("rm -rf temp_output")

print(f"Витрина данных успешно создана и сохранена в CSV файл: {output_filename}")

# Загружаем файл в S3
s3_client = boto3.client(
    's3',
    endpoint_url=ENDPOINT,
    aws_access_key_id=KEY_ID,
    aws_secret_access_key=SECRET,
    config=Config(signature_version='s3v4')
)

s3_client.upload_file(output_filename, CONTAINER, output_filename)
print(f"Файл успешно загружен в S3: {CONTAINER}/{output_filename}")

# Закрываем сессию
spark.stop()


In [41]:
# Закрываем сессию
spark.stop()