In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
file_exel_path = 'spark_example_files\\online_retail.xlsx'
file_csv_path = 'spark_example_files\\online_retail.csv'

In [3]:
df = pd.read_excel(file_exel_path)

In [4]:
df.to_csv(file_csv_path, index=False)

In [5]:
spark = SparkSession.builder.appName('Spark Example').getOrCreate()

In [6]:
dfs = spark.read.csv(file_csv_path, header=True, inferSchema=True)
dfs.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
dfs.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [8]:
print(f'Количество строк: {dfs.count()}')

Количество строк: 541909


In [9]:
unique_count = dfs.select('CustomerID').distinct().count()
print("Количество уникальных клиентов: ", unique_count)

Количество уникальных клиентов:  4373


In [10]:
from pyspark.sql import functions as F

# Группировка по столбцу и подсчет количества записей
grouped_dfs = dfs.groupBy('Country').count()

# Сортировка по убыванию количества записей
sorted_dfs = grouped_dfs.orderBy(F.desc('count'))

# Получение наиболее повторяющейся записи
most_frequent_record = sorted_dfs.first()

print("Наибольшее количество покупок в:", most_frequent_record)

Наибольшее количество покупок в: Row(Country='United Kingdom', count=495478)


In [11]:
from pyspark.sql.functions import min, max

result_dfs_date = dfs.agg(min('InvoiceDate').alias('first_date'), max('InvoiceDate').alias('last_date'))
result_dfs_date.show(truncate=False)


+-------------------+-------------------+
|first_date         |last_date          |
+-------------------+-------------------+
|2010-12-01 08:26:00|2011-12-09 12:50:00|
+-------------------+-------------------+



In [12]:
from pyspark.sql.functions import col, lit, when, count, avg

# Предположим, что текущая дата - это последняя дата в дата фрейме
current_date = lit('2011-12-09').cast('date')

dfs = dfs.withColumn('InvoiceDate', col('InvoiceDate'))

# Рассчитываем Recency, Frequency и Monetary
recency_df = dfs.groupBy('CustomerID').agg(
    (current_date - max('InvoiceDate').cast('date')).alias('Recency')
)

# recency_df = dfs.groupBy('CustomerID').agg(
#     (max('InvoiceDate').cast('timestamp') - current_date).alias('Recency')
# )

frequency_df = dfs.groupBy('CustomerID').agg(
    count('InvoiceNo').alias('Frequency')
)

monetary_df = dfs.groupBy('CustomerID').agg(
    avg(col('UnitPrice') * col('Quantity')).alias('Monetary')
)

# Объединяем все параметры в один DataFrame
rfm_df = recency_df.join(frequency_df, 'CustomerID').join(monetary_df, 'CustomerID')


In [13]:
rfm_df.show()


+----------+------------------+---------+------------------+
|CustomerID|           Recency|Frequency|          Monetary|
+----------+------------------+---------+------------------+
|   16916.0| INTERVAL '23' DAY|      143|4.0297902097902085|
|   17884.0|  INTERVAL '3' DAY|      117| 6.132051282051282|
|   13094.0| INTERVAL '21' DAY|       30| 56.96200000000001|
|   16596.0| INTERVAL '15' DAY|       12|20.845833333333335|
|   17633.0| INTERVAL '31' DAY|       72| 17.25472222222222|
|   16858.0|INTERVAL '366' DAY|       13|26.606923076923085|
|   13649.0|INTERVAL '256' DAY|       23|15.069565217391306|
|   16656.0| INTERVAL '22' DAY|       80|           107.103|
|   15160.0|INTERVAL '357' DAY|        4|39.540000000000006|
|   18277.0| INTERVAL '58' DAY|        9|10.847777777777777|
|   15311.0|  INTERVAL '0' DAY|     2491|23.853608992372532|
|   17659.0|  INTERVAL '3' DAY|      161|18.352484472049685|
|   12967.0|  INTERVAL '3' DAY|       33| 36.20454545454545|
|   17062.0|INTERVAL '31

In [14]:
# Разбиваем на группы (A, B, C) для каждого параметра
# Давность (Recency)
rfm_df = rfm_df.withColumn('RecencyGroup',
                           when(col('Recency').cast('int') <= 30, lit('A'))
                           .when((col('Recency').cast('int') > 30) & (col('Recency').cast('int') <= 60), lit('B'))
                           .otherwise(lit('C')))

# Частота (Frequency)
rfm_df = rfm_df.withColumn('FrequencyGroup',
                           when(col('Frequency') >= 100, lit('A'))
                           .when((col('Frequency') < 100) & (col('Frequency') >= 50), lit('B'))
                           .otherwise(lit('C')))

# Денежная ценность (Monetary)
rfm_df = rfm_df.withColumn('MonetaryGroup',
                           when(col('Monetary') >= 100, lit('A'))
                           .when((col('Monetary') < 100) & (col('Monetary') >= 50), lit('B'))
                           .otherwise(lit('C')))



In [15]:
from pyspark.sql.functions import concat
# Итоговый столбец с суммой значений групп по каждому параметру
rfm_df = rfm_df.withColumn('RFMSum',
                           concat(col('RecencyGroup'), col('FrequencyGroup'), col('MonetaryGroup')))


In [16]:
result_df = rfm_df.filter(col('RFMSum') == 'AAA')

In [18]:
result_df.count()

19

In [17]:
# Сохраняем результат в CSV-файл
result_df.toPandas().to_csv('online_retail_rfm.csv', index=False)
