In [0]:
df = spark.read.parquet("dbfs:///projeto_1/DadosEcommerce/Silver")

In [0]:
df.createOrReplaceTempView("DadosEcommerce")

In [0]:
%sql
select * from DadosEcommerce

count(1)
420718


In [0]:
%sql
create or replace temporary view MONETARY as (select user_id, round( sum(price),2 ) as Monetary from DadosEcommerce group by user_id)

In [0]:
%sql

select * from MONETARY

count(1)
203478


In [0]:
%sql
create or replace temporary view FREQUENCY as (select user_id, count(distinct(order_id)) as Frequency from DadosEcommerce group by user_id)

In [0]:
%sql

select * from FREQUENCY

user_id,Frequency
1515915625512202175,1
1515915625512611915,4
1515915625512959303,1
1515915625514157876,1
1515915625514716804,2
1515915625498275020,2
1515915625514801548,1
1515915625511934842,1
1515915625484655166,7
1515915625514719684,1


In [0]:
%sql
create or replace temporary view RECENCY as (select user_id, min(datediff(current_timestamp,event_time)) as Recency from DadosEcommerce GROUP BY user_id)

In [0]:
%sql 
select * from RECENCY

user_id,Recency
1515915625480625124,726
1515915625511678467,727
1515915625477852636,725
1515915625498341537,722
1515915625484571294,707
1515915625511828479,724
1515915625498342301,703
1515915625511848535,723
1515915625498177839,711
1515915625511735041,681


In [0]:
%sql
show tables

database,tableName,isTemporary
default,rfm,False
,dadosecommerce,True
,frequency,True
,monetary,True
,recency,True


In [0]:
%sql
select r.user_id, r.RECENCY Recency, f.FREQUENCY Frequency, round(m.MONETARY,2) Monetary from frequency f 
join recency r on f.user_id = r.user_id
join monetary m on f.user_id = m.user_id

user_id,Recency,Frequency,Monetary
1515915625512202175,714,1,41.64
1515915625512611915,705,4,912.97
1515915625512959303,699,1,162.01
1515915625514157876,686,1,2.99
1515915625514716804,671,2,145.55
1515915625498275020,706,2,972.18
1515915625514801548,670,1,11.55
1515915625511934842,720,1,115.72
1515915625484655166,671,7,1039.39
1515915625514719684,671,1,6.92


In [0]:
%sql
create or replace table RFM as (
select r.user_id, r.RECENCY Recency, f.FREQUENCY Frequency, round(m.MONETARY,2) Monetary from frequency f 
join recency r on f.user_id = r.user_id
join monetary m on f.user_id = m.user_id
)

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from RFM

user_id,Recency,Frequency,Monetary
1515915625512202175,714,1,41.64
1515915625512611915,705,4,912.97
1515915625512959303,699,1,162.01
1515915625514157876,686,1,2.99
1515915625514716804,671,2,145.55
1515915625498275020,706,2,972.18
1515915625514801548,670,1,11.55
1515915625511934842,720,1,115.72
1515915625484655166,671,7,1039.39
1515915625514719684,671,1,6.92


In [0]:
import pandas

df = spark.sql('select * from RFM').toPandas()

In [0]:
df['recency_score'] = pandas.qcut(df['Recency'], 5, labels=[5,4,3,2,1])
df['frequency_score'] = pandas.qcut(df['Frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
df['monetary_score'] = pandas.qcut(df['Monetary'], 5, labels=[1,2,3,4,5])

In [0]:
df_rfm = df[['user_id', 'recency_score','frequency_score','monetary_score']]
df_rfm = spark.createDataFrame(df_rfm)
display(df_rfm)

user_id,recency_score,frequency_score,monetary_score
1515915625498183345,5,5,4
1515915625514157054,5,1,2
1515915625514163508,5,1,4
1515915625512084413,4,1,1
1515915625512118466,4,1,3
1515915625512558294,5,5,5
1515915625514597050,5,1,1
1515915625514576496,5,1,4
1515915625514594791,5,4,3
1515915625513722733,5,5,5


In [0]:
df_rfm = df_rfm.toPandas()

In [0]:
df_rfm['FM_media'] = (df_rfm['monetary_score'] + df_rfm['frequency_score'])/2

In [0]:
df_rfm_pronto = df_rfm
df_rfm_pronto

Unnamed: 0,user_id,recency_score,frequency_score,monetary_score,FM_media
0,1515915625498183345,5,5,4,4.5
1,1515915625514157054,5,1,2,1.5
2,1515915625514163508,5,1,4,2.5
3,1515915625512084413,4,1,1,1.0
4,1515915625512118466,4,1,3,2.0
...,...,...,...,...,...
203473,1515915625484653838,1,4,2,3.0
203474,1515915625484645645,1,4,1,2.5
203475,1515915625484651609,1,4,1,2.5
203476,1515915625484636729,1,4,2,3.0


In [0]:
import numpy as np
 
regras = [
  (df_rfm_pronto['recency_score'] >= 4) & (df_rfm_pronto['FM_media'] >= 4),
  (df_rfm_pronto['recency_score'] >= 2) & (df_rfm_pronto['FM_media'] >= 3),
  (df_rfm_pronto['recency_score'] >= 3) & (df_rfm_pronto['FM_media'] >= 1) & (df_rfm_pronto['FM_media'] <= 3),
  (df_rfm_pronto['recency_score'] >= 4) & (df_rfm_pronto['FM_media'] <= 1),
  (df_rfm_pronto['recency_score'] >= 3) & (df_rfm_pronto['recency_score'] <= 4) & (df_rfm_pronto['FM_media'] <= 1),
  (df_rfm_pronto['recency_score'] >= 2) & (df_rfm_pronto['recency_score'] <= 3) & (df_rfm_pronto['FM_media'] >= 2) & (df_rfm_pronto['FM_media'] <= 3),
  (df_rfm_pronto['recency_score'] >= 2) & (df_rfm_pronto['recency_score'] <= 3) & (df_rfm_pronto['FM_media'] <= 2),
  (df_rfm_pronto['recency_score'] <= 2) & (df_rfm_pronto['FM_media'] >= 2),
  (df_rfm_pronto['recency_score'] <= 1) & (df_rfm_pronto['FM_media'] >= 4),
  (df_rfm_pronto['recency_score'] >= 1) & (df_rfm_pronto['recency_score'] <= 2) & (df_rfm_pronto['FM_media'] >= 1) & (df_rfm_pronto['FM_media'] <= 2),
  (df_rfm_pronto['recency_score'] <= 2) & (df_rfm_pronto['FM_media'] <= 2)
]
 
segmentos = ['Champions', 'Loyal Costumers', 'Potential Loyalist', 'New Costumers', 'Promising', 'Customers Needing Attention', 'About to Sleep', 'At Risk', 'Cant Lose Them', 'Hibernating', 'Lost']
 
df_rfm_pronto['GRUPOS'] = np.select(regras, segmentos)

df_rfm_pronto

Unnamed: 0,user_id,recency_score,frequency_score,monetary_score,FM_media,GRUPOS
0,1515915625498183345,5,5,4,4.5,Champions
1,1515915625514157054,5,1,2,1.5,Potential Loyalist
2,1515915625514163508,5,1,4,2.5,Potential Loyalist
3,1515915625512084413,4,1,1,1.0,Potential Loyalist
4,1515915625512118466,4,1,3,2.0,Potential Loyalist
...,...,...,...,...,...,...
203473,1515915625484653838,1,4,2,3.0,At Risk
203474,1515915625484645645,1,4,1,2.5,At Risk
203475,1515915625484651609,1,4,1,2.5,At Risk
203476,1515915625484636729,1,4,2,3.0,At Risk


In [0]:
df_rfm_pronto['GRUPOS'].unique()

Out[17]: array(['Champions', 'Potential Loyalist', 'Loyal Costumers',
       'Customers Needing Attention', 'About to Sleep', 'At Risk',
       'Hibernating'], dtype=object)

In [0]:
df_rfm_pronto = spark.createDataFrame(df_rfm_pronto)


In [0]:
display(df_rfm_pronto)

user_id,recency_score,frequency_score,monetary_score,FM_media,GRUPOS
1515915625498183345,5,5,4,4.5,Champions
1515915625514157054,5,1,2,1.5,Potential Loyalist
1515915625514163508,5,1,4,2.5,Potential Loyalist
1515915625512084413,4,1,1,1.0,Potential Loyalist
1515915625512118466,4,1,3,2.0,Potential Loyalist
1515915625512558294,5,5,5,5.0,Champions
1515915625514597050,5,1,1,1.0,Potential Loyalist
1515915625514576496,5,1,4,2.5,Potential Loyalist
1515915625514594791,5,4,3,3.5,Loyal Costumers
1515915625513722733,5,5,5,5.0,Champions
