In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName('ecommerce-rfm').getOrCreate()

## I - Data Ingestion

In [3]:
schema = T.StructType([ \
    T.StructField('InvoiceNo', T.StringType(), True), \
    T.StructField('StockCode', T.StringType(), True), \
    T.StructField('Description', T.StringType(), True), \
    T.StructField('Quantity', T.IntegerType(), True), \
    T.StructField('InvoiceDate', T.StringType(), True), \
    T.StructField('UnitPrice', T.FloatType(), True), \
    T.StructField('CustomerID', T.StringType(), True), \
    T.StructField('Country', T.StringType(), True) \
])

In [4]:
df_raw = spark.read.csv('data/trusted/data.csv', header=True, schema=schema)
df_raw.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [5]:
df_raw.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [6]:
df_raw.count()

541909

## II - Data Transformation

### 1 - Conversão de InvoiceDate para Timestamp

In [7]:
#Conversão de Data
#PS: Nova versão do Pyspark pede que oo formato contenha somente uma letra. Ex: M/d/y ao inves de MM/dd/yyyy
df = df_raw.withColumn('InvoiceDate', F.to_timestamp(df_raw.InvoiceDate,'M/d/y h:m'))

In [8]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS S

### 2 - Eliminar linhas por valores

In [9]:
# Cria uma expressão SQL que filtra varios valores para todas as colunas da tabela

def expression_drop_columns_by_value(value):
    dfexpr = ' AND '.join(f'({col_name} NOT IN ({value}))' for col_name in df.columns)
    return str(dfexpr)

dfexpr = expression_drop_columns_by_value('"?", "??", "missing", "NaN"')
dfexpr

'(InvoiceNo NOT IN ("?", "??", "missing", "NaN")) AND (StockCode NOT IN ("?", "??", "missing", "NaN")) AND (Description NOT IN ("?", "??", "missing", "NaN")) AND (Quantity NOT IN ("?", "??", "missing", "NaN")) AND (InvoiceDate NOT IN ("?", "??", "missing", "NaN")) AND (UnitPrice NOT IN ("?", "??", "missing", "NaN")) AND (CustomerID NOT IN ("?", "??", "missing", "NaN")) AND (Country NOT IN ("?", "??", "missing", "NaN"))'

In [10]:
df = df.filter(dfexpr)


In [11]:
df.count()

194222

### 3 - Verificar e eliminar Nulos

In [12]:
#Identificar valores nulos para todas as colunas no PySpark

df.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [13]:
df = df.dropna()

In [14]:
df.count()

194222

### 4 - Ajustar Texto

In [15]:
# Letras Minusculas
df = df.withColumn('Description',F.lower(df.Description))

In [16]:
# Eliminar espaços em branco na esquerda e direita - .strip() em pandas
df = df.withColumn('Description',F.trim(df.Description))

In [17]:
# Replace
df = df.withColumn('Description',F.regexp_replace(df.Description,'\n', ''))

In [18]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|white hanging hea...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| white metal lantern|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|cream cupid heart...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|knitted union fla...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|red woolly hottie...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|    22752|set 7 babushka ne...|       2|2010-12-01 08:26:00|     7.65|     17850|United Kingdom|
|   536365|    21730|glass s

### 5 - Filtrar Descrições que não são itens

In [19]:
#Eliminar valores especificos
df = df.filter(~df.Description.isin(["amazon fee", "samples", "postage", "packing charge","manual","discount","adjust bad debt","bank charges","cruk commission","next day carriage"]))
df.count()

193245

In [20]:
#Eliminar valores nulos e negativos
df = df.filter((df.UnitPrice > 0) & (df.Quantity > 0))
df.count()

189391

In [21]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|white hanging hea...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| white metal lantern|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|cream cupid heart...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|knitted union fla...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|red woolly hottie...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|    22752|set 7 babushka ne...|       2|2010-12-01 08:26:00|     7.65|     17850|United Kingdom|
|   536365|    21730|glass s

### 6 - Adicionar Coluna para diferença de tempo entre hoje e ultima compra

In [22]:
#Codigo se for para o dia atual
#df.withColumn('DaysFromToday', F.datediff(F.current_date(),F.col('InvoiceDate'))).show(5000)

#Codigo para data determinada
df = df.withColumn('DaysFromToday', F.datediff(F.lit('2011-12-09 00:00:00.0000'),F.col('InvoiceDate')))
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DaysFromToday|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+
|   536365|   85123A|white hanging hea...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|          373|
|   536365|    71053| white metal lantern|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|          373|
|   536365|   84406B|cream cupid heart...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|          373|
|   536365|   84029G|knitted union fla...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|          373|
|   536365|   84029E|red woolly hottie...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|          373|
|   536365|    22752|set

##### Funcoes que usam DateType e TimestampType aceitam no seguinte formato:
###### DateType default format is yyyy-MM-dd 
###### TimestampType default format is yyyy-MM-dd HH:mm:ss.SSSS

## III - Criação da Tabela RFM

### 1 - Agrupar dados para RFM

In [23]:
df_rfm = df.groupBy('CustomerID').agg(F.min('DaysFromToday').alias('MinDaysFromToday'), \
                             F.approx_count_distinct('InvoiceNo').alias('CountUniqueItems'), \
                             F.sum('UnitPrice').alias('SumUnitPrice'))
df_rfm.show()

+----------+----------------+----------------+------------------+
|CustomerID|MinDaysFromToday|CountUniqueItems|      SumUnitPrice|
+----------+----------------+----------------+------------------+
|     16250|             373|               1|  47.2699992954731|
|     15555|              33|               5| 424.4699979946017|
|     15574|             177|               3|169.06999880075455|
|     15271|              36|               6| 267.1999993920326|
|     17714|             320|               1|              20.5|
|     12637|              91|               4| 440.4699971526861|
|     13107|              85|               3| 99.52999895811081|
|     13027|             113|               4| 7.600000113248825|
|     12957|               9|               5|403.70999773964286|
|     17128|             334|               1|55.790000200271606|
|     14439|             319|               1|207.38999783992767|
|     14810|              40|               9| 497.9999971687794|
|     1749

###### SumUnitPrice -> Soma dos Preços
###### CountUniqueItems -> Contagem distinta dos itens comprados
###### MinDaysFromToday -> Dias entre hoje e data da última compra

### 2 - Arredondar valores de Preço

In [24]:
df_rfm = df_rfm.withColumn('SumUnitPrice', F.round('SumUnitPrice',2))
df_rfm.show()

+----------+----------------+----------------+------------+
|CustomerID|MinDaysFromToday|CountUniqueItems|SumUnitPrice|
+----------+----------------+----------------+------------+
|     16250|             373|               1|       47.27|
|     15555|              33|               5|      424.47|
|     15574|             177|               3|      169.07|
|     15271|              36|               6|       267.2|
|     17714|             320|               1|        20.5|
|     12637|              91|               4|      440.47|
|     13107|              85|               3|       99.53|
|     13027|             113|               4|         7.6|
|     12957|               9|               5|      403.71|
|     17128|             334|               1|       55.79|
|     14439|             319|               1|      207.39|
|     14810|              40|               9|       498.0|
|     17491|              36|               5|      135.49|
|     17917|             362|           

### 3 - Visualizar ordenação por dias desde a última compra

In [25]:
df_rfm.orderBy('MinDaysFromToday').show()

+----------+----------------+----------------+------------+
|CustomerID|MinDaysFromToday|CountUniqueItems|SumUnitPrice|
+----------+----------------+----------------+------------+
|     12423|               0|               8|       244.2|
|     16705|               0|              12|      666.71|
|     16954|               0|               7|      152.07|
|     15910|               0|               5|      320.89|
|     13069|               0|              13|       433.2|
|     17389|               0|              13|      377.52|
|     16558|               0|               8|      584.08|
|     13777|               0|              16|      244.16|
|     15694|               0|               7|      216.84|
|     14051|               0|               9|      436.58|
|     12748|               0|              68|     3854.81|
|     17001|               0|               7|      295.21|
|     15311|               0|              31|      2088.9|
|     17581|               0|           

### 4 - Renomear Colunas

In [26]:
df_rfm = df_rfm.withColumnRenamed('MinDaysFromToday', 'Recency') \
               .withColumnRenamed('CountUniqueItems', 'Frequency') \
               .withColumnRenamed('SumUnitPrice', 'Monetary')
df_rfm.show()

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     16250|    373|        1|   47.27|
|     15555|     33|        5|  424.47|
|     15574|    177|        3|  169.07|
|     15271|     36|        6|   267.2|
|     17714|    320|        1|    20.5|
|     12637|     91|        4|  440.47|
|     13107|     85|        3|   99.53|
|     13027|    113|        4|     7.6|
|     12957|      9|        5|  403.71|
|     17128|    334|        1|   55.79|
|     14439|    319|        1|  207.39|
|     14810|     40|        9|   498.0|
|     17491|     36|        5|  135.49|
|     17917|    362|        1|    67.6|
|     18106|    329|        1|   153.5|
|     17855|    372|        1|   30.02|
|     14443|     53|        3|  198.89|
|     12386|    337|        2|   23.91|
|     15107|    332|        1|     8.5|
|     18161|     64|        3|  168.06|
+----------+-------+---------+--------+
only showing top 20 rows



### 5 - Definir Quartis

In [27]:
recency_quartile = df_rfm.approxQuantile("Recency", [0.25,0.5,0.75], 0)
recency_quartile

[22.0, 67.0, 177.0]

In [28]:
frequency_quartile = df_rfm.approxQuantile("Frequency", [0.25,0.5,0.75], 0)
frequency_quartile

[1.0, 2.0, 3.0]

In [29]:
monetary_quartile = df_rfm.approxQuantile("Monetary", [0.25,0.5,0.75], 0)
monetary_quartile

[37.69, 89.43, 198.79]

### 6 - Definir pontuações RFM

In [30]:
# Quanto menor Recency, melhor
df_rfm = df_rfm.withColumn('RecencyQuartile',\
                           F.when(df_rfm.Recency >= recency_quartile[2], 1).\
                           when(df_rfm.Recency >= recency_quartile[1], 2).\
                           when(df_rfm.Recency >= recency_quartile[0], 3).\
                           otherwise(4))
df_rfm.show()

+----------+-------+---------+--------+---------------+
|CustomerID|Recency|Frequency|Monetary|RecencyQuartile|
+----------+-------+---------+--------+---------------+
|     16250|    373|        1|   47.27|              1|
|     15555|     33|        5|  424.47|              3|
|     15574|    177|        3|  169.07|              1|
|     15271|     36|        6|   267.2|              3|
|     17714|    320|        1|    20.5|              1|
|     12637|     91|        4|  440.47|              2|
|     13107|     85|        3|   99.53|              2|
|     13027|    113|        4|     7.6|              2|
|     12957|      9|        5|  403.71|              4|
|     17128|    334|        1|   55.79|              1|
|     14439|    319|        1|  207.39|              1|
|     14810|     40|        9|   498.0|              3|
|     17491|     36|        5|  135.49|              3|
|     17917|    362|        1|    67.6|              1|
|     18106|    329|        1|   153.5|         

##### Não precisa repetir function F no encadeamento dos 'when'. Somente o primeiro já é o suficiente

In [31]:
# Quanto maior Frequency, melhor
df_rfm = df_rfm.withColumn('FrequencyQuartile',\
                           F.when(df_rfm.Frequency > frequency_quartile[2], 4).\
                           when(df_rfm.Frequency > frequency_quartile[1], 3).\
                           when(df_rfm.Frequency > frequency_quartile[0], 2).\
                           otherwise(1))
df_rfm.show()

+----------+-------+---------+--------+---------------+-----------------+
|CustomerID|Recency|Frequency|Monetary|RecencyQuartile|FrequencyQuartile|
+----------+-------+---------+--------+---------------+-----------------+
|     16250|    373|        1|   47.27|              1|                1|
|     15555|     33|        5|  424.47|              3|                4|
|     15574|    177|        3|  169.07|              1|                3|
|     15271|     36|        6|   267.2|              3|                4|
|     17714|    320|        1|    20.5|              1|                1|
|     12637|     91|        4|  440.47|              2|                4|
|     13107|     85|        3|   99.53|              2|                3|
|     13027|    113|        4|     7.6|              2|                4|
|     12957|      9|        5|  403.71|              4|                4|
|     17128|    334|        1|   55.79|              1|                1|
|     14439|    319|        1|  207.39

In [32]:
# Quanto maior Monetary, melhor
df_rfm = df_rfm.withColumn('MonetaryQuartile',\
                           F.when(df_rfm.Monetary >= monetary_quartile[2], 4).\
                           when(df_rfm.Monetary >= monetary_quartile[1], 3).\
                           when(df_rfm.Monetary >= monetary_quartile[0], 2).\
                           otherwise(1))
df_rfm.show()

+----------+-------+---------+--------+---------------+-----------------+----------------+
|CustomerID|Recency|Frequency|Monetary|RecencyQuartile|FrequencyQuartile|MonetaryQuartile|
+----------+-------+---------+--------+---------------+-----------------+----------------+
|     16250|    373|        1|   47.27|              1|                1|               2|
|     15555|     33|        5|  424.47|              3|                4|               4|
|     15574|    177|        3|  169.07|              1|                3|               3|
|     15271|     36|        6|   267.2|              3|                4|               4|
|     17714|    320|        1|    20.5|              1|                1|               1|
|     12637|     91|        4|  440.47|              2|                4|               4|
|     13107|     85|        3|   99.53|              2|                3|               3|
|     13027|    113|        4|     7.6|              2|                4|               1|

In [33]:
df_rfm = df_rfm.withColumn('RFMScore', F.concat(df_rfm.RecencyQuartile, \
                                                df_rfm.FrequencyQuartile, \
                                                df_rfm.MonetaryQuartile).\
                                                cast('String'))
df_rfm.show()

+----------+-------+---------+--------+---------------+-----------------+----------------+--------+
|CustomerID|Recency|Frequency|Monetary|RecencyQuartile|FrequencyQuartile|MonetaryQuartile|RFMScore|
+----------+-------+---------+--------+---------------+-----------------+----------------+--------+
|     16250|    373|        1|   47.27|              1|                1|               2|     112|
|     15555|     33|        5|  424.47|              3|                4|               4|     344|
|     15574|    177|        3|  169.07|              1|                3|               3|     133|
|     15271|     36|        6|   267.2|              3|                4|               4|     344|
|     17714|    320|        1|    20.5|              1|                1|               1|     111|
|     12637|     91|        4|  440.47|              2|                4|               4|     244|
|     13107|     85|        3|   99.53|              2|                3|               3|     233|


In [34]:
df_rfm = df_rfm.withColumn('RFScore', F.concat(df_rfm.RecencyQuartile, \
                                                df_rfm.FrequencyQuartile).\
                                                cast('String'))
df_rfm.show()

+----------+-------+---------+--------+---------------+-----------------+----------------+--------+-------+
|CustomerID|Recency|Frequency|Monetary|RecencyQuartile|FrequencyQuartile|MonetaryQuartile|RFMScore|RFScore|
+----------+-------+---------+--------+---------------+-----------------+----------------+--------+-------+
|     16250|    373|        1|   47.27|              1|                1|               2|     112|     11|
|     15555|     33|        5|  424.47|              3|                4|               4|     344|     34|
|     15574|    177|        3|  169.07|              1|                3|               3|     133|     13|
|     15271|     36|        6|   267.2|              3|                4|               4|     344|     34|
|     17714|    320|        1|    20.5|              1|                1|               1|     111|     11|
|     12637|     91|        4|  440.47|              2|                4|               4|     244|     24|
|     13107|     85|        

### 7 - Definir Classe de Cliente

In [35]:
df_rfm = df_rfm.withColumn('CustomerClass', F.expr("CASE "+
                                          "WHEN RFScore IN ('11','12') THEN 'Hibernando'" +
                                          "WHEN RFScore IN ('13') THEN 'Em Risco'" +
                                          "WHEN RFScore IN ('21', '22') THEN 'Indo Hibernar'" +
                                          "WHEN RFScore IN ('23') THEN 'Precisa de Atenção'" +
                                          "WHEN RFScore IN ('31') THEN 'Promissor'" +
                                          "WHEN RFScore IN ('41') THEN 'Novos Clientes'" +
                                          "WHEN RFScore IN ('32', '42', '33', '43') THEN 'Fieis em Potencial'" +
                                          "WHEN RFScore IN ('14') THEN 'Não pode Perde-los'" +
                                          "WHEN RFScore IN ('24', '34') THEN 'Clientes Fieis'" +
                                          "WHEN RFScore IN ('44') THEN 'Campeões'" +
                                          "ELSE RFScore END"))
df_rfm.show()

+----------+-------+---------+--------+---------------+-----------------+----------------+--------+-------+------------------+
|CustomerID|Recency|Frequency|Monetary|RecencyQuartile|FrequencyQuartile|MonetaryQuartile|RFMScore|RFScore|     CustomerClass|
+----------+-------+---------+--------+---------------+-----------------+----------------+--------+-------+------------------+
|     16250|    373|        1|   47.27|              1|                1|               2|     112|     11|        Hibernando|
|     15555|     33|        5|  424.47|              3|                4|               4|     344|     34|    Clientes Fieis|
|     15574|    177|        3|  169.07|              1|                3|               3|     133|     13|          Em Risco|
|     15271|     36|        6|   267.2|              3|                4|               4|     344|     34|    Clientes Fieis|
|     17714|    320|        1|    20.5|              1|                1|               1|     111|     11|    

### 8 - Estudar RFM para cada classe de cliente

In [36]:
df_customer = df_rfm.groupBy('CustomerClass').agg(F.count('CustomerID').alias('Count'), \
                                                  F.mean('Recency').alias('MeanRecency'), \
                                                  F.mean('Frequency').alias('MeanFrequency'), \
                                                  F.mean('Monetary').alias('MeanMonetary')). \
                                                  orderBy(F.col('MeanMonetary').desc())
df_customer.show()

+------------------+-----+------------------+------------------+------------------+
|     CustomerClass|Count|       MeanRecency|     MeanFrequency|      MeanMonetary|
+------------------+-----+------------------+------------------+------------------+
|          Campeões|  335| 8.495522388059701|10.170149253731344| 562.2347164179102|
|    Clientes Fieis|  327|  58.2262996941896| 6.382262996941896| 369.1680122324161|
|Não pode Perde-los|   25|            224.68|               5.4|313.43600000000004|
|          Em Risco|   44|235.04545454545453|               3.0| 164.8968181818182|
|Precisa de Atenção|  129|112.98449612403101|               3.0|156.63441860465122|
|Fieis em Potencial|  535|25.781308411214955|2.4299065420560746|154.32712149532705|
|     Indo Hibernar|  555|114.61981981981982|1.3711711711711712| 79.88812612612617|
|        Hibernando|  713|  267.945301542777| 1.187938288920056| 70.69516129032256|
|         Promissor|  301| 43.82392026578073|               1.0| 63.49332225

In [37]:
df_customer = df_customer.withColumn('MeanRecency', F.round('MeanRecency',2))\
                         .withColumn('MeanFrequency', F.round('MeanFrequency',2))\
                         .withColumn('MeanMonetary', F.round('MeanMonetary',2))
df_customer.show(truncate=False)

+------------------+-----+-----------+-------------+------------+
|CustomerClass     |Count|MeanRecency|MeanFrequency|MeanMonetary|
+------------------+-----+-----------+-------------+------------+
|Campeões          |335  |8.5        |10.17        |562.23      |
|Clientes Fieis    |327  |58.23      |6.38         |369.17      |
|Não pode Perde-los|25   |224.68     |5.4          |313.44      |
|Em Risco          |44   |235.05     |3.0          |164.9       |
|Precisa de Atenção|129  |112.98     |3.0          |156.63      |
|Fieis em Potencial|535  |25.78      |2.43         |154.33      |
|Indo Hibernar     |555  |114.62     |1.37         |79.89       |
|Hibernando        |713  |267.95     |1.19         |70.7        |
|Promissor         |301  |43.82      |1.0          |63.49       |
|Novos Clientes    |153  |12.46      |1.0          |58.79       |
+------------------+-----+-----------+-------------+------------+



### 9 - Obter a porcentagem de clientes

##### collect() captura a celula unitaria para ser usada como variavel

In [38]:
total_customers = df_customer.select(F.sum('Count')).collect()[0][0]

In [39]:
df_customer = df_customer.withColumn('%', F.round((F.col('Count') / F.lit(total_customers) * 100),2))
df_customer.show()

+------------------+-----+-----------+-------------+------------+-----+
|     CustomerClass|Count|MeanRecency|MeanFrequency|MeanMonetary|    %|
+------------------+-----+-----------+-------------+------------+-----+
|          Campeões|  335|        8.5|        10.17|      562.23|10.75|
|    Clientes Fieis|  327|      58.23|         6.38|      369.17|10.49|
|Não pode Perde-los|   25|     224.68|          5.4|      313.44|  0.8|
|          Em Risco|   44|     235.05|          3.0|       164.9| 1.41|
|Precisa de Atenção|  129|     112.98|          3.0|      156.63| 4.14|
|Fieis em Potencial|  535|      25.78|         2.43|      154.33|17.16|
|     Indo Hibernar|  555|     114.62|         1.37|       79.89|17.81|
|        Hibernando|  713|     267.95|         1.19|        70.7|22.87|
|         Promissor|  301|      43.82|          1.0|       63.49| 9.66|
|    Novos Clientes|  153|      12.46|          1.0|       58.79| 4.91|
+------------------+-----+-----------+-------------+------------

##### Pode-se unificar as funções, desde que a saída seja uma coluna

### 10 - Rearranjar colunas

In [40]:
df_customer = df_customer.select('CustomerClass', 'Count', '%', 'MeanRecency', 'MeanFrequency', 'MeanMonetary')
df_customer.show()

+------------------+-----+-----+-----------+-------------+------------+
|     CustomerClass|Count|    %|MeanRecency|MeanFrequency|MeanMonetary|
+------------------+-----+-----+-----------+-------------+------------+
|          Campeões|  335|10.75|        8.5|        10.17|      562.23|
|    Clientes Fieis|  327|10.49|      58.23|         6.38|      369.17|
|Não pode Perde-los|   25|  0.8|     224.68|          5.4|      313.44|
|          Em Risco|   44| 1.41|     235.05|          3.0|       164.9|
|Precisa de Atenção|  129| 4.14|     112.98|          3.0|      156.63|
|Fieis em Potencial|  535|17.16|      25.78|         2.43|      154.33|
|     Indo Hibernar|  555|17.81|     114.62|         1.37|       79.89|
|        Hibernando|  713|22.87|     267.95|         1.19|        70.7|
|         Promissor|  301| 9.66|      43.82|          1.0|       63.49|
|    Novos Clientes|  153| 4.91|      12.46|          1.0|       58.79|
+------------------+-----+-----+-----------+-------------+------

### IV - Export

In [41]:
#df_rfm.write.mode('overwrite').parquet('output_rfm')
#df_customer.write.mode('overwrite').parquet('output_customer')

In [42]:
df_rfm_pandas = df_rfm.toPandas()
df_customer_pandas = df_customer.toPandas()

In [46]:
df_rfm_pandas.to_csv('data/refined/output_rfm.csv', index=False, encoding='utf-8')
df_customer_pandas.to_csv('data/refined/output_customer.csv', index=False, encoding='utf-8')