## Загрузка и сохранение файла исходных данных в формат csv

In [None]:
import numpy as np
import pandas as pd

In [None]:
df_r = pd.read_excel(r'https://github.com/sultanmurad/spark_example_files/raw/main/online_retail.xlsx', sheet_name='Online Retail')

In [None]:
print(df_r.shape)
print(df_r.info())
print(df_r.head(10))

In [None]:
df_r.to_csv('online_retail.csv', index= False, sep=';')

## Создание сессии Spark

In [None]:
import findspark
findspark.init()
findspark.find()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
    .master("local[1]")\
    .appName("task_47")\
    .config("spark.executor.memory", "10g")\
    .config("spark.executor.cores", 5)\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", 5)\
    .config("spark.shuffle.service.enabled", "true")\
.getOrCreate()

In [None]:
from pyspark.sql.types import *

data_schema = [
               StructField('InvoiceNo', StringType(), True),
               StructField('StockCode', StringType(), True),
               StructField('Description', StringType(), True),
               StructField('Quantity', IntegerType(), True),
               StructField('InvoiceDate', DateType(), True),
               StructField('UnitPrice', DoubleType(), True),
               StructField('CustomerID', StringType(), True),
               StructField('Country', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

df_s = spark.read.csv('online_retail.csv', sep=";", header=True, schema=final_struc)

In [None]:
df_s.printSchema()

In [None]:
df_s.show(5, vertical=True, truncate=False)

## Анализ DataFrame

In [None]:
from pyspark.sql import functions as f

In [None]:
print('Rows count: ', df_s.count()) #число строк в файле

In [None]:
#Число уникальных покупателей
df_s.select(f.count_distinct(df_s.CustomerID).alias('Число уникальных покупателей')).show(truncate=False)

In [None]:
#временная таблица для расчетов
temp_table_name = 'temp'
df_s.createOrReplaceTempView(temp_table_name)

In [None]:
#В какой стране совершается большинство покупок?
sql = """
    select distinct country
    from temp
    group by country
    having count(CustomerID) = (select count(CustomerID) from temp group by country order by 1 desc limit 1)

"""
df = spark.sql(sql).withColumnRenamed('country', 'Cтрана с большинством покупок')
df.show(truncate=False)

In [None]:
#Даты самой ранней и самой последней покупки на платформе

# 1. Функции Spark
df_s.select(f.min(df_s.InvoiceDate).alias('Минимальная дата'),\
            f.max(df_s.InvoiceDate).alias('Максимальная дата'))\
.show(truncate=False)

# 2. Запрос Spark SQL
sql = """
    SELECT min(InvoiceDate), max(InvoiceDate)
    FROM temp
"""
df = spark.sql(sql)\
    .withColumnRenamed('min(InvoiceDate)', 'Минимальная дата')\
    .withColumnRenamed('max(InvoiceDate)', 'Максимальная дата')
df.show(truncate=False)

## RFM-анализ клиентов платформы

In [None]:
#поскольку данные имеют размах по дате 2010-12-01 - 2011-12-09, 
#для анализа предполагаю, что его дата - максимальная дата в датасете + 1 день

In [None]:
sql = """
SELECT
  InvoiceNo
  , InvoiceDate
  , CustomerID
  , SUM(Quantity*UnitPrice) AS Price
  , MAX(InvoiceDate) OVER() + INTERVAL '1' DAY  AS Now
FROM 
  temp
WHERE 
  CustomerID IS NOT NULL
GROUP BY 
  InvoiceNo
  , InvoiceDate
  , CustomerID
"""

df = spark.sql(sql)

df.createOrReplaceTempView('orders')

df.show()

In [None]:
#расчет Recency, Frequency и Monetary за последний год (2011) 

In [None]:
sql = """
SELECT 
  CustomerID
  , MIN(datediff(Now, InvoiceDate)) AS Recency
  , COUNT(DISTINCT InvoiceNo) AS Frequency
  , SUM(Price) AS Monetary
FROM
  orders
WHERE 
    InvoiceDate >= Now - INTERVAL '365' DAY
GROUP BY 
  CustomerID
"""

df = spark.sql(sql)

df.createOrReplaceTempView('base')

df.show()

In [None]:
#расчет границ групп Recency, Frequency и Monetary за последний год (по перцентилям - 33%, 67%, 100%) 

In [None]:
sql = """
SELECT
     percentile(Recency, 0.33) AS R_33
    , percentile(Recency, 0.67) AS R_67

    , percentile(Frequency, 0.33) AS F_33
    , percentile(Frequency, 0.67) AS F_67

    , percentile(Monetary, 0.33) AS M_33
    , percentile(Monetary, 0.67) AS M_67

FROM 
    base
"""

df = spark.sql(sql)

df.createOrReplaceTempView('bins')

df.show()

In [None]:
sql = """

WITH rfm AS(
SELECT
 CustomerID
  , Recency
  , Frequency
  , Monetary
  , CASE 
         WHEN Recency <= R_33 THEN 'С'
         WHEN Recency <= R_67 THEN 'B'
         ELSE 'A'
    END AS R
 , CASE 
         WHEN Frequency <= F_33 THEN 'A'
         WHEN Frequency <= F_67 THEN 'B'
         ELSE 'С'
    END AS F
   , CASE 
         WHEN Monetary <= M_33 THEN 'A'
         WHEN Monetary <= M_67 THEN 'B'
         ELSE 'С'
    END AS M 
FROM
  base
CROSS JOIN 
  bins
)
SELECT
    *
    , Concat(r,f,m ) AS RFM_Score
FROM 
    rfm
ORDER BY 
    CustomerID
"""

df = spark.sql(sql)

df.show()

In [None]:
df_r = df.select(['CustomerID']).filter(df.RFM_Score == 'AAA')
df_r.show()

In [None]:
#из-за ошибки Py4JJavaError (которая вызвана, скорее всего, моей работой в Jupyter Notebook под Windows и
#решения которой внятным способом я не обнаружил),
#для записи результирующий DataFrame Spark конвертирую в Pandas

In [None]:
p_d = df_r.toPandas()
p_d['CustomerID'] = p_d['CustomerID'].apply(lambda x: x.split('.')[0])
p_d

In [None]:
#запись результата в файл
p_d.to_csv('result.csv', index= False, sep=',')