In [6]:
import findspark
findspark.init()

In [162]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, BooleanType  


In [19]:
# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("Retail Sales Data Quality & Analytics") \
    .getOrCreate()

print(f"Spark сессия создана: {spark.sparkContext.appName}")

Spark сессия создана: Retail Sales Data Quality & Analytics


In [None]:
# 1. Загрузка и предварительная обработка данных

In [28]:
# 1.1. Загрузка и вывод схемы: 

df = spark.read.csv(
    "retail_store_sales.csv",      
    header=True,            
    inferSchema=True         
)

print("Схема DataFrame:")
df.printSchema()

print("Вывод первых 5 строк загруженного DataFrame:")
df.show(5)

Схема DataFrame:
root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Discount Applied: boolean (nullable = true)

Вывод первых 5 строк загруженного DataFrame:
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN

In [27]:
# 1.2. Очистка названий столбцов

# Преобразование названий столбцов в snake_case
for column in df.columns:
    new_name = column.lower().replace(' ', '_')
    df = df.withColumnRenamed(column, new_name)

# Вывод обновленных названий столбцов
print(df.columns)

# Вывод схемы
df.printSchema()

['transaction_id', 'customer_id', 'category', 'item', 'price_per_unit', 'quantity', 'total_spent', 'payment_method', 'location', 'transaction_date', 'discount_applied']
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



In [33]:
# 1.3. Преобразование типов данных

# Определение схемы вручную
schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("category", StringType(), False),
    StructField("item", StringType(), True),
    StructField("price_per_unit", DoubleType(), True),
    StructField("quantity", DoubleType(), True),
    StructField("total_spent", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("location", StringType(), True),
    StructField("transaction_date", DateType(), False),
    StructField("discount_applied", BooleanType(), True)
])

# Перезагрузка данных 
df = spark.read.csv(
    "retail_store_sales.csv",      
    header=True,            
    schema=schema         
)

# Проверка схемы
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



In [None]:
#2. Очистка и валидация данных

In [63]:
# 2.1 Заполнение отсутствующих Price Per Unit

null_price_count = df.filter(F.col("price_per_unit").isNull()).count()
print(f"Количество NULL записей в столбце `price_per_unit` до заполнения: {null_price_count}")

# Условия для вычисления цены
conditions = (
    F.col("price_per_unit").isNull() &
    F.col("total_spent").isNotNull() &
    F.col("quantity").isNotNull() &
    (F.col("quantity") != 0)
)

# Вычисление цены за единицу где это возможно
calculated_price = F.round(F.col("total_spent") / F.col("quantity"), 2)

df_filled = df.withColumn(
    "price_per_unit",
    F.when(conditions, calculated_price)
     .otherwise(F.col("price_per_unit"))
)

null_price_count = df_filled.filter(F.col("price_per_unit").isNull()).count()
print(f"Количество NULL записей в столбце `price_per_unit` после заполнения: {null_price_count}")

Количество NULL записей в столбце `price_per_unit` до заполнения: 609
Количество NULL записей в столбце `price_per_unit` после заполнения: 0


In [87]:
# 2.2. Восстановление отсутствующих Item

# Создание справочника товаров

item_catalog = df_filled.filter(
    F.col("item").isNotNull() &
    F.col("category").isNotNull() &
    F.col("price_per_unit").isNotNull()
).select(
    "category",
    "item", 
    "price_per_unit"
).distinct().orderBy("category", "item")

# Переименовываем столбец item в item_catalog
item_catalog = item_catalog.withColumnRenamed("item", "item_catalog")

print(f"Размер справочника: {item_catalog.count()} уникальных товаров")
item_catalog.show(20, truncate=False)

Размер справочника: 200 уникальных товаров
+---------+------------+--------------+
|category |item_catalog|price_per_unit|
+---------+------------+--------------+
|Beverages|Item_10_BEV |18.5          |
|Beverages|Item_11_BEV |20.0          |
|Beverages|Item_12_BEV |21.5          |
|Beverages|Item_13_BEV |23.0          |
|Beverages|Item_14_BEV |24.5          |
|Beverages|Item_15_BEV |26.0          |
|Beverages|Item_16_BEV |27.5          |
|Beverages|Item_17_BEV |29.0          |
|Beverages|Item_18_BEV |30.5          |
|Beverages|Item_19_BEV |32.0          |
|Beverages|Item_1_BEV  |5.0           |
|Beverages|Item_20_BEV |33.5          |
|Beverages|Item_21_BEV |35.0          |
|Beverages|Item_22_BEV |36.5          |
|Beverages|Item_23_BEV |38.0          |
|Beverages|Item_24_BEV |39.5          |
|Beverages|Item_25_BEV |41.0          |
|Beverages|Item_2_BEV  |6.5           |
|Beverages|Item_3_BEV  |8.0           |
|Beverages|Item_4_BEV  |9.5           |
+---------+------------+-------------

In [101]:
# Пропуски до восстановления

missing_before = df_filled.filter(F.col("item").isNull()).count()
print(f"Количество строк с пропусками в item до восстановления: {missing_before}")

Количество строк с пропусками в item до восстановления: 1213


In [102]:
# Восстанавливаем item через join с справочником

# Восстанавление отсутствующи item путем соединения с item_catalog
df_joined = df_filled.join(item_catalog, on=["category", "price_per_unit"], how="left")

df_filled_item = df_joined.select(
                df_joined["transaction_id"],
                df_joined["customer_id"],
                df_joined["category"],
                F.coalesce(df_joined["item"], df_joined["item_catalog_item"]).alias("item"),
                df_joined["price_per_unit"],
                df_joined["quantity"],
                df_joined["total_spent"],
                df_joined["payment_method"],
                df_joined["location"],
                df_joined["transaction_date"],
                df_joined["discount_applied"]
)

In [128]:
# Пропуски после восстановления
missing_after = df_filled_item.filter(F.col("item").isNull()).count()
print(f"Количество строк с пропусками в item после восстановления: {missing_after}")

Количество строк с пропусками в item после восстановления: 0


In [140]:
# 2.3. Заполнение отсутствующих Quantity и Total Spent

# Шаг 1: Проверка пропусков до восстановления
print("Проверка пропусков до восстановления:")
null_counts_before = df_filled_item.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) 
    for c in ["quantity", "total_spent", "price_per_unit"]
]).collect()[0]

print(f"Пропусков в quantity: {null_counts_before['quantity']}")
print(f"Пропусков в total_spent: {null_counts_before['total_spent']}")
print(f"Пропусков в price_per_unit: {null_counts_before['price_per_unit']}")


Проверка пропусков до восстановления:
Пропусков в quantity: 604
Пропусков в total_spent: 604
Пропусков в price_per_unit: 0


In [142]:
# Шаг 2: Восстановление Total Spent (если есть quantity и price_per_unit)

print("Восстановление Total Spent")
df_filled_total = df_filled_item.withColumn(
    "total_spent",
    F.when(
        (F.col("total_spent").isNull()) &
        (F.col("quantity").isNotNull()) &
        (F.col("price_per_unit").isNotNull()),
        F.round(F.col("quantity") * F.col("price_per_unit"), 2)
    ).otherwise(F.col("total_spent"))
)

Восстановление Total Spent


In [143]:
# Шаг 3: Восстановление Quantity (если есть total_spent и price_per_unit)

print("Восстановление Quantity:")
df_filled_quantity = df_filled_total.withColumn(
    "quantity",
    F.when(
        (F.col("quantity").isNull()) &
        (F.col("total_spent").isNotNull()) &
        (F.col("price_per_unit").isNotNull()) &
        (F.col("price_per_unit") != 0),
        F.round(F.col("total_spent") / F.col("price_per_unit"), 0).cast("integer")
    ).otherwise(F.col("quantity"))
)

Восстановление Quantity:


In [144]:
# Шаг 4: Проверка результатов

print("Итоговая проверка после восстановления:")
null_counts_after = df_filled_quantity.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) 
    for c in ["quantity", "total_spent", "price_per_unit"]
]).collect()[0]
print(f"Пропусков в quantity после восстановления: {null_counts_after['quantity']}")
print(f"Пропусков в total_spent после восстановления: {null_counts_after['total_spent']}")

print(f"\nИтого восстановлено:")
print(f"- total_spent: {null_counts_before['total_spent'] - null_counts_after['total_spent']}")
print(f"- quantity: {null_counts_before['quantity'] - null_counts_after['quantity']}")

Итоговая проверка после восстановления:
Пропусков в quantity после восстановления: 604
Пропусков в total_spent после восстановления: 604

Итого восстановлено:
- total_spent: 0
- quantity: 0


In [145]:
# 2.4. Удаление строк с пропусками в Category, Quantity ,Total Spent и Price Per Unit

print("Количество строк до удаления:")
row_count_before = df_filled_quantity.count()
print(f"Всего строк: {row_count_before}")

Количество строк до удаления:
Всего строк: 12575


In [146]:
# Проверка пропусков по каждому полю
print("\nПропуски по полям перед удалением:")
null_counts = df_filled_quantity.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) 
    for c in ["category", "quantity", "total_spent", "price_per_unit"]
]).collect()[0]

for field, count in null_counts.asDict().items():
    print(f"{field}: {count} пропусков")


Пропуски по полям перед удалением:
category: 0 пропусков
quantity: 604 пропусков
total_spent: 604 пропусков
price_per_unit: 0 пропусков


In [148]:
type(null_counts)

pyspark.sql.types.Row

In [149]:
null_counts

Row(category=0, quantity=604, total_spent=604, price_per_unit=0)

In [150]:
null_counts = df_filled_quantity.agg(
    F.sum(F.col("category").isNull().cast("int")).alias("category_nulls"),
    F.sum(F.col("quantity").isNull().cast("int")).alias("quantity_nulls"),
    F.sum(F.col("total_spent").isNull().cast("int")).alias("total_spent_nulls"),
    F.sum(F.col("price_per_unit").isNull().cast("int")).alias("price_nulls")
).collect()[0]

In [151]:
null_counts

Row(category_nulls=0, quantity_nulls=604, total_spent_nulls=604, price_nulls=0)

In [153]:
# Идентифицикация строк для удаления (есть хотя бы один пропуск в указанных полях)
rows_to_delete = df_filled_quantity.filter(
    F.col("category").isNull() |
    F.col("quantity").isNull() |
    F.col("total_spent").isNull() |
    F.col("price_per_unit").isNull()
)
print(f"\nНайдено строк для удаления: {rows_to_delete.count()}")


Найдено строк для удаления: 604


In [154]:
# Удаление строк с пропусками
df_cleaned = df_filled_quantity.filter(
    F.col("category").isNotNull() &
    F.col("quantity").isNotNull() &
    F.col("total_spent").isNotNull() &
    F.col("price_per_unit").isNotNull()
)
print("\nКоличество строк после удаления:")
row_count_after = df_cleaned.count()
print(f"Всего строк: {row_count_after}")
print(f"Удалено строк: {row_count_before - row_count_after}")


Количество строк после удаления:
Всего строк: 11971
Удалено строк: 604


In [160]:
# Проверка отсутствия пропусков
print("Проверка отсутствия пропусков после удаления:")
null_counts_after = df_cleaned.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) 
    for c in ["category", "quantity", "total_spent", "price_per_unit"]
]).collect()[0]

for field, count in null_counts_after.asDict().items():
    print(f"{field}: {count} пропусков")

Проверка отсутствия пропусков после удаления:
category: 0 пропусков
quantity: 0 пропусков
total_spent: 0 пропусков
price_per_unit: 0 пропусков


In [159]:
print(f"Итоговое количество записей: {df_cleaned.count()}")

Итоговое количество записей: 11971


In [161]:
#3. Разведочный анализ данных

In [166]:
#3.1. Самые популярные категории товаров

# Общее количество проданных единиц по категориям
category_sales = df_cleaned.groupBy("category") \
    .agg(
        F.sum("quantity").alias("total_units_sold"),
        F.count("transaction_id").alias("transaction_count"),
        F.round(F.sum("total_spent"), 2).alias("total_revenue")
    ) \
    .orderBy(F.desc("total_units_sold"))

print("Все категории товаров (по количеству проданных единиц):")
category_sales.show(truncate=False)


Все категории товаров (по количеству проданных единиц):
+----------------------------------+----------------+-----------------+-------------+
|category                          |total_units_sold|transaction_count|total_revenue|
+----------------------------------+----------------+-----------------+-------------+
|Furniture                         |8462.0          |1525             |195310.0     |
|Food                              |8387.0          |1507             |194812.0     |
|Beverages                         |8358.0          |1496             |197047.5     |
|Milk Products                     |8339.0          |1513             |180112.0     |
|Electric household essentials     |8309.0          |1516             |203813.5     |
|Computers and electric accessories|8272.0          |1477             |190692.5     |
|Butchers                          |8206.0          |1496             |208118.0     |
|Patisserie                        |7943.0          |1441             |182165.5    

In [167]:
# Топ-5 категорий по количеству проданных единиц
top_5_categories = category_sales.limit(5)

print("Топ-5 категорий по общему количеству проданных единиц:")
top_5_categories.show(truncate=False)

Топ-5 категорий по общему количеству проданных единиц:
+-----------------------------+----------------+-----------------+-------------+
|category                     |total_units_sold|transaction_count|total_revenue|
+-----------------------------+----------------+-----------------+-------------+
|Furniture                    |8462.0          |1525             |195310.0     |
|Food                         |8387.0          |1507             |194812.0     |
|Beverages                    |8358.0          |1496             |197047.5     |
|Milk Products                |8339.0          |1513             |180112.0     |
|Electric household essentials|8309.0          |1516             |203813.5     |
+-----------------------------+----------------+-----------------+-------------+



In [169]:
# Визуализация результатов
print("Детальная статистика Топ-5 категорий:")
for row in top_5_categories.collect():
    category = row["category"]
    units = row["total_units_sold"]
    transactions = row["transaction_count"]
    revenue = row["total_revenue"]
    avg_per_transaction = round(revenue / transactions, 2) if transactions > 0 else 0
    
    print(f"\nКатегория: {category}")
    print(f"  Продано единиц: {int(units):,}")
    print(f"  Количество транзакций: {transactions:,}")
    print(f"  Общая выручка: ${revenue:,.2f}")
    print(f"  Средний чек: ${avg_per_transaction:,.2f}")

Детальная статистика Топ-5 категорий:

Категория: Furniture
  Продано единиц: 8,462
  Количество транзакций: 1,525
  Общая выручка: $195,310.00
  Средний чек: $128.07

Категория: Food
  Продано единиц: 8,387
  Количество транзакций: 1,507
  Общая выручка: $194,812.00
  Средний чек: $129.27

Категория: Beverages
  Продано единиц: 8,358
  Количество транзакций: 1,496
  Общая выручка: $197,047.50
  Средний чек: $131.72

Категория: Milk Products
  Продано единиц: 8,339
  Количество транзакций: 1,513
  Общая выручка: $180,112.00
  Средний чек: $119.04

Категория: Electric household essentials
  Продано единиц: 8,309
  Количество транзакций: 1,516
  Общая выручка: $203,813.50
  Средний чек: $134.44


In [170]:
print("Процентное распределение Топ-5 категорий:")
total_units = df_cleaned.agg(F.sum("quantity")).collect()[0][0]

for row in top_5_categories.collect():
    category = row["category"]
    units = row["total_units_sold"]
    percentage = (units / total_units) * 100
    print(f"{category}: {percentage:.1f}%")

print(f"\nОбщее количество проданных единиц товара: {int(total_units):,}")

Процентное распределение Топ-5 категорий:
Furniture: 12.8%
Food: 12.7%
Beverages: 12.6%
Milk Products: 12.6%
Electric household essentials: 12.5%

Общее количество проданных единиц товара: 66,276


In [173]:
#3.2. Анализ среднего чека

# 1. Средний чек по методам оплаты
print("1. Средний чек по методам оплаты:")
avg_by_payment = df_cleaned.groupBy("payment_method") \
    .agg(
        F.round(F.avg("total_spent"), 2).alias("avg_check"),
        F.round(F.sum("total_spent"), 2).alias("total_revenue"),
        F.count("transaction_id").alias("transaction_count"),
        F.round(F.stddev("total_spent"), 2).alias("std_dev_check")
    ) \
    .orderBy(F.desc("avg_check"))

avg_by_payment.show(truncate=False)

1. Средний чек по методам оплаты:
+--------------+---------+-------------+-----------------+-------------+
|payment_method|avg_check|total_revenue|transaction_count|std_dev_check|
+--------------+---------+-------------+-----------------+-------------+
|Cash          |131.05   |537710.0     |4103             |94.78        |
|Credit Card   |129.13   |507082.0     |3927             |95.23        |
|Digital Wallet|128.72   |507279.0     |3941             |94.25        |
+--------------+---------+-------------+-----------------+-------------+



In [174]:
# Детальная информация по методам оплаты
print("Детальная статистика по методам оплаты:")
for row in avg_by_payment.collect():
    method = row["payment_method"]
    avg = row["avg_check"]
    total = row["total_revenue"]
    count = row["transaction_count"]
    
    print(f"\nМетод оплаты: {method}")
    print(f"  Средний чек: ${avg:,.2f}")
    print(f"  Общая выручка: ${total:,.2f}")
    print(f"  Количество транзакций: {count:,}")
    print(f"  Доля от общей выручки: {(total / df_cleaned.agg(F.sum('total_spent')).collect()[0][0] * 100):.1f}%")

Детальная статистика по методам оплаты:

Метод оплаты: Cash
  Средний чек: $131.05
  Общая выручка: $537,710.00
  Количество транзакций: 4,103
  Доля от общей выручки: 34.6%

Метод оплаты: Credit Card
  Средний чек: $129.13
  Общая выручка: $507,082.00
  Количество транзакций: 3,927
  Доля от общей выручки: 32.7%

Метод оплаты: Digital Wallet
  Средний чек: $128.72
  Общая выручка: $507,279.00
  Количество транзакций: 3,941
  Доля от общей выручки: 32.7%


In [183]:
# 2. Средний чек по месту оплаты (Location)
print("\n\n2. Средний чек по месту оплаты:")
avg_by_location = df_cleaned.groupBy("location") \
    .agg(
        F.round(F.avg("total_spent"), 2).alias("avg_check"),
        F.round(F.sum("total_spent"), 2).alias("total_revenue"),
        F.count("transaction_id").alias("transaction_count"),
        F.round(F.min("total_spent"), 2).alias("min_check"),
        F.round(F.max("total_spent"), 2).alias("max_check")
    ) \
    .orderBy(F.desc("avg_check"))

avg_by_location.show(truncate=False)



2. Средний чек по месту оплаты:
+--------+---------+-------------+-----------------+---------+---------+
|location|avg_check|total_revenue|transaction_count|min_check|max_check|
+--------+---------+-------------+-----------------+---------+---------+
|Online  |130.42   |791401.0     |6068             |5.0      |410.0    |
|In-store|128.86   |760670.0     |5903             |5.0      |410.0    |
+--------+---------+-------------+-----------------+---------+---------+



In [184]:
# Детальная информация по местам оплаты
print("\nДетальная статистика по местам оплаты:")
for row in avg_by_location.collect():
    location = row["location"]
    avg = row["avg_check"]
    total = row["total_revenue"]
    count = row["transaction_count"]
    min_check = row["min_check"]
    max_check = row["max_check"]
    
    print(f"\nМесто оплаты: {location}")
    print(f"  Средний чек: ${avg:,.2f}")
    print(f"  Минимальный чек: ${min_check:,.2f}")
    print(f"  Максимальный чек: ${max_check:,.2f}")
    print(f"  Общая выручка: ${total:,.2f}")
    print(f"  Количество транзакций: {count:,}")


Детальная статистика по местам оплаты:

Место оплаты: Online
  Средний чек: $130.42
  Минимальный чек: $5.00
  Максимальный чек: $410.00
  Общая выручка: $791,401.00
  Количество транзакций: 6,068

Место оплаты: In-store
  Средний чек: $128.86
  Минимальный чек: $5.00
  Максимальный чек: $410.00
  Общая выручка: $760,670.00
  Количество транзакций: 5,903


In [None]:
# 4. Генерация признаков 

In [None]:
# 4.1. Временные признаки

In [202]:
# Создание копии DataFrame для добавления новых признаков
df_with_features = df_cleaned

# Добавление дня недели (1=воскресенье, 2=понедельник, ..., 7=суббота)
df_with_features = df_with_features.withColumn(
    "day_of_week", 
    F.dayofweek("transaction_date")
)

# Добавление месяца транзакции
df_with_features = df_with_features.withColumn(
    "transaction_month", 
    F.month("transaction_date")
)

# Добавление дня месяца и года для полноты анализа
df_with_features = df_with_features.withColumn(
    "transaction_day", 
    F.dayofmonth("transaction_date")
)

df_with_features = df_with_features.withColumn(
    "transaction_year", 
    F.year("transaction_date")
)

In [203]:
# Добавление названия дней недели для наглядности
df_with_features = df_with_features.withColumn(
    "day_of_week_name",
    F.when(F.col("day_of_week") == 1, "Sunday")
     .when(F.col("day_of_week") == 2, "Monday")
     .when(F.col("day_of_week") == 3, "Tuesday")
     .when(F.col("day_of_week") == 4, "Wednesday")
     .when(F.col("day_of_week") == 5, "Thursday")
     .when(F.col("day_of_week") == 6, "Friday")
     .when(F.col("day_of_week") == 7, "Saturday")
     .otherwise("Unknown")
)

In [208]:
# Добавление названия месяцев
df_with_features = df_with_features.withColumn(
    "transaction_month_name",
    F.when(F.col("transaction_month") == 1, "January")
     .when(F.col("transaction_month") == 2, "February")
     .when(F.col("transaction_month") == 3, "March")
     .when(F.col("transaction_month") == 4, "April")
     .when(F.col("transaction_month") == 5, "May")
     .when(F.col("transaction_month") == 6, "June")
     .when(F.col("transaction_month") == 7, "July")
     .when(F.col("transaction_month") == 8, "August")
     .when(F.col("transaction_month") == 9, "September")
     .when(F.col("transaction_month") == 10, "October")
     .when(F.col("transaction_month") == 11, "November")
     .when(F.col("transaction_month") == 12, "December")
     .otherwise("Unknown")
)


In [214]:
print("\nПримеры данных с временными признаками (первые 10 строк):")
df_with_features.select(
    "transaction_id",
    "transaction_date",
    "day_of_week_name",
    "transaction_day",
    "transaction_month_name",
    "transaction_year",
    "category",
    "total_spent"
).limit(10).show(truncate=False)


Примеры данных с временными признаками (первые 10 строк):
+--------------+----------------+----------------+---------------+----------------------+----------------+-------------+-----------+
|transaction_id|transaction_date|day_of_week_name|transaction_day|transaction_month_name|transaction_year|category     |total_spent|
+--------------+----------------+----------------+---------------+----------------------+----------------+-------------+-----------+
|TXN_6867343   |2024-04-08      |Monday          |8              |April                 |2024            |Patisserie   |185.0      |
|TXN_3731986   |2023-07-23      |Sunday          |23             |July                  |2023            |Milk Products|261.0      |
|TXN_9303719   |2022-10-05      |Wednesday       |5              |October               |2022            |Butchers     |43.0       |
|TXN_9458126   |2022-05-07      |Saturday        |7              |May                   |2022            |Beverages    |247.5      |
|TXN_45753

In [226]:
#4.2. Продажи по дням недели

print("\nРаспределение транзакций, общего дохода и среднего чека по дням недели:")
day_distribution = df_with_features.groupBy("day_of_week", "day_of_week_name") \
    .agg(
        F.count("transaction_id").alias("transaction_count"),
        F.round(F.sum("total_spent"), 2).alias("total_revenue"),
        F.round(F.avg("total_spent"), 2).alias("avg_check")
    ) \
    .orderBy("day_of_week")

day_distribution.show(truncate=False)


Распределение транзакций, общего дохода и среднего чека по дням недели:
+-----------+----------------+-----------------+-------------+---------+
|day_of_week|day_of_week_name|transaction_count|total_revenue|avg_check|
+-----------+----------------+-----------------+-------------+---------+
|1          |Sunday          |1736             |225991.0     |130.18   |
|2          |Monday          |1697             |213090.5     |125.57   |
|3          |Tuesday         |1696             |219650.0     |129.51   |
|4          |Wednesday       |1711             |216982.0     |126.82   |
|5          |Thursday        |1697             |219380.5     |129.28   |
|6          |Friday          |1729             |232786.0     |134.64   |
|7          |Saturday        |1705             |224191.0     |131.49   |
+-----------+----------------+-----------------+-------------+---------+



In [228]:
#4.3.Продажи по месяцам

print("\nРаспределение транзакций, общего дохода и среднего чека по месяцам:")
month_distribution = df_with_features.groupBy("transaction_month", "transaction_month_name") \
    .agg(
        F.count("transaction_id").alias("transaction_count"),
        F.round(F.sum("total_spent"), 2).alias("total_revenue"),
        F.round(F.avg("total_spent"), 2).alias("avg_check")
    ) \
    .orderBy("transaction_month")

month_distribution.show(truncate=False)


Распределение транзакций, общего дохода и среднего чека по месяцам:
+-----------------+----------------------+-----------------+-------------+---------+
|transaction_month|transaction_month_name|transaction_count|total_revenue|avg_check|
+-----------------+----------------------+-----------------+-------------+---------+
|1                |January               |1295             |174421.0     |134.69   |
|2                |February              |916              |119685.0     |130.66   |
|3                |March                 |965              |122392.0     |126.83   |
|4                |April                 |953              |125618.5     |131.81   |
|5                |May                   |978              |124594.5     |127.4    |
|6                |June                  |991              |129771.0     |130.95   |
|7                |July                  |1039             |131509.0     |126.57   |
|8                |August                |992              |123287.5     |124.28 

In [229]:
#4.4. Признаки клиента: Customer Lifetime Value (CLV)

# 1. Рассчитываем CLV для каждого клиента 
customer_clv = df_with_features.groupBy("customer_id") \
    .agg(
        F.round(F.sum("total_spent"), 2).alias("customer_lifetime_value"),
        F.count("transaction_id").alias("total_transactions"),
        F.round(F.avg("total_spent"), 2).alias("avg_transaction_value"),
        F.min("transaction_date").alias("first_purchase_date"),
        F.max("transaction_date").alias("last_purchase_date")
    ) \
    .orderBy(F.desc("customer_lifetime_value"))

In [232]:
print("\nОбщая статистика по клиентам:")
print(f"Всего уникальных клиентов: {customer_clv.count()}")


Общая статистика по клиентам:
Всего уникальных клиентов: 25


In [233]:
# 2. Топ-10 клиентов по CLV
print("\nТоп-10 клиентов по Customer Lifetime Value:")
top_10_customers = customer_clv.limit(10)
top_10_customers.show(truncate=False)


Топ-10 клиентов по Customer Lifetime Value:
+-----------+-----------------------+------------------+---------------------+-------------------+------------------+
|customer_id|customer_lifetime_value|total_transactions|avg_transaction_value|first_purchase_date|last_purchase_date|
+-----------+-----------------------+------------------+---------------------+-------------------+------------------+
|CUST_24    |68452.0                |519               |131.89               |2022-01-01         |2025-01-18        |
|CUST_08    |67351.5                |507               |132.84               |2022-01-03         |2025-01-17        |
|CUST_05    |66974.5                |516               |129.8                |2022-01-03         |2025-01-14        |
|CUST_16    |65570.5                |494               |132.73               |2022-01-01         |2025-01-16        |
|CUST_13    |65037.0                |508               |128.03               |2022-01-04         |2025-01-18        |
|CUST_23   

In [238]:
# 3. Детальная информация о топ-10 клиентах
print("\nДетальная информация о топ-10 клиентах:")
for row in top_10_customers.collect():
    customer_id = row["customer_id"]
    clv = row["customer_lifetime_value"]
    transactions = row["total_transactions"]
    avg_value = row["avg_transaction_value"]
    first_date = row["first_purchase_date"]
    last_date = row["last_purchase_date"]
    
    print(f"\nКлиент: {customer_id}")
    print(f"  CLV: ${clv:,.2f}")
    print(f"  Количество транзакций: {transactions}")
    print(f"  Средний чек: ${avg_value:,.2f}")
    print(f"  Первая покупка: {first_date}")
    print(f"  Последняя покупка: {last_date}")


Детальная информация о топ-10 клиентах:

Клиент: CUST_24
  CLV: $68,452.00
  Количество транзакций: 519
  Средний чек: $131.89
  Первая покупка: 2022-01-01
  Последняя покупка: 2025-01-18

Клиент: CUST_08
  CLV: $67,351.50
  Количество транзакций: 507
  Средний чек: $132.84
  Первая покупка: 2022-01-03
  Последняя покупка: 2025-01-17

Клиент: CUST_05
  CLV: $66,974.50
  Количество транзакций: 516
  Средний чек: $129.80
  Первая покупка: 2022-01-03
  Последняя покупка: 2025-01-14

Клиент: CUST_16
  CLV: $65,570.50
  Количество транзакций: 494
  Средний чек: $132.73
  Первая покупка: 2022-01-01
  Последняя покупка: 2025-01-16

Клиент: CUST_13
  CLV: $65,037.00
  Количество транзакций: 508
  Средний чек: $128.03
  Первая покупка: 2022-01-04
  Последняя покупка: 2025-01-18

Клиент: CUST_23
  CLV: $64,507.00
  Количество транзакций: 481
  Средний чек: $134.11
  Первая покупка: 2022-01-02
  Последняя покупка: 2025-01-14

Клиент: CUST_10
  CLV: $63,155.50
  Количество транзакций: 481
  Средн