# Импорты - и спарк сессия

In [16]:

import sys
import re
import requests
import io

from urllib.parse import urlencode

import pandas as pd


sys.path.append('../') 
from my_spark import get_context

from pyspark.sql import functions as F
from pyspark.sql import types as T 
from pyspark.sql import Window



spark = get_context()


In [2]:
spark

## 1. Загрузка и предварительная обработка данных
 

### 1.1. Загрузка и вывод схемы:

In [3]:
#Загрузите файл retail_store_sales.csv.  Выведите первые 5 строк загруженного DataFrame и его схему

base_url = 'https://cloud-api.yandex.net/v1/disk/public/resources/download?'
public_key = 'https://disk.yandex.ru/d/5yhVs9f0xe4w-Q'  

final_url = base_url + urlencode(dict(public_key=public_key))
response = requests.get(final_url)
download_url = response.json()['href']

download_response = requests.get(download_url)

# Читаем в pandas DataFrame из памяти чтобы не сохранять на диск
pdf = pd.read_csv(io.BytesIO(download_response.content))

# Преобразуем pandas DataFrame в список словарей.Можно в спарк сразу из пандас, но так не на всех версиях панадас работает
records = pdf.to_dict(orient='records')

# Создаем DataFrame из RDD 
rdd = spark.sparkContext.parallelize(records)
spark_df = spark.createDataFrame(rdd)

spark_df.printSchema()
spark_df.show(5)

root
 |-- Category: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Discount Applied: boolean (nullable = true)
 |-- Item: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Transaction Date: string (nullable = true)
 |-- Transaction ID: string (nullable = true)

+-------------+-----------+----------------+------------+--------+--------------+--------------+--------+-----------+----------------+--------------+
|     Category|Customer ID|Discount Applied|        Item|Location|Payment Method|Price Per Unit|Quantity|Total Spent|Transaction Date|Transaction ID|
+-------------+-----------+----------------+------------+--------+--------------+--------------+--------+-----------+----------------+--------------+
|   Patisserie|    CUST_09|            true| Item_10_PAT|  Online|D

### 1.2. Очистка названий столбцов

Преобразуйте названия всех столбцов к единому регистру - snake_case.  Выведите обновленную схему DataFrame  или названия столбцов, чтобы убедиться в изменении названий.

In [6]:
# сделаем функцию чтобы переиспользовать
def clean_and_snake_case_names(columns: list) -> list:
    """
    Очищает список названий колонок:
    - все знаки препинания заменяет на пробел
    - убирает повторяющиеся пробелы
    - пробелы заменяет на _
    - приводит к нижнему регистру
    """
    cleaned = []
    for col in columns:
        col = re.sub(r'[^\w\s]', ' ', col)      # знаки препинания на пробел
        col = col.strip()                       # убрать пробелы по краям
        col = re.sub(r'\s+', ' ', col)          # несколько пробелов на один
        col = col.replace(' ', '_')             # пробелы на _
        col = col.lower()                       # в нижний регистр
        cleaned.append(col)
    return cleaned

In [8]:
#Чистим названия колонок и приводим к snake_case
spark_df = spark_df.toDF(*clean_and_snake_case_names(spark_df.columns))

display(spark_df.columns)
spark_df.printSchema()

['category',
 'customer_id',
 'discount_applied',
 'item',
 'location',
 'payment_method',
 'price_per_unit',
 'quantity',
 'total_spent',
 'transaction_date',
 'transaction_id']

root
 |-- category: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- discount_applied: boolean (nullable = true)
 |-- item: string (nullable = true)
 |-- location: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_id: string (nullable = true)



### 1.3. Преобразование типов данных:

In [9]:
# Приводим типы, для денег - беру Decimal, для количества - целое
spark_df = (
    spark_df
    .withColumn('price_per_unit', F.col('price_per_unit').cast(T.DecimalType(10, 2)))
    .withColumn('total_spent', F.col('total_spent').cast(T.DecimalType(10, 2)))
    .withColumn('quantity', F.col('quantity').cast(T.IntegerType()))
)
# в item Nan залетел как текст , чистим NaN -> Null
spark_df = (
    spark_df
    .withColumn(
        'item',
        F.when(F.col('item') == 'NaN', None).otherwise(F.col('item'))
    )
)
spark_df.printSchema()


root
 |-- category: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- discount_applied: boolean (nullable = true)
 |-- item: string (nullable = true)
 |-- location: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- price_per_unit: decimal(10,2) (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_spent: decimal(10,2) (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_id: string (nullable = true)



In [19]:
#cохраним его поближе
spark_df.persist()

DataFrame[category: string, customer_id: string, discount_applied: boolean, item: string, location: string, payment_method: string, price_per_unit: decimal(10,2), quantity: int, total_spent: decimal(10,2), transaction_date: string, transaction_id: string]

## 2. Очистка и валидация данных


In [10]:
# Зафиксируем нашу стартовую позицию
(
     spark_df
    .agg(
        F.sum(
            F.when(
                F.col('category').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_category_row'),
        F.sum(
            F.when(
                F.col('item').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_item_row'),
        F.sum(
            F.when(
                F.col('price_per_unit').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_price_row'),
        F.sum(
            F.when(
                F.col('quantity') == 0, F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('0_quantity_row'),# проверяем на 0 так как после преобразования null кастанулся в 0
        F.sum(
            F.when(
                F.col('total_spent').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_total_spent_row')
    )
).show(10)

+-----------------+-------------+--------------+--------------+--------------------+
|null_category_row|null_item_row|null_price_row|0_quantity_row|null_total_spent_row|
+-----------------+-------------+--------------+--------------+--------------------+
|                0|         1213|           609|           604|                 604|
+-----------------+-------------+--------------+--------------+--------------------+



Гипотеза - если цена товара одинакова внутри категории - то это один и тот же товар .
По этой причине предлагаю  изменить предложенную очередность заданий и востановить цену товара пункт задания 2.3 


### 2.3. Заполнение отсутствующих Quantity и price Rer Unit

In [11]:
# расчитываем price_per_unit_recovery как total_spent / quantity
# Востанавливаем  price_per_unit выбирая непустое значение  из двух цен
# После востановления цепны Обновляем quantity total_spent / price_per_unit
prep_dim_item = (
    spark_df
    .withColumn(
        'price_per_unit_recovery',
        (F.col('total_spent') / F.col('quantity')).cast(T.DecimalType(10, 2))
    )

    .withColumn('price_per_unit', F.coalesce('price_per_unit', 'price_per_unit_recovery'))
    .drop('price_per_unit_recovery')
    .withColumn(
        'quantity',
        (F.col('total_spent') / F.col('price_per_unit')).cast(T.IntegerType())
    )
)

prep_dim_item.show(3)

+-------------+-----------+----------------+------------+--------+--------------+--------------+--------+-----------+----------------+--------------+
|     category|customer_id|discount_applied|        item|location|payment_method|price_per_unit|quantity|total_spent|transaction_date|transaction_id|
+-------------+-----------+----------------+------------+--------+--------------+--------------+--------+-----------+----------------+--------------+
|   Patisserie|    CUST_09|            true| Item_10_PAT|  Online|Digital Wallet|         18.50|      10|     185.00|      2024-04-08|   TXN_6867343|
|Milk Products|    CUST_22|            true|Item_17_MILK|  Online|Digital Wallet|         29.00|       9|     261.00|      2023-07-23|   TXN_3731986|
|     Butchers|    CUST_02|           false| Item_12_BUT|  Online|   Credit Card|         21.50|       2|      43.00|      2022-10-05|   TXN_9303719|
+-------------+-----------+----------------+------------+--------+--------------+--------------+----

### 2.1. Восстановление отсутствующих item

Проверяем гипотзеу: если цена товара одинакова внутри категории - то это один и тот же товар .


для начала нам нужно проверить - товары внутри категории
разметим товары внутри категоии с одинаковой ценой одинаковым псевдо айдишником
и посчитаем сколько товаров в категории получиться, по условию их не должно быть строго 25

Для нашей задачи подойдет как rank так и dense_rank, я воспользуюсь последним
дадим одинаковые номера тем товарам у которых одинаковая цена в пределах категории

In [17]:
window_spec = Window.partitionBy('category').orderBy('price_per_unit')

df_ranked = (
    prep_dim_item
    .select('category', 'item', 'price_per_unit')
    .distinct()
    .withColumn(
        'rank',
        F.dense_rank().over(window_spec)
    )

    .orderBy('category', 'price_per_unit')
)

df_ranked.show(10)

+---------+----------+--------------+----+
| category|      item|price_per_unit|rank|
+---------+----------+--------------+----+
|Beverages|Item_1_BEV|          5.00|   1|
|Beverages|      null|          5.00|   1|
|Beverages|Item_2_BEV|          6.50|   2|
|Beverages|      null|          6.50|   2|
|Beverages|      null|          8.00|   3|
|Beverages|Item_3_BEV|          8.00|   3|
|Beverages|      null|          9.50|   4|
|Beverages|Item_4_BEV|          9.50|   4|
|Beverages|Item_5_BEV|         11.00|   5|
|Beverages|Item_6_BEV|         12.50|   6|
+---------+----------+--------------+----+
only showing top 10 rows



In [18]:
# проверяем кол-во товаров
(
     df_ranked
     .groupBy('category')
    .agg(
        F.countDistinct('rank').alias('cnt_psevdo_item'),
           
    )

).show(10,False)

+----------------------------------+---------------+
|category                          |cnt_psevdo_item|
+----------------------------------+---------------+
|Beverages                         |25             |
|Butchers                          |25             |
|Computers and electric accessories|25             |
|Electric household essentials     |25             |
|Food                              |25             |
|Furniture                         |25             |
|Milk Products                     |25             |
|Patisserie                        |25             |
+----------------------------------+---------------+



In [20]:
# проверяем кол-во товаров v2 
#в целом можно не выпендирваться с оконками и проверить кол-во цен в категории
(
     df_ranked
     .groupBy('category')
    .agg(
        F.countDistinct('price_per_unit').alias('cnt_price_per_unit'),
    )

).show(10,False)

+----------------------------------+------------------+
|category                          |cnt_price_per_unit|
+----------------------------------+------------------+
|Food                              |25                |
|Furniture                         |25                |
|Electric household essentials     |25                |
|Milk Products                     |25                |
|Computers and electric accessories|25                |
|Patisserie                        |25                |
|Beverages                         |25                |
|Butchers                          |25                |
+----------------------------------+------------------+



In [21]:
#создаем справочник товаров, для каждой цены может быт заполнен item а может быть Null,
# воспользуемся группирвокой и агрегатной функцией. тем самым она вытащит нам действительно значение для цены
dim_item = (
    prep_dim_item
    .groupBy('category', 'price_per_unit')
    .agg(
        F.max('item').alias('item')
    )
    .select('category', 'item', 'price_per_unit')
    .orderBy('price_per_unit')
)
dim_item.show(5)

+--------------------+----------+--------------+
|            category|      item|price_per_unit|
+--------------------+----------+--------------+
|            Butchers|Item_1_BUT|          5.00|
|Computers and ele...|Item_1_CEA|          5.00|
|           Beverages|Item_1_BEV|          5.00|
|           Furniture|Item_1_FUR|          5.00|
|Electric househol...|Item_1_EHE|          5.00|
+--------------------+----------+--------------+
only showing top 5 rows



In [22]:
# проверим справочник кол-во пустых строк в справочнике

(
     dim_item
    .agg(
        F.sum(
            F.when(
                F.col('item').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_item_row'),
        F.sum(
            F.when(
                F.col('price_per_unit').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_price_row')
    )
).show(10)


+-------------+--------------+
|null_item_row|null_price_row|
+-------------+--------------+
|            0|             0|
+-------------+--------------+



In [23]:
# джойним по цене и кагории для обогащения поля item.
df_with_item = (
    prep_dim_item.alias('prep')
    .join(
        dim_item.alias('dim'),
        on=['category','price_per_unit'],
        how='left'
    )
    .select(
        'prep.category',
        'prep.customer_id',
        'prep.discount_applied',
        F.coalesce('prep.item','dim.item').alias('item'),
        'prep.location',
        'prep.price_per_unit',
        'prep.quantity',
        'prep.total_spent',
        'prep.transaction_date',
        'prep.transaction_id', 
        'prep.payment_method'
    )
    
)
prep_dim_item.show(3)

+-------------+-----------+----------------+------------+--------+--------------+--------------+--------+-----------+----------------+--------------+
|     category|customer_id|discount_applied|        item|location|payment_method|price_per_unit|quantity|total_spent|transaction_date|transaction_id|
+-------------+-----------+----------------+------------+--------+--------------+--------------+--------+-----------+----------------+--------------+
|   Patisserie|    CUST_09|            true| Item_10_PAT|  Online|Digital Wallet|         18.50|      10|     185.00|      2024-04-08|   TXN_6867343|
|Milk Products|    CUST_22|            true|Item_17_MILK|  Online|Digital Wallet|         29.00|       9|     261.00|      2023-07-23|   TXN_3731986|
|     Butchers|    CUST_02|           false| Item_12_BUT|  Online|   Credit Card|         21.50|       2|      43.00|      2022-10-05|   TXN_9303719|
+-------------+-----------+----------------+------------+--------+--------------+--------------+----

### 2.2. Восстановление Total Spent:

Найдите все транзакции, с пропусками в общей сумме и обновите ее, пересчитав её как quantity * price_per_unit для всех записей.

In [None]:
#Забегая вперед - тут мы по сути проверям соглосованность Total Spent =  quantity * price_per_unit , так как часть данных не востановима

In [24]:
result_df = (
    df_with_item
    .withColumn('total_spent', F.col('quantity') * F.col('price_per_unit'))    
    
)

In [25]:
#проверим насколько нам удалось востановить

(
     result_df
    .agg(
        F.sum(
            F.when(
                F.col('category').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_category_row'),
        F.sum(
            F.when(
                F.col('item').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_item_row'),
        F.sum(
            F.when(
                F.col('price_per_unit').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_price_row'),
        F.sum(
            F.when(
                F.col('quantity').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('0_quantity_row'),# тут уже проверфем на Null 
        F.sum(
            F.when(
                F.col('total_spent').isNull(), F.lit(1)
            ).otherwise(F.lit(0))
        ).alias('null_total_spent_row')
    )
).show(10)

+-----------------+-------------+--------------+--------------+--------------------+
|null_category_row|null_item_row|null_price_row|0_quantity_row|null_total_spent_row|
+-----------------+-------------+--------------+--------------+--------------------+
|                0|            0|             0|           604|                 604|
+-----------------+-------------+--------------+--------------+--------------------+



In [378]:
#было / стало сравниваем со значениями на старте - видим что уже неплохо
#quantity - восстановить не удалось, так как везде где quantity is Null то и Total_spend Null

# +-----------------+-------------+--------------+--------------+--------------------+
# |null_category_row|null_item_row|null_price_row|0_quantity_row|null_total_spent_row|
# +-----------------+-------------+--------------+--------------+--------------------+
# |                0|         1213|           609|           604|                 604|
# +-----------------+-------------+--------------+--------------+--------------------+

### 2.4. Удалите оставшийся строки с пропусками в Category, Quantity ,Total Spent и Price Rer Unit



In [26]:
result_df =(
    result_df
    .filter(F.col('total_spent').isNotNull())
    .filter(F.col('quantity').isNotNull())
    .filter(F.col('price_per_unit').isNotNull())
)
result_df.show(20)

    

+--------------------+-----------+----------------+------------+--------+--------------+--------+-----------+----------------+--------------+--------------+
|            category|customer_id|discount_applied|        item|location|price_per_unit|quantity|total_spent|transaction_date|transaction_id|payment_method|
+--------------------+-----------+----------------+------------+--------+--------------+--------+-----------+----------------+--------------+--------------+
|          Patisserie|    CUST_09|            true| Item_10_PAT|  Online|         18.50|      10|     185.00|      2024-04-08|   TXN_6867343|Digital Wallet|
|       Milk Products|    CUST_22|            true|Item_17_MILK|  Online|         29.00|       9|     261.00|      2023-07-23|   TXN_3731986|Digital Wallet|
|            Butchers|    CUST_02|           false| Item_12_BUT|  Online|         21.50|       2|      43.00|      2022-10-05|   TXN_9303719|   Credit Card|
|           Beverages|    CUST_06|            null| Item_1

## 3. Разведочный анализ данных

### 3.1. Самые популярные категории товаров

In [27]:
# Рассчитайте общее количество проданных единиц товара  для каждой категории.

(
    result_df
    .groupBy('category')
    .agg(
        F.sum('quantity').alias('cnt')
    )
    .orderBy(F.desc('cnt'))
    
).show(10, False)

+----------------------------------+----+
|category                          |cnt |
+----------------------------------+----+
|Furniture                         |8462|
|Food                              |8387|
|Beverages                         |8358|
|Milk Products                     |8339|
|Electric household essentials     |8309|
|Computers and electric accessories|8272|
|Butchers                          |8206|
|Patisserie                        |7943|
+----------------------------------+----+



In [28]:
#Определите Топ-5 категорий по общему количеству проданных единиц. 

(
    result_df
    .groupBy('category')
    .agg(
        F.sum('quantity').alias('cnt')
    )
    .orderBy(F.desc('cnt'))
    
).show(5, False)

+-----------------------------+----+
|category                     |cnt |
+-----------------------------+----+
|Furniture                    |8462|
|Food                         |8387|
|Beverages                    |8358|
|Milk Products                |8339|
|Electric household essentials|8309|
+-----------------------------+----+
only showing top 5 rows



### 3.2. Анализ среднего чека: 

In [29]:
#Рассчитайте среднее значение Total Spent для каждого метода оплаты. Округлите до двух знаков после запятой.
# Тут возможно надо переписать - так как по условию - это не средний чек, он обычно считается на заказ/пользователя

(
    result_df
    .groupBy('payment_method')
    .agg(
        F.round(F.avg('total_spent'),2).alias('avg_total_spent')
    )
    .orderBy(F.desc('avg_total_spent'))
    
).show(5, False)


+--------------+---------------+
|payment_method|avg_total_spent|
+--------------+---------------+
|Cash          |131.05         |
|Credit Card   |129.13         |
|Digital Wallet|128.72         |
+--------------+---------------+



In [30]:
#Рассчитайте среднее значение Total Spent для каждой места где прошла оплата. Округлите до двух знаков после запятой.

(
    result_df
    .groupBy('location')
    .agg(
        F.round(F.avg('total_spent'),2).alias('avg_total_spent')
    )
    .orderBy(F.desc('avg_total_spent'))
    
).show(5, False)



+--------+---------------+
|location|avg_total_spent|
+--------+---------------+
|Online  |130.42         |
|In-store|128.86         |
+--------+---------------+



## 4. Генерация признаков 

### 4.1. Временные признаки



In [31]:
#Добавьте два новых столбца на основе Transaction Date:day_of_week, transaction_month
# Текст или число - не оговаривается, заложим оба варианта
# добавим вспомгательную колонку day_of_week_num для сортировки и сделаем так чтобы неделя была с понедельника

result_df_w_ds = (
    
    result_df
    .withColumn("day_of_week", F.date_format(F.col("transaction_date"), "E"))
    .withColumn("day_of_week_num", ((F.dayofweek(F.col("transaction_date")) + 5) % 7 + 1))
    .withColumn("transaction_month_name", F.date_format(F.col("transaction_date"), "MMMM"))
    .withColumn("transaction_month", F.month(F.col("transaction_date")))
    
)
result_df_w_ds.show(1,False,True)

-RECORD 0--------------------------------
 category               | Patisserie     
 customer_id            | CUST_09        
 discount_applied       | true           
 item                   | Item_10_PAT    
 location               | Online         
 price_per_unit         | 18.50          
 quantity               | 10             
 total_spent            | 185.00         
 transaction_date       | 2024-04-08     
 transaction_id         | TXN_6867343    
 payment_method         | Digital Wallet 
 day_of_week            | Mon            
 day_of_week_num        | 1              
 transaction_month_name | April          
 transaction_month      | 4              
only showing top 1 row



### 4.2. Продажи по дням недели

In [32]:
#Рассчитайте среднюю сумму продаж (Total Spent) для каждого дня недели. Выведите результаты, отсортированные по дням недели.

(
    result_df_w_ds
    .groupBy('day_of_week', 'day_of_week_num')
    .agg(
        F.sum('total_spent').alias('total_spent')
    )
    .orderBy(F.asc('day_of_week_num'))
    .drop('day_of_week_num')
    
).show(10, False)


+-----------+-----------+
|day_of_week|total_spent|
+-----------+-----------+
|Mon        |213090.50  |
|Tue        |219650.00  |
|Wed        |216982.00  |
|Thu        |219380.50  |
|Fri        |232786.00  |
|Sat        |224191.00  |
|Sun        |225991.00  |
+-----------+-----------+



### 4.3.Продажи по месяцам

In [33]:
# Рассчитайте среднюю сумму продаж (Total Spent)  для каждого месяца. Выведите результаты, отсортированные по месяцам.
(
    result_df_w_ds
    .groupBy('transaction_month_name', 'transaction_month')
    .agg(
        F.sum('total_spent').alias('total_spent')
    )
    .orderBy(F.asc('transaction_month'))
    .drop('transaction_month')
    
).show(12, False)


+----------------------+-----------+
|transaction_month_name|total_spent|
+----------------------+-----------+
|January               |174421.00  |
|February              |119685.00  |
|March                 |122392.00  |
|April                 |125618.50  |
|May                   |124594.50  |
|June                  |129771.00  |
|July                  |131509.00  |
|August                |123287.50  |
|September             |129344.00  |
|October               |119413.50  |
|November              |122346.50  |
|December              |129688.50  |
+----------------------+-----------+



### 4.4. Признаки клиента

Рассчитайте customer_lifetime_value (CLV) для каждого клиента как общую сумму (Total Spent), потраченную этим клиентом за все транзакции. Выведите Топ-10 клиентов по их CLV (customer_id и их CLV). 

In [34]:
(
    result_df_w_ds
    .groupBy('customer_id')
    .agg(
        F.sum('total_spent').alias('clv')
    )
    .orderBy(F.desc('clv'))
    .limit(10)
    
).show()

+-----------+--------+
|customer_id|     clv|
+-----------+--------+
|    CUST_24|68452.00|
|    CUST_08|67351.50|
|    CUST_05|66974.50|
|    CUST_16|65570.50|
|    CUST_13|65037.00|
|    CUST_23|64507.00|
|    CUST_10|63155.50|
|    CUST_15|63117.50|
|    CUST_21|62933.00|
|    CUST_02|62046.50|
+-----------+--------+



In [None]:
# всем спасибо

spark.stop()