In [61]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, DateType, IntegerType
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder \
    .appName("RetailStoreSalesAnalysis") \
    .master("local[*]") \
    .getOrCreate()

26/02/08 16:37:39 WARN Utils: Your hostname, konstantin-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
26/02/08 16:37:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/08 16:37:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 1. Загрузка и предварительная обработка данных

#### 1.1. Загрузка и вывод схемы

In [6]:
schema = StructType([
    StructField("Transaction ID", StringType(), True),
    StructField("Customer ID", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Item", StringType(), True),
    StructField("Price Per Unit", DoubleType(), True),
    StructField("Quantity", DoubleType(), True),
    StructField("Total Spent", DoubleType(), True),
    StructField("Payment Method", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Transaction Date", DateType(), True), 
    StructField("Discount Applied", BooleanType(), True)
])

In [7]:
df = spark.read.csv('retail_store_sales.csv', header=True, schema=schema)
df.show(5)

                                                                                

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|    Beverages| Item_16_BEV|          27.5|     9.0|      247.5|   Credit

In [8]:
df.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Discount Applied: boolean (nullable = true)



In [9]:
df.summary().show()

26/02/08 16:46:27 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+-------+--------------+-----------+----------+-----------+------------------+------------------+-----------------+--------------+--------+
|summary|Transaction ID|Customer ID|  Category|       Item|    Price Per Unit|          Quantity|      Total Spent|Payment Method|Location|
+-------+--------------+-----------+----------+-----------+------------------+------------------+-----------------+--------------+--------+
|  count|         12575|      12575|     12575|      11362|             11966|             11971|            11971|         12575|   12575|
|   mean|          NULL|       NULL|      NULL|       NULL|23.365911749958215| 5.536379583994654|129.6525770612313|          NULL|    NULL|
| stddev|          NULL|       NULL|      NULL|       NULL|10.743519044264605|2.8578828340802986|94.75069674502284|          NULL|    NULL|
|    min|   TXN_1002182|    CUST_01| Beverages|Item_10_BEV|               5.0|               1.0|              5.0|          Cash|In-store|
|    25%|          N

                                                                                

#### 1.2. Очистка названий столбцов: Преобразуйте названия всех столбцов к единому регистру - snake_case.

In [10]:
def to_snake_case(name: str) -> str:
    return "_".join(name.split()).lower()

In [11]:
column_snake_case = map(to_snake_case, df.columns)

df_snake = df.toDF(*column_snake_case)
df_snake.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



### 2. Очистка и валидация данных
#### 2.1 Заполнение отсутствующие Price Per Unit

In [12]:
empty_count = df_snake.filter(F.col("price_per_unit") == "").count()
null_count = df_snake.filter(F.isnull(F.col("price_per_unit"))).count()
nan_count = df_snake.filter(F.isnan(F.col("price_per_unit"))).count()
print(f'Количество пустых значений в столбце "price_per_unit": {empty_count}')
print(f'Количество пропущенных значений в столбце "price_per_unit": {null_count}')
print(f'Количество значений NaN в столбце "price_per_unit": {nan_count}')

Количество пустых значений в столбце "price_per_unit": 0
Количество пропущенных значений в столбце "price_per_unit": 609
Количество значений NaN в столбце "price_per_unit": 0


In [None]:
df_snake.filter(F.isnull(F.col("price_per_unit")) & (F.isnull(F.col("quantity")) | F.isnull(F.col("total_spent")))).count()

0

In [19]:
# Мы можем вычислить все пропущенные значения в столбце "price_per_unit" 
# так как, где отсутсвует значение в столбце "price_per_unit" есть значения в столбцах "quantity" и "total_spent"
# мин значение в столбце "quantity" равно 1, следовательно там нет 0
df_new = df_snake.withColumn( 
    "price_per_unit", 
    F.when(F.isnull(F.col("price_per_unit")) & F.isnotnull(F.col("quantity")) & F.isnotnull(F.col("total_spent")), \
        F.round(F.col("total_spent") / F.col("quantity"), 2)) \
    .otherwise(F.col("price_per_unit")))
df_new.show(5)


+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|     category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|    Beverages| Item_16_BEV|          27.5|     9.0|      247.5|   Credit

In [20]:
null_count = df_new.filter(F.isnull(F.col("price_per_unit"))).count()
print(f'Количество пропущенных значений в столбце "price_per_unit": {null_count}')

Количество пропущенных значений в столбце "price_per_unit": 0


#### 2.2. Восстановление отсутствующих Item

In [21]:
empty_count = df_new.filter(F.col("item") == "").count()
null_count = df_new.filter(F.isnull(F.col("item"))).count()
print(f'Количество пустых значений в столбце "item": {empty_count}')
print(f'Количество пропущенных значений в столбце "item": {null_count}')


Количество пустых значений в столбце "item": 0
Количество пропущенных значений в столбце "item": 1213


In [None]:
# справочник товаров
df_price = df_new.select("category", "item", "price_per_unit") \
    .distinct() \
    .filter(F.isnotnull(F.col("item"))) \
    .sort(F.col("category"), F.col("item")) \
    .withColumnRenamed("item", "item_price") 
df_price.show(5)

+---------+-----------+--------------+
| category| item_price|price_per_unit|
+---------+-----------+--------------+
|Beverages|Item_10_BEV|          18.5|
|Beverages|Item_11_BEV|          20.0|
|Beverages|Item_12_BEV|          21.5|
|Beverages|Item_13_BEV|          23.0|
|Beverages|Item_14_BEV|          24.5|
+---------+-----------+--------------+
only showing top 5 rows



In [None]:
df_new.filter(F.isnull(F.col("item")) & (F.isnotnull(F.col("category")) & F.isnotnull(F.col("price_per_unit")))).count()
# Везде где остутствует значение в столбце "item" есть значение "category" и "price_per_unit"

1213

In [31]:
df_join = df_new.join(df_price, on=['category', 'price_per_unit'], how='left')
df_join = df_join.select(
        df_new["transaction_id"],
        df_new["customer_id"],
        df_new["category"],
        F.coalesce(df_join["item"], df_join["item_price"]).alias("item"),
        df_new["price_per_unit"],
        df_new["quantity"],
        df_new["total_spent"],
        df_new["payment_method"],
        df_new["location"],
        df_new["transaction_date"],
        df_new["discount_applied"])
df_join.show()

+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|           Beverages| Item_16_

In [36]:
null_count = df_join.filter(F.isnull(F.col("item"))).count()
print(f'Количество пропущенных значений в столбце "item": {null_count}')

Количество пропущенных значений в столбце "item": 0


#### 2.3. Заполнение отсутствующих Quantity и Total Spent

In [45]:
empty_count_t_s = df_join.filter(F.col("total_spent") == "").count()
null_count_t_s = df_join.filter(F.isnull(F.col("total_spent"))).count()
print(f'Количество пустых значений в столбце "total_spent": {empty_count_t_s}')
print(f'Количество пропущенных значений в столбце "total_spent": {null_count_t_s}')
empty_count_q = df_join.filter(F.col("quantity") == "").count()
null_count_q = df_join.filter(F.isnull(F.col("quantity"))).count()
print(f'Количество пустых значений в столбце "quantity": {empty_count_q}')
print(f'Количество пропущенных значений в столбце "quantity": {null_count_q}')

Количество пустых значений в столбце "total_spent": 0
Количество пропущенных значений в столбце "total_spent": 604
Количество пустых значений в столбце "quantity": 0
Количество пропущенных значений в столбце "quantity": 604


In [38]:
df_join.filter(F.isnull(F.col("total_spent")) & (F.isnotnull(F.col("quantity")) & F.isnotnull(F.col("price_per_unit")))).count()

0

In [46]:
df_join.filter(F.isnull(F.col("quantity")) & (F.isnotnull(F.col("total_spent")) & F.isnotnull(F.col("price_per_unit")))).count()

0

In [47]:
df_join.filter(F.isnull(F.col("quantity")) & F.isnull(F.col("total_spent"))).count()

604

In [None]:
# Количество пропущеных значениях одинаковое и там где нет значения в одном столбце, нет и в другом

0

In [74]:
df_clean_t_s = df_join.withColumn( 
    "total_spent", 
    F.when(F.isnull(F.col("total_spent")) & F.isnotnull(F.col("quantity")) & F.isnotnull(F.col("price_per_unit")), \
        F.round(F.col("price_per_unit") * F.col("quantity"), 2)) \
    .otherwise(F.col("total_spent")))
df_clean_t_s.filter(F.isnull(F.col("total_spent"))).count()

604

In [75]:
df_clean_q = df_clean_t_s.withColumn( 
    "quantity", 
    F.when(F.isnull(F.col("quantity")) & F.isnotnull(F.col("total_spent")) & F.isnotnull(F.col("price_per_unit")), \
        F.round(F.col("total_spent") / F.col("price_per_unit"), 2)) \
    .otherwise(F.col("quantity"))) 
df_clean_q = df_clean_q.withColumn("quantity", F.col("quantity").cast(IntegerType()))
df_clean_q.filter(F.isnull(F.col("quantity"))).count()

604

#### 2.4 Удаление строк с отсутствующими значениями

In [78]:
df_clean_q.filter(F.isnull(F.col("quantity"))).count() * 100 / df_clean_q.count()

4.8031809145129225

In [81]:
# Удалим 604 строки, это около 5% от всего датасета
df_final = df_clean_q.dropna(subset=["quantity", "total_spent", "price_per_unit", "category"])
df_final.count()

11971

In [93]:
df_final.describe().show(truncate=False)

+-------+--------------+-----------+----------+-----------+-----------------+------------------+-----------------+--------------+--------+
|summary|transaction_id|customer_id|category  |item       |price_per_unit   |quantity          |total_spent      |payment_method|location|
+-------+--------------+-----------+----------+-----------+-----------------+------------------+-----------------+--------------+--------+
|count  |11971         |11971      |11971     |11971      |11971            |11971             |11971            |11971         |11971   |
|mean   |NULL          |NULL       |NULL      |NULL       |23.36087210759335|5.536379583994654 |129.6525770612313|NULL          |NULL    |
|stddev |NULL          |NULL       |NULL      |NULL       |10.74188910271391|2.8578828340802986|94.75069674502284|NULL          |NULL    |
|min    |TXN_1002182   |CUST_01    |Beverages |Item_10_BEV|5.0              |1                 |5.0              |Cash          |In-store|
|max    |TXN_9999729   |CUS

In [94]:
df_final.show(5)

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|     category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|       2|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|    Beverages| Item_16_BEV|          27.5|       9|      247.5|   Credit

In [None]:
# Пропусков нет не в одной колонке, кроме колонки "discount_applied"
df_final.filter(F.isnull(F.col("discount_applied"))).count()

3988

### 3. Разведочный анализ данных

In [92]:
# Топ-5 популярных категорий товаров
df_final.groupBy("category").agg(
    F.sum(F.col("quantity")).alias("total_quantity")) \
    .orderBy(F.col("total_quantity").desc()) \
    .limit(5) \
    .show(truncate=False)

+-----------------------------+--------------+
|category                     |total_quantity|
+-----------------------------+--------------+
|Furniture                    |8462          |
|Food                         |8387          |
|Beverages                    |8358          |
|Milk Products                |8339          |
|Electric household essentials|8309          |
+-----------------------------+--------------+



In [None]:
# Анализ среднего чека
df_final.groupBy("payment_method").agg(
    F.round(F.avg(F.col("total_spent")), 2).alias("total_spent_avg")) \
    .orderBy(F.col("total_spent_avg").desc()) \
    .show()

+--------------+---------------+
|payment_method|total_spent_avg|
+--------------+---------------+
|          Cash|         131.05|
|   Credit Card|         129.13|
|Digital Wallet|         128.72|
+--------------+---------------+



In [102]:
df_final.groupBy("location").agg(
    F.round(F.avg(F.col("total_spent")), 2).alias("total_spent_avg")) \
    .orderBy(F.col("total_spent_avg").desc()) \
    .show()

+--------+---------------+
|location|total_spent_avg|
+--------+---------------+
|  Online|         130.42|
|In-store|         128.86|
+--------+---------------+



### 4. Генерация признаков

In [105]:
df_feature = df_final.withColumn(
        "day_of_week", F.dayofweek(F.col("transaction_date"))) \
    .withColumn(
        "transaction_month", F.month(F.col("transaction_date")))
df_feature.show(5)

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+-----------+-----------------+
|transaction_id|customer_id|     category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|day_of_week|transaction_month|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+-----------+-----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true|          2|                4|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|          1|                7|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|       2|       43.0|   Credit

In [109]:
# Продажи по дням недели
df_feature.groupBy("day_of_week").agg(
    F.round(F.avg(F.col("total_spent")), 2).alias("total_spent_avg")) \
    .orderBy(F.col("day_of_week")) \
    .show()

+-----------+---------------+
|day_of_week|total_spent_avg|
+-----------+---------------+
|          1|         130.18|
|          2|         125.57|
|          3|         129.51|
|          4|         126.82|
|          5|         129.28|
|          6|         134.64|
|          7|         131.49|
+-----------+---------------+



In [110]:
# Продажи по месяцам
df_feature.groupBy("transaction_month").agg(
    F.round(F.avg(F.col("total_spent")), 2).alias("total_spent_avg")) \
    .orderBy(F.col("transaction_month")) \
    .show()

+-----------------+---------------+
|transaction_month|total_spent_avg|
+-----------------+---------------+
|                1|         134.69|
|                2|         130.66|
|                3|         126.83|
|                4|         131.81|
|                5|          127.4|
|                6|         130.95|
|                7|         126.57|
|                8|         124.28|
|                9|         131.45|
|               10|         127.85|
|               11|         128.79|
|               12|         133.15|
+-----------------+---------------+



In [114]:
# Признаки клиента
df_feature.groupBy("customer_id").agg(
    F.sum(F.col("total_spent")).alias("CLV")) \
    .orderBy(F.col("CLV").desc()) \
    .limit(10) \
    .show()

+-----------+-------+
|customer_id|    CLV|
+-----------+-------+
|    CUST_24|68452.0|
|    CUST_08|67351.5|
|    CUST_05|66974.5|
|    CUST_16|65570.5|
|    CUST_13|65037.0|
|    CUST_23|64507.0|
|    CUST_10|63155.5|
|    CUST_15|63117.5|
|    CUST_21|62933.0|
|    CUST_02|62046.5|
+-----------+-------+



In [115]:
spark.stop()