In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, DateType, IntegerType
import re

# 1. Загружаем и предварительно обработываем данные
# 1.1. Загружаем и выводим схему: 
spark = SparkSession.builder \
    .appName("SensorLogsProcessing") \
    .getOrCreate()

# Загрузим файл retail_store_sales.csv.  
# Определяем схему
schema = StructType([
    StructField("Transaction ID", StringType(), False),
    StructField("Customer ID", StringType(), False),
    StructField("Category", StringType(), False),
    StructField("Item", StringType(), True),
    StructField("Price Per Unit", DoubleType(), True),
    StructField("Quantity", DoubleType(), True),
    StructField("Total Spent", DoubleType(), True),
    StructField("Payment Method", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Transaction Date", DateType(), False),
    StructField("Discount Applied", BooleanType(), True)
])

df_csv = spark.read.csv(
    "retail_store_sales.csv",      # Указываем файл
    header=True,            # Указываем, что первая строка содержит заголовки столбцов
    schema=schema         # Автоматически определяет типы данных для каждого столбца
)

# Выведем первые 5 строк загруженного DataFrame и его схему (df_csv.printSchema())
print("Схема DataFrame:")
df_csv.printSchema()

print("Первые 5 строк DataFrame:")
df_csv.show(5)
print(f"Количество строк в исходном DF: {df_csv.count()}")

Схема DataFrame:
root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Discount Applied: boolean (nullable = true)

Первые 5 строк DataFrame:
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|     Category|        Item|Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_0

In [2]:
# 1.2. Очищаем названия столбцов: Преобразовываем названия всех столбцов к единому регистру - snake_case.  
# Выведем обновленную схему DataFrame, чтобы убедиться в изменении названий столбцов.
def to_snake_case(name):
    """
       Заменяет пробелы и специальные символы.
       Приводит к нижнему регистру и убирает лишние _
    """
    name = re.sub(r'[\s\-\.]+', '_', name)
    name = re.sub(r'_+', '_', name)
    return name.strip('_').lower()
    
df_renamed_columns = df_csv

for old_name in df_csv.columns:
    new_name = to_snake_case(old_name)
    df_renamed_columns = df_renamed_columns.withColumnRenamed(old_name, new_name)
    
print("Обновленная схема DataFram, чтобы убедиться в изменении названий столбцов:")    
df_renamed_columns.printSchema()

Обновленная схема DataFram, чтобы убедиться в изменении названий столбцов:
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



In [3]:
# 1.3. Убедимся, что типы данных столбцов соответсвуют тем данным, которые в них находятся
# А также убедимся, что некорректные или отсутствующие значения преобразуются в null в соответствующих типах данных.
print("Посмотрим, есть ли некорректные значения в столбце `transaction_id`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `transaction_id` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("transaction_id")))
                   .groupby(F.col("transaction_id"))
                   .agg(F.count("*").alias("transaction_id_cnt"))
                   .orderBy(F.col("transaction_id"))
                   .show(5)
)

(df_renamed_columns.select((F.col("transaction_id")))
                   .groupby(F.col("transaction_id"))
                   .agg(F.count("*").alias("transaction_id_cnt"))
                   .orderBy(F.desc("transaction_id"))
                   .show(5)
)

cnt_err_transaction_id = (df_renamed_columns.select((F.col("transaction_id")))
                   .where("transaction_id NOT LIKE 'TXN%'")
                   .orderBy(F.col("transaction_id"))
                   .count()
)

print(f"Найдено {cnt_err_transaction_id} значений в `transaction_id`, которые не соответствуют шаблону 'TXN%'.")

Посмотрим, есть ли некорректные значения в столбце `transaction_id`.
Для этого выведем значения по этому столбцам, отсортируем по `transaction_id` и посчитаем количество значений.
+--------------+------------------+
|transaction_id|transaction_id_cnt|
+--------------+------------------+
|   TXN_1002182|                 1|
|   TXN_1003865|                 1|
|   TXN_1003940|                 1|
|   TXN_1004091|                 1|
|   TXN_1004124|                 1|
+--------------+------------------+
only showing top 5 rows

+--------------+------------------+
|transaction_id|transaction_id_cnt|
+--------------+------------------+
|   TXN_9999729|                 1|
|   TXN_9999124|                 1|
|   TXN_9998575|                 1|
|   TXN_9997234|                 1|
|   TXN_9996909|                 1|
+--------------+------------------+
only showing top 5 rows

Найдено 0 значений в `transaction_id`, которые не соответствуют шаблону 'TXN%'.


In [4]:
print("Посмотрим, есть ли некорректные значения в столбце `customer_id`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `customer_id` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("customer_id")))
                   .groupby(F.col("customer_id"))
                   .agg(F.count("*").alias("customer_id_cnt"))
                   .orderBy(F.col("customer_id"))
                   .show(5)
)

(df_renamed_columns.select((F.col("customer_id")))
                   .groupby(F.col("customer_id"))
                   .agg(F.count("*").alias("customer_id_cnt"))
                   .orderBy(F.desc("customer_id"))
                   .show(5)
)

cnt_err_customer_id = (df_renamed_columns.select((F.col("customer_id")))
                   .where("customer_id NOT LIKE 'CUST%'")
                   .orderBy(F.asc("customer_id"))
                   .count()
)
print(f"Найдено {cnt_err_customer_id} значений в `customer_id`, которые не соответсвуют шаблону 'CUST%'.")

Посмотрим, есть ли некорректные значения в столбце `customer_id`.
Для этого выведем значения по этому столбцам, отсортируем по `customer_id` и посчитаем количество значений.
+-----------+---------------+
|customer_id|customer_id_cnt|
+-----------+---------------+
|    CUST_01|            507|
|    CUST_02|            488|
|    CUST_03|            465|
|    CUST_04|            474|
|    CUST_05|            544|
+-----------+---------------+
only showing top 5 rows

+-----------+---------------+
|customer_id|customer_id_cnt|
+-----------+---------------+
|    CUST_25|            476|
|    CUST_24|            543|
|    CUST_23|            513|
|    CUST_22|            501|
|    CUST_21|            498|
+-----------+---------------+
only showing top 5 rows

Найдено 0 значений в `customer_id`, которые не соответсвуют шаблону 'CUST%'.


In [5]:
print("Посмотрим, есть ли некорректные значения в столбце `category`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `category` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("category")))
                   .groupby(F.col("category"))
                   .agg(F.count("*").alias("category_cnt"))
                   .orderBy(F.col("category"))
                   .show(5, truncate=False)
)


Посмотрим, есть ли некорректные значения в столбце `category`.
Для этого выведем значения по этому столбцам, отсортируем по `category` и посчитаем количество значений.
+----------------------------------+------------+
|category                          |category_cnt|
+----------------------------------+------------+
|Beverages                         |1567        |
|Butchers                          |1568        |
|Computers and electric accessories|1558        |
|Electric household essentials     |1591        |
|Food                              |1588        |
+----------------------------------+------------+
only showing top 5 rows



In [6]:
print("Посмотрим, есть ли некорректные значения в столбце `item`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `item` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("item")))
                   .groupby(F.col("item"))
                   .agg(F.count("*").alias("item_cnt"))
                   .orderBy(F.col("item"))
                   .show(5)
)

(df_renamed_columns.select((F.col("item")))
                   .groupby(F.col("item"))
                   .agg(F.count("*").alias("item_cnt"))
                   .orderBy(F.desc("item"))
                   .show(5)
)

cnt_err_item = (df_renamed_columns.select((F.col("item")))
                   .where("item NOT LIKE  'Item%'")
                   .orderBy(F.col("item"))
                   .count()
)

print("Видим, что в столбце `item` присутствуют NULL значения.")
print(f"Найдено {cnt_err_item} значений в `item`, которые не соответсвуют шаблону 'Item%'.")

Посмотрим, есть ли некорректные значения в столбце `item`.
Для этого выведем значения по этому столбцам, отсортируем по `item` и посчитаем количество значений.
+-----------+--------+
|       item|item_cnt|
+-----------+--------+
|       NULL|    1213|
|Item_10_BEV|      23|
|Item_10_BUT|      16|
|Item_10_CEA|      73|
|Item_10_EHE|      41|
+-----------+--------+
only showing top 5 rows

+-----------+--------+
|       item|item_cnt|
+-----------+--------+
| Item_9_PAT|      54|
|Item_9_MILK|      10|
| Item_9_FUR|      14|
|Item_9_FOOD|      71|
| Item_9_EHE|      59|
+-----------+--------+
only showing top 5 rows

Видим, что в столбце `item` присутствуют NULL значения.
Найдено 0 значений в `item`, которые не соответсвуют шаблону 'Item%'.


In [7]:
print("Посмотрим, есть ли некорректные значения в столбце `payment_method`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `payment_method` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("payment_method")))
                   .groupby(F.col("payment_method"))
                   .agg(F.count("*").alias("payment_method_cnt"))
                   .orderBy(F.col("payment_method"))
                   .show(5)
)


Посмотрим, есть ли некорректные значения в столбце `payment_method`.
Для этого выведем значения по этому столбцам, отсортируем по `payment_method` и посчитаем количество значений.
+--------------+------------------+
|payment_method|payment_method_cnt|
+--------------+------------------+
|          Cash|              4310|
|   Credit Card|              4121|
|Digital Wallet|              4144|
+--------------+------------------+



In [8]:
print("Посмотрим, есть ли некорректные значения в столбце `location`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `location` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("location")))
                   .groupby(F.col("location"))
                   .agg(F.count("*").alias("location_cnt"))
                   .orderBy(F.col("location"))
                   .show(5)
)


Посмотрим, есть ли некорректные значения в столбце `location`.
Для этого выведем значения по этому столбцам, отсортируем по `location` и посчитаем количество значений.
+--------+------------+
|location|location_cnt|
+--------+------------+
|In-store|        6221|
|  Online|        6354|
+--------+------------+



In [9]:
print("Посмотрим, есть ли некорректные значения в столбце `discount_applied`.") 
print("Для этого выведем значения по этому столбцам, отсортируем по `discount_applied` и посчитаем количество значений.")

(df_renamed_columns.select((F.col("discount_applied")))
                   .groupby(F.col("discount_applied"))
                   .agg(F.count("*").alias("discount_applied_cnt"))
                   .orderBy(F.col("discount_applied"))
                   .show(5)
)

print("В столбце `discount_applied` тоже присутствуют NULL значения.")

Посмотрим, есть ли некорректные значения в столбце `discount_applied`.
Для этого выведем значения по этому столбцам, отсортируем по `discount_applied` и посчитаем количество значений.
+----------------+--------------------+
|discount_applied|discount_applied_cnt|
+----------------+--------------------+
|            NULL|                4199|
|           false|                4157|
|            true|                4219|
+----------------+--------------------+

В столбце `discount_applied` тоже присутствуют NULL значения.


In [10]:
print("Проверим, есть ли дубли в нашем DF `df_renamed_columns`.")
duplicate_transaction_count = (df_renamed_columns.groupBy(F.col("transaction_id"))
                                                 .agg(
                                                        F.count("*").alias("transaction_id_count")
                                                       )
                                                 .filter(F.col("transaction_id_count") > 1)
                                                 .count()
                              )

print(f"Найдено {duplicate_transaction_count} записей с дубликатами по столбцу `transaction_id`.")

duplicate_count = (df_renamed_columns.groupBy("transaction_id", "customer_id", "category", "item", "price_per_unit"
                                              , "quantity", "total_spent", "payment_method", "location"
                                              , "transaction_date", "discount_applied")
                                                 .agg(
                                                        F.count("*").alias("transaction_id_count")
                                                       )
                                                 .filter(F.col("transaction_id_count") > 1)
                                                 .count()
                              )
print(f"Найдено {duplicate_count} записей с дубликатами по всем столбцам DF `df_renamed_columns`.")

Проверим, есть ли дубли в нашем DF `df_renamed_columns`.
Найдено 0 записей с дубликатами по столбцу `transaction_id`.
Найдено 0 записей с дубликатами по всем столбцам DF `df_renamed_columns`.


In [11]:
price_per_unit_null =  (df_renamed_columns.select((F.col("price_per_unit")))
                                          .where((F.col("price_per_unit").isNull()) 
                                                 | (F.col("price_per_unit") <= 0.0) 
                                                )
                       )

print("Посмотрим, есть ли некорректные значения в столбце `price_per_unit`:")
price_per_unit_null.show(5)
price_per_unit_null_count =  price_per_unit_null.count()
print(f"Количество пустых записей и записей <=0 в столбце `price_per_unit`: {price_per_unit_null_count}")

print("Отсортируем столбец `price_per_unit` в порядке убывания, чтобы посмотреть, нет ли слишком больших значений:")
df_renamed_columns.select((F.col("price_per_unit"))).orderBy(F.desc("price_per_unit")).show(5)

Посмотрим, есть ли некорректные значения в столбце `price_per_unit`:
+--------------+
|price_per_unit|
+--------------+
|          NULL|
|          NULL|
|          NULL|
|          NULL|
|          NULL|
+--------------+
only showing top 5 rows

Количество пустых записей и записей <=0 в столбце `price_per_unit`: 609
Отсортируем столбец `price_per_unit` в порядке убывания, чтобы посмотреть, нет ли слишком больших значений:
+--------------+
|price_per_unit|
+--------------+
|          41.0|
|          41.0|
|          41.0|
|          41.0|
|          41.0|
+--------------+
only showing top 5 rows



In [12]:
quantity_null =  (df_renamed_columns.select((F.col("quantity")))
                                    .where((F.col("quantity").isNull()) | (F.col("quantity") <= 0.0) 
                                          )
                 )
print("Посмотрим, есть ли некорректные значения в столбце `quantity`:")
quantity_null.show(5)
quantity_null_count = quantity_null.count()
print(f"Количество пустых записей и записей <=0 в столбце `quantity`: {quantity_null_count}")

print("Отсортируем столбец `quantity` в порядке убывания, чтобы посмотреть, нет ли слишком больших значений:")
df_renamed_columns.select((F.col("quantity"))).orderBy(F.desc("quantity")).show(5)

print("Преобразуем столбец `quantity` в IntegerType()")
df_renamed_columns = df_renamed_columns.withColumn("quantity", F.col("quantity").cast(IntegerType()))
df_renamed_columns.select((F.col("quantity"))).orderBy(F.desc("quantity")).show(5)

print("Убедимся, что в схеме `df_renamed_columns` поменялся тип для поля `quantity` на IntegerType()")
df_renamed_columns.printSchema()

Посмотрим, есть ли некорректные значения в столбце `quantity`:
+--------+
|quantity|
+--------+
|    NULL|
|    NULL|
|    NULL|
|    NULL|
|    NULL|
+--------+
only showing top 5 rows

Количество пустых записей и записей <=0 в столбце `quantity`: 604
Отсортируем столбец `quantity` в порядке убывания, чтобы посмотреть, нет ли слишком больших значений:
+--------+
|quantity|
+--------+
|    10.0|
|    10.0|
|    10.0|
|    10.0|
|    10.0|
+--------+
only showing top 5 rows

Преобразуем столбец `quantity` в IntegerType()
+--------+
|quantity|
+--------+
|      10|
|      10|
|      10|
|      10|
|      10|
+--------+
only showing top 5 rows

Убедимся, что в схеме `df_renamed_columns` поменялся тип для поля `quantity` на IntegerType()
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: integer (nullable = true)

In [13]:
total_spent_null =  (df_renamed_columns.select((F.col("total_spent")))
                                       .where((F.col("total_spent").isNull()) 
                                              | (F.col("total_spent") <= 0.0) 
                                             )
                    )

print("Посмотрим, есть ли некорректные значения в столбце `total_spent`:")
total_spent_null.show(5)
total_spent_null_count = total_spent_null.count()
print(f"Количество пустых записей и записей <=0 в столбце total_spent: {total_spent_null_count}")

print("Отсортируем `total_spent` в порядке убывания, чтобы посмотреть, нет ли слишком больших значений:")
df_renamed_columns.select((F.col("total_spent"))).orderBy(F.desc("total_spent")).show(5)

Посмотрим, есть ли некорректные значения в столбце `total_spent`:
+-----------+
|total_spent|
+-----------+
|       NULL|
|       NULL|
|       NULL|
|       NULL|
|       NULL|
+-----------+
only showing top 5 rows

Количество пустых записей и записей <=0 в столбце total_spent: 604
Отсортируем `total_spent` в порядке убывания, чтобы посмотреть, нет ли слишком больших значений:
+-----------+
|total_spent|
+-----------+
|      410.0|
|      410.0|
|      410.0|
|      410.0|
|      410.0|
+-----------+
only showing top 5 rows



In [14]:
item_null = df_renamed_columns.select((F.col("item"))).where((F.col("item").isNull()))
item_null_count = item_null.count()
print(f"Количество не заполненных записей в столбце `item`: {item_null_count}")

discount_applied_null = (df_renamed_columns.select((F.col("discount_applied")))
                                           .where((F.col("discount_applied").isNull())
                                                 )
                        )

discount_applied_null_count = discount_applied_null.count()
print(f"Количество не заполненных записей в столбце `discount_applied`: {discount_applied_null_count}")
print("Столбец `discount_applied` в дальнейших заданиях не используется.")

Количество не заполненных записей в столбце `item`: 1213
Количество не заполненных записей в столбце `discount_applied`: 4199
Столбец `discount_applied` в дальнейших заданиях не используется.


In [15]:
# Выведем по 5 записей из df_renamed_columns, где item isNull() и discount_applied isNull()
print("Выведем 5 записей из  df_renamed_columns, где `item` isNull():")
df_renamed_columns.where((F.col("item").isNull())).orderBy(F.col("item")).show(5)

print("Выведем 5 записей из  df_renamed_columns, где `discount_applied` isNull():")
(df_renamed_columns.where((F.col("discount_applied").isNull()))
                   .orderBy(F.col("discount_applied"))
                   .show(3, vertical=True)
)

Выведем 5 записей из  df_renamed_columns, где `item` isNull():
+--------------+-----------+-------------+----+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|     category|item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+-------------+----+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_7482416|    CUST_09|   Patisserie|NULL|          NULL|      10|      200.0|   Credit Card|  Online|      2023-11-30|            NULL|
|   TXN_1372952|    CUST_21|    Furniture|NULL|          33.5|    NULL|       NULL|Digital Wallet|In-store|      2024-04-02|            true|
|   TXN_5422631|    CUST_09|Milk Products|NULL|          NULL|       8|       52.0|Digital Wallet|In-store|      2025-01-12|            true|
|   TXN_1809665|    CUST_14|    Beverages|NULL|          24.5|    NULL|       NULL|  

In [16]:
# Создадим дополнительное поле price_per_unit_filled, если в столбце price_per_unit значения = Null, 
# то мы рассчитаем значение как total_spent / quantity.
# Если же Значение в столбце price_per_unit не Null, то оно запишется в столбец price_per_unit_filled из price_per_unit.
df_renamed_columns = df_renamed_columns.withColumn(
    "price_per_unit_filled",
    F.when(
        ((F.col("price_per_unit").isNull()) | (F.col("price_per_unit") == 0.0)) & 
        (F.col("total_spent").isNotNull()) & 
        (F.col("quantity").isNotNull()) & 
        (F.col("quantity") != 0),
        F.col("total_spent") / F.col("quantity")
    ).otherwise(F.col("price_per_unit"))
)
print("В новый столбец `price_per_unit_filled` запишем данные как `total_spent` / `quantity`, для Null значений", 
      "столбца `price_per_unit`, если значения в `price_per_unit` были корректные, то они остаются без изменений.", sep="\n")
print("\n")
print("Посмотрим, как заполнились значения в столбце `price_per_unit_filled` из столбца `price_per_unit`:")
(df_renamed_columns.select(F.col("transaction_id"), F.col("customer_id"), F.col("category"), F.col("item")
                           , F.col("quantity"), F.col("total_spent"), F.col("price_per_unit")
                           , F.col("price_per_unit_filled"))
                   .where((F.col("price_per_unit").isNull()) & (F.col("price_per_unit_filled").isNotNull()))
                   .orderBy(F.col("item"))
                   .show(10)
)
null_price_per_unit_filled = df_renamed_columns.where(F.col("price_per_unit_filled").isNull()).count()

# Нет пустых значений в столбце price_per_unit_filled 
print(f"Количество пустых записей в столбце `price_per_unit_filled`: {null_price_per_unit_filled}")
print("Столбец `price_per_unit_filled` полностью заполнен.")


В новый столбец `price_per_unit_filled` запишем данные как `total_spent` / `quantity`, для Null значений
столбца `price_per_unit`, если значения в `price_per_unit` были корректные, то они остаются без изменений.


Посмотрим, как заполнились значения в столбце `price_per_unit_filled` из столбца `price_per_unit`:
+--------------+-----------+--------------------+----+--------+-----------+--------------+---------------------+
|transaction_id|customer_id|            category|item|quantity|total_spent|price_per_unit|price_per_unit_filled|
+--------------+-----------+--------------------+----+--------+-----------+--------------+---------------------+
|   TXN_7482416|    CUST_09|          Patisserie|NULL|      10|      200.0|          NULL|                 20.0|
|   TXN_5422631|    CUST_09|       Milk Products|NULL|       8|       52.0|          NULL|                  6.5|
|   TXN_9634894|    CUST_15|       Milk Products|NULL|      10|      275.0|          NULL|                 27.5|
|   TXN_8

In [17]:
# Проверим, есть ли записи, где total_spent Null, но при этом quantity не пустое
count_null_total_spent_quantity_full = (df_renamed_columns.where((F.col("quantity").isNotNull()) & 
                                       (F.col("total_spent").isNull())).orderBy(F.col("item")).count())

print(f"Количество записей где столбец `total_spent` = Null, но при этом столбец `quantity`",
      f"не Null: {count_null_total_spent_quantity_full}")
print("===============================================================================================")
print("Создадим справочник товаров, где присутствуют столбцы: `category`, `item`, `price_per_unit_filled`.")
print("Этот справочник нам нужен для того, чтобы заполнить пустые `item`.")
print("Создадим справочник, аналогичный distinct_goods, где все столбцы `item`,",
      "`price_per_unit_filled`, `category` -  не Null")
dict_unique_goods = (df_renamed_columns.select(F.col("category"), F.col("item"), F.col("price_per_unit_filled"))
                                       .where((F.col("item").isNotNull()) 
                                           & (F.col("price_per_unit_filled").isNotNull()) 
                                           & (F.col("category").isNotNull()))
                                    .orderBy(F.col("category"), F.col("item"))
                                    .distinct()
                    )
dict_unique_goods.show(5, truncate=False)
print(f"Количество записей в справочнике товаров `dict_unique_goods`: {dict_unique_goods.count()}")

Количество записей где столбец `total_spent` = Null, но при этом столбец `quantity` не Null: 0
Создадим справочник товаров, где присутствуют столбцы: `category`, `item`, `price_per_unit_filled`.
Этот справочник нам нужен для того, чтобы заполнить пустые `item`.
Создадим справочник, аналогичный distinct_goods, где все столбцы `item`, `price_per_unit_filled`, `category` -  не Null
+----------------------------------+------------+---------------------+
|category                          |item        |price_per_unit_filled|
+----------------------------------+------------+---------------------+
|Beverages                         |Item_16_BEV |27.5                 |
|Beverages                         |Item_7_BEV  |14.0                 |
|Electric household essentials     |Item_23_EHE |38.0                 |
|Milk Products                     |Item_16_MILK|27.5                 |
|Computers and electric accessories|Item_1_CEA  |5.0                  |
+----------------------------------+------

In [18]:
# Проверим, сколько значений в dict_unique_goods по столбцам category и price_per_unit_filled без item, 
# item нам нужно будет заполнить
unique_value = (dict_unique_goods.groupBy(F.col("category"), F.col("price_per_unit_filled"))
                                 .agg(F.count('*').alias("cnt"))
                                 .filter(F.col("cnt") == 1)
                                 .count()
               )
print(f"Количество уникальных записей в `dict_unique_goods`только по столбцам"
       f"`category` и `price_per_unit_filled` без `item`: {unique_value}")

not_unique_value = (dict_unique_goods.groupBy(F.col("category"), F.col("price_per_unit_filled"))
                                     .agg(F.count('*').alias("cnt"))
                                     .filter(F.col("cnt") > 1)
                                     .count()
                   )
print(f"Количество НЕ уникальных записей в `dict_unique_goods` по столбцам `category`",
      f"и `price_per_unit_filled` без `item`: {not_unique_value}")

print("Делаем вывод, что столбец `item`, где значение = Null мы можем заполнить из",
      "`dict_unique_goods`, потому что сочетание категории и цены уникально.")

print("===============================================================================================")

result_goods = (df_renamed_columns.alias("a")
           .join(dict_unique_goods.alias("b"),
                    (F.col("a.category") == F.col("b.category")) & 
                    (F.col("a.price_per_unit_filled") == F.col("b.price_per_unit_filled")),
                "left")
           .withColumn(  # Используем withColumn 
               "item_final",
                F.when(
                    F.col("a.item").isNotNull(),  # если в a есть значение
                    F.col("a.item")
                ).otherwise(
                    F.col("b.item")  # иначе берем из b
                )
           )
          .select(
              "a.*",
              "item_final",
              F.col("b.item").alias("item_from_dict")
           )
)
print("Для этого создадим новый столбец `item_final` и новый DF `result_goods`.")
print(f"Убедимся, что количество записей в новом DF `result_goods` совпадает",
      f"с количеством записей в первоначальном DF: {result_goods.count()}")

print("Выведем столбцы `item` и `item_from_dict` из result_goods, чтобы убедиться, что столбец `item_from_dict` заполнен:")
(result_goods.select(F.col("item"), F.col("item_from_dict"))
             .where((F.col("item").isNull()))
             .show(5)
)

ccount_null_item_from_dict = result_goods.where((F.col("item_from_dict").isNull())).count()
print(f"Количество записей в result_goods, в новом столбце `item_from_dict` со значениями = Null: {ccount_null_item_from_dict}")
print("Убедились, что столбец `item_from_dict` заполнен полностью без пропусков.")

Количество уникальных записей в `dict_unique_goods`только по столбцам`category` и `price_per_unit_filled` без `item`: 200
Количество НЕ уникальных записей в `dict_unique_goods` по столбцам `category` и `price_per_unit_filled` без `item`: 0
Делаем вывод, что столбец `item`, где значение = Null мы можем заполнить из `dict_unique_goods`, потому что сочетание категории и цены уникально.
Для этого создадим новый столбец `item_final` и новый DF `result_goods`.
Убедимся, что количество записей в новом DF `result_goods` совпадает с количеством записей в первоначальном DF: 12575
Выведем столбцы `item` и `item_from_dict` из result_goods, чтобы убедиться, что столбец `item_from_dict` заполнен:
+----+--------------+
|item|item_from_dict|
+----+--------------+
|NULL|   Item_11_PAT|
|NULL|   Item_20_FUR|
|NULL|   Item_2_MILK|
|NULL|   Item_14_BEV|
|NULL|  Item_16_MILK|
+----+--------------+
only showing top 5 rows

Количество записей в result_goods, в новом столбце `item_from_dict` со значениями = N

In [19]:
# 2.4. Удалим оставшийся строки с пропусками в Category, Quantity, Total Spent и Rrice Rer Unit

print(f"Всего записей в DF result_goods: {result_goods.count()}")
empty_recordes = (result_goods.where((F.col("quantity").isNull()) | (F.col("total_spent").isNull()) 
                         | (F.col("category").isNull()) ).count())

print(f"Количество записей которые необходимо удалить, где `quantity` и `total_spent` = Null:", 
      f"{empty_recordes}. Восстановить эти данные мы не можем.")

(result_goods.select(F.col("transaction_id"), F.col("customer_id"), F.col("category"), F.col("item_final")
                     , F.col("price_per_unit_filled"), F.col("quantity"), F.col("total_spent"))
              .where((F.col("quantity").isNull()) | (F.col("total_spent").isNull()))
              .show(5)
)

cnt_null_value = (result_goods.where((F.col("item_final").isNull()) 
                                     | (F.col("category").isNull()) 
                                     | (F.col("price_per_unit_filled").isNull()))
                              .count()
                 )

print(f"Проверим еще количество записей, в которых `item_final` или `category`", 
      f"или `price_per_unit_filled` пустые: {cnt_null_value}.")

print("Убедились, что таких записей нет.")
# Уберем данные, где пустые значения для этого создадим новый DF
clear_goods =  result_goods.filter((F.col("quantity").isNotNull()) & (F.col("total_spent").isNotNull()))
print(f"Всего записей в новом DF `clear_goods` без пустых `quantity` и `total_spent`: {clear_goods.count()}")
print("Дальше работаем с DF `clear_goods`")

Всего записей в DF result_goods: 12575
Количество записей которые необходимо удалить, где `quantity` и `total_spent` = Null: 604. Восстановить эти данные мы не можем.
+--------------+-----------+----------+-----------+---------------------+--------+-----------+
|transaction_id|customer_id|  category| item_final|price_per_unit_filled|quantity|total_spent|
+--------------+-----------+----------+-----------+---------------------+--------+-----------+
|   TXN_1372952|    CUST_21| Furniture|Item_20_FUR|                 33.5|    NULL|       NULL|
|   TXN_1809665|    CUST_14| Beverages|Item_14_BEV|                 24.5|    NULL|       NULL|
|   TXN_4206593|    CUST_01| Furniture|Item_21_FUR|                 35.0|    NULL|       NULL|
|   TXN_3481599|    CUST_05| Furniture|Item_24_FUR|                 39.5|    NULL|       NULL|
|   TXN_1621497|    CUST_06|Patisserie|Item_13_PAT|                 23.0|    NULL|       NULL|
+--------------+-----------+----------+-----------+---------------------+

In [20]:
# 3.1. Самые популярные категории товаров: 
print("Рассчитаем общее количество проданных единиц товара для каждой категории.") 
print("Определим Топ-5 категорий по общему количеству проданных единиц:")
(clear_goods.groupBy("category")
            .agg(F.sum("quantity").alias("sum_quantity"))
            .orderBy(F.desc("sum_quantity"))
            .show(5, truncate=False)
)

Рассчитаем общее количество проданных единиц товара для каждой категории.
Определим Топ-5 категорий по общему количеству проданных единиц:
+-----------------------------+------------+
|category                     |sum_quantity|
+-----------------------------+------------+
|Furniture                    |8462        |
|Food                         |8387        |
|Beverages                    |8358        |
|Milk Products                |8339        |
|Electric household essentials|8309        |
+-----------------------------+------------+
only showing top 5 rows



In [21]:
# 3.2. Анализ среднего чека, округлим до двух знаков после запятой: 
print("Рассчитаем среднее значение Total Spent для каждого метода оплаты:")
(clear_goods.groupBy("payment_method")
            .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
            .orderBy(F.col("payment_method"))
            .show()
)
print("Рассчитаем среднее значение Total Spent для каждого места где прошла оплата:")
(clear_goods.groupBy("location")
            .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
            .orderBy(F.col("location"))
            .show()
)

Рассчитаем среднее значение Total Spent для каждого метода оплаты:
+--------------+---------------+
|payment_method|avg_total_spent|
+--------------+---------------+
|          Cash|         131.05|
|   Credit Card|         129.13|
|Digital Wallet|         128.72|
+--------------+---------------+

Рассчитаем среднее значение Total Spent для каждого места где прошла оплата:
+--------+---------------+
|location|avg_total_spent|
+--------+---------------+
|In-store|         128.86|
|  Online|         130.42|
+--------+---------------+



In [22]:
# Временные признаки: Добавим новые столбцы на основе Transaction Date:
# day_of_week: День недели
# transaction_month: Месяц транзакции 
# day_of_week_num: День недели числовой, для сортировки
# month_num: Месяц транзакции числовой, для сортировки 

goods_with_new_fields = (clear_goods.withColumn(
                    "day_of_week", F.date_format(F.col("transaction_date"), "EEEE")
                     ).withColumn(
                    "month_num", F.month(F.col("transaction_date"))
                     ).withColumn(
                    "day_of_week_num", 
                        F.when(F.dayofweek(F.col("transaction_date")) == 1, 8)
                         .otherwise(F.dayofweek(F.col("transaction_date")))
                    ).withColumn(
                    "transaction_month", F.date_format(F.col("transaction_date"), "MMMM"))
                    )
goods_with_new_fields.show(3, vertical=True)
goods_with_new_fields.coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("my_file.csv")

-RECORD 0-------------------------------
 transaction_id        | TXN_6867343    
 customer_id           | CUST_09        
 category              | Patisserie     
 item                  | Item_10_PAT    
 price_per_unit        | 18.5           
 quantity              | 10             
 total_spent           | 185.0          
 payment_method        | Digital Wallet 
 location              | Online         
 transaction_date      | 2024-04-08     
 discount_applied      | true           
 price_per_unit_filled | 18.5           
 item_final            | Item_10_PAT    
 item_from_dict        | Item_10_PAT    
 day_of_week           | Monday         
 month_num             | 4              
 day_of_week_num       | 2              
 transaction_month     | April          
-RECORD 1-------------------------------
 transaction_id        | TXN_3731986    
 customer_id           | CUST_22        
 category              | Milk Products  
 item                  | Item_17_MILK   
 price_per_unit 

In [23]:
# 4.2. Продажи по дням недели: 
print("Рассчитаем среднюю сумму продаж (Total Spent) для каждого дня недели.")
print("Выведем результаты, отсортированные по дням недели:")
df_avg_total_spent_in_weekday = (goods_with_new_fields
                                    .groupBy("day_of_week", "day_of_week_num")
                                    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
                                    .orderBy(F.col("day_of_week_num"))
                                )
df_avg_total_spent_in_weekday.select("day_of_week", "avg_total_spent").show()

# 4.3.Продажи по месяцам:
print("Рассчитаем среднюю сумму продаж (Total Spent) для каждого месяца.")
print("Выведем результаты, отсортированные по месяцам:")
df_avg_total_spent_in_month = (goods_with_new_fields
                                   .groupBy("transaction_month", "month_num")
                                   .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
                                   .orderBy(F.col("month_num"))
                              )
df_avg_total_spent_in_month.select("transaction_month", "avg_total_spent").show()

# 4.4. Признаки клиента: 
print("Рассчитаем `customer_lifetime_value` (CLV) для каждого клиента как общую сумму", 
      "(Total Spent), потраченную этим клиентом за все транзакции.")
print("Выведем Топ-10 клиентов по их CLV (customer_id и их CLV):") 
df_sum_total_spent_by_customers = (goods_with_new_fields
                                       .groupBy("customer_id")
                                       .agg(F.round(F.sum("total_spent"), 2).alias("sum_total_spent"))
                                       .orderBy(F.desc("sum_total_spent"))
                                  )
df_sum_total_spent_by_customers.show(10)

Рассчитаем среднюю сумму продаж (Total Spent) для каждого дня недели.
Выведем результаты, отсортированные по дням недели:
+-----------+---------------+
|day_of_week|avg_total_spent|
+-----------+---------------+
|     Monday|         125.57|
|    Tuesday|         129.51|
|  Wednesday|         126.82|
|   Thursday|         129.28|
|     Friday|         134.64|
|   Saturday|         131.49|
|     Sunday|         130.18|
+-----------+---------------+

Рассчитаем среднюю сумму продаж (Total Spent) для каждого месяца.
Выведем результаты, отсортированные по месяцам:
+-----------------+---------------+
|transaction_month|avg_total_spent|
+-----------------+---------------+
|          January|         134.69|
|         February|         130.66|
|            March|         126.83|
|            April|         131.81|
|              May|          127.4|
|             June|         130.95|
|             July|         126.57|
|           August|         124.28|
|        September|         131.45|
|

In [24]:
# Останавливаем SparkSession
spark.stop()