In [187]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, BooleanType  # Import BooleanType

# Создаем SparkSession
spark = SparkSession.builder.appName("RetailStoreSalesAnalysis").getOrCreate()

# 1. Загрузка и предварительная обработка данных
1.1. Загрузка и вывод схемы

In [188]:
# Путь к файлу с данными
file_path = "/content/retail_store_sales.csv"


# df = spark.read.csv(file_path, header=True, inferSchema=True) # Вывод схемы по умолчанию

1.3. Явное преобразование типов данных

In [189]:
# Определяем схему
custom_schema = StructType([
    StructField("Transaction ID", StringType(), False),
    StructField("Customer ID", StringType(), False),
    StructField("Category", StringType(), False),
    StructField("Item", StringType(), True),
    StructField("Price Per Unit", DoubleType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Total Spent", DoubleType(), True),
    StructField("Payment Method", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Transaction Date", DateType(), False),
    StructField("Discount Applied", BooleanType(), True)
])

# Загрузка данных с заданной схемой
df = spark.read.csv(file_path, header=True, schema=custom_schema, sep=",")

print("\nСхема DataFrame:")
df.printSchema()

print("\nВывод первых 5 строк загруженного DataFrame:")
df.show(5)


Схема DataFrame:
root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Discount Applied: boolean (nullable = true)


Вывод первых 5 строк загруженного DataFrame:
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   

1.2. Очистка названий столбцов (snake_case)

In [190]:
# Очистка названий столбцов
df = df.toDF(*(c.lower().replace(" ", "_") for c in df.columns))

print("\nОбновленные названия столбцов:")
print(df.columns)

print("\nОбновленная схема DataFrame:")
df.printSchema()


Обновленные названия столбцов:
['transaction_id', 'customer_id', 'category', 'item', 'price_per_unit', 'quantity', 'total_spent', 'payment_method', 'location', 'transaction_date', 'discount_applied']

Обновленная схема DataFrame:
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



# 2. Очистка и валидация данных
2.1. Восстановление отсутствующих item

In [191]:
# Вывод 20 строк до совершения восстановления значений
print("\nВывод 20 строк до совершения восстановления значений:")
df.show(20)

# Количество NULL в столбце item до восстановления
null_count_item_before = df.filter(F.col("item").isNull()).count()

# Создаем справочник товаров (item_catalog)
item_catalog = df.filter(F.col("item").isNotNull() & (F.col("item") != ""))\
                  .select("category", "item", "price_per_unit")\
                  .distinct()

# Переименовываем столбец item в item_catalog
item_catalog = item_catalog.withColumnRenamed("item", "item_catalog_item")

print("\nВывод 20 строк каталога:")
item_catalog.show()




Вывод 20 строк до совершения восстановления значений:
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    NULL|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|    NULL|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|    NULL|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|  

In [192]:
# Восстанавливаем отсутствующие item путем соединения с item_catalog
df_joined = df.join(item_catalog, on=["category", "price_per_unit"], how="left")

# Явно выбираем столбцы, используя coalesce для item
df = df_joined.select(
    df_joined["transaction_id"],
    df_joined["customer_id"],
    df_joined["category"],
    F.coalesce(df_joined["item"], df_joined["item_catalog_item"]).alias("item"),
    df_joined["price_per_unit"],
    df_joined["quantity"],
    df_joined["total_spent"],
    df_joined["payment_method"],
    df_joined["location"],
    df_joined["transaction_date"],
    df_joined["discount_applied"]
)

# Выводим 20 строк после факта восстановленния некоторых значений
print("\nВывод 20 строк после факта восстановленния некоторых значений :")
df.show(20)

# Дополнительная проверка - сколько было NULL и сколько осталось в item после восстановления
print(f"\nКоличество NULL в столбце item до восстановления: {null_count_item_before}")

null_count_item_after = df.filter(F.col("item").isNull()).count()
print(f"\nКоличество NULL в столбце item после восстановления: {null_count_item_after}")

null_count_price = df.filter(F.col("price_per_unit").isNull()).count()
print(f"\nКоличество NULL в столбце price_per_unit после восстановления: {null_count_price}")


Вывод 20 строк после факта восстановленния некоторых значений :
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|    NULL|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|    NULL|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|    NULL|       43.0|   Credit Card|  Online|      2022-10-05|           

Поскольку по условию в столбце "category" не может быть значений NULL и количество NULL в столбце "price_per_unit" равно количеству не восстановленных значений "item", можем сделать вывод, что не восстановились те значения, которые имеют NULL в "price_per_unit"

2.2. Восстановление total_spent (если отсутствует)

In [193]:
# Считаем количество значений Null в столбце total_spent до восстановления
null_count_total_spent__before = df.filter(F.col("total_spent").isNull()).count()
print(f"\nКоличество NULL в столбце total_spent до восстановления: {null_count_total_spent__before}")

# Восстановление total_spent:
df = df.withColumn("total_spent", F.coalesce(F.col("total_spent"), F.col("quantity") * F.col("price_per_unit")))

# Считаем количество значений Null в столбце total_spent после восстановления
null_count_total_spent_after = df.filter(F.col("total_spent").isNull()).count()
print(f"\nКоличество NULL в столбце total_spent после восстановления: {null_count_total_spent_after}")


Количество NULL в столбце total_spent до восстановления: 604

Количество NULL в столбце total_spent после восстановления: 604


In [194]:
# Так как ни одно значение total_spent, как мы видим не восстановилось, посчитаем были ли вообще не нулевые значения которыми можно было заменить
quantity_price_per_unit_isnotnull=df.filter((F.col("quantity") * F.col("price_per_unit")).isNotNull()).count()
print(f"\nКоличество не Null значений для замены отсутствующих в столбце price_per_unit: {quantity_price_per_unit_isnotnull}")


Количество не Null значений для замены отсутствующих в столбце price_per_unit: 0


2.3. Заполнение отсутствующих quantity и price_per_unit

In [195]:
# Подсчет количества Null значений до замены отсутствующих в столбце quantity и price_per_unit
quantity_isnull_before=df.filter(F.col("quantity").isNull()).count()
price_per_unit_before=df.filter(F.col("price_per_unit").isNull()).count()

In [196]:
# Заполнение quantity и price_per_unit (если отсутствуют)
df = df.withColumn("quantity", F.coalesce(F.col("quantity"), F.round(F.col("total_spent") / F.col("price_per_unit")).cast("integer")))
df = df.withColumn("price_per_unit", F.coalesce(F.col("price_per_unit"), F.round(F.col("total_spent") / F.col("quantity"), 2)))

# Подсчет количества Null значений в столбце quantity и price_per_unit после замены
quantity_isnull_after=df.filter(F.col("quantity").isNull()).count()
price_per_unit_after=df.filter(F.col("price_per_unit").isNull()).count()

# Количество замен в столбце quantity и price_per_unit
quantity_change = quantity_isnull_before - quantity_isnull_after
price_per_unit_change = price_per_unit_before - price_per_unit_after

# Вывод количества замен Null значений quantity и price_per_unit на экран
print(f"\nКоличество замен в столбце quantity: {quantity_change}")
print(f"\nКоличество замен в столбце price_per_unit: {price_per_unit_change}")


Количество замен в столбце quantity: 11362

Количество замен в столбце price_per_unit: 0


2.4. Удаление оставшихся строк с пропусками в category, quantity ,total_spent и price_per_unit

In [197]:
# Подсчитаем общее количество строк до удаления строк с пропусками
df_dropna_befor  = df.count()

# Составим список столбцов для проверки наличия значений Null
columns_to_check = ["category", "quantity", "total_spent", "price_per_unit"]

# Удаляем строки, где есть NULL хотя бы в одном из указанных столбцов
df = df.dropna(subset=columns_to_check)

# Подсчитаем общее количество строк после удаления строк с пропусками
df_dropna_after  = df.count()

# Выводим количество строк до и после удаления (для проверки)
print(f"Количество строк до удаления пропусков: {df_dropna_befor}")
print(f"Количество строк после удаления пропусков: {df_dropna_after}")

# Отображаем первые несколько строк (для проверки)
df.show(5)

# Проверим остались ли в этих столбцах пропущенные значения
for column in columns_to_check:
  null_count = df.filter(F.col(column).isNull()).count()
  print(f"Количество NULL значений в столбце {column}: {null_count}")

Количество строк до удаления пропусков: 12575
Количество строк после удаления пропусков: 11362
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|     category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|     Butchers| Item_12_BUT|          21.5|       2|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   T

# 3. Разведочный анализ данных
3.1. Самые популярные категории товаров

In [198]:
# Общее количество проданных единиц товара для каждой категории.
category_sales = df.groupBy("category").agg(F.sum("quantity").alias("total_quantity")).orderBy(F.desc("total_quantity"))
print("Топ-5 категорий по общему количеству проданных единиц - Category Sales:")
category_sales.show(5)

Топ-5 категорий по общему количеству проданных единиц - Category Sales:
+--------------------+--------------+
|            category|total_quantity|
+--------------------+--------------+
|           Furniture|          8083|
|           Beverages|          7974|
|                Food|          7925|
|Electric househol...|          7897|
|       Milk Products|          7889|
+--------------------+--------------+
only showing top 5 rows



3.2. Анализ среднего чека

In [199]:
# Среднее значение Total Spent для каждого метода оплаты
payment_method_avg = df.groupBy("payment_method").agg(F.round(F.avg("total_spent").alias("avg_spent"), 2))
print("Среднее значение для каждого метода оплаты - Payment Method Average:")
payment_method_avg.show()

Среднее значение для каждого метода оплаты - Payment Method Average:
+--------------+---------------------------------------+
|payment_method|round(avg(total_spent) AS avg_spent, 2)|
+--------------+---------------------------------------+
|   Credit Card|                                 129.03|
|Digital Wallet|                                 128.68|
|          Cash|                                 131.14|
+--------------+---------------------------------------+



In [200]:
# Cреднее значение Total Spent для каждого места где прошла оплата
location_avg = df.groupBy("location").agg(F.round(F.avg("total_spent").alias("avg_spent"), 2))
print("Cреднее значение Total Spent для каждого места где прошла оплата - Location Average:")
location_avg.show()

Cреднее значение Total Spent для каждого места где прошла оплата - Location Average:
+--------+---------------------------------------+
|location|round(avg(total_spent) AS avg_spent, 2)|
+--------+---------------------------------------+
|In-store|                                 128.82|
|  Online|                                 130.45|
+--------+---------------------------------------+



# 4. Генерация признаков
4.1. Временные признаки

In [201]:
# Добавим два новых столбца day_of_week и transaction_month
df = df.withColumn("day_of_week", F.dayofweek(F.col("transaction_date")))
df = df.withColumn("transaction_month", F.month(F.col("transaction_date")))
print("Добавление новых столбцов по временным признакам - day_of_week and transaction_month:")
df.show(5)

Добавление новых столбцов по временным признакам - day_of_week and transaction_month:
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+-----------+-----------------+
|transaction_id|customer_id|     category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|day_of_week|transaction_month|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+-----------+-----------------+
|   TXN_6867343|    CUST_09|   Patisserie| Item_10_PAT|          18.5|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true|          2|                4|
|   TXN_3731986|    CUST_22|Milk Products|Item_17_MILK|          29.0|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|          1|                7|
|   TXN_930371

4.2. Продажи по дням недели

In [202]:
# Рассчитаем среднюю сумму продаж (total_spent) для каждого дня недели
daily_sales = df.groupBy("day_of_week").agg(F.avg("total_spent").alias("avg_spent")).orderBy("day_of_week")
print("Средняя сумма продаж для каждого дня недели - Daily Sales:")
daily_sales.show()

Средняя сумма продаж для каждого дня недели - Daily Sales:
+-----------+------------------+
|day_of_week|         avg_spent|
+-----------+------------------+
|          1| 130.3134510042605|
|          2|126.08145363408521|
|          3|129.02869619463505|
|          4|126.62033067973056|
|          5| 129.6309894212819|
|          6|134.51823708206686|
|          7|131.17064220183485|
+-----------+------------------+



4.3. Продажи по месяцам

In [203]:
# Рассчет средней суммы продаж (total_spent)  для каждого месяца
monthly_sales = df.groupBy("transaction_month").agg(F.avg("total_spent").alias("avg_spent")).orderBy("transaction_month")
print("Средняя сумма продаж для каждого месяца - Monthly Sales:")
monthly_sales.show()

Средняя сумма продаж для каждого месяца - Monthly Sales:
+-----------------+------------------+
|transaction_month|         avg_spent|
+-----------------+------------------+
|                1| 135.1926530612245|
|                2| 129.7631880733945|
|                3|126.93729372937294|
|                4|132.00166481687015|
|                5| 127.4728144989339|
|                6|  131.323093220339|
|                7|127.64733059548254|
|                8|122.76838235294117|
|                9| 130.6153846153846|
|               10|128.05379388448472|
|               11|130.04082774049218|
|               12|132.32119914346896|
+-----------------+------------------+



4.4. Признаки клиента

In [204]:
# Рассчитем customer_lifetime_value (CLV) для каждого клиента как общую сумму (total_spent), потраченную этим клиентом за все транзакции.
customer_clv = df.groupBy("customer_id").agg(F.sum("total_spent").alias("clv")).orderBy(F.desc("clv"))
print("Топ-10 клиентов по их CLV - Customer Lifetime Value:")
customer_clv.show(10)

Топ-10 клиентов по их CLV - Customer Lifetime Value:
+-----------+-------+
|customer_id|    clv|
+-----------+-------+
|    CUST_24|64608.0|
|    CUST_05|63855.5|
|    CUST_16|63185.5|
|    CUST_13|63015.5|
|    CUST_08|62850.5|
|    CUST_15|60749.5|
|    CUST_10|60367.5|
|    CUST_23|59738.5|
|    CUST_21|59639.0|
|    CUST_02|59512.5|
+-----------+-------+
only showing top 10 rows



In [205]:
# Останавливаем SparkSession
spark.stop()