In [1]:
import numpy as np
import pandas as pd

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col, asc, desc, count,round, mean, min, max, sum, datediff, to_date
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col

In [3]:
import findspark
findspark.init()

In [4]:
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkFirst") \
.config("spark.memory.offHeap.enabled","true") \
.config("spark.memory.offHeap.size","15g")\
.getOrCreate() 

In [5]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [6]:
df = spark.read.csv("K:/DE_Projects/DE_T4.7/data/data.csv", inferSchema=True, header=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
df.show()

+---+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|_c0|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|  0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|  1|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|  2|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|  3|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|  4|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|  5|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United K

In [8]:
#Количество строк в файле
df.count()

541909

In [9]:
#Количество уникальных клиентов
pandas_df = df.toPandas()
pandas_df.nunique ()

_c0            541909
InvoiceNo       25900
StockCode        4070
Description      4223
Quantity          722
InvoiceDate     23260
UnitPrice        1630
CustomerID       4372
Country            38
dtype: int64

In [10]:
#В какой стране совершается большинство покупок
df1 = df.groupBy('Country').count().orderBy('count',ascending=False).show()

+---------------+------+
|        Country| count|
+---------------+------+
| United Kingdom|495478|
|        Germany|  9495|
|         France|  8557|
|           EIRE|  8196|
|          Spain|  2533|
|    Netherlands|  2371|
|        Belgium|  2069|
|    Switzerland|  2002|
|       Portugal|  1519|
|      Australia|  1259|
|         Norway|  1086|
|          Italy|   803|
|Channel Islands|   758|
|        Finland|   695|
|         Cyprus|   622|
|         Sweden|   462|
|    Unspecified|   446|
|        Austria|   401|
|        Denmark|   389|
|          Japan|   358|
+---------------+------+
only showing top 20 rows



In [11]:
#Даты самой ранней и самой последней покупки на платформе
df2=df.agg({'InvoiceDate':'max'}).show()

+-------------------+
|   max(InvoiceDate)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



In [12]:
df3=df.agg({'InvoiceDate':'min'}).show()

+-------------------+
|   min(InvoiceDate)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



In [13]:
#RFM-анализ клиентов платформы


In [14]:
def my_count(df_in):
    df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()

In [15]:
my_count(df)

+------+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   _c0|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+------+---------+---------+-----------+--------+-----------+---------+----------+-------+
|541909|   541909|   541909|     540455|  541909|     541909|   541909|    406829| 541909|
+------+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [16]:
df_n = df.dropna(how='any')
my_count(df_n)

+------+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   _c0|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+------+---------+---------+-----------+--------+-----------+---------+----------+-------+
|406829|   406829|   406829|     406829|  406829|     406829|   406829|    406829| 406829|
+------+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [17]:
df_n.select(['InvoiceDate']).dtypes

[('InvoiceDate', 'string')]

In [18]:
df_n = df_n.withColumn('TotalPrice', round( df.Quantity * df.UnitPrice, 2 ) )

In [19]:
date_max = df_n.select(max('InvoiceDate')).toPandas()
current = to_utc_timestamp( unix_timestamp(lit(str(date_max.iloc[0][0])), \
                              'yy-MM-dd HH:mm').cast('timestamp'), 'UTC' )

df_n = df_n.withColumn('Duration', datediff(lit(current), 'InvoiceDate'))

In [20]:
recency = df_n.groupBy('CustomerID').agg(min('Duration').alias('Recency'))
frequency = df_n.groupBy('CustomerID', 'InvoiceNo').count()\
                        .groupBy('CustomerID')\
                        .agg(count("*").alias("Frequency"))
monetary = df_n.groupBy('CustomerID').agg(round(sum('TotalPrice'), 2).alias('Monetary'))
rfm = recency.join(frequency,'CustomerID', how = 'inner')\
             .join(monetary,'CustomerID', how = 'inner')

In [21]:
rfm.show(5)

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|   13094.0|     21|       16| 1708.86|
|   17884.0|      3|        4|  717.45|
|   16561.0|      5|        2|  511.12|
|   13973.0|    287|        1|   264.7|
|   14285.0|     21|        4| 1910.01|
+----------+-------+---------+--------+
only showing top 5 rows



In [22]:
cols = ['Recency','Frequency','Monetary']
rfm.select(cols).describe().show()

+-------+------------------+-----------------+-----------------+
|summary|           Recency|        Frequency|         Monetary|
+-------+------------------+-----------------+-----------------+
|  count|              4372|             4372|             4372|
|   mean| 91.58119853613907| 5.07548032936871|1898.459700365968|
| stddev|100.77213931384843|9.338754163574727|8219.345141139713|
|    min|                 0|                1|         -4287.63|
|    max|               373|              248|        279489.02|
+-------+------------------+-----------------+-----------------+



In [23]:
def describe_pd(df_in, columns, deciles=False):
    
    if deciles:
        percentiles = np.array(range(0, 110, 10))
    else:
        percentiles = [25, 50, 75]

    percs = np.transpose([np.percentile(df_in.select(x).collect(), percentiles) for x in columns])
    percs = pd.DataFrame(percs, columns=columns)
    percs['summary'] = [str(p) + '%' for p in percentiles]

    spark_describe = df_in.describe().toPandas()
    new_df = pd.concat([spark_describe, percs],ignore_index=True)
    new_df = new_df.round(2)
    return new_df[['summary'] + columns]

In [24]:
cols = ['Recency','Frequency','Monetary']
describe_pd(rfm,cols,1)

Unnamed: 0,summary,Recency,Frequency,Monetary
0,count,4372.0,4372.0,4372.0
1,mean,91.58119853613908,5.07548032936871,1898.459700365968
2,stddev,100.77213931384844,9.338754163574729,8219.345141139713
3,min,0.0,1.0,-4287.63
4,max,373.0,248.0,279489.02
5,0%,0.0,1.0,-4287.63
6,10%,4.0,1.0,146.022
7,20%,11.0,1.0,234.392
8,30%,21.0,1.0,337.37
9,40%,31.0,2.0,465.412


In [25]:
rfm.summary().show()

+-------+------------------+------------------+-----------------+-----------------+
|summary|        CustomerID|           Recency|        Frequency|         Monetary|
+-------+------------------+------------------+-----------------+-----------------+
|  count|              4372|              4372|             4372|             4372|
|   mean|15299.677721866423| 91.58119853613907| 5.07548032936871|1898.459700365968|
| stddev|1722.3907054276901|100.77213931384843|9.338754163574727|8219.345141139713|
|    min|           12346.0|                 0|                1|         -4287.63|
|    25%|           13812.0|                16|                1|            293.1|
|    50%|           15300.0|                50|                3|           647.74|
|    75%|           16778.0|               143|                5|          1611.59|
|    max|           18287.0|               373|              248|        279489.02|
+-------+------------------+------------------+-----------------+-----------

In [26]:
def RScore(x):
    if  x <= 21:
        return 'A'
    elif x<= 71:
        return 'B'
    else:
        return 'C'

def FScore(x):
    if  x <= 1:
        return 'C'
    elif x <= 4:
        return 'B'
    else:
        return 'A'

def MScore(x):
    if  x <= 337:
        return'C'
    elif x <= 909:
        return 'B'
    else:
        return 'A'

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = udf(lambda x: RScore(x), StringType())
F_udf = udf(lambda x: FScore(x), StringType())
M_udf = udf(lambda x: MScore(x), StringType())

In [27]:
rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))

In [28]:
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))

In [29]:
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))

In [30]:
rfm_seg.show(5)

+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
|   13094.0|     21|       16| 1708.86|    A|    A|    A|
|   17884.0|      3|        4|  717.45|    A|    B|    B|
|   16561.0|      5|        2|  511.12|    A|    B|    B|
|   13973.0|    287|        1|   264.7|    C|    C|    C|
|   14285.0|     21|        4| 1910.01|    A|    B|    A|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows



In [31]:
rfm_seg = rfm_seg.withColumn('RFMScore',
                             F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg')))

In [32]:
rfm_seg.sort(F.col('RFMScore')).show(5)

+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
|   13956.0|      5|        5| 1026.42|    A|    A|    A|     AAA|
|   13094.0|     21|       16| 1708.86|    A|    A|    A|     AAA|
|   17659.0|      3|       14| 2954.75|    A|    A|    A|     AAA|
|   15311.0|      0|      118|59419.34|    A|    A|    A|     AAA|
|   16353.0|      3|       23| 6675.71|    A|    A|    A|     AAA|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows



In [33]:
rfm_seg_filtered = rfm_seg.filter("RFMScore = 'AAA'")

In [34]:
rfm_seg_filtered.count()

724

In [35]:
rfm_final=rfm_seg_filtered.toPandas()

In [36]:
rfm_final.to_csv ("K:/DE_Projects/DE_T4.7/data/rfm_final.csv",  
                  index = True, 
                  header=True,
                  encoding='utf-8-sig')