# RFM (PYSPARK)

In [1]:
import numpy as np
import pandas as pd
import os
import datetime
from datetime import datetime, timedelta

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import *

Inicializamos Spark y leemos el archivo.

In [2]:
spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)



In [6]:
df = sqlContext.read.csv('2019-Oct-Nov-transformed.csv', header='True')
df.show(5)

+----------+----------+----------+--------------------+-------+------+---------+
|event_time|event_type|product_id|       category_code|  brand| price|  user_id|
+----------+----------+----------+--------------------+-------+------+---------+
|2019-10-01|  purchase|   1004856|electronics.smart...|samsung|130.76|543272936|
|2019-10-01|  purchase|   1002532|electronics.smart...|  apple|642.69|551377651|
|2019-10-01|  purchase|   5100816|         no category| xiaomi| 29.51|514591159|
|2019-10-01|  purchase|  13800054|furniture.bathroo...|santeri| 54.42|555332717|
|2019-10-01|  purchase|   4804055|electronics.audio...|  apple|189.91|524601178|
+----------+----------+----------+--------------------+-------+------+---------+
only showing top 5 rows



### Filtramos solo las columnas que necesitamos y parseamos a tipo fecha event_time.

In [7]:
from pyspark.sql.functions import *

df = df.select(to_date(df.event_time, 'yyyy-MM-dd').alias('event_time'), 
             df.event_type, df.product_id, df.category_code, df.brand, df.price, df.user_id)
df.show(5) 

+----------+----------+----------+--------------------+-------+------+---------+
|event_time|event_type|product_id|       category_code|  brand| price|  user_id|
+----------+----------+----------+--------------------+-------+------+---------+
|2019-10-01|  purchase|   1004856|electronics.smart...|samsung|130.76|543272936|
|2019-10-01|  purchase|   1002532|electronics.smart...|  apple|642.69|551377651|
|2019-10-01|  purchase|   5100816|         no category| xiaomi| 29.51|514591159|
|2019-10-01|  purchase|  13800054|furniture.bathroo...|santeri| 54.42|555332717|
|2019-10-01|  purchase|   4804055|electronics.audio...|  apple|189.91|524601178|
+----------+----------+----------+--------------------+-------+------+---------+
only showing top 5 rows



In [None]:
#df.filter("user_id == 549473770").collect()
#df.show(10)

### RECENCY

In [8]:
window= np.datetime64('2019-12-01','ns')
window = window.astype("datetime64[D]")

In [9]:
from pyspark.sql.functions import col, max as max_

df_recency = (df.withColumn("event_time", col("event_time").cast("timestamp"))
    .groupBy("user_id")
    .agg(max_("event_time")))

df_recency.show(5)

+---------+-------------------+
|  user_id|    max(event_time)|
+---------+-------------------+
|549473770|2019-10-01 00:00:00|
|545206959|2019-10-01 00:00:00|
|512977906|2019-10-01 00:00:00|
|525595314|2019-10-11 00:00:00|
|516885410|2019-10-01 00:00:00|
+---------+-------------------+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import datediff, to_date, lit

df_recency = df_recency.withColumn("Recency", 
              datediff(to_date(lit("2019-12-01")),
                       to_date("max(event_time)","yyyy/MM/dd")))
df_recency.show(5)

+---------+-------------------+-------+
|  user_id|    max(event_time)|Recency|
+---------+-------------------+-------+
|549473770|2019-10-01 00:00:00|     61|
|545206959|2019-10-01 00:00:00|     61|
|512977906|2019-10-01 00:00:00|     61|
|525595314|2019-10-11 00:00:00|     51|
|516885410|2019-10-01 00:00:00|     61|
+---------+-------------------+-------+
only showing top 5 rows



In [11]:
df_recency = df_recency.drop('max(event_time)')
df_recency.show(5)

+---------+-------+
|  user_id|Recency|
+---------+-------+
|549473770|     61|
|545206959|     61|
|512977906|     61|
|525595314|     51|
|516885410|     61|
+---------+-------+
only showing top 5 rows



### FREQUENCY

In [12]:
df_frequency = df.dropDuplicates()

In [13]:
df_frequency = df_frequency.groupBy("user_id").count()

In [14]:
df_frequency.show(5)

+---------+-----+
|  user_id|count|
+---------+-----+
|553669125|    1|
|525490930|    3|
|548334045|    7|
|537328726|    1|
|541758012|    1|
+---------+-----+
only showing top 5 rows



In [15]:
df_frequency=df_frequency.withColumnRenamed("count","Frequency")

### MONETARY

In [16]:
df_monetary = df.groupBy("user_id").agg({"price":"sum"})
df_monetary=df_monetary.withColumnRenamed("sum(price)","Monetary")
df_monetary.show(5)

+---------+-----------------+
|  user_id|         Monetary|
+---------+-----------------+
|549473770|           165.24|
|545206959|           295.49|
|512977906|           286.86|
|525595314|948.6399999999999|
|516885410|           130.76|
+---------+-----------------+
only showing top 5 rows



### MERGE DE LOS TRES DF

In [17]:
df_rfm = df_recency.join(df_frequency, how='inner', on='user_id')
df_rfm.show(5)

+---------+-------+---------+
|  user_id|Recency|Frequency|
+---------+-------+---------+
|138340325|     20|        1|
|225644257|     19|        1|
|253299396|     25|        1|
|256164170|     11|        1|
|264649825|     56|        2|
+---------+-------+---------+
only showing top 5 rows



In [18]:
df_rfm = df_rfm.join(df_monetary, how='inner', on='user_id')
df_rfm.show(5)

+---------+-------+---------+--------+
|  user_id|Recency|Frequency|Monetary|
+---------+-------+---------+--------+
|138340325|     20|        1|    93.5|
|225644257|     19|        1|   40.91|
|253299396|     25|        1|  246.85|
|256164170|     11|        1|  113.23|
|264649825|     56|        2| 1240.04|
+---------+-------+---------+--------+
only showing top 5 rows



### Eliminamos outliers utilizando el percentil 98.

Chequeamos máx y min de Recency

In [19]:
print(df_rfm.agg({"Recency": "max"}).collect()[0][0])
print(df_rfm.agg({"Recency": "min"}).collect()[0][0])

61
1


Chequeamos máx y mín de Frequency

In [23]:
print(df_rfm.agg({"Frequency": "max"}).collect()[0][0])
print(df_rfm.agg({"Frequency": "min"}).collect()[0][0])

9
1


Chequeamos máx y mín para Monetary

In [25]:
print(df_rfm.agg({"Monetary": "max"}).collect()[0][0])
print(df_rfm.agg({"Monetary": "min"}).collect()[0][0])

3429.41
17.990000000000002


Para Frequency, calculamos el percentil 98 (q98) y nos quedamos con los valores menores a él.

In [None]:
#from pyspark.sql.functions import percentile_approx

#base.approxQuantile("R", [0.98], 0)

#Outlier_M=base.approxQuantile("M", [0.99], 0)[0]
#Outlier_F=base.approxQuantile("F", [0.99], 0)[0]
#base_outliers=base.filter((base['M']>=Outlier_M) & (base['F']>=Outlier_F))

#df_rfm.select(percentile_approx("Recency", [0.98]).alias("quantiles"))


In [22]:
p98 = df_rfm.approxQuantile("Frequency", [0.98], 0)[0]
df_rfm = df_rfm.filter(df_rfm["Frequency"] <= p98)


+---------+-------+---------+--------+
|  user_id|Recency|Frequency|Monetary|
+---------+-------+---------+--------+
|138340325|     20|        1|    93.5|
|225644257|     19|        1|   40.91|
|253299396|     25|        1|  246.85|
|256164170|     11|        1|  113.23|
|264649825|     56|        2| 1240.04|
|267316896|     17|        1|  189.71|
|276604124|     19|        1|    7.18|
+---------+-------+---------+--------+
only showing top 7 rows



Para Monetary, calculamos el percentil 2 y el percentil 98 (p2 y p98) y nos quedamos con el intervalo entre estos dos valores.

In [24]:
p2 = df_rfm.approxQuantile("Monetary", [0.02], 0)[0]
p98 = df_rfm.approxQuantile("Monetary", [0.98], 0)[0]
df_rfm = df_rfm.filter((df_rfm["Monetary"] <= p98) & (df_rfm["Monetary"] >= p2))

df_rfm.show(7)

+---------+-------+---------+--------+
|  user_id|Recency|Frequency|Monetary|
+---------+-------+---------+--------+
|138340325|     20|        1|    93.5|
|225644257|     19|        1|   40.91|
|253299396|     25|        1|  246.85|
|256164170|     11|        1|  113.23|
|264649825|     56|        2| 1240.04|
|267316896|     17|        1|  189.71|
|282274853|     23|        1|   79.15|
+---------+-------+---------+--------+
only showing top 7 rows



### Asignamos quintiles

In [26]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from itertools import chain
from pyspark.sql.functions import create_map, lit

#Asignamos cuantiles a Recency
rfm = df_rfm.select("user_id","Recency","Frequency",'Monetary', F.ntile(5).over(Window.partitionBy().orderBy(df_rfm['Recency'])).alias("R_")) 

#Remapeamos los números para que los 1 sean 5, 2 sean 4, etc.
simple_dict = {1:5, 2:4, 3:3, 4:2, 5:1}
mapping_expr = create_map([lit(x) for x in chain(*simple_dict.items())])

rfm =rfm.withColumn('R', mapping_expr[rfm['R_']])
rfm = rfm.drop('R_')

rfm.show(truncate=False)

+---------+-------+---------+------------------+---+
|user_id  |Recency|Frequency|Monetary          |R  |
+---------+-------+---------+------------------+---+
|296197073|1      |2        |61.52             |5  |
|453412907|1      |1        |64.35             |5  |
|457173341|1      |1        |35.78             |5  |
|467991797|1      |1        |282.89            |5  |
|471516534|1      |2        |157.81            |5  |
|474035876|1      |1        |308.82            |5  |
|483102654|1      |1        |229.59            |5  |
|488123390|1      |7        |1334.1299999999999|5  |
|494367224|1      |1        |141.55            |5  |
|495059418|1      |1        |36.53             |5  |
|499480531|1      |1        |167.03            |5  |
|501673590|1      |1        |916.37            |5  |
|505654193|1      |2        |478.77            |5  |
|512363666|1      |2        |982.65            |5  |
|512363681|1      |4        |259.96000000000004|5  |
|512364123|1      |4        |904.39           

In [27]:
#Para Frequency
simple_dict = {1:1, 2:1, 3:2, 4:2, 5:3, 6:3, 7:4, 8:4, 9:5,10:5, 11:5}
mapping_expr = create_map([lit(x) for x in chain(*simple_dict.items())])
rfm = rfm.withColumn('F', mapping_expr[rfm['Frequency']])
rfm.show(5)

+---------+-------+---------+--------+---+---+
|  user_id|Recency|Frequency|Monetary|  R|  F|
+---------+-------+---------+--------+---+---+
|296197073|      1|        2|   61.52|  5|  1|
|453412907|      1|        1|   64.35|  5|  1|
|457173341|      1|        1|   35.78|  5|  1|
|467991797|      1|        1|  282.89|  5|  1|
|471516534|      1|        2|  157.81|  5|  1|
+---------+-------+---------+--------+---+---+
only showing top 5 rows



In [28]:
#Monetary
rfm.collect()
rfm = rfm.select("user_id","Recency","Frequency",'Monetary',"R","F", F.ntile(5).over(Window.partitionBy().orderBy(df_rfm['Monetary'])).alias("M"))
rfm.show(5)

+---------+-------+---------+------------------+---+---+---+
|  user_id|Recency|Frequency|          Monetary|  R|  F|  M|
+---------+-------+---------+------------------+---+---+---+
|520238048|      3|        2|17.990000000000002|  5|  1|  1|
|536598093|     48|        2|17.990000000000002|  1|  1|  1|
|571296638|     17|        1|              18.0|  3|  1|  1|
|513493902|     61|        1|              18.0|  1|  1|  1|
|545022470|     15|        2|18.009999999999998|  3|  1|  1|
+---------+-------+---------+------------------+---+---+---+
only showing top 5 rows



In [29]:
#Creamos la columna RFM_SCORE concatenando las columnas R, F, M.
rfm=rfm.select('user_id','Recency','Frequency','R','F','M',concat(rfm.R, rfm.F, rfm.M).alias("RFM_SCORE"))
rfm.show(5)

+---------+-------+---------+---+---+---+---------+
|  user_id|Recency|Frequency|  R|  F|  M|RFM_SCORE|
+---------+-------+---------+---+---+---+---------+
|520238048|      3|        2|  5|  1|  1|      511|
|536598093|     48|        2|  1|  1|  1|      111|
|571296638|     17|        1|  3|  1|  1|      311|
|513493902|     61|        1|  1|  1|  1|      111|
|545022470|     15|        2|  3|  1|  1|      311|
+---------+-------+---------+---+---+---+---------+
only showing top 5 rows



### A partir del RFM_SCORE, mapeamos los segmentos.

In [None]:
seg_map = sqlContext.read.csv('/content/drive/MyDrive/?TESIS/segmentos_rfm.csv', header='True')
seg_map = seg_map.drop("Value")
seg_map.show(5)

+---------+-------+
|RFM_SCORE|segment|
+---------+-------+
|      111|   Lost|
|      112|   Lost|
|      113|   Lost|
|      114|At Risk|
|      115|At Risk|
+---------+-------+
only showing top 5 rows



In [None]:
rfm = rfm.join(seg_map, how='inner', on='RFM_SCORE')
rfm.show(5)

+---------+---------+-------+---------+---+---+---+-------------+---------+-------------+-------------+
|RFM_SCORE|  user_id|Recency|Frequency|  R|  F|  M|      segment|    Value|      segment|      segment|
+---------+---------+-------+---------+---+---+---+-------------+---------+-------------+-------------+
|      411|520573734|     14|        1|  4|  1|  1|    Promising|     Good|    Promising|    Promising|
|      411|513378422|     11|        1|  4|  1|  1|    Promising|     Good|    Promising|    Promising|
|      511|517557126|      5|        1|  5|  1|  1|New Customers|Very Good|New Customers|New Customers|
|      511|524830675|      8|        1|  5|  1|  1|New Customers|Very Good|New Customers|New Customers|
|      411|513377188|     12|        1|  4|  1|  1|    Promising|     Good|    Promising|    Promising|
+---------+---------+-------+---------+---+---+---+-------------+---------+-------------+-------------+
only showing top 5 rows



Exportamos el dataset

In [None]:
rfm.repartition(1).write.csv('work/RFM/RFM PYSPARK Resultados.csv')
