#### Импортируем библиотеки

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *

#### 1. С помощью модуля pandas преобразуем файл из .xlsx в .csv формат

In [2]:
data = pd.read_excel('online_retail.xlsx')
data.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom


In [3]:
data.to_csv('online_retail.csv', index=False)

#### 2. Инициализируйем Spark-сессию

In [4]:
# spark = SparkSession.builder.appName('my_first_spark').getOrCreate()
spark = SparkSession.builder \
  .master("local[1]") \
  .appName("SparkFirst") \
  .config("spark.executor.memory", "10g")\
  .config("spark.executor.cores", 5) \
  .config("spark.dynamicAllocation.enabled", "true") \
  .config("spark.dynamicAllocation.maxExecutors", 5) \
  .config("spark.shuffle.service.enabled", "true") \
  .getOrCreate()

#### 3. Создадим dataframe из скачанного файла

In [5]:
df = spark.read.csv('online_retail.csv', header=True)
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



#### 4. Подсчитаем следующие показатели:

#### a) Количество строк в файле

In [6]:
row_count = df.count()
print(f'Количество строк в файле: {row_count}')

Количество строк в файле: 541909


#### b) Количество уникальных клиентов

In [7]:
unique_customers = df.select("CustomerID").distinct().count()
print("Количество уникальных клиентов:", unique_customers)

Количество уникальных клиентов: 4373


#### c) В какой стране совершается большинство покупок

In [8]:
most_purchases_by_country = df.groupBy('Country').count().orderBy('count', ascending=False).first()[0]
print("Страна с наибольшим количеством покупок:", most_purchases_by_country)

Страна с наибольшим количеством покупок: United Kingdom


#### d) Даты самой ранней и самой последней покупки на платформе

In [9]:
earliest_date = df.select(min('InvoiceDate')).first()[0]
latest_date = df.select(max('InvoiceDate')).first()[0]

print("Самая ранняя дата покупки:", earliest_date)
print("Самая поздняя дата покупки:", latest_date)

Самая ранняя дата покупки: 2010-12-01 08:26:00
Самая поздняя дата покупки: 2011-12-09 12:50:00


#### 5. Проведем RFM-анализ клиентов платформы. Что такое RFM-анализ? Обычно RFM-анализ используется в маркетинге для оценки ценности клиента на основе его:

#### a) Recency - Давность: как давно каждый покупатель совершил покупку?

#### b) Frequency- Частота: Как часто они что-то покупали?

#### c) Monetary - Денежная ценность: сколько денег они в среднем тратят при совершении покупок?

#### Добавьте в dataframe для каждого клиента 3 новых поля : recency, frequency, monetary

In [10]:
today = current_date()

df = df.withColumn('InvoiceDate', date_format(df['InvoiceDate'], 'yyyy-MM-dd'))
df = df.withColumn('InvoiceDate', to_date(df['InvoiceDate'], 'yyyy-MM-dd'))

recency_df = df.groupBy('CustomerID').agg(max('InvoiceDate').alias('last_purchase_date'))
recency_df = recency_df.withColumn('recency', datediff(today, recency_df['last_purchase_date']))

frequency_df = df.groupBy('CustomerID').agg(count('InvoiceNo').alias('frequency'))

monetary_df = df.groupBy('CustomerID').agg(round(sum('UnitPrice'), 2).alias('monetary'))

df = df.join(recency_df.select('CustomerID', 'recency'), 'CustomerID')
df = df.join(frequency_df.select('CustomerID', 'frequency'), 'CustomerID')
df = df.join(monetary_df.select('CustomerID', 'monetary'), 'CustomerID')

df.show(5)

+----------+---------+---------+--------------------+--------+-----------+---------+--------------+-------+---------+--------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|       Country|recency|frequency|monetary|
+----------+---------+---------+--------------------+--------+-----------+---------+--------------+-------+---------+--------+
|   17850.0|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|United Kingdom|   4577|      312| 1224.51|
|   17850.0|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|United Kingdom|   4577|      312| 1224.51|
|   17850.0|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|United Kingdom|   4577|      312| 1224.51|
|   17850.0|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|United Kingdom|   4577|      312| 1224.51|
|   17850.0|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|United Kingdom|   4577|    

#### Для каждого показателя добавим стобец с разбиением клиентов на 3 группы. Допустим, у нас есть 3 клиента, первый клиент последний раз купил товар только в прошлом году, второй клиент в прошлом месяце, а третий клиент на прошлой неделе. Каждый из этих клиентов должен получить различные значения группы для показателя Recency - A, B и С, где А - отражает наибольшую “ценность”, а С - соответственно, наименьшую. 

#### Добавим итоговый столбец с “суммой” значений групп по каждому показателю

In [11]:
quantiles_recency = df.approxQuantile('recency', [0.33, 0.66], 0.01) # разбиение на три категории
df = df.withColumn('recency_group',
                   when(df['recency'] <= quantiles_recency[0], 'C')
                   .when((df['recency'] > quantiles_recency[0]) & (df['recency'] <= quantiles_recency[1]), 'B')
                   .otherwise('A'))

quantiles_frequency = df.approxQuantile('frequency', [0.33, 0.66], 0.01)
df = df.withColumn('frequency_group',
                   when(df['frequency'] <= quantiles_frequency[0], 'C')
                   .when((df['frequency'] > quantiles_frequency[0]) & (df['frequency'] <= quantiles_frequency[1]), 'B')
                   .otherwise('A'))

quantiles_monetary = df.approxQuantile('monetary', [0.33, 0.66], 0.01)
df = df.withColumn('monetary_group',
                   when(df['monetary'] <= quantiles_monetary[0], 'C')
                   .when((df['monetary'] > quantiles_monetary[0]) & (df['monetary'] <= quantiles_monetary[1]), 'B')
                   .otherwise('A'))


In [12]:
df = df.withColumn('RFM', concat_ws('', df['recency_group'], df['frequency_group'], df['monetary_group']))
df.show(5)

+----------+---------+---------+--------------------+--------+-----------+---------+--------------+-------+---------+--------+-------------+---------------+--------------+---+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|       Country|recency|frequency|monetary|recency_group|frequency_group|monetary_group|RFM|
+----------+---------+---------+--------------------+--------+-----------+---------+--------------+-------+---------+--------+-------------+---------------+--------------+---+
|   17850.0|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|United Kingdom|   4577|      312| 1224.51|            A|              B|             A|ABA|
|   17850.0|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|United Kingdom|   4577|      312| 1224.51|            A|              B|             A|ABA|
|   17850.0|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|United Kingdom|   4577|      312| 12

#### Сохраним в отдельный csv-файл Id только тех клиентов, у которых значения групп ААА.

In [15]:
result_df = (
    df.filter(df["RFM"] == "AAA")
    .select("CustomerID")
    .distinct()
    .orderBy("CustomerID", ascending=True)
)

result_df.toPandas().to_csv('result.csv', header=False, index=False)
result_df.show()

+----------+
|CustomerID|
+----------+
|   12472.0|
|   12484.0|
|   12637.0|
|   13097.0|
|   15249.0|
|   15356.0|
|   15719.0|
|   15998.0|
|   16686.0|
|   16729.0|
|   16984.0|
|   17107.0|
|   17337.0|
|   17865.0|
|   17965.0|
+----------+

