In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
#from pyspark.sql.window import Window
#from datetime import date



## 1. Загрузка и предварительная обработка данных
### 1.1 Загрузка и вывод схемы

In [2]:
spark = SparkSession.builder \
         .appName("project_avtovaz") \
         .getOrCreate()
#spark.sparkConrext.setLogLevel("WARN")

data_path = "retail_store_sales.csv"

df = spark.read.csv(data_path, header=True, inferSchema=True)


In [3]:
df.show(5)
df.printSchema()

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|    Beverages| Item_16_BEV|          27.5|     9.0|      247.5|   Credit

### 1.2 очистка названий столбцов

In [4]:
def to_snake(name: str) -> str:
    return (name.strip()
                .replace("-", " ")
                .replace("/", " ")
                .replace(".", " ")
                .lower()
                .replace(" ", "_"))

for old in df.columns:
    df = df.withColumnRenamed(old, to_snake(old))



In [5]:
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



### 1.3 Преобразование типов данных

In [6]:
df = (
    df.withColumn("transaction_id", F.col("transaction_id").cast(StringType()))
      .withColumn("customer_id", F.col("customer_id").cast(StringType()))
      .withColumn("category", F.col("category").cast(StringType()))
      .withColumn("item", F.col("item").cast(StringType()))
      .withColumn("price_per_unit", F.col("price_per_unit").cast(DoubleType()))
      .withColumn("quantity", F.col("quantity").cast(DoubleType()))
      .withColumn("total_spent", F.col("total_spent").cast(DoubleType()))
      .withColumn("payment_method", F.col("payment_method").cast(StringType()))
      .withColumn("location", F.col("location").cast(StringType()))
      .withColumn("transaction_date", F.to_date(F.col("transaction_date"), "yyy-MM-dd"))
      .withColumn("discount_applied", F.col("discount_applied").cast(BooleanType()))
      )
df.printSchema()
                  

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



## 2. Очитска и валидация данных
### 2.1 Восстановление отсутствующих item

In [7]:
product_lookup = (
    df.select(
        F.col("category").alias("cat"),
        F.col("item").alias("item_ref"),
        F.col("price_per_unit").alias("pp_unit")
    )
    .dropna(subset=["cat", "item_ref", "pp_unit"])
    .distinct()
)
                    
product_lookup.show()    

+--------------------+------------+-------+
|                 cat|    item_ref|pp_unit|
+--------------------+------------+-------+
|           Beverages| Item_16_BEV|   27.5|
|           Beverages|  Item_7_BEV|   14.0|
|Electric househol...| Item_23_EHE|   38.0|
|       Milk Products|Item_16_MILK|   27.5|
|Computers and ele...|  Item_1_CEA|    5.0|
|           Beverages| Item_25_BEV|   41.0|
|                Food|Item_12_FOOD|   21.5|
|           Furniture|  Item_4_FUR|    9.5|
|           Beverages| Item_17_BEV|   29.0|
|Computers and ele...|  Item_9_CEA|   17.0|
|           Beverages| Item_18_BEV|   30.5|
|                Food|Item_11_FOOD|   20.0|
|Computers and ele...|  Item_6_CEA|   12.5|
|Computers and ele...|  Item_8_CEA|   15.5|
|Electric househol...|  Item_6_EHE|   12.5|
|Electric househol...| Item_15_EHE|   26.0|
|           Beverages| Item_11_BEV|   20.0|
|          Patisserie|  Item_9_PAT|   17.0|
|           Furniture| Item_16_FUR|   27.5|
|Electric househol...|  Item_4_E

In [8]:
df = (
    df.join(product_lookup,
            (df.category == product_lookup.cat) & (df.price_per_unit == product_lookup.pp_unit),
            "left"
           )
    .withColumn("item", F.when(F.col("item").isNull(), F.col("item_ref")).otherwise(F.col("item")))
    .drop("cat", "item_ref", "pp_unit")
)
df.show(20)        

+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|           Beverages| Item_16_

### 2.2 Восстановление Total Spent

In [9]:
df = df.withColumn("total_spent",
                   F.when(
                       F.col("total_spent").isNull() & F.col("quantity").isNotNull() & F.col("price_per_unit").isNotNull(),
                       F.col("quantity") * F.col("price_per_unit")
                   ).otherwise(F.col("total_spent"))
                  )                                                                                         

### 2.3 Заполнение отсутствующих quantity и price_per_unit

In [10]:
df = df.withColumn("quantity",
                   F.when(
                       F.col("quantity").isNull() & F.col("total_spent").isNotNull() & F.col("price_per_unit").isNotNull(),
                       F.round(F.col("total_spent") / F.col("price_per_unit"))
                   ).otherwise(F.col("quantity"))
                  ) 

In [11]:
df = df.withColumn("price_per_unit",
                   F.when(
                       F.col("price_per_unit").isNull() & F.col("total_spent").isNotNull() & F.col("quantity").isNotNull(),
                       F.round(F.col("total_spent") / F.col("quantity"), 2)
                   ).otherwise(F.col("price_per_unit"))
                  ) 

In [12]:
df.show(20)

+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|           Beverages| Item_16_

### 2.4 Удаление оставшихся строк с пропусками в category, quantity, total_spent, price_per_unit

In [13]:
df = df.dropna(
    subset=["category", "quantity", "total_spent", "price_per_unit"]
)

In [14]:
df.show(20)

+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|     2.0|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|           Beverages| Item_16_

## 3. Разведочный анализ данных
### 3.1 Самые популярные категории товаров

In [15]:
category_sales = df.groupBy("category") \
    .agg(F.sum("quantity").alias("total_quantity")) \
    .orderBy(F.desc("total_quantity")) \
    .limit(5)

category_sales.show()

+--------------------+--------------+
|            category|total_quantity|
+--------------------+--------------+
|           Furniture|        8462.0|
|                Food|        8387.0|
|           Beverages|        8358.0|
|       Milk Products|        8339.0|
|Electric househol...|        8309.0|
+--------------------+--------------+



### 3.2 Анализ среднего чека

In [16]:
# среднее значение Total Spent для каждого метода оплаты
avg_payment = df.groupBy("payment_method") \
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent")) \
    .orderBy(F.desc("avg_total_spent"))

avg_payment.show()

+--------------+---------------+
|payment_method|avg_total_spent|
+--------------+---------------+
|          Cash|         131.05|
|   Credit Card|         129.13|
|Digital Wallet|         128.72|
+--------------+---------------+



In [17]:
# среднее значение Total Spent для каждой места где прошла оплата
avg_location = df.groupBy("location") \
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent")) \
    .orderBy(F.desc("avg_total_spent"))

avg_location.show()

+--------+---------------+
|location|avg_total_spent|
+--------+---------------+
|  Online|         130.42|
|In-store|         128.86|
+--------+---------------+



## 4. Генерация признаков
### 4.1 Временные признаки

In [18]:
df = df.withColumn(
     "day_of_week_num",
     F.dayofweek(F.col("transaction_date"))  
 ).withColumn(
     "transaction_month_num",
     F.month(F.col("transaction_date"))
 )

df.show()

+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+---------------+---------------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|day_of_week_num|transaction_month_num|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+---------------+---------------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    10.0|      185.0|Digital Wallet|  Online|      2024-04-08|            true|              2|                    4|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|     9.0|      261.0|Digital Wallet|  Online|      2023-07-23|            true|              1|                    7|
|   TXN_9303719|    CUST_

### 4.2 Продажи по дням недели

In [19]:
avg_sales_weekday = df.groupBy("day_of_week_num") \
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent")) \
    .orderBy(F.col("day_of_week_num")
    )

avg_sales_weekday.show()

+---------------+---------------+
|day_of_week_num|avg_total_spent|
+---------------+---------------+
|              1|         130.18|
|              2|         125.57|
|              3|         129.51|
|              4|         126.82|
|              5|         129.28|
|              6|         134.64|
|              7|         131.49|
+---------------+---------------+



### 4.3 Продажи по месяцам

In [20]:
avg_sales_month = df.groupBy("transaction_month_num") \
    .agg(F.round(F.avg(F.col("total_spent")), 2).alias("avg_total_spent")) \
    .orderBy(F.col("transaction_month_num"))

avg_sales_month.show()

+---------------------+---------------+
|transaction_month_num|avg_total_spent|
+---------------------+---------------+
|                    1|         134.69|
|                    2|         130.66|
|                    3|         126.83|
|                    4|         131.81|
|                    5|          127.4|
|                    6|         130.95|
|                    7|         126.57|
|                    8|         124.28|
|                    9|         131.45|
|                   10|         127.85|
|                   11|         128.79|
|                   12|         133.15|
+---------------------+---------------+



### 4.4 Признаки клиента

In [21]:
clv = df.groupBy("customer_id") \
    .agg(F.sum("total_spent").alias("customer_lifetime_value")) \
    .orderBy(F.desc("customer_lifetime_value")) \
    .limit(10)

clv.show()

+-----------+-----------------------+
|customer_id|customer_lifetime_value|
+-----------+-----------------------+
|    CUST_24|                68452.0|
|    CUST_08|                67351.5|
|    CUST_05|                66974.5|
|    CUST_16|                65570.5|
|    CUST_13|                65037.0|
|    CUST_23|                64507.0|
|    CUST_10|                63155.5|
|    CUST_15|                63117.5|
|    CUST_21|                62933.0|
|    CUST_02|                62046.5|
+-----------+-----------------------+

