In [None]:
# Инициализация подключения к базе данных и сессии Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, date_trunc, expr, count, countDistinct, when, array_distinct, collect_list
import psycopg2
from datetime import datetime, timedelta

# Подключение к базе данных PostgreSQL
db_conn = psycopg2.connect(
    host="*******",
    database="***",
    user="***",
    password="***"
)
db_cursor = db_conn.cursor()

# Настройка сессии Spark с параметрами для работы с большими данными
spark = SparkSession.builder \
    .appName("AvitoTech Mart Build") \
    .config("spark.jars", "/jars/postgresql-42.7.4.jar") \
    .config("spark.sql.shuffle.partitions", "20") \
    .config("spark.default.parallelism", "20") \
    .getOrCreate()

INGEST_DATE = "2026-02-14"
JDBC_URL = "jdbc:postgresql://postgres:***/***"
JDBC_PROPS = {"user": "***", "password": "***", "driver": "org.postgresql.Driver"}

print("Подключение к базе данных и инициализация Spark завершены")
print(f"Дата обрабатываемой партиции: {INGEST_DATE}")

16:51:38 [WARN] o.a.h.u.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16:51:39 [WARN] o.a.s.SparkConf - Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
16:51:40 [WARN] o.a.s.u.Utils - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Подключение к базе данных и инициализация Spark завершены
Дата обрабатываемой партиции: 2026-02-14


In [5]:
# Создание структуры таблиц витрин с индексами для ускорения запросов
db_cursor.execute("""
CREATE TABLE IF NOT EXISTS marts.mart_user_interactions (
    cookie BIGINT NOT NULL,
    item BIGINT NOT NULL,
    event_date TIMESTAMP NOT NULL,
    event_day DATE NOT NULL,
    event_week DATE NOT NULL,
    event_hour INTEGER NOT NULL,
    surface_id BIGINT NOT NULL,
    platform_id BIGINT NOT NULL,
    category_id BIGINT,
    location_id BIGINT,
    is_contact BOOLEAN NOT NULL,
    ingest_date DATE NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_mart_ui_event_day ON marts.mart_user_interactions(event_day);
CREATE INDEX IF NOT EXISTS idx_mart_ui_ingest_date ON marts.mart_user_interactions(ingest_date);
""")

db_cursor.execute("""
CREATE TABLE IF NOT EXISTS marts.mart_daily_surface_stats (
    event_day DATE NOT NULL,
    surface_id BIGINT NOT NULL,
    platform_id BIGINT NOT NULL,
    category_id BIGINT,
    location_id BIGINT,
    total_interactions BIGINT NOT NULL,
    contacts BIGINT NOT NULL,
    contact_rate NUMERIC(5,2) NOT NULL,
    unique_users BIGINT NOT NULL,
    unique_items BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_mart_dss_event_day ON marts.mart_daily_surface_stats(event_day);
""")

db_cursor.execute("""
CREATE TABLE IF NOT EXISTS marts.mart_user_behavior (
    cookie BIGINT PRIMARY KEY,
    first_seen_date DATE NOT NULL,
    last_seen_date DATE NOT NULL,
    total_interactions BIGINT NOT NULL,
    total_contacts BIGINT NOT NULL,
    lifetime_contact_rate NUMERIC(5,2) NOT NULL,
    days_since_first_interaction INTEGER NOT NULL,
    recent_platforms BIGINT[] NOT NULL,
    recent_surfaces BIGINT[] NOT NULL
);
""")

db_conn.commit()
print("Структура витрин создана")

Структура витрин создана


In [6]:
# Загрузка детальных данных взаимодействий за указанную дату партиции
# Удаление ранее загруженных данных за эту дату для обеспечения идемпотентности
db_cursor.execute("DELETE FROM marts.mart_user_interactions WHERE ingest_date = %s", (INGEST_DATE,))
db_conn.commit()
print(f"Удалены данные за дату партиции {INGEST_DATE} из витрины mart_user_interactions")

# Чтение данных из фактовой таблицы с фильтрацией на стороне базы данных
# Параметр predicates обеспечивает выполнение WHERE-условия в PostgreSQL
JDBC_PROPS_FETCH = JDBC_PROPS.copy()
JDBC_PROPS_FETCH["fetchsize"] = "10000"

df_new = spark.read.jdbc(
    url=JDBC_URL,
    table="core.fct_user_interactions",
    predicates=[f"ingest_date = '{INGEST_DATE}'"],
    properties=JDBC_PROPS_FETCH) \
    .withColumn("event_day", date_trunc("day", col("event_date")).cast("date")) \
    .withColumn("event_week", date_trunc("week", col("event_date")).cast("date")) \
    .withColumn("event_hour", expr("EXTRACT(HOUR FROM event_date)")) \
    .select(
        "cookie", "item", "event_date", "event_day", "event_week", "event_hour",
        "surface_id", "platform_id", "category_id", "location_id", "is_contact", "ingest_date"
    )

# Запись данных в витрину с контролем размера партиций и пакетов вставки
df_new.repartition(4).write \
    .option("batchsize", "10000") \
    .jdbc(JDBC_URL, "marts.mart_user_interactions", mode="append", properties=JDBC_PROPS)

inserted_count = df_new.count()
print(f"Загружено {inserted_count:,} строк в витрину mart_user_interactions")

Удалены данные за дату партиции 2026-02-14 из витрины mart_user_interactions


[Stage 3:>                                                          (0 + 1) / 1]

Загружено 68,806,152 строк в витрину mart_user_interactions


                                                                                

In [None]:
# Обновление ежедневных агрегатов по поверхностям и платформам
# Определение дней, для которых необходимо пересчитать агрегаты
db_cursor.execute("""
    SELECT DISTINCT DATE_TRUNC('day', event_date)::date 
    FROM core.fct_user_interactions 
    WHERE ingest_date = %s
""", (INGEST_DATE,))
affected_days = [row[0] for row in db_cursor.fetchall()]

if not affected_days:
    print("Нет дней, требующих пересчета агрегатов")
else:
    print(f"Пересчет агрегатов для дней: {affected_days}")
    
    # Удаление старых агрегатов за затронутые дни
    placeholders = ','.join(['%s'] * len(affected_days))
    db_cursor.execute(f"DELETE FROM marts.mart_daily_surface_stats WHERE event_day IN ({placeholders})", affected_days)
    db_conn.commit()
    
    # Чтение данных за затронутые дни с фильтрацией на стороне базы данных
    predicates = [f"DATE_TRUNC('day', event_date)::date = '{day}'" for day in affected_days]
    df_agg = spark.read.jdbc(
        url=JDBC_URL,
        table="core.fct_user_interactions",
        predicates=predicates,
        properties=JDBC_PROPS_FETCH
    ).groupBy(
        date_trunc("day", col("event_date")).cast("date").alias("event_day"),
        "surface_id", "platform_id", "category_id", "location_id"
    ).agg(
        count("*").alias("total_interactions"),
        count(when(col("is_contact"), 1)).alias("contacts"),
        (count(when(col("is_contact"), 1)) * 100.0 / count("*")).alias("contact_rate"),
        countDistinct("cookie").alias("unique_users"),
        countDistinct("item").alias("unique_items")
    ).withColumn("contact_rate", expr("ROUND(contact_rate, 2)")) \
     .select(
        "event_day", "surface_id", "platform_id", "category_id", "location_id",
        "total_interactions", "contacts", "contact_rate", "unique_users", "unique_items"
     )
    
    # Запись пересчитанных агрегатов в витрину
    df_agg.repartition(2).write \
        .option("batchsize", "10000") \
        .jdbc(JDBC_URL, "marts.mart_daily_surface_stats", mode="append", properties=JDBC_PROPS)
    
    print(f"Пересчитано {df_agg.count():,} агрегатов")

In [12]:
# Пересчет витрины поведения пользователей
print("Пересчет витрины поведения пользователей")

# Определение максимальной даты события в фактовой таблице для расчета окна анализа
db_cursor.execute("SELECT MAX(event_date) FROM core.fct_user_interactions")
max_event_date = db_cursor.fetchone()[0]
print(f"Максимальная дата события в фактах: {max_event_date}")

# Очистка существующих данных в витрине поведения пользователей
db_cursor.execute("DELETE FROM marts.mart_user_behavior")
db_conn.commit()

# Чтение данных за последний год с фильтрацией на стороне базы данных
one_year_ago = (max_event_date - timedelta(days=365)).strftime('%Y-%m-%d')
predicates = [f"event_date >= '{one_year_ago}' AND event_date <= '{max_event_date}'"]

from pyspark.sql.functions import datediff, min as spark_min, max as spark_max, count, when, col, array_distinct, collect_list, expr, lit

thirty_days_ago = (max_event_date - timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')

df_behavior = spark.read.jdbc(
    url=JDBC_URL,
    table="core.fct_user_interactions",
    predicates=predicates,
    properties=JDBC_PROPS_FETCH
).groupBy("cookie") \
 .agg(
    spark_min("event_date").alias("first_seen_date"),
    spark_max("event_date").alias("last_seen_date"),
    count("*").alias("total_interactions"),
    count(when(col("is_contact"), 1)).alias("total_contacts"),
    ((count(when(col("is_contact"), 1)) * 100.0) / count("*")).alias("lifetime_contact_rate"),
    datediff(
        spark_max("event_date").cast("date"),
        spark_min("event_date").cast("date")
    ).alias("days_since_first_interaction"),
    array_distinct(collect_list(
        when(col("event_date") >= lit(thirty_days_ago), col("platform_id"))
    )).alias("recent_platforms"),
    array_distinct(collect_list(
        when(col("event_date") >= lit(thirty_days_ago), col("surface_id"))
    )).alias("recent_surfaces")
 ).withColumn("lifetime_contact_rate", expr("ROUND(lifetime_contact_rate, 2)")) \
  .withColumn("first_seen_date", col("first_seen_date").cast("date")) \
  .withColumn("last_seen_date", col("last_seen_date").cast("date")) \
  .select(
    "cookie", "first_seen_date", "last_seen_date", "total_interactions",
    "total_contacts", "lifetime_contact_rate", "days_since_first_interaction",
    "recent_platforms", "recent_surfaces"
 )

# Запись результатов в витрину поведения пользователей
df_behavior.repartition(4).write \
    .option("batchsize", "10000") \
    .jdbc(JDBC_URL, "marts.mart_user_behavior", mode="append", properties=JDBC_PROPS)

user_count = df_behavior.count()
print(f"Рассчитаны сегменты для {user_count:,} пользователей")

Пересчет витрины поведения пользователей
Максимальная дата события в фактах: 2025-02-23 03:00:00


[Stage 28:>                                                         (0 + 1) / 1]

Рассчитаны сегменты для 134,294 пользователей


                                                                                

In [13]:
# Проверка результатов загрузки витрин
db_cursor.execute("SELECT COUNT(*) FROM marts.mart_user_interactions WHERE ingest_date = %s", (INGEST_DATE,))
mart1_total = db_cursor.fetchone()[0]

db_cursor.execute("""
    SELECT COUNT(*) FROM marts.mart_daily_surface_stats 
    WHERE event_day IN (
        SELECT DISTINCT DATE_TRUNC('day', event_date)::date 
        FROM core.fct_user_interactions WHERE ingest_date = %s
    )
""", (INGEST_DATE,))
mart2_total = db_cursor.fetchone()[0]

db_cursor.execute("SELECT COUNT(*) FROM marts.mart_user_behavior")
mart3_total = db_cursor.fetchone()[0]

db_conn.close()

print("=" * 70)
print("ЗАГРУЗКА ВИТРИН ЗАВЕРШЕНА")
print("=" * 70)
print(f"mart_user_interactions   : {mart1_total:>12,} строк")
print(f"mart_daily_surface_stats : {mart2_total:>12,} агрегатов")
print(f"mart_user_behavior       : {mart3_total:>12,} пользователей")
print("=" * 70)

ЗАГРУЗКА ВИТРИН ЗАВЕРШЕНА
mart_user_interactions   :   68,806,152 строк
mart_daily_surface_stats :    7,326,691 агрегатов
mart_user_behavior       :      134,294 пользователей
