In [87]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

spark = SparkSession.builder \
      .master("local") \
      .appName("Spark") \
      .config("spark.executor.memory", "10g")\
      .config("spark.executor.cores", 5) \
      .config("spark.dynamicAllocation.enabled", "true") \
      .config("spark.dynamicAllocation.maxExecutors", 5) \
      .config("spark.shuffle.service.enabled", "true") \
      .getOrCreate() 

In [89]:
import pandas as pd
df_excel = pd.read_excel('spark_example_files/online_retail.xlsx', header=0) #, sheetname='<your sheet>'
df_cvs = df_excel.to_csv('online_retail.csv', index= False, sep=';')

In [90]:
df_raw = spark.read.csv('online_retail.csv', sep =";", header = True, inferSchema='true')
df = df_raw.dropna(how='any')

                                                                                

In [91]:
df_raw.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [93]:
#Количество строк в файле
print(df.count())

#Количество уникальных клиентов
df.select(f.count_distinct('CustomerId')).show()

#В какой стране совершается большинство покупок
max_sale_country = df.groupBy('Country').count().orderBy(f.count_distinct('InvoiceNo'), ascending=False).limit(1)
max_sale_country.show()

#Даты самой ранней и самой последней покупки на платформе
df.select([f.min("InvoiceDate")]).alias('MinInvoiceDate').show()
df.select([f.max("InvoiceDate")]).alias('MaxInvoiceDate').show()

#TotalPrice
df = df.withColumn('TotalPrice', f.round(df.Quantity * df.UnitPrice, 2))

#Duration
df = df.withColumn('Duration', f.datediff(f.current_date(), 'InvoiceDate'))

#Recency
recency = df.groupBy('CustomerID').agg(f.min('Duration').alias('Recency'))

#Frequency
frequency = df.groupBy('CustomerID', 'InvoiceNo').count().groupBy('CustomerID').agg(f.count("*").alias("Frequency"))

#Monetary
monetary = df.groupBy('CustomerID').agg(f.round(f.sum('TotalPrice'), 2).alias('Monetary'))

#RFM
rfm = recency.join(frequency,'CustomerID', how = 'inner')\
             .join(monetary,'CustomerID', how = 'inner')

rfm.show()



                                                                                

406829


                                                                                

+--------------------------+
|count(DISTINCT CustomerId)|
+--------------------------+
|                      4372|
+--------------------------+



                                                                                

+--------------+------+
|       Country| count|
+--------------+------+
|United Kingdom|361878|
+--------------+------+



                                                                                

+-------------------+
|   min(InvoiceDate)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



                                                                                

+-------------------+
|   max(InvoiceDate)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



[Stage 549:>  (0 + 1) / 1][Stage 551:>  (0 + 0) / 1][Stage 553:>  (0 + 0) / 1]1]

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|   13094.0|   4299|       16| 1708.86|
|   17884.0|   4281|        4|  717.45|
|   16561.0|   4283|        2|  511.12|
|   13973.0|   4565|        1|   264.7|
|   14285.0|   4299|        4| 1910.01|
|   17633.0|   4309|        6| 1242.34|
|   13956.0|   4283|        5| 1026.42|
|   13533.0|   4460|        3|  270.79|
|   13918.0|   4327|        2| 1212.84|
|   12493.0|   4443|        3|  416.79|
|   17267.0|   4405|        2|  317.62|
|   14768.0|   4295|        3|   139.5|
|   15776.0|   4411|        1|  241.62|
|   16629.0|   4351|        2|  417.73|
|   14473.0|   4352|        2|  234.34|
|   18114.0|   4568|        1|   220.1|
|   13607.0|   4318|        1|  678.01|
|   16596.0|   4293|        2|  250.15|
|   14024.0|   4399|        2|   327.7|
|   14452.0|   4288|        2|  264.44|
+----------+-------+---------+--------+
only showing top 20 rows



                                                                                

In [96]:
import numpy as np

def describe_pd(df_in, columns, deciles=False):
    if deciles:
        percentiles = np.array(range(0, 110, 10))
    else:
        percentiles = [25, 50, 75]

    percs = np.transpose([np.percentile(df_in.select(x).collect(), percentiles) for x in columns])
    percs = pd.DataFrame(percs, columns=columns)
    percs['summary'] = [str(p) + '%' for p in percentiles]

    spark_describe = df_in.describe().toPandas()
    new_df = pd.concat([spark_describe, percs],ignore_index=True)
    new_df = new_df.round(2)
    return new_df[['summary'] + columns]

cols = ['Recency','Frequency','Monetary']
describe_pd(rfm , cols, 1)

                                                                                

Unnamed: 0,summary,Recency,Frequency,Monetary
0,count,4372.0,4372.0,4372.0
1,mean,4369.581198536139,5.07548032936871,1898.459700365968
2,stddev,100.77213931384824,9.338754163574729,8219.345141139713
3,min,4278.0,1.0,-4287.63
4,max,4651.0,248.0,279489.02
5,0%,4278.0,1.0,-4287.63
6,10%,4282.0,1.0,146.022
7,20%,4289.0,1.0,234.392
8,30%,4299.0,1.0,337.37
9,40%,4309.0,2.0,465.412


In [97]:
def Recency(x):
    if  x <= 4289:
        return 'A'
    elif x<= 4328:
        return 'B'
    else:
        return 'C'

def Frequency(x):
    if  x <= 1:
        return 'C'
    elif x <= 3:
        return 'B'
    else:
        return 'A'

def Monetary(x):
    if  x <= 234:
        return 'C'
    elif x <= 648:
        return 'B'
    else:
        return 'A'

Recency_udf = f.udf(lambda x: Recency(x), StringType())
Frequency_udf = f.udf(lambda x: Frequency(x), StringType())
Monetary_udf = f.udf(lambda x: Monetary(x), StringType())

rfm_segmet = rfm.withColumn('r_segment', Recency_udf('Recency'))
rfm_segmet = rfm_segmet.withColumn('f_segment', Frequency_udf('Frequency'))
rfm_segmet = rfm_segmet.withColumn('m_segment', Monetary_udf('Monetary'))

rfm_segmet.show(5)


[Stage 643:>  (0 + 1) / 1][Stage 645:>  (0 + 0) / 1][Stage 647:>  (0 + 0) / 1]1]

+----------+-------+---------+--------+---------+---------+---------+
|CustomerID|Recency|Frequency|Monetary|r_segment|f_segment|m_segment|
+----------+-------+---------+--------+---------+---------+---------+
|   13094.0|   4299|       16| 1708.86|        B|        A|        A|
|   17884.0|   4281|        4|  717.45|        A|        A|        A|
|   16561.0|   4283|        2|  511.12|        A|        B|        B|
|   13973.0|   4565|        1|   264.7|        C|        C|        B|
|   14285.0|   4299|        4| 1910.01|        B|        A|        A|
+----------+-------+---------+--------+---------+---------+---------+
only showing top 5 rows



                                                                                

In [98]:
rfm_segmet = rfm_segmet.withColumn('RFMScore',f.concat(f.col('r_segment'),f.col('f_segment'), f.col('m_segment')))
rfm_segmet.sort(f.col('RFMScore')).show(5)

[Stage 655:>  (0 + 1) / 1][Stage 657:>  (0 + 0) / 1][Stage 659:>  (0 + 0) / 1]1]

+----------+-------+---------+--------+---------+---------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|r_segment|f_segment|m_segment|RFMScore|
+----------+-------+---------+--------+---------+---------+---------+--------+
|   15145.0|   4282|        4| 1194.73|        A|        A|        A|     AAA|
|   15311.0|   4278|      118|59419.34|        A|        A|        A|     AAA|
|   13956.0|   4283|        5| 1026.42|        A|        A|        A|     AAA|
|   16353.0|   4281|       23| 6675.71|        A|        A|        A|     AAA|
|   17659.0|   4281|       14| 2954.75|        A|        A|        A|     AAA|
+----------+-------+---------+--------+---------+---------+---------+--------+
only showing top 5 rows



                                                                                

In [100]:
top_cust = rfm_segmet.select(f.col('CustomerID')).filter(f.col('RFMScore') == 'AAA')
top_cust.write.csv('result')

                                                                                