Исходные данные: Вам предоставлен датасет `retail_store_sales.csv` ,который содержит информацию о транзакциях в розничном магазине. Он включает информацию о продажах из восьми товарных категорий, в каждой из которых по 25 товаров со статичными ценами. Датасет содержит пропущенные значения, несогласованные данные и ошибки.

## 1. Загрузка и предварительная обработка данных

### Инициализация

In [1]:
# Импортируем необходимые библиотеки
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, isnan

In [2]:
# Создаем сессию Spark
spark = SparkSession.builder \
    .appName("RetailStore") \
    .getOrCreate()

In [3]:
# Загружаем данные о продажах из CSV-файла
sales_df = spark.read.csv("retail_store_sales.csv", header=True, inferSchema=True)

In [4]:
# Выводим схему и первые 5 строк данных
sales_df.printSchema()
sales_df.show(5)

root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Discount Applied: boolean (nullable = true)

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5

### Переименование столбцов в snake_case

In [5]:
def rename_columns_to_snake_case(df):
    new_column_names = [col.replace(" ", "_").lower() for col in df.columns]
    return df.toDF(*new_column_names)

# Переименовываем столбцы
sales_df = rename_columns_to_snake_case(sales_df)

In [6]:
# Выводим схему и случайные 5 строк после переименования
sales_df.printSchema()
sales_df.sample(withReplacement=False, fraction=0.01).show(5)

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|     category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_1110407|    CUST_25|     Butchers| Item_24_BUT|          39.5

### Преобразование типов

In [7]:
# Строковые столбцы
string_cols = [
    "transaction_id", "customer_id", "category",
    "item", "payment_method", "location"
]
df_clean = sales_df
for c in string_cols:
    df_clean = df_clean.withColumn(
        c,
        F.when(
            F.lower(F.trim(F.col(c))).isin("", "null", "na", "none"),
            F.lit(None)
        ).otherwise(F.col(c))
    )

# Числовые столбцы
float_cols = ["price_per_unit", "quantity", "total_spent"]
for c in float_cols:
    df_clean = df_clean.withColumn(c, F.col(c).cast("double"))

# Дата
df_clean = df_clean.withColumn(
    "transaction_date",
    F.to_date(F.col("transaction_date"), "yyyy-MM-dd")
)

# Логический признак
df_clean = df_clean.withColumn(
    "discount_applied",
    F.col("discount_applied").cast("boolean")
)

In [8]:
# Функция проверки некорректных значений
def check_invalid_values(df):
    # Проверяем количество null значений в каждом столбце
    print("\nКоличество null значений по столбцам:")
    null_counts = []
    for col_name in df.columns:
        null_count = df.filter(col(col_name).isNull()).count()
        null_counts.append((col_name, null_count))
    
    for col_name, count in null_counts:
        print(f"{col_name}: {count}")
    
    # Дополнительная проверка на некорректные значения в числовых столбцах
    print("\nПроверка на NaN в числовых столбцах:")
    for col_name in ['price_per_unit', 'quantity', 'total_spent']:
        nan_count = df.filter(isnan(col(col_name))).count()
        print(f"{col_name} NaN count: {nan_count}")

In [9]:
check_invalid_values(df_clean)


Количество null значений по столбцам:
transaction_id: 0
customer_id: 0
category: 0
item: 1213
price_per_unit: 609
quantity: 604
total_spent: 604
payment_method: 0
location: 0
transaction_date: 0
discount_applied: 4199

Проверка на NaN в числовых столбцах:
price_per_unit NaN count: 0
quantity NaN count: 0
total_spent NaN count: 0


In [10]:
# Сохраняем очищенный DataFrame (заменяем исходный)
sales_df = df_clean

In [11]:
# Выводим схему и случайные 5 строк после преобразования типов
sales_df.printSchema()
sales_df.sample(withReplacement=False, fraction=0.01).show(5)

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)

+--------------+-----------+--------------------+-----------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|       item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+-----------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_9619859|    CUST_01|            Butchers|It

## 2. Очистка и валидация данных

### Восстановление отсутствующих `item`

In [12]:
# Создаем справочник товаров в отдельный DataFrame с "category", "item", "price_per_unit".
items_ref = (
    sales_df
    .filter(F.col("item").isNotNull())         # только строки, где item задан
    .groupBy("category", "price_per_unit")     # ключ: категория + цена
    .agg(F.first("item").alias("item_ref"))    # берём первый встретившийся товар
)

In [13]:
# Заполняем пропуски
df_filled = (
    sales_df
    .join(items_ref, on=["category", "price_per_unit"], how="left")
    .withColumn("item", F.coalesce("item", "item_ref"))  # если item пуст – берём из справочника
    .drop("item_ref")
)

In [14]:
# Выводим 20 строк, демонстрирующих восстановленные значения
orig = df_clean.alias("o")
filled = df_filled.alias("f")
(
    orig.join(filled, on="transaction_id")
        .where(F.col("o.item").isNull() & F.col("f.item").isNotNull())
        .select(
            "transaction_id",
            F.col("o.category").alias("category"),
            F.col("o.price_per_unit").alias("price_per_unit"),
            F.col("f.item").alias("restored_item")
        )
        .show(20, truncate=False)
)

+--------------+----------------------------------+--------------+-------------+
|transaction_id|category                          |price_per_unit|restored_item|
+--------------+----------------------------------+--------------+-------------+
|TXN_1372952   |Furniture                         |33.5          |Item_20_FUR  |
|TXN_1809665   |Beverages                         |24.5          |Item_14_BEV  |
|TXN_4206593   |Furniture                         |35.0          |Item_21_FUR  |
|TXN_3481599   |Furniture                         |39.5          |Item_24_FUR  |
|TXN_1621497   |Patisserie                        |23.0          |Item_13_PAT  |
|TXN_4532214   |Patisserie                        |15.5          |Item_8_PAT   |
|TXN_5419965   |Computers and electric accessories|20.0          |Item_11_CEA  |
|TXN_8210621   |Beverages                         |39.5          |Item_24_BEV  |
|TXN_8586300   |Computers and electric accessories|27.5          |Item_16_CEA  |
|TXN_1154680   |Furniture   

In [15]:
# Проверяем статистику
sales_df = df_filled
check_invalid_values(sales_df)


Количество null значений по столбцам:
category: 0
price_per_unit: 609
transaction_id: 0
customer_id: 0
item: 609
quantity: 604
total_spent: 604
payment_method: 0
location: 0
transaction_date: 0
discount_applied: 4199

Проверка на NaN в числовых столбцах:
price_per_unit NaN count: 0
quantity NaN count: 0
total_spent NaN count: 0


Количество пропущенных значений в столбце `item` немного уменьшилось: 1213 -> 609

### Восстановление `total_spent`

In [16]:
# Проверим возможность восстановления 
sales_df.filter(
    F.col("total_spent").isNull() &
    F.col("quantity").isNotNull() &
    F.col("price_per_unit").isNotNull()
).show()

+--------+--------------+--------------+-----------+----+--------+-----------+--------------+--------+----------------+----------------+
|category|price_per_unit|transaction_id|customer_id|item|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------+--------------+--------------+-----------+----+--------+-----------+--------------+--------+----------------+----------------+
+--------+--------------+--------------+-----------+----+--------+-----------+--------------+--------+----------------+----------------+



Очевидно смысла заморачиваться нет, т.к. у нас нет строк, где `total_spent` не задан, но есть и `quantity` и `price_per_unit`.

### Заполнение отсутствующих `quantity` и `price_per_unit`

In [17]:
# Восстанавливаем `quantity`, когда
# - quantity IS NULL
# - price_per_unit и total_spent заданы

fill_qty_cond = (
    F.col("quantity").isNull() &
    F.col("price_per_unit").isNotNull() &
    F.col("total_spent").isNotNull()
)

sales_with_qty = sales_df.withColumn(
    "quantity",
    F.when(
        fill_qty_cond,
        F.round(F.col("total_spent") / F.col("price_per_unit"), 0)    # округляем до целого
    ).otherwise(F.col("quantity"))
)

# ----------------------------------------------------------------------

# Восстанавливаем `price_per_unit`, когда
# - price_per_unit IS NULL
# - quantity задано и не равно 0
# - total_spent задано

fill_price_cond = (
    F.col("price_per_unit").isNull() &
    F.col("quantity").isNotNull() &
    (F.col("quantity") != 0) &
    F.col("total_spent").isNotNull()
)

sales_filled = sales_with_qty.withColumn(
    "price_per_unit",
    F.when(
        fill_price_cond,
        F.round(F.col("total_spent") / F.col("quantity"), 2)
    ).otherwise(F.col("price_per_unit"))
)

# ----------------------------------------------------------------------

print("Восстановленные quantity")
(
    sales_df.alias("old")
    .join(sales_with_qty.alias("new"), on="transaction_id")
    .where(F.col("old.quantity").isNull() & F.col("new.quantity").isNotNull())
    .select(
        "transaction_id",
        F.col("new.price_per_unit"),
        F.col("new.total_spent"),
        F.col("new.quantity").alias("restored_quantity")
    )
    .show(20, truncate=False)
)

print("Восстановленные price_per_unit")
(
    sales_with_qty.alias("old")
    .join(sales_filled.alias("new"), on="transaction_id")
    .where(F.col("old.price_per_unit").isNull() & F.col("new.price_per_unit").isNotNull())
    .select(
        "transaction_id",
        F.col("new.quantity"),
        F.col("new.total_spent"),
        F.col("new.price_per_unit").alias("restored_price_per_unit")
    )
    .show(20, truncate=False)
)

Восстановленные quantity
+--------------+--------------+-----------+-----------------+
|transaction_id|price_per_unit|total_spent|restored_quantity|
+--------------+--------------+-----------+-----------------+
+--------------+--------------+-----------+-----------------+

Восстановленные price_per_unit
+--------------+--------+-----------+-----------------------+
|transaction_id|quantity|total_spent|restored_price_per_unit|
+--------------+--------+-----------+-----------------------+
|TXN_7482416   |10.0    |200.0      |20.0                   |
|TXN_5422631   |8.0     |52.0       |6.5                    |
|TXN_9634894   |10.0    |275.0      |27.5                   |
|TXN_8685338   |3.0     |105.0      |35.0                   |
|TXN_1543244   |8.0     |196.0      |24.5                   |
|TXN_4608122   |10.0    |275.0      |27.5                   |
|TXN_3822751   |9.0     |126.0      |14.0                   |
|TXN_7804271   |8.0     |40.0       |5.0                    |
|TXN_3433195 

In [18]:
# Проверяем статистику
sales_df = sales_filled
check_invalid_values(sales_df)


Количество null значений по столбцам:
category: 0
price_per_unit: 0
transaction_id: 0
customer_id: 0
item: 609
quantity: 604
total_spent: 604
payment_method: 0
location: 0
transaction_date: 0
discount_applied: 4199

Проверка на NaN в числовых столбцах:
price_per_unit NaN count: 0
quantity NaN count: 0
total_spent NaN count: 0


Количество пропущенных значений в `price_per_unit` уменьшилось до 0. Количество пропущенных значений в `quantity` осталось равным количеству пропущенных значений в `total_spent`, что очевидно свидетельствует о невозможности восстановления этих значений.

### Удаление строк с пропущенными значениями

In [19]:
# Посчитать текущее количество строк
current_row_count = sales_df.count()

# Удаляем строки с пропущенными значениями в критически важных столбцах
sales_df = sales_df.dropna(subset=["price_per_unit", "total_spent", "quantity", "item"])

# Проверяем количество строк после удаления
new_row_count = sales_df.count()

# Посчитаем долю удаленных строк
deleted_row_count = current_row_count - new_row_count
deleted_row_fraction = deleted_row_count / current_row_count
print(f"Удалено строк: {deleted_row_count} ({deleted_row_fraction:.2%})")

Удалено строк: 1213 (9.65%)


In [20]:
# Проверяем статистику после удаления строк
check_invalid_values(sales_df)


Количество null значений по столбцам:
category: 0
price_per_unit: 0
transaction_id: 0
customer_id: 0
item: 0
quantity: 0
total_spent: 0
payment_method: 0
location: 0
transaction_date: 0
discount_applied: 3783

Проверка на NaN в числовых столбцах:
price_per_unit NaN count: 0
quantity NaN count: 0
total_spent NaN count: 0


Очищено всё кроме `discount_applied`, который не является критически важным для анализа. Количество строк уменьшилось на 9.65% от общего числа.

## 3. Разведочный анализ данных

### Самые популярные категории товаров

In [21]:
# Рассчитайте общее количество проданных единиц товара для каждой категории
category_sales = (
    sales_df.groupBy("category")
    .agg(F.sum("quantity").alias("total_quantity"))
    .orderBy(F.desc("total_quantity"))
)

# Выводим топ-5 категорий по количеству проданных единиц
category_sales.show(5, truncate=False)

+-----------------------------+--------------+
|category                     |total_quantity|
+-----------------------------+--------------+
|Furniture                    |8083.0        |
|Beverages                    |7974.0        |
|Food                         |7925.0        |
|Electric household essentials|7897.0        |
|Milk Products                |7889.0        |
+-----------------------------+--------------+
only showing top 5 rows


### Анализ среднего чека

In [22]:
# Рассчитайте среднее значение `total_spent` для каждого метода оплаты
avg_spent_by_payment = (
    sales_df.groupBy("payment_method")
    .agg(F.avg("total_spent").alias("avg_total_spent"))
    .orderBy(F.desc("avg_total_spent"))
)

# Округлите до двух знаков после запятой.
avg_spent_by_payment = avg_spent_by_payment.withColumn(
    "avg_total_spent", F.round(F.col("avg_total_spent"), 2)
)

# Выводим результаты
avg_spent_by_payment.show(truncate=False)

+--------------+---------------+
|payment_method|avg_total_spent|
+--------------+---------------+
|Cash          |131.14         |
|Credit Card   |129.03         |
|Digital Wallet|128.68         |
+--------------+---------------+



In [23]:
# Рассчитайте среднее значение `total_spent` для каждой места, где прошла оплата. 
avg_spent_by_location = (
    sales_df.groupBy("location")
    .agg(F.avg("total_spent").alias("avg_total_spent"))
    .orderBy(F.desc("avg_total_spent"))
)

# Округлите до двух знаков после запятой.
avg_spent_by_location = avg_spent_by_location.withColumn(
    "avg_total_spent", F.round(F.col("avg_total_spent"), 2)
)

# Выводим результаты
avg_spent_by_location.show(truncate=False)

+--------+---------------+
|location|avg_total_spent|
+--------+---------------+
|Online  |130.45         |
|In-store|128.82         |
+--------+---------------+



## 4. Генерация признаков

### Временные признаки

In [24]:
# Добавьте два новых столбца на основе `transaction_date`: `day_of_week` и `transaction_month`
sales_df = sales_df.withColumn(
    "day_of_week", F.date_format(F.col("transaction_date"), "EEEE")
).withColumn(
    "transaction_month", F.date_format(F.col("transaction_date"), "MMMM")
).withColumn(
    "weekday_num", F.dayofweek("transaction_date")
).withColumn(
    "month_num", F.month("transaction_date"))

# Сместим по православному первый день недели на понедельник
sales_df = sales_df.withColumn(
    "weekday_num_monday_first",
    F.when(F.col("weekday_num") == 1, 7).otherwise(F.col("weekday_num") - 1)
)

# Переименовываем столбец weekday_num_monday_first в weekday_num
sales_df = (
    sales_df
    .drop("weekday_num")
    .withColumnRenamed("weekday_num_monday_first", "weekday_num")
)

# Выводим схему и случайные 5 строк после добавления временных признаков
sales_df.printSchema()
sales_df.sample(withReplacement=False, fraction=0.01).show(5)

root
 |-- category: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- item: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- transaction_month: string (nullable = true)
 |-- month_num: integer (nullable = true)
 |-- weekday_num: integer (nullable = true)

+--------------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+-----------+-----------------+---------+-----------+
|            category|price_per_unit|transaction_id|customer_id|        item|quantity|total_spent|payment_method|location|transactio

### Продажи по дням недели

In [25]:
# Рассчитайте среднюю сумму продаж (`total_spent`) для каждого дня недели.
avg_spent_by_day = (
    sales_df.groupBy("day_of_week", "weekday_num")
    .agg(F.avg("total_spent").alias("avg_total_spent"))
    .orderBy("weekday_num")
)

# Выводим результаты
avg_spent_by_day.show(truncate=False)

+-----------+-----------+------------------+
|day_of_week|weekday_num|avg_total_spent   |
+-----------+-----------+------------------+
|Monday     |1          |126.08145363408521|
|Tuesday    |2          |129.02869619463505|
|Wednesday  |3          |126.62033067973056|
|Thursday   |4          |129.6309894212819 |
|Friday     |5          |134.51823708206686|
|Saturday   |6          |131.17064220183485|
|Sunday     |7          |130.3134510042605 |
+-----------+-----------+------------------+



### Продажи по месяцам

In [26]:
# Рассчитайте среднюю сумму продаж (`total_spent`) для каждого месяца. Выведите результаты, отсортированные по месяцам.
avg_spent_by_month = (
    sales_df.groupBy("transaction_month", "month_num")
    .agg(F.avg("total_spent").alias("avg_total_spent"))
    .orderBy("month_num")
)

# Выводим результаты
avg_spent_by_month.show(truncate=False)

+-----------------+---------+------------------+
|transaction_month|month_num|avg_total_spent   |
+-----------------+---------+------------------+
|January          |1        |135.1926530612245 |
|February         |2        |129.7631880733945 |
|March            |3        |126.93729372937294|
|April            |4        |132.00166481687015|
|May              |5        |127.4728144989339 |
|June             |6        |131.323093220339  |
|July             |7        |127.64733059548254|
|August           |8        |122.76838235294117|
|September        |9        |130.6153846153846 |
|October          |10       |128.05379388448472|
|November         |11       |130.04082774049218|
|December         |12       |132.32119914346896|
+-----------------+---------+------------------+



### Признаки клиента

In [27]:
# Рассчитайте customer_lifetime_value (CLV) для каждого клиента как общую сумму (`total_spent`), потраченную этим клиентом за все транзакции.
clv_by_customer = (
    sales_df.groupBy("customer_id")
    .agg(F.sum("total_spent").alias("clv"))
    .orderBy(F.desc("clv"))
)

# Выведите Топ-10 клиентов по их CLV (customer_id и их CLV)
clv_by_customer.show(10, truncate=False)

+-----------+-------+
|customer_id|clv    |
+-----------+-------+
|CUST_24    |64608.0|
|CUST_05    |63855.5|
|CUST_16    |63185.5|
|CUST_13    |63015.5|
|CUST_08    |62850.5|
|CUST_15    |60749.5|
|CUST_10    |60367.5|
|CUST_23    |59738.5|
|CUST_21    |59639.0|
|CUST_02    |59512.5|
+-----------+-------+
only showing top 10 rows
