# Анализ данных retail_store_sales

## 1. Загрузка и предварительная обработка данных

### 1.1. Загрузка и вывод схемы
Загрузите файл retail_store_sales.csv.  Выведите первые 5 строк загруженного DataFrame и его схему (df.printSchema()).

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, round as _round, sum as _sum, mean as _mean,
    dayofweek, month, date_format
)

spark = SparkSession.builder \
    .appName("RetailStoreSales") \
    .getOrCreate()

df = spark.read.csv("retail_store_sales.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|    Beverages| Item_16_BEV|          27.5|     9.0|      247.5|   Credit

### 1.2. Очистка названий столбцов
Преобразуйте названия всех столбцов к единому регистру - snake_case.  Выведите обновленную схему DataFrame  или названия столбцов, чтобы убедиться в изменении названий.

In [7]:
df = df.toDF(*[c.strip().lower().replace(" ", "_") for c in df.columns])
print("\nНазвания столбцов после преобразования:")
print(df.columns)


Названия столбцов после преобразования:
['transaction_id', 'customer_id', 'category', 'item', 'price_per_unit', 'quantity', 'total_spent', 'payment_method', 'location', 'transaction_date', 'discount_applied']


### 1.3. Преобразование типов данных
Проанализируйте к каким типам данных относятся данные в столбцах и приведите столбец к соответствующему типу. Убедитесь, что некорректные или отсутствующие значения преобразуются в null в соответствующих типах данных.

In [8]:
from pyspark.sql.types import StringType, DoubleType, IntegerType, BooleanType, DateType

df = df \
    .withColumn("transaction_id", col("transaction_id").cast(StringType())) \
    .withColumn("customer_id", col("customer_id").cast(StringType())) \
    .withColumn("category", col("category").cast(StringType())) \
    .withColumn("item", col("item").cast(StringType())) \
    .withColumn("price_per_unit", col("price_per_unit").cast(DoubleType())) \
    .withColumn("quantity", col("quantity").cast(IntegerType())) \
    .withColumn("total_spent", col("total_spent").cast(DoubleType())) \
    .withColumn("payment_method", col("payment_method").cast(StringType())) \
    .withColumn("location", col("location").cast(StringType())) \
    .withColumn("transaction_date", col("transaction_date").cast(DateType())) \
    .withColumn("discount_applied",
        when(col("discount_applied") == "True", True)
        .when(col("discount_applied") == "False", False)
        .otherwise(None)
    )

print("\nПроверка типов:")
print(df.dtypes)


Проверка типов:
[('transaction_id', 'string'), ('customer_id', 'string'), ('category', 'string'), ('item', 'string'), ('price_per_unit', 'double'), ('quantity', 'int'), ('total_spent', 'double'), ('payment_method', 'string'), ('location', 'string'), ('transaction_date', 'date'), ('discount_applied', 'boolean')]


## 2. Очистка и валидация данных

### 2.1. Восстановление отсутствующих item
Так как данные статические для каждого товара, то  составьте справочник товаров в отдельный DataFrame с Category, Item и Rrice Rer Unit.
Для транзакций, где отсутствует название товара , но имеется категория и цена , попытайтесь определить название товара, путём объединения (join) с загруженным справочником товаров. Выведите 20 строк, демонстрирующих восстановленные значения.

In [9]:
product_ref = df.dropna(subset=["category", "item", "price_per_unit"]) \
    .dropDuplicates(["category", "price_per_unit"]) \
    .select("category", "item", "price_per_unit")

df = df.alias("t").join(
    product_ref.alias("r"),
    on=["category", "price_per_unit"],
    how="left"
).withColumn(
    "item",
    when(col("t.item").isNotNull(), col("t.item")).otherwise(col("r.item"))
).drop("r.item")

df.show(20)

+--------------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+------------+
|            category|price_per_unit|transaction_id|customer_id|        item|quantity|total_spent|payment_method|location|transaction_date|discount_applied|        item|
+--------------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+------------+
|          Patisserie|          18.5|   TXN_6867343|    CUST_09| Item_10_PAT|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true| Item_10_PAT|
|       Milk Products|          29.0|   TXN_3731986|    CUST_22|Item_17_MILK|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|Item_17_MILK|
|            Butchers|          21.5|   TXN_9303719|    CUST_02| Item_12_BUT|       2|       43.0|   Credit Card|  Online|      2022-10-05|           

### 2.2. Восстановление Total Spent
Найдите все транзакции, с пропусками в общей сумме и обновите ее, пересчитав её как quantity * price_per_unit для всех записей.

In [14]:
df.filter(col("quantity").isNotNull() & col("price_per_unit").isNotNull()) \
  .select("quantity", "price_per_unit", "total_spent") \
  .show(20, truncate=False)

+--------+--------------+-----------+
|quantity|price_per_unit|total_spent|
+--------+--------------+-----------+
|10      |18.5          |185.0      |
|9       |29.0          |261.0      |
|2       |21.5          |43.0       |
|9       |27.5          |247.5      |
|7       |12.5          |87.5       |
|8       |5.0           |40.0       |
|1       |27.5          |27.5       |
|3       |36.5          |109.5      |
|9       |8.0           |72.0       |
|7       |6.5           |45.5       |
|6       |39.5          |237.0      |
|2       |27.5          |55.0       |
|8       |29.0          |232.0      |
|1       |23.0          |23.0       |
|9       |14.0          |126.0      |
|7       |9.5           |66.5       |
|1       |18.5          |18.5       |
|2       |24.5          |49.0       |
|4       |33.5          |134.0      |
|10      |41.0          |410.0      |
+--------+--------------+-----------+
only showing top 20 rows



### 2.3. Заполнение отсутствующих Quantity и Price Per Unit
Для транзакций, где отсутствуют значения о количестве проданного товара , но имеются сумма транзакции и цена за товар , вычислите количество проданного товара и заполните пропущенные значения. Результат приведите к целому числу. 
Аналогично, если  отсутствует цена за единицу товара , но общая сумма и количество имеются, вычислите цену за единицу и заполните пропущенные значения. Округлите до двух знаков после запятой. Выведите 20 строк, демонстрирующих заполненные значения.

In [15]:
df = df.withColumn(
    "quantity",
    when(col("quantity").isNull() & col("total_spent").isNotNull() & col("price_per_unit").isNotNull(),
         _round(col("total_spent") / col("price_per_unit")).cast(IntegerType())
    ).otherwise(col("quantity"))
)

df = df.withColumn(
    "price_per_unit",
    when(col("price_per_unit").isNull() & col("total_spent").isNotNull() & col("quantity").isNotNull(),
         _round(col("total_spent") / col("quantity"), 2)
    ).otherwise(col("price_per_unit"))
)

df.show(20)


+--------------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+------------+
|            category|price_per_unit|transaction_id|customer_id|        item|quantity|total_spent|payment_method|location|transaction_date|discount_applied|        item|
+--------------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+------------+
|          Patisserie|          18.5|   TXN_6867343|    CUST_09| Item_10_PAT|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true| Item_10_PAT|
|       Milk Products|          29.0|   TXN_3731986|    CUST_22|Item_17_MILK|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|Item_17_MILK|
|            Butchers|          21.5|   TXN_9303719|    CUST_02| Item_12_BUT|       2|       43.0|   Credit Card|  Online|      2022-10-05|           

### 2.4. Удаление оставшихся строк с пропусками
Удалите оставшийся строки с пропусками в Category, Quantity ,Total Spent и Rrice Rer Unit

In [21]:
df = df.dropna(subset=["category", "quantity", "total_spent", "price_per_unit"])
# количество строк после очистки
print(f"Количество строк после очистки: {df.count()}")

Количество строк после очистки: 11971


## 3. Разведочный анализ данных

### 3.1. Самые популярные категории товаров
Рассчитайте общее количество проданных единиц товара  для каждой категории. Определите Топ-5 категорий по общему количеству проданных единиц.

In [29]:
top5 = (
    df.groupby("category", dropna=False)["quantity"]
      .sum()
      .sort_values(ascending=False)
      .head(5)
)
display(top5)

category
Furniture                        8462.0
Food                             8387.0
Beverages                        8358.0
Milk Products                    8339.0
Electric household essentials    8309.0
Name: quantity, dtype: float64

### 3.2. Анализ среднего чека
Рассчитайте среднее значение Total Spent для каждого метода оплаты. Округлите до двух знаков после запятой.
Рассчитайте среднее значение Total Spent для каждой места где прошла оплата. Округлите до двух знаков после запятой.

In [23]:
avg_payment = df.groupBy("payment_method").agg(_round(_mean("total_spent"), 2).alias("avg_spent"))
avg_location = df.groupBy("location").agg(_round(_mean("total_spent"), 2).alias("avg_spent"))

avg_payment.show()
avg_location.show()

+--------------+---------+
|payment_method|avg_spent|
+--------------+---------+
|   Credit Card|   129.13|
|Digital Wallet|   128.72|
|          Cash|   131.05|
+--------------+---------+

+--------+---------+
|location|avg_spent|
+--------+---------+
|In-store|   128.86|
|  Online|   130.42|
+--------+---------+



## 4. Генерация признаков

### 4.1. Временные признаки
Добавьте два новых столбца на основе Transaction Date:
- day_of_week: День недели
- transaction_month: Месяц транзакции 

In [27]:
df = df.withColumn("day_of_week", date_format(col("transaction_date"), "EEEE")) \
       .withColumn("transaction_month", month(col("transaction_date")))
df.show(5)

+-------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+------------+-----------+-----------------+
|     category|price_per_unit|transaction_id|customer_id|        item|quantity|total_spent|payment_method|location|transaction_date|discount_applied|        item|day_of_week|transaction_month|
+-------------+--------------+--------------+-----------+------------+--------+-----------+--------------+--------+----------------+----------------+------------+-----------+-----------------+
|   Patisserie|          18.5|   TXN_6867343|    CUST_09| Item_10_PAT|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true| Item_10_PAT|     Monday|                4|
|Milk Products|          29.0|   TXN_3731986|    CUST_22|Item_17_MILK|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|Item_17_MILK|     Sunday|                7|
|     Butchers|          21.5|   TX

### 4.2. Продажи по дням недели
Рассчитайте среднюю сумму продаж (Total Spent) для каждого дня недели. Выведите результаты, отсортированные по дням недели.

In [28]:
avg_weekday = df.groupBy("day_of_week").agg(_round(_mean("total_spent"), 2).alias("avg_spent"))
avg_weekday.show()

+-----------+---------+
|day_of_week|avg_spent|
+-----------+---------+
|  Wednesday|   126.82|
|    Tuesday|   129.51|
|     Friday|   134.64|
|   Thursday|   129.28|
|   Saturday|   131.49|
|     Monday|   125.57|
|     Sunday|   130.18|
+-----------+---------+



### 4.3. Продажи по месяцам
Рассчитайте среднюю сумму продаж (Total Spent)  для каждого месяца. Выведите результаты, отсортированные по месяцам.

In [29]:
avg_month = df.groupBy("transaction_month").agg(_round(_mean("total_spent"), 2).alias("avg_spent")) \
    .orderBy("transaction_month")
avg_month.show()

+-----------------+---------+
|transaction_month|avg_spent|
+-----------------+---------+
|                1|   134.69|
|                2|   130.66|
|                3|   126.83|
|                4|   131.81|
|                5|    127.4|
|                6|   130.95|
|                7|   126.57|
|                8|   124.28|
|                9|   131.45|
|               10|   127.85|
|               11|   128.79|
|               12|   133.15|
+-----------------+---------+



### 4.4. Признаки клиента (CLV)
Рассчитайте customer_lifetime_value (CLV) для каждого клиента как общую сумму (Total Spent), потраченную этим клиентом за все транзакции. Выведите Топ-10 клиентов по их CLV (customer_id и их CLV).

In [30]:
clv = df.groupBy("customer_id").agg(_sum("total_spent").alias("clv")) \
    .orderBy(col("clv").desc()).limit(10)
clv.show()

+-----------+-------+
|customer_id|    clv|
+-----------+-------+
|    CUST_24|68452.0|
|    CUST_08|67351.5|
|    CUST_05|66974.5|
|    CUST_16|65570.5|
|    CUST_13|65037.0|
|    CUST_23|64507.0|
|    CUST_10|63155.5|
|    CUST_15|63117.5|
|    CUST_21|62933.0|
|    CUST_02|62046.5|
+-----------+-------+

