# Импорт библиотек

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import count_distinct
from pyspark.sql.functions import count
from pyspark.sql.functions import round
from pyspark.sql.functions import sum

# 1. Преобразование файла из .xlsx в .csv формат

In [2]:
df = pd.read_excel('online_retail.xlsx')

In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype         
---  ------       --------------   -----         
 0   InvoiceNo    541909 non-null  object        
 1   StockCode    541909 non-null  object        
 2   Description  540455 non-null  object        
 3   Quantity     541909 non-null  int64         
 4   InvoiceDate  541909 non-null  datetime64[ns]
 5   UnitPrice    541909 non-null  float64       
 6   CustomerID   406829 non-null  float64       
 7   Country      541909 non-null  object        
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 33.1+ MB


In [4]:
df.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom


In [5]:
df.tail()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
541904,581587,22613,PACK OF 20 SPACEBOY NAPKINS,12,2011-12-09 12:50:00,0.85,12680.0,France
541905,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,2011-12-09 12:50:00,2.1,12680.0,France
541906,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,2011-12-09 12:50:00,4.15,12680.0,France
541907,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,2011-12-09 12:50:00,4.15,12680.0,France
541908,581587,22138,BAKING SET 9 PIECE RETROSPOT,3,2011-12-09 12:50:00,4.95,12680.0,France


In [6]:
df.to_csv('online_retail.csv', index = False, sep = ';')

# 2. Инициализация Spark-сессии

In [7]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("SparkFirst")\
    .config("spark.executor.memory", "10g")\
    .config("spark.executor.cores", 5)\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", 5)\
    .config("spark.shuffle.service.enabled", "true")\
.getOrCreate()

# 3. Создание dataframe из скачанного файла

In [8]:
schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", TimestampNTZType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", FloatType(), True),
    #StructField("CustomerID",IntegerType(), True),
    StructField("Country", StringType(), True),
    ])

df_spark = spark.read.csv('online_retail.csv', sep = ";", header=True, schema=schema)

In [9]:
df_spark.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [10]:
df_spark.dtypes

[('InvoiceNo', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'int'),
 ('InvoiceDate', 'timestamp_ntz'),
 ('UnitPrice', 'double'),
 ('CustomerID', 'float'),
 ('Country', 'string')]

# 4. Расчет показателей:

   ## a. Количество строк в файле

In [11]:
df_spark.count()

541909

## b. Количество уникальных клиентов

In [12]:
df_spark.select(count_distinct("CustomerID")).show()

+--------------------------+
|count(DISTINCT CustomerID)|
+--------------------------+
|                      4372|
+--------------------------+



## с. В какой стране совершается больше покупок

In [13]:
df_spark.select("Country").groupby("Country").count().sort("count", ascending=False).limit(1).show()

+--------------+------+
|       Country| count|
+--------------+------+
|United Kingdom|495478|
+--------------+------+



## d. Дата самой ранней и самой последней покупки на платформе

In [14]:
# Дата самой ранней покупки
df_spark.agg({'InvoiceDate': 'min'}).show()

+-------------------+
|   min(InvoiceDate)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



In [15]:
# Дата самой последней покупки
df_spark.agg({'InvoiceDate': 'max'}).show()

+-------------------+
|   max(InvoiceDate)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



# 5. RFM-анализ клиентов платформы

In [16]:
# проверка датасета
df_spark.show(5)
df_spark.printSchema()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows

roo

In [17]:
# Проверка нулевых наличия нулевых значений
def my_count(df_in):
    df_in.agg(*[count(c).alias(c) for c in df_in.columns]).show()

In [18]:
my_count(df_spark)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   541909|   541909|     540455|  541909|     541909|   541909|    406829| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [19]:
# Удалим записи с нулевыми значениями из набора данных
df_spark_clean = df_spark.dropna(how='any')
my_count(df_spark_clean)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   406829|   406829|     406829|  406829|     406829|   406829|    406829| 406829|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [20]:
# добавление столбца "recency"
recency = df_spark_clean.withColumn('recency', F.datediff(F.current_date(), df_spark_clean.InvoiceDate))

In [21]:
recency.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|recency|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|   4651|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|   4651|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|   4651|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|   4651|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|   4651|
+---------+---------+--------------------+--------+-------------------+-

In [22]:
# расчет показателя "frequency", создание нового датасета.
frequency = df_spark_clean.groupBy('CustomerID', 'InvoiceNo').count().\
                           groupBy('CustomerID').agg(count("*").alias("frequency"))
frequency.show(5)

+----------+---------+
|CustomerID|frequency|
+----------+---------+
|   17323.0|        9|
|   13999.0|       15|
|   15512.0|        2|
|   16156.0|       13|
|   13883.0|        9|
+----------+---------+
only showing top 5 rows



In [23]:
# расчет значения "TotalPrice", создание нового столбца с данными значениями
total_price_clean = df_spark_clean.withColumn('TotalPrice', round( df_spark.Quantity * df_spark.UnitPrice, 2 ) )

In [24]:
# расчет показателя "monetary", создание нового датасета
monetary = total_price_clean.groupBy("CustomerID").agg(round(sum('TotalPrice'),2).alias('monetary'))
monetary.show(5)

+----------+--------+
|CustomerID|monetary|
+----------+--------+
|   13999.0| 3865.26|
|   15512.0|   121.0|
|   13305.0| 2000.86|
|   15640.0|12433.34|
|   13094.0| 1708.86|
+----------+--------+
only showing top 5 rows



In [25]:
# добавление новых столбцов в датасет
total_price_new = recency.join(frequency, 'CustomerID', how = 'inner').join(monetary, 'CustomerID', how = 'inner')
total_price_new.show(5)

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-------+---------+--------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|recency|frequency|monetary|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-------+---------+--------+
|   17850.0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|United Kingdom|   4651|       35| 5288.63|
|   17850.0|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|United Kingdom|   4651|       35| 5288.63|
|   17850.0|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|United Kingdom|   4651|       35| 5288.63|
|   17850.0|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|United Kingdom|   4651|       35| 5288.63|
|   17850.0|   536365|   84029E|RED WOOLLY HOTTIE...|  

In [26]:
# В связи с тем, что все покупатели совершили покупку более года назад, разбиение клиентов по столбцу "recency" на значения:
# "последняя покупка в прошлом году", "последняя покупка на прошлой неделе", "последняя покупка в прошлом месяце" 
# нецелесообразна, других критериев нет. Разделим на группы опираясь на описательную статистику.
# Описательная статистика по новым столбцам
total_price_new.select('recency','frequency','monetary').summary().show()

+-------+------------------+-----------------+------------------+
|summary|           recency|        frequency|          monetary|
+-------+------------------+-----------------+------------------+
|  count|            406829|           406829|            406829|
|   mean| 4429.863667535992|23.22480698278638|11067.338028559558|
| stddev|112.85058929698144|45.49945551637789|30144.694797603723|
|    min|              4278|                1|          -4287.63|
|    25%|              4328|                4|           1084.34|
|    50%|              4409|                8|           2616.32|
|    75%|              4525|               18|            6147.4|
|    max|              4651|              248|         279489.02|
+-------+------------------+-----------------+------------------+



In [28]:
# разделим клиентов на группы. А, В, С, где А - клиенты с наибольшим приоритетом, С - с наименьшим.
total_price_new = total_price_new.withColumn('recency_group', F.when(F.col("recency")>4409, "C")\
                                            .when(F.col('recency') > 4325, "B")\
                                            .otherwise("A"))
total_price_new = total_price_new.withColumn('frequency_group', F.when(F.col("frequency")< 8, "C")\
                                            .when(F.col('frequency') < 18, "B")\
                                            .otherwise("A"))
total_price_new = total_price_new.withColumn('monetary_group', F.when(F.col("monetary")< 2616, "C")\
                                            .when(F.col('monetary') < 6147, "B")\
                                            .otherwise("A"))
total_price_new.show(5)

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-------+---------+--------+-------------+---------------+--------------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|recency|frequency|monetary|recency_group|frequency_group|monetary_group|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-------+---------+--------+-------------+---------------+--------------+
|   17850.0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|United Kingdom|   4651|       35| 5288.63|            C|              A|             B|
|   17850.0|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|United Kingdom|   4651|       35| 5288.63|            C|              A|             B|
|   17850.0|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|United K

In [29]:
# проверим распределение по группам
total_price_new = total_price_new.withColumn('groups',
                                            F.concat(F.col('recency_group'),F.col('frequency_group'),F.col('monetary_group')))
total_price_new.select('CustomerID','recency','frequency','monetary','recency_group','frequency_group','monetary_group', 'groups').show(5)

+----------+-------+---------+--------+-------------+---------------+--------------+------+
|CustomerID|recency|frequency|monetary|recency_group|frequency_group|monetary_group|groups|
+----------+-------+---------+--------+-------------+---------------+--------------+------+
|   17323.0|   4293|        9|  908.99|            A|              B|             C|   ABC|
|   17323.0|   4293|        9|  908.99|            A|              B|             C|   ABC|
|   17323.0|   4293|        9|  908.99|            A|              B|             C|   ABC|
|   17323.0|   4297|        9|  908.99|            A|              B|             C|   ABC|
|   17323.0|   4297|        9|  908.99|            A|              B|             C|   ABC|
+----------+-------+---------+--------+-------------+---------------+--------------+------+
only showing top 5 rows



In [30]:
# определим id клиентов, у которых значение по всем трем показателям "recency","frequency","monetary" соответствуют группе А.
result = total_price_new.select(['CustomerID']).filter(total_price_new.groups == 'AAA').distinct()
result.show(5)

+----------+
|CustomerID|
+----------+
|   17315.0|
|   16746.0|
|   12748.0|
|   15039.0|
|   16729.0|
+----------+
only showing top 5 rows



In [31]:
# cохраним результат в отдельный csv файл
result.toPandas().to_csv('4_7_result.csv', index = False)