<div id='id1' />
#Business case

Objetivo: Segundo a regra do NPS (NET PROMETER SCORE), que mensura o grau de lealdade do cliente. Qual seria o grau de instisfação dos possíveis clientes defratores, baseados em dados do consumo e review?

<div name = 'Business' id='id1' />
NPS - OLIST
<img src="https://blog.customerradar.com/hs-fs/hubfs/NPS-1-5.png?width=750&height=456&name=NPS-1-5.png" alt="nps" style="width: 750px; height: 460px">

##Importar bibliotecas

In [5]:
from pyspark.sql.functions import datediff
from pyspark.sql.functions import to_date
from pyspark.sql.functions import lit
from pyspark.sql.functions import current_date
from pyspark.sql.functions import month
from pyspark.sql.functions import months_between
from pyspark.sql.functions import max
from pyspark.sql.functions import min
from pyspark.sql.functions import year
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import desc
from pyspark.sql.functions import mean
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import count
from pyspark.sql.functions import length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd

##Funções uteis

In [7]:
def UDF_summary(quantitative_data):
  """
  * Faz um summary de todas as variáveis do dataset
  * Faz calcula se tem outlier baseado no intervalo interquartis
  * Faz o calculo do coeficiente de variação 
  """
  for nome, tipo in quantitative_data.dtypes:
    if tipo in ('double','int'):
      summ = quantitative_data[[nome]].summary('count',
                                               'mean',
                                               'stddev',
                                               'min',
                                               '10%',
                                               '20%',
                                               '25%',
                                               '30%',
                                               '50%',
                                               '60%',
                                               '75%',
                                               '80%',
                                               '90%',
                                               'max')
      summ_collect = summ.collect()
      q1 = float(summ_collect[4][1])
      mdn = float(summ_collect[5][1])
      q3 = float(summ_collect[6][1])
      stddev = float(summ_collect[2][1])
      mean = float(summ_collect[1][1])
      # para evitar erro com media igual a zero, criamos esta funçao para aproximar 0 para 0.00000000001
      coef_var = lambda stddev,mean: (100*(stddev/mean)) if mean>0 else (100*(stddev/0.00000000001))
      coef = coef_var(stddev,mean)
      #para calcular outlier vamos usar os intervalos interquartis
      outs = quantitative_data.filter(quantitative_data[nome]>q3 + 1.5*(q3-q1)).count()
      outi = quantitative_data.filter(quantitative_data[nome]<q1 - 1.5*(q3-q1)).count()

      spaces = len(nome)
      if spaces < 20:
        spaces = 2

      print("+"+"-"*13+"+"+"-"*spaces+"+")
      print("|     summary |"+" "*(spaces-len(nome))+nome+"|")
      print("+"+"-"*13+"+"+"-"*spaces+"+")
      for linha in summ_collect:
        print("|"+" "*(13-len(linha[0]))+linha[0]+"|"+" "*(spaces-len(linha[1]))+linha[1]+"|")
      print("| outlier sup |"+" "*(spaces-len(str(outs)))+str(outs)+"|")
      print("| outlier inf |"+" "*(spaces-len(str(outi)))+str(outi)+"|")
      print("|    coef var |"+" "*(spaces-len(str(coef)))+str(coef)+"|")
      print("+"+"-"*13+"+"+"-"*spaces+"+")
      print('\n')

In [8]:
def num_columns(df_parametro):
  return len(df_parametro.columns)

def num_rows(df_parametro):
  return df_parametro.count()

def shape(df_aux):
  return (num_rows(df_aux),num_columns(df_aux))

##Carregar as bases

####Carregar a base orders

In [11]:
df_orders = (spark.read.csv('/mnt/datasets/brazilian-ecommerce/olist_orders_dataset.csv', header=True, sep=',', inferSchema=True)
             .repartition(2)
             .cache()
             .dropna()
             .dropDuplicates())

####Carregar a base customers

In [13]:
df_customers = (spark.read
                .csv('/mnt/datasets/brazilian-ecommerce/olist_customers_dataset.csv', 
                     header=True, 
                     sep=',', 
                     inferSchema=True)
                .dropna())

####Carregar a base order_itens

In [15]:
df_order_items = (spark.read.csv('/mnt/datasets/brazilian-ecommerce/olist_order_items_dataset.csv', header=True, sep=',', inferSchema=True)
                  .repartition(2)
                  .cache())

####Carregar a base products

In [17]:
df_products = (spark.read.csv('/mnt/datasets/brazilian-ecommerce/olist_products_dataset.csv', header=True, sep=',', inferSchema=True)
               .repartition(2)
               .cache())

####Carregar a base order_review

In [19]:
df_order_reviews = (spark.read.csv('/mnt/datasets/brazilian-ecommerce/olist_order_reviews_dataset.csv', header=True, sep=',', inferSchema=True, multiLine=True)
                            .repartition(2)
                            .cache())

##Juntar as bases

####Join order vs customer

In [22]:
# df_customers ja foi carregado anteriormente
# df_orders ja foi carregado anteriormente
df_orders_customers = (df_orders.join(df_customers, 
                                      df_orders['customer_id'] == df_customers['customer_id'], 
                                      how='left')
                       .drop(df_customers['customer_id'])
                       .drop(df_orders['order_delivered_carrier_date'])
                       .repartition(2)
                       .cache())

df_orders.unpersist()
shape(df_orders_customers)

In [23]:
display(df_orders_customers)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_customer_date,order_estimated_delivery_date,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
bc7401d98fe2814849de2404e3ebd9ad,28a451491753dc3f4c52bb8dfea902aa,delivered,2017-03-31T13:14:16.000+0000,2017-03-31T13:25:11.000+0000,2017-05-17T17:14:59.000+0000,2017-04-27T00:00:00.000+0000,40cbf2d82a88f3a67c2c60fa1def7157,88106,sao jose,SC
2c1a279890c82005e7d51b4717cc0080,8cd7393d7826172d4f6f6e4f433defc6,delivered,2018-05-15T18:52:41.000+0000,2018-05-15T19:12:39.000+0000,2018-05-21T16:37:00.000+0000,2018-05-25T00:00:00.000+0000,959f91ac4059b1cbc8831bd99a36035b,2460,sao paulo,SP
3287906d23c6e6cb0ebb48f0b766001b,1c01b6e99332e712ccd834ae02122d20,delivered,2017-09-06T14:39:40.000+0000,2017-09-06T14:52:27.000+0000,2017-09-19T16:59:01.000+0000,2017-09-27T00:00:00.000+0000,88f7972f3f521975d199f30aae8053ff,14040,ribeirao preto,SP
c70d255a582e4ffcdf4bbe6bb0ffdc12,54c67eba6900fd3785f06de81c7612bc,delivered,2017-07-26T15:20:13.000+0000,2017-07-26T15:35:06.000+0000,2017-08-03T16:59:51.000+0000,2017-08-17T00:00:00.000+0000,b5d7ca7682fea3d714991982983f5035,94828,alvorada,RS
2cd487f56c4c8e2600c902f8ac475597,376c8b9583b94167b5869fad6d6155db,delivered,2017-12-19T15:16:43.000+0000,2017-12-19T16:10:20.000+0000,2017-12-28T20:43:41.000+0000,2018-01-12T00:00:00.000+0000,f0cfb215c2ab08fbf7c3bdc30436ca23,12490,sao bento do sapucai,SP
8100c4de96dbf94a93a19c78c69a006d,b4e369402f94a6036f05434ba8ac3a21,delivered,2017-06-10T19:59:23.000+0000,2017-06-10T20:10:18.000+0000,2017-06-19T12:08:01.000+0000,2017-07-03T00:00:00.000+0000,f62d6572c9c25b3796d058d7b833f8b7,5756,sao paulo,SP
3e30e6710e775ebfeecbd90fad476396,0c16f77e243787b61748a79eb1634329,delivered,2017-12-11T18:18:04.000+0000,2017-12-12T03:59:41.000+0000,2017-12-13T17:29:13.000+0000,2017-12-29T00:00:00.000+0000,7d2a53716c3b4ab11b666372a985773a,9846,sao bernardo do campo,SP
d3af64a47d8c61129648bf0d3d695842,eaa2eb92cb3d0bbf3b5492946f3caf79,delivered,2018-03-20T16:19:37.000+0000,2018-03-20T16:35:37.000+0000,2018-04-02T22:41:37.000+0000,2018-04-13T00:00:00.000+0000,871835b56495eba643a02722f152f2eb,36400,conselheiro lafaiete,MG
946fbd026e6cdf0c0b3c2f6a5d8a149b,0c5f776f27bd2e3d271019993998b0d7,delivered,2017-10-22T14:13:17.000+0000,2017-10-22T14:28:19.000+0000,2017-11-06T21:12:00.000+0000,2017-11-03T00:00:00.000+0000,83fd2e966327fb235dba7f279badb8e9,13060,campinas,SP
19c0bfcd4496e4f2c70c779226c8605b,089da4e76a40d4222c2797bb79c2988f,delivered,2018-05-24T10:11:14.000+0000,2018-05-24T10:33:08.000+0000,2018-06-04T18:11:55.000+0000,2018-06-19T00:00:00.000+0000,5263247be265218671c9a7dc834fb48b,17055,bauru,SP


####Join orders_itens vs products
nesta etapa é feito um join inicial somente para colocar a variável customer_unique_id para poder agregar no final

In [25]:
df_orders_customers_aux = df_orders_customers.select('customer_unique_id',"order_id")

In [26]:
df_orders_orders_itens = (df_orders_customers_aux.join(df_order_items, df_orders_customers_aux['order_id'] == df_order_items['order_id'], how='left')
                       .drop(df_order_items['order_id'])
                       .drop(df_order_items['seller_id'])
                       .drop(df_order_items['shipping_limit_date'])
                       .drop(df_order_items['freight_value'])
                       .repartition(2)
                       .cache())
df_order_items.unpersist()

In [27]:
df_orders_orders_itens_products = (df_orders_orders_itens.join(df_products, df_orders_orders_itens['product_id'] == df_products['product_id'], how='left')
                                   .drop(df_products['product_id'])
                                   .drop(df_products['product_name_lenght'])
                                   .drop(df_products['product_weight_g'])
                                   .drop(df_products['product_length_cm'])
                                   .drop(df_products['product_height_cm'])
                                   .drop(df_products['product_width_cm'])
                                   .repartition(2)
                                   .cache())
df_products.unpersist()
df_orders_orders_itens.unpersist()
shape(df_orders_orders_itens_products)

####Join orders vs reviews
nesta etapa é feito um join somente para colocar a variável customer_unique_id para poder agrupar os reviews por customer_unique_id no final

In [29]:
df_orders_to_review_join = df_orders_customers.select('customer_unique_id',
                                                      "order_id",
                                                      "order_delivered_customer_date")

In [30]:

df_orders_orders_reviews = (df_orders_to_review_join.join(df_order_reviews, df_orders_to_review_join['order_id'] == df_order_reviews['order_id'], how='left')
                           .drop(df_order_reviews['order_id'])
                           .drop(df_order_reviews['review_id'])
                           #.drop(df_order_reviews['review_creation_date'])
                           .repartition(2)
                           .cache())

##Criar Features

####Criar features para orders e customer

###### Criar variáveis relativas a qtd de compras por região

In [34]:
df_regiao = (df_orders_customers
             .select("customer_state","customer_unique_id")
             .withColumn("sudeste",
                         when((col("customer_state") == "SP") | 
                              (col("customer_state") == "RJ")| 
                              (col("customer_state") == "MG")| 
                              (col("customer_state") == "ES"),1)
                         .otherwise(0))
             .withColumn("sul",
                         when((col("customer_state") == "RS") | 
                              (col("customer_state") == "SC")| 
                              (col("customer_state") == "PR"),1)
                         .otherwise(0))
             .withColumn("centro_oeste",
                         when((col("customer_state") == "MT") | 
                              (col("customer_state") == "MS")|
                              (col("customer_state") == "DF")|
                              (col("customer_state") == "GO"),1)
                         .otherwise(0))
             .withColumn("norte",
                         when((col("customer_state") == "TO") | 
                              (col("customer_state") == "PA")| 
                              (col("customer_state") == "AM")|
                              (col("customer_state") == "AP")|
                              (col("customer_state") == "RR")|
                              (col("customer_state") == "AC")|
                              (col("customer_state") == "RO"),1)
                         .otherwise(0))
             .withColumn("nordeste",
                         when((col("customer_state") == "MA") | 
                              (col("customer_state") == "PI")| 
                              (col("customer_state") == "BA")|
                              (col("customer_state") == "CE")|
                              (col("customer_state") == "RN")|
                              (col("customer_state") == "PB")|
                              (col("customer_state") == "PE")|
                              (col("customer_state") == "AL")|
                              (col("customer_state") == "SE"),1)
                         .otherwise(0))
             .drop("customer_state")
            )
shape(df_regiao)

######Criar variáveis relativas a qtd de compras aprovadas no periodo ano

In [36]:
df_compras_ano = (df_orders_customers
                  .select("order_approved_at","customer_unique_id")
                  .withColumn("approved_year",
                                year("order_approved_at"))
                  .withColumn("approved_year_2016",
                              when(col("approved_year") == 2016,1)
                              .otherwise(0))
                  .withColumn("approved_year_2017",
                              when(col("approved_year") == 2017,1)
                              .otherwise(0))
                  .withColumn("approved_year_2018",
                              when(col("approved_year") == 2018,1)
                              .otherwise(0))
                  .drop('approved_year')
                  .drop('order_approved_at')
                  )
shape(df_compras_ano)

######Criar variáveis relativas a qtd de compras aprovadas no periodo trimestre

In [38]:
df_compras_trimestre = (df_orders_customers
                        .select("order_approved_at","customer_unique_id")
                        .withColumn("approved_year",
                                    year("order_approved_at"))
                        .withColumn("approved_month",
                                    month("order_approved_at"))
                        .withColumn("1_trimestre_2016",
                                    when((col("approved_year") == 2016)&
                                         (col("approved_month") >= 1) & 
                                         (col("approved_month") <= 3),1)
                                    .otherwise(0))
                        .withColumn("2_trimestre_2016",
                                    when((col("approved_year") == 2016)&
                                         (col("approved_month") >= 4) & 
                                         (col("approved_month") <= 6),1)
                                    .otherwise(0))
                        .withColumn("3_trimestre_2016",
                                    when((col("approved_year") == 2016)&
                                         (col("approved_month") >= 7) & 
                                         (col("approved_month") <= 9),1)
                                    .otherwise(0))
                        .withColumn("4_trimestre_2016",
                                    when((col("approved_year") == 2016)&
                                         (col("approved_month") >= 10) & 
                                         (col("approved_month") <= 12),1)
                                    .otherwise(0))
                        .withColumn("1_trimestre_2017",
                                    when((col("approved_year") == 2017)&
                                         (col("approved_month") >= 1) & 
                                         (col("approved_month") <= 3),1)
                                    .otherwise(0))
                        .withColumn("2_trimestre_2017",
                                    when((col("approved_year") == 2017)&
                                         (col("approved_month") >= 4) & 
                                         (col("approved_month") <= 6),1)
                                    .otherwise(0))
                        .withColumn("3_trimestre_2017",
                                    when((col("approved_year") == 2017)&
                                         (col("approved_month") >= 7) & 
                                         (col("approved_month") <= 9),1)
                                    .otherwise(0))
                        .withColumn("4_trimestre_2017",
                                    when((col("approved_year") == 2017)&
                                         (col("approved_month") >= 10) & 
                                         (col("approved_month") <= 12),1)
                                    .otherwise(0))
                        .withColumn("1_trimestre_2018",
                                    when((col("approved_year") == 2018)&
                                         (col("approved_month") >= 1) & 
                                         (col("approved_month") <= 3),1)
                                    .otherwise(0))
                        .withColumn("2_trimestre_2018",
                                    when((col("approved_year") == 2018)&
                                         (col("approved_month") >= 4) & 
                                         (col("approved_month") <= 6),1)
                                    .otherwise(0))
                        .withColumn("3_trimestre_2018",
                                    when((col("approved_year") == 2018)&
                                         (col("approved_month") >= 7) & 
                                         (col("approved_month") <= 9),1)
                                    .otherwise(0))
                        .withColumn("4_trimestre_2018",
                                    when((col("approved_year") == 2018)&
                                         (col("approved_month") >= 10) & 
                                         (col("approved_month") <= 12),1)
                                    .otherwise(0))
                        .drop('approved_year')
                        .drop('approved_month')
                        #.drop('order_approved_at')
                        )
shape(df_compras_trimestre)

######Criar variáveis relativas a qtd de compras aprovadas no periodo semestre

In [40]:
df_compras_semestre = (df_orders_customers
                       .select("order_approved_at","customer_unique_id")
                       .withColumn("approved_year",
                                   year("order_approved_at"))
                       .withColumn("approved_month",
                                   month("order_approved_at"))
                       .withColumn("1_semestre_2016",
                                   when((col("approved_year") == 2016)&
                                        (col("approved_month") >= 1) & 
                                        (col("approved_month") <= 6),1)
                                   .otherwise(0))
                       .withColumn("2_semestre_2016",
                                   when((col("approved_year") == 2016)&
                                        (col("approved_month") >= 7) & 
                                        (col("approved_month") <= 12),1)
                                   .otherwise(0))
                       .withColumn("1_semestre_2017",
                                   when((col("approved_year") == 2017)&
                                        (col("approved_month") >= 1) & 
                                        (col("approved_month") <= 6),1)
                                   .otherwise(0))
                       .withColumn("2_semestre_2017",
                                   when((col("approved_year") == 2017)&
                                        (col("approved_month") >= 7) & 
                                        (col("approved_month") <= 12),1)
                                   .otherwise(0))
                       .withColumn("1_semestre_2018",
                                   when((col("approved_year") == 2018)&
                                        (col("approved_month") >= 1) & 
                                        (col("approved_month") <= 6),1)
                                   .otherwise(0))
                       .withColumn("2_semestre_2018",
                                   when((col("approved_year") == 2018)&
                                        (col("approved_month") >= 7) & 
                                        (col("approved_month") <= 12),1)
                                   .otherwise(0))
                       .drop('approved_year')
                       .drop('approved_month')
                       )
shape(df_compras_semestre)

######Criar variáveis relativas a qtd de compras atrasadas

In [42]:
df_compras_atrasadas = (df_orders_customers
                        .select("order_estimated_delivery_date","order_delivered_customer_date","customer_unique_id")
                        .withColumn("delta_da_entrega",
                                    datediff("order_delivered_customer_date","order_estimated_delivery_date"))
                        .withColumn("entrega_atrasada_ate_1_semana",
                                    when((col("delta_da_entrega") > 0) & (col("delta_da_entrega") < 8),1)
                                    .otherwise(0))
                        .withColumn("entrega_atrasada_de_1_a_2_semanas",
                                    when((col("delta_da_entrega") > 7) & (col("delta_da_entrega") < 16),1)
                                    .otherwise(0))
                        .withColumn("entrega_super_atrasada",
                                    when((col("delta_da_entrega") > 15),1)
                                    .otherwise(0))
                        )
df_compras_atrasadas = (df_compras_atrasadas
                        .drop("order_estimated_delivery_date")
                        .drop("order_delivered_customer_date")
                        .drop('delta_da_entrega'))
shape(df_compras_atrasadas)

######Criar variáveis relativas a qtd de compras adiantadas

In [44]:
df_compras_adiantadas = (df_orders_customers
                         .select("order_estimated_delivery_date","order_delivered_customer_date","customer_unique_id")
                         .withColumn("delta_da_entrega",
                                     datediff("order_delivered_customer_date","order_estimated_delivery_date"))
                         .withColumn("entrega_adiantada_ate_8_dias",
                                     when((col("delta_da_entrega") > -9)&(col("delta_da_entrega") < 0),1)
                                     .otherwise(0))
                         .withColumn("entrega_super_adiantada",
                                     when((col("delta_da_entrega") < -8)&(col("delta_da_entrega") > -19),1)
                                     .otherwise(0))
                         .drop("order_estimated_delivery_date")
                         .drop("order_delivered_customer_date")
                         .drop('delta_da_entrega')
                         )
shape(df_compras_adiantadas)

######Criar variáveis relativas a qtd de compras por estado

In [46]:
states = df_orders_customers.select("customer_state").distinct().rdd.flatMap(lambda x: x).collect()
#print(type(states))
df_estados = df_orders_customers.select('customer_unique_id',"customer_state")
for estado in states:
  df_estados = df_estados.withColumn(estado,when(col("customer_state") == estado, 1).otherwise(0))
df_estados = df_estados.drop("customer_state")
shape(df_estados)

######Criar variáveis relativas a qtd de compras por cidade

In [48]:
# cidades = df_orders_customers.select("customer_city").distinct().rdd.flatMap(lambda x: x).collect()
# df_cidades = df_orders_customers.select('customer_unique_id',"customer_city")
# for cidade in cidades:
#   df_cidades = df_cidades.withColumn(cidade,when(col("customer_city") == cidade, 1).otherwise(0))
# df_cidades = df_cidades.drop('customer_city')
# shape(df_cidades)

####Criar features para orders_itens e products

######Criar variáveis relativas a qtd de compras com mais de um item por pedido

In [51]:
df_itens_products_agrupada_customer_unique_id = (df_orders_orders_itens_products
                                                 .withColumn("compras_com_mais_de_1_item",
                                                             when(col("order_item_id") > 1,1)
                                                             .otherwise(0))
                                                )

######Criar variáveis relativas a novas categorias de produtos

In [53]:
df_itens_products_agrupada_customer_unique_id = (df_itens_products_agrupada_customer_unique_id
                                                 .withColumn("product_category_utilidades_domesticas",
                                                             when((col("product_category_name") == 'cama_mesa_banho' )|
                                                                  (col("product_category_name") == 'moveis_decoracao' )|
                                                                  (col("product_category_name") == 'utilidades_domesticas' )|
                                                                  (col("product_category_name") == 'ferramentas_jardim' )|
                                                                  (col("product_category_name") == 'construcao_ferramentas_construcao' )|
                                                                  (col("product_category_name") == 'moveis_sala' )|
                                                                  (col("product_category_name") == 'casa_conforto' )|
                                                                  (col("product_category_name") == 'construcao_ferramentas_iluminacao' )|
                                                                  (col("product_category_name") == 'moveis_escritorio' )|
                                                                  (col("product_category_name") == 'moveis_cozinha_area_de_servico_jantar_e_jardim' )|
                                                                  (col("product_category_name") == 'construcao_ferramentas_jardim' )|
                                                                  (col("product_category_name") == 'artigos_de_natal' )|
                                                                  (col("product_category_name") == 'moveis_quarto' )|
                                                                  (col("product_category_name") == 'construcao_ferramentas_ferramentas' )|
                                                                  (col("product_category_name") == 'portateis_casa_forno_e_cafe' )|
                                                                  (col("product_category_name") == 'sinalizacao_e_seguranca' )|
                                                                  (col("product_category_name") == 'construcao_ferramentas_seguranca' )|
                                                                  (col("product_category_name") == 'moveis_colchao_e_estofado' )|
                                                                  (col("product_category_name") == 'la_cuisine' )|
                                                                  (col("product_category_name") == 'portateis_cozinha_e_preparadores_de_alimentos' )|
                                                                  (col("product_category_name") == 'casa_conforto_2'),1)
                                                             .otherwise(0)))
#21

In [54]:
df_itens_products_agrupada_customer_unique_id = (df_itens_products_agrupada_customer_unique_id
                                                 .withColumn("product_category_Eletro",
                                                             when((col("product_category_name") == 'informatica_acessorios' )|
                                                                  (col("product_category_name") == 'telefonia' )|
                                                                  (col("product_category_name") == 'eletronicos' )|
                                                                  (col("product_category_name") == 'consoles_games' )|
                                                                  (col("product_category_name") == 'eletrodomesticos' )|
                                                                  (col("product_category_name") == 'eletroportateis' )|
                                                                  (col("product_category_name") == 'pcs' )|
                                                                  (col("product_category_name") == 'eletrodomesticos_2' )|
                                                                  (col("product_category_name") == 'telefonia_fixa' )|
                                                                  (col("product_category_name") == 'tablets_impressao_imagem' )|
                                                                  (col("product_category_name") == 'dvds_blu_ray' )|
                                                                  (col("product_category_name") == 'cds_dvds_musicais' )|
                                                                  (col("product_category_name") == 'pc_gamer' )|
                                                                  (col("product_category_name") == 'climatizacao'),1)
                                                             .otherwise(0)))
#14

In [55]:
df_itens_products_agrupada_customer_unique_id = (df_itens_products_agrupada_customer_unique_id
                                                 .withColumn("product_category_Comida_beleza_saude",
                                                             when((col("product_category_name") == 'beleza_saude' )|
                                                                  (col("product_category_name") == 'perfumaria' )|
                                                                  (col("product_category_name") == 'bebes' )|
                                                                  (col("product_category_name") == 'pet_shop' )|
                                                                  (col("product_category_name") == 'alimentos' )|
                                                                  (col("product_category_name") == 'bebidas' )|
                                                                  (col("product_category_name") == 'alimentos_bebidas' )|
                                                                  (col("product_category_name") == 'fraldas_higiene'),1)
                                                             .otherwise(0)))
#8

In [56]:
df_itens_products_agrupada_customer_unique_id = (df_itens_products_agrupada_customer_unique_id
                                                 .withColumn("product_category_Pessoal",
                                                             when((col("product_category_name") == 'relogios_presentes' )|
                                                                  (col("product_category_name") == 'esporte_lazer' )|
                                                                  (col("product_category_name") == 'brinquedos' )|
                                                                  (col("product_category_name") == 'papelaria' )|
                                                                  (col("product_category_name") == 'fashion_bolsas_e_acessorios' )|
                                                                  (col("product_category_name") == 'malas_acessorios' )|
                                                                  (col("product_category_name") == 'livros_interesse_geral' )|
                                                                  (col("product_category_name") == 'audio' )|
                                                                  (col("product_category_name") == 'livros_tecnicos' )|
                                                                  (col("product_category_name") == 'fashion_calcados' )|
                                                                  (col("product_category_name") == 'artes' )|
                                                                  (col("product_category_name") == 'fashion_underwear_e_moda_praia' )|
                                                                  (col("product_category_name") == 'fashion_roupa_masculina' )|
                                                                  (col("product_category_name") == 'cine_foto' )|
                                                                  (col("product_category_name") == 'livros_importados' )|
                                                                  (col("product_category_name") == 'fashion_roupa_feminina' )|
                                                                  (col("product_category_name") == 'artigos_de_festas' )|
                                                                  (col("product_category_name") == 'musica' )|
                                                                  (col("product_category_name") == 'instrumentos_musicais' )|
                                                                  (col("product_category_name") == 'fashion_roupa_infanto_juvenil' )|
                                                                  (col("product_category_name") == 'fashion_esporte'),1)
                                                             .otherwise(0)))
#20

In [57]:
df_itens_products_agrupada_customer_unique_id = (df_itens_products_agrupada_customer_unique_id
         .withColumn("product_category_Outros",
                     when((col("product_category_Pessoal") == 0 )&
                          (col("product_category_Comida_beleza_saude") == 0 )&
                          (col("product_category_Eletro") == 0 )&
                          (col("product_category_utilidades_domesticas") == 0 ),1)
                     .otherwise(0))
        )

####criar features para orders_review

######Criar variáveis relativas ao score do review

In [60]:
df_score_reviews = (df_orders_orders_reviews
                    .select('review_score','customer_unique_id')
                    .withColumn("review_score_1",
                                when(col("review_score") == 1,1)
                                .otherwise(0))
                    .withColumn("review_score_2",
                                when(col("review_score") == 2,1)
                                .otherwise(0))
                    .withColumn("review_score_3",
                                when(col("review_score") == 3,1)
                                .otherwise(0))
                    .withColumn("review_score_4",
                                when(col("review_score") == 4,1)
                                .otherwise(0))
                    .withColumn("review_score_5",
                                when(col("review_score") == 5,1)
                                .otherwise(0))
                    .drop('review_score')
                            )
shape(df_score_reviews)

######Criar variáveis relativas a diferenças de datas

In [62]:
df_datediff_reviews = (df_orders_orders_reviews
                       .select('review_answer_timestamp','order_delivered_customer_date','review_creation_date','customer_unique_id')
                       .withColumn("dias_entre_delivered_e_review",
                                   datediff("review_answer_timestamp",
                                            "order_delivered_customer_date"))
                       .withColumn("dias_receber_e_responder_review",
                                   datediff("review_answer_timestamp",
                                            "review_creation_date"))
                       .withColumn("dias_entre_delivered_e_review_m",
                                   datediff("review_answer_timestamp",
                                            "order_delivered_customer_date"))
                       .withColumn("dias_receber_e_responder_review_m",
                                   datediff("review_creation_date",
                                            "review_answer_timestamp"))
                       .drop('review_answer_timestamp')
                       .drop('order_delivered_customer_date')
                       .drop('review_creation_date')
                      )
shape(df_datediff_reviews)

######Criar variáveis relativas ao tamanho do review

In [64]:
df_length_reviews = (df_orders_orders_reviews
                     .select('review_comment_title','review_comment_message','customer_unique_id')
                     .withColumn('review_comment_title_length',
                                 length(col("review_comment_title")))
                     .withColumn('review_comment_message_length',
                                 length(col("review_comment_message")))
                     .withColumn('review_comment_title_length_m',
                                 length(col("review_comment_title")))
                     .withColumn('review_comment_message_length_m',
                                 length(col("review_comment_message")))
                     .drop('review_comment_title')
                     .drop('review_comment_message')
                    )
shape(df_length_reviews)

##Agrupar as junções com relacão ao customer_unique_id

####Agrupar features criadas para orders e customer

######Agrupar variáveis relativas a qtd de compras por região

In [68]:
df_regiao.columns

In [69]:
df_regiao_agrupado = (df_regiao.
                      groupBy("customer_unique_id")
                      .agg({'sudeste':'sum',
                            'sul':'sum',
                            'centro_oeste':'sum',
                            'norte':'sum',
                            'nordeste':'sum'
                           })
                     )

In [70]:
df_regiao_agrupado.columns

In [71]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_regiao_agrupado.columns]

In [72]:
df_regiao_agrupado = (df_regiao_agrupado
                      .withColumnRenamed('sum(centro_oeste)',
                                         'soma_orders_regiao_centro_oeste')
                      .withColumnRenamed('sum(sudeste)',
                                         'soma_orders_regiao_sudeste')
                      .withColumnRenamed('sum(nordeste)',
                                         'soma_orders_regiao_nordeste')
                      .withColumnRenamed('sum(sul)',
                                         'soma_orders_regiao_sul')
                      .withColumnRenamed('sum(norte)',
                                         'soma_orders_regiao_norte')
                     )

In [73]:
df_regiao_agrupado.columns

######Agrupar variáveis relativas a qtd de compras por ano

In [75]:
df_compras_ano.columns

In [76]:
df_compras_ano_agrupado = (df_compras_ano
                           .groupBy("customer_unique_id")
                           .agg({'approved_year_2016':'sum',
                                 'approved_year_2017':'sum',
                                 'approved_year_2018':'sum',
                                })
                          )

In [77]:
df_compras_ano_agrupado.columns

In [78]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_compras_ano_agrupado.columns]

In [79]:
df_compras_ano_agrupado = (df_compras_ano_agrupado
                           .withColumnRenamed('sum(approved_year_2018)',
                                              'soma_orders_approved_year_2018')
                           .withColumnRenamed('sum(approved_year_2016)',
                                              'soma_orders_approved_year_2016')
                           .withColumnRenamed('sum(approved_year_2017)',
                                              'soma_orders_approved_year_2017')
                          )

In [80]:
df_compras_ano_agrupado.columns

######Agrupar variáveis relativas a qtd de compras por semestre

In [82]:
df_compras_semestre.columns

In [83]:
df_compras_semestre_agrupado = (df_compras_semestre
                                .groupBy("customer_unique_id")
                                .agg({'1_semestre_2016':'sum',
                                      '2_semestre_2016':'sum',
                                      '1_semestre_2017':'sum',
                                      '2_semestre_2017':'sum',
                                      '1_semestre_2018':'sum',
                                      '2_semestre_2018':'sum',
                                     })
                               )

In [84]:
df_compras_semestre_agrupado.columns

In [85]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_compras_semestre_agrupado.columns]

In [86]:
df_compras_semestre_agrupado = (df_compras_semestre_agrupado
                                .withColumnRenamed('sum(1_semestre_2017)',
                                                   'soma_orders_1_semestre_2017')
                                .withColumnRenamed('sum(2_semestre_2018)',
                                                   'soma_orders_2_semestre_2018')
                                .withColumnRenamed('sum(1_semestre_2016)',
                                                   'soma_orders_1_semestre_2016')
                                .withColumnRenamed('sum(2_semestre_2017)',
                                                   'soma_orders_2_semestre_2017')
                                .withColumnRenamed('sum(2_semestre_2016)',
                                                   'soma_orders_2_semestre_2016')
                                .withColumnRenamed('sum(1_semestre_2018)',
                                                   'soma_orders_1_semestre_2018')
                               )

In [87]:
df_compras_semestre_agrupado.columns

######Agrupar variáveis relativas a qtd de compras por trimestre

In [89]:
df_compras_trimestre.columns

In [90]:
df_compras_trimestre_agrupado = (df_compras_trimestre
                                 .groupBy("customer_unique_id")
                                 .agg({'1_trimestre_2016':'sum',
                                       '2_trimestre_2016':'sum',
                                       '3_trimestre_2016':'sum',
                                       '4_trimestre_2016':'sum',
                                       '1_trimestre_2017':'sum',
                                       '2_trimestre_2017':'sum',
                                       '3_trimestre_2017':'sum',
                                       '4_trimestre_2017':'sum',
                                       '1_trimestre_2018':'sum',
                                       '2_trimestre_2018':'sum',
                                       '3_trimestre_2018':'sum',
                                       '4_trimestre_2018':'sum',
                                       'order_approved_at':'max'
                                      })
                                )

In [91]:
df_compras_trimestre_agrupado.columns

In [92]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_compras_trimestre_agrupado.columns]

In [93]:
df_compras_trimestre_agrupado = (df_compras_trimestre_agrupado
                                 .withColumnRenamed('sum(4_trimestre_2017)',
                                                    'soma_orders_4_trimestre_2017')
                                 .withColumnRenamed('sum(1_trimestre_2017)',
                                                    'soma_orders_1_trimestre_2017')
                                 .withColumnRenamed('sum(2_trimestre_2016)',
                                                    'soma_orders_2_trimestre_2016')
                                 .withColumnRenamed('sum(4_trimestre_2016)',
                                                    'soma_orders_4_trimestre_2016')
                                 .withColumnRenamed('sum(3_trimestre_2018)',
                                                    'soma_orders_3_trimestre_2018')
                                 .withColumnRenamed('sum(3_trimestre_2016)',
                                                    'soma_orders_3_trimestre_2016')
                                 .withColumnRenamed('sum(2_trimestre_2017)',
                                                    'soma_orders_2_trimestre_2017')
                                 .withColumnRenamed('sum(1_trimestre_2018)',
                                                    'soma_orders_1_trimestre_2018')
                                 .withColumnRenamed('sum(1_trimestre_2016)',
                                                    'soma_orders_1_trimestre_2016')
                                 .withColumnRenamed('sum(4_trimestre_2018)',
                                                    'soma_orders_4_trimestre_2018')
                                 .withColumnRenamed('sum(2_trimestre_2018)',
                                                    'soma_orders_2_trimestre_2018')
                                 .withColumnRenamed('sum(3_trimestre_2017)',
                                                    'soma_orders_3_trimestre_2017')
                                 .withColumnRenamed('max(order_approved_at)',
                                                    'max_order_approved_at')
                                 )

In [94]:
df_compras_trimestre_agrupado.columns

######Agrupar variáveis relativas a qtd de compras atrasadas

In [96]:
df_compras_atrasadas.columns

In [97]:
df_compras_atrasadas_agrupado = (df_compras_atrasadas
                                 .groupBy("customer_unique_id")
                                 .agg({'entrega_atrasada_ate_1_semana':'sum',
                                       'entrega_atrasada_de_1_a_2_semanas':'sum',
                                       'entrega_super_atrasada':'sum'
                                      })
                                )

In [98]:
df_compras_atrasadas_agrupado.columns

In [99]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_compras_atrasadas_agrupado.columns]

In [100]:
df_compras_atrasadas_agrupado = (df_compras_atrasadas_agrupado
                                 .withColumnRenamed('sum(entrega_atrasada_ate_1_semana)',
                                                    'soma_orders_entrega_atrasada_ate_1_semana')
                                 .withColumnRenamed('sum(entrega_atrasada_de_1_a_2_semanas)',
                                                    'soma_orders_entrega_atrasada_de_1_a_2_semanas')
                                 .withColumnRenamed('sum(entrega_super_atrasada)',
                                                    'soma_orders_entrega_super_atrasada')
                                 )

In [101]:
df_compras_atrasadas_agrupado.columns

######Agrupar variáveis relativas a qtd de compras adiantadas

In [103]:
df_compras_adiantadas.columns

In [104]:
df_compras_adiantadas_agrupadas = (df_compras_adiantadas
                                   .groupBy("customer_unique_id")
                                   .agg({'entrega_adiantada_ate_8_dias':'sum',
                                         'entrega_super_adiantada':'sum'
                                        })
                                  )

In [105]:
df_compras_adiantadas_agrupadas.columns

In [106]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_compras_adiantadas_agrupadas.columns]

In [107]:
df_compras_adiantadas_agrupadas = (df_compras_adiantadas_agrupadas
                                     .withColumnRenamed('sum(entrega_super_adiantada)',
                                                        'soma_orders_entrega_super_adiantada')
                                     .withColumnRenamed('sum(entrega_adiantada_ate_8_dias)',
                                                        'soma_orders_entrega_adiantada_ate_8_dias')
                                     )

In [108]:
df_compras_adiantadas_agrupadas.columns

######Agrupar variáveis relativas a qtd por estado

In [110]:
print(df_estados.columns)

In [111]:
df_estados_agrupados = (df_estados
                        .groupBy("customer_unique_id")
                        .agg({'SC':'sum',
                              'RO':'sum', 
                              'PI':'sum', 
                              'AM':'sum', 
                              'RR':'sum', 
                              'GO':'sum', 
                              'TO':'sum', 
                              'MT':'sum', 
                              'SP':'sum',
                              'ES':'sum',
                              'PB':'sum',
                              'RS':'sum',
                              'MS':'sum',
                              'AL':'sum',
                              'MG':'sum',
                              'PA':'sum',
                              'BA':'sum',
                              'SE':'sum',
                              'PE':'sum',
                              'CE':'sum',
                              'RN':'sum',
                              'RJ':'sum',
                              'MA':'sum',
                              'AC':'sum',
                              'DF':'sum',
                              'PR':'sum',
                              'AP':'sum'
                             })
                       )

In [112]:
print(df_estados_agrupados.columns)

In [113]:
# #ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_estados_agrupados.columns]

In [114]:
df_estados_agrupados = (df_estados_agrupados
                        .withColumnRenamed('sum(MA)',
                                           'soma_orders_MA')
                        .withColumnRenamed('sum(GO)',
                                           'soma_orders_GO')
                        .withColumnRenamed('sum(CE)',
                                           'soma_orders_CE')
                        .withColumnRenamed('sum(AC)',
                                           'soma_orders_AC')
                        .withColumnRenamed('sum(PR)',
                                           'soma_orders_PR')
                        .withColumnRenamed('sum(DF)',
                                           'soma_orders_DF')
                        .withColumnRenamed('sum(ES)',
                                           'soma_orders_ES')
                        .withColumnRenamed('sum(RJ)',
                                           'soma_orders_RJ')
                        .withColumnRenamed('sum(RO)',
                                           'soma_orders_RO')
                        .withColumnRenamed('sum(SP)',
                                           'soma_orders_SP')
                        .withColumnRenamed('sum(PB)',
                                           'soma_orders_PB')
                        .withColumnRenamed('sum(SE)',
                                           'soma_orders_SE')
                        .withColumnRenamed('sum(RR)',
                                           'soma_orders_RR')
                        .withColumnRenamed('sum(RS)',
                                           'soma_orders_RS')
                        .withColumnRenamed('sum(BA)',
                                           'soma_orders_BA')
                        .withColumnRenamed('sum(PI)',
                                           'soma_orders_PI')
                        .withColumnRenamed('sum(RN)',
                                           'soma_orders_RN')
                        .withColumnRenamed('sum(MT)',
                                           'soma_orders_MT')
                        .withColumnRenamed('sum(AM)',
                                           'soma_orders_AM')
                        .withColumnRenamed('sum(PA)',
                                           'soma_orders_PA')
                        .withColumnRenamed('sum(MG)',
                                           'soma_orders_MG')
                        .withColumnRenamed('sum(AP)',
                                           'soma_orders_AP')
                        .withColumnRenamed('sum(TO)',
                                           'soma_orders_TO')
                        .withColumnRenamed('sum(AL)',
                                           'soma_orders_AL')
                        .withColumnRenamed('sum(PE)',
                                           'soma_orders_PE')
                        .withColumnRenamed('sum(SC)',
                                           'soma_orders_SC')
                        .withColumnRenamed('sum(MS)',
                                           'soma_orders_MS')
                       )

In [115]:
print(df_estados_agrupados.columns)

####Agrupar features para orders_itens e products

In [117]:
print(df_itens_products_agrupada_customer_unique_id.columns)

In [118]:
df_itens_products_agrupada_customer_unique_id_select = (df_itens_products_agrupada_customer_unique_id
                                                       .select('customer_unique_id', 
                                                               #'order_id', 
                                                               'order_item_id', 
                                                               #'product_id', 
                                                               'price', 
                                                               #'product_category_name', 
                                                               'product_description_lenght', 
                                                               'product_photos_qty', 
                                                               'compras_com_mais_de_1_item', 
                                                               'product_category_Outros', 
                                                               'product_category_utilidades_domesticas', 
                                                               'product_category_Eletro', 
                                                               'product_category_Comida_beleza_saude', 
                                                               'product_category_Pessoal')) 
                                                   
display(df_itens_products_agrupada_customer_unique_id_select)

customer_unique_id,order_item_id,price,product_description_lenght,product_photos_qty,compras_com_mais_de_1_item,product_category_Outros,product_category_utilidades_domesticas,product_category_Eletro,product_category_Comida_beleza_saude,product_category_Pessoal
95525127999a756443df590cc5aabe88,5,130.0,1031.0,6.0,1,0,1,0,0,0
dce2ed744c77bee28e2a2491c49baa59,1,39.9,106.0,1.0,0,1,0,0,0,0
fb374d327bb02e056ee964f122594ee7,1,299.0,2043.0,5.0,0,0,1,0,0,0
f288d458425986ef2a58327fc0c6ae4c,1,115.9,262.0,1.0,0,0,1,0,0,0
8c7f90968d1dd891c54bbceea9b34696,1,39.0,2833.0,1.0,0,0,0,0,0,1
b65c0b9fab51aaa116ae66138edd6624,1,239.9,2417.0,1.0,0,0,1,0,0,0
2c57d7c4918e4227b7d39e653ae2ad6c,1,148.0,1908.0,2.0,0,0,1,0,0,0
39483d5b72e19a377671e29c4768ba5d,1,35.97,516.0,1.0,0,0,1,0,0,0
921304187180808f410aba8fe524f13e,1,109.9,1166.0,5.0,0,1,0,0,0,0
361b5c19f63c93dd2accd5e3e001bc06,1,24.9,74.0,1.0,0,0,1,0,0,0


In [119]:
df_itens_products_agrupada_customer_unique = (df_itens_products_agrupada_customer_unique_id_select
                                              .groupBy("customer_unique_id")
                                              .agg({#'order_id':'sum', 
                                                    'order_item_id':'mean', 
                                                    #'product_id':'sum', 
                                                    'price':'mean', 
                                                    #'product_category_name':'sum', 
                                                    'product_description_lenght':'mean', 
                                                    'product_photos_qty':'sum', 
                                                    'compras_com_mais_de_1_item':'sum', 
                                                    'product_category_Outros':'sum', 
                                                    'product_category_utilidades_domesticas':'sum', 
                                                    'product_category_Eletro':'sum', 
                                                    'product_category_Comida_beleza_saude':'sum', 
                                                    'product_category_Pessoal':'sum'})
                                             )
shape(df_itens_products_agrupada_customer_unique)

In [120]:
df_itens_products_agrupado = df_itens_products_agrupada_customer_unique

In [121]:
df_itens_products_agrupado.columns

In [122]:
#ajuda para renomear as colunas 
# [".withColumnRenamed('"+item+"','soma_orders_')," for item in df_itens_products_agrupado.columns]

In [123]:
df_itens_products_agrupado = (df_itens_products_agrupado
                              .withColumnRenamed('sum(product_category_utilidades_domesticas)',
                                                 'soma_orders_product_category_utilidades_domesticas')
                              .withColumnRenamed('avg(order_item_id)',
                                                 'media_itens_por_orders')
                              .withColumnRenamed('avg(price)',
                                                 'media_orders_price')
                              .withColumnRenamed('sum(product_category_Pessoal)',
                                                 'soma_orders_product_category_Pessoal')
                              .withColumnRenamed('sum(compras_com_mais_de_1_item)',
                                                 'soma_orders_compras_com_mais_de_1_item')
                              .withColumnRenamed('avg(product_description_lenght)',
                                                 'media_orders_product_description_lenght')
                              .withColumnRenamed('sum(product_category_Comida_beleza_saude)',
                                                 'soma_orders_product_category_Comida_beleza_saude')
                              .withColumnRenamed('sum(product_photos_qty)',
                                                 'soma_orders_product_photos_qty')
                              .withColumnRenamed('sum(product_category_Outros)',
                                                 'soma_orders_product_category_Outros')
                              .withColumnRenamed('sum(product_category_Eletro)',
                                                 'soma_orders_product_category_Eletro')
                             )

In [124]:
df_itens_products_agrupado.columns

####Agrupar features para orders_review

######Agrupar variáveis relativas ao score do review

In [127]:
df_score_reviews.columns

In [128]:
df_score_reviews_agrupado = (df_score_reviews
                             .groupBy("customer_unique_id")
#                              .agg(sum(col('review_score_1')).alias('sum_review_score_1'),
#                                   sum(col('review_score_2')).alias('sum_review_score_2'),
#                                   sum(col('review_score_3')).alias('sum_review_score_3'),
#                                   sum(col('review_score_4')).alias('sum_review_score_4'),
#                                   sum(col('review_score_5')).alias('sum_review_score_5')
#                                  )
                             .agg({'review_score_1':'sum',
                                   'review_score_2':'sum',
                                   'review_score_3':'sum',
                                   'review_score_4':'sum',
                                   'review_score_5':'sum'
                                  })
                            )

In [129]:
df_score_reviews_agrupado.columns

In [130]:
df_score_reviews_agrupado = (df_score_reviews_agrupado
                             .withColumnRenamed('sum(review_score_2)',
                                                "soma_review_score_2")
                             .withColumnRenamed('sum(review_score_1)',
                                                "soma_review_score_1")
                             .withColumnRenamed('sum(review_score_4)',
                                                "soma_review_score_4")
                             .withColumnRenamed('sum(review_score_5)',
                                                "soma_review_score_5")
                             .withColumnRenamed('sum(review_score_3)',
                                                "soma_review_score_3")
                            )

In [131]:
df_score_reviews_agrupado.columns

######Agrupar variáveis relativas a diferenças de datas

In [133]:
df_datediff_reviews.columns

In [134]:
df_datediff_reviews_agrupado = (df_datediff_reviews
                                .groupBy("customer_unique_id")
                                .agg({'dias_entre_delivered_e_review':'mean',
                                      'dias_entre_delivered_e_review_m':'max',
                                      'dias_receber_e_responder_review':'mean',
                                      'dias_receber_e_responder_review_m':'max'
                                      })
                               )

In [135]:
df_datediff_reviews_agrupado.columns

In [136]:
df_datediff_reviews_agrupado = (df_datediff_reviews_agrupado
                                .withColumnRenamed('avg(dias_entre_delivered_e_review)',
                                                   "media_dias_entre_delivered_e_review")
                                .withColumnRenamed('avg(dias_receber_e_responder_review)',
                                                   "media_dias_receber_e_responder_review")
                                .withColumnRenamed('max(dias_receber_e_responder_review_m)',
                                                   "max_dias_receber_e_responder_review")
                                .withColumnRenamed('max(dias_entre_delivered_e_review_m)',
                                                   "max_dias_entre_delivered_e_review")
                               )

In [137]:
df_datediff_reviews_agrupado.columns

######Agrupar variáveis relativas ao tamanho do review

In [139]:
df_length_reviews.columns

In [140]:
df_length_reviews_agrupado = (df_length_reviews
                              .groupBy("customer_unique_id")
                              .agg({'review_comment_message_length':'mean',
                                    'review_comment_title_length':'mean',
                                    'review_comment_message_length_m':'max',
                                    'review_comment_title_length_m':'max'
                                   })
                             )

In [141]:
df_length_reviews_agrupado.columns

In [142]:
df_length_reviews_agrupado = (df_length_reviews_agrupado
                              .withColumnRenamed('avg(review_comment_title_length)',
                                                 "media_review_comment_title_length")
                              .withColumnRenamed('avg(review_comment_message_length)',
                                                 "media_review_comment_message_length")
                              .withColumnRenamed('max(review_comment_title_length_m)',
                                                 "max_review_comment_title_length")
                              .withColumnRenamed('max(review_comment_message_length_m)',
                                                 "max_review_comment_message_length")
                             )

In [143]:
df_length_reviews_agrupado.columns

##Fazer o Join das features desejadas

####Listar todas as features

In [146]:
print(df_regiao_agrupado.columns)
print("")
print(df_compras_ano_agrupado.columns)
print("")
print(df_compras_trimestre_agrupado.columns)
print("")
print(df_compras_semestre_agrupado.columns)
print("")
print(df_compras_atrasadas_agrupado.columns)
print("")
print(df_compras_adiantadas_agrupadas.columns)
print("")
print(df_estados_agrupados.columns)
print("")
print(df_score_reviews_agrupado.columns)
print("")
print(df_datediff_reviews_agrupado.columns)
print("")
print(df_length_reviews_agrupado.columns)
print("")

####Unir todas as features

In [148]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
"""
df_tabelao = (df_regiao_agrupado
              .join(df_compras_ano_agrupado, 
                    df_regiao_agrupado['customer_unique_id'] == df_compras_ano_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_compras_ano_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [149]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado
"""
df_tabelao = (df_tabelao
              .join(df_compras_trimestre_agrupado, 
                    df_tabelao['customer_unique_id'] == df_compras_trimestre_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_compras_trimestre_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [150]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
"""
df_tabelao = (df_tabelao
              .join(df_compras_semestre_agrupado, 
                    df_tabelao['customer_unique_id'] == df_compras_semestre_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_compras_semestre_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [151]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
com df_compras_atrasadas_agrupado
"""
df_tabelao = (df_tabelao
              .join(df_compras_atrasadas_agrupado, 
                    df_tabelao['customer_unique_id'] == df_compras_atrasadas_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_compras_atrasadas_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [152]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
com df_compras_atrasadas_agrupado
com df_compras_adiantadas_agrupadas
"""
df_tabelao = (df_tabelao
              .join(df_compras_adiantadas_agrupadas, 
                    df_tabelao['customer_unique_id'] == df_compras_adiantadas_agrupadas['customer_unique_id'],
                    how='left')
              .drop(df_compras_adiantadas_agrupadas['customer_unique_id'])
              .repartition(2)
              .cache())

In [153]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
com df_compras_atrasadas_agrupado
com df_compras_adiantadas_agrupadas
com df_estados_agrupados
"""
df_tabelao = (df_tabelao
              .join(df_estados_agrupados, 
                    df_tabelao['customer_unique_id'] == df_estados_agrupados['customer_unique_id'],
                    how='left')
              .drop(df_estados_agrupados['customer_unique_id'])
              .repartition(2)
              .cache())

In [154]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
com df_compras_atrasadas_agrupado
com df_compras_adiantadas_agrupadas
com df_estados_agrupados
com df_score_reviews_agrupado
"""
df_tabelao = (df_tabelao
              .join(df_score_reviews_agrupado, 
                    df_tabelao['customer_unique_id'] == df_score_reviews_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_score_reviews_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [155]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
com df_compras_atrasadas_agrupado
com df_compras_adiantadas_agrupadas
com df_estados_agrupados
com df_score_reviews_agrupado
com df_datediff_reviews_agrupado
"""
df_tabelao = (df_tabelao
              .join(df_datediff_reviews_agrupado, 
                    df_tabelao['customer_unique_id'] == df_datediff_reviews_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_datediff_reviews_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [156]:
"""Une as tabelas 
(df_regiao_agrupado e df_regiao_agrupado) 
com df_compras_trimestre_agrupado) 
com df_compras_semestre_agrupado
com df_compras_atrasadas_agrupado
com df_compras_adiantadas_agrupadas
com df_estados_agrupados
com df_score_reviews_agrupado
com df_datediff_reviews_agrupado
com df_length_reviews_agrupado
"""
df_tabelao = (df_tabelao
              .join(df_length_reviews_agrupado, 
                    df_tabelao['customer_unique_id'] == df_length_reviews_agrupado['customer_unique_id'],
                    how='left')
              .drop(df_length_reviews_agrupado['customer_unique_id'])
              .repartition(2)
              .cache())

In [157]:
shape(df_tabelao)

##Calcular a target baseada no BC

In [159]:
def gera_targets(df_principal):
  df_with_targets = (df_principal
                     .withColumn('quantidade_mes_ultma_compra',
                                 #ultima_data_compras_aprovadas
                                 months_between(to_date(lit("2018-09-03")),
                                                'max_order_approved_at'))
                     .withColumn("target",
                                  when(#(col("quantidade_mes_ultma_compra") > 8)&
                                       ((col('soma_review_score_1')>0)|
                                        (col('soma_review_score_2')>0)|
                                        (col('soma_review_score_3')>0)),1)
                                  .otherwise(0))
                    )
  df_with_targets = (df_with_targets
                     .drop('max_order_approved_at')
                     .drop('quantidade_mes_ultma_compra')
                     .drop('soma_review_score_2')
                     .drop('soma_review_score_1')
                     .drop('soma_review_score_4')
                     .drop('soma_review_score_5')
                     .drop('soma_review_score_3')
                     #.drop('customer_unique_id')
                    )
  
  return df_with_targets
#shape(gera_targets(df_tabelao))

##Criar a Analytical Base Table (ABT)
ABT  é uma tabela de dados que você cria com base nas suas consultas aos bancos de dados e é usada para realizar análises estatísticas. Com base nas ABT’s você pode criar modelos analíticos, gerar informações e chegar a conclusões para os problemas de negócios.

In [161]:
abt = gera_targets(df_tabelao)

####Verificar a proporção da target

In [163]:
abt.select("target").groupBy("target").count().show()

####Verificar nulos na ABT

In [165]:
# for col in abt.columns:
#   qtd_nulos = abt.filter(abt[col].isNull()).count()
#   total = abt.count()
#   print(f'{col} possui {qtd_nulos} dados nulos, {round(qtd_nulos/total*100,2)}%')

######Trocar nulos por zeros na ABT

In [167]:
#coloca 0 nos nulos
abt_c_null = abt
abt = abt.fillna(0)

####Salvar a ABT

In [169]:
#esta etapa de salvar a abt com o customer unique idfoi para poder add features novas na entrega 3
abt_c_null.write.mode("overwrite").parquet('/FileStore/olist/ENTREGA_2/0_bases/ABT_com_nulo_e_unique_id.parquet')

In [170]:
# a abt não pode ter unique id por isso vamos colocar aqui sem
abt = abt.drop('customer_unique_id')
abt_c_null = abt_c_null.drop('customer_unique_id')

In [171]:
abt_c_null.write.mode("overwrite").parquet('/FileStore/olist/ENTREGA_2/0_bases/ABT_com_nulo.parquet')
df_tabelao.write.mode("overwrite").parquet('/FileStore/olist/ENTREGA_2/0_bases/DataFrame_completo_para_modelo.parquet')
abt.write.mode("overwrite").parquet('/FileStore/olist/ENTREGA_2/0_bases/ABT_nulos_replace_zero_entrega2.parquet')

In [172]:
%fs
ls /FileStore/olist

path,name,size
dbfs:/FileStore/olist/ENTREGA_2/,ENTREGA_2/,0
dbfs:/FileStore/olist/ENTREGA_3/,ENTREGA_3/,0


In [173]:
%fs
ls /FileStore/olist/ENTREGA_2/

path,name,size
dbfs:/FileStore/olist/ENTREGA_2/0_bases/,0_bases/,0
dbfs:/FileStore/olist/ENTREGA_2/1_modelos/,1_modelos/,0


In [174]:
%fs
ls /FileStore/olist/ENTREGA_2/0_bases/

path,name,size
dbfs:/FileStore/olist/ENTREGA_2/0_bases/ABT_com_nulo_e_unique_id.parquet/,ABT_com_nulo_e_unique_id.parquet/,0
dbfs:/FileStore/olist/ENTREGA_2/0_bases/ABT_nulos_replace_zero_entrega2.parquet/,ABT_nulos_replace_zero_entrega2.parquet/,0
dbfs:/FileStore/olist/ENTREGA_2/0_bases/DataFrame_completo_para_modelo.parquet/,DataFrame_completo_para_modelo.parquet/,0


##Modelagem

####Criar um vetor com as features chamado 'features'
VectorAssembler é um transformador que combina uma determinada lista de colunas em um unico vetor coluna. 
É útil para combinar features brutas e as features geradas por diferentes transformadores em um único vetor de features, a fim de treinar modelos de ML, como regressão logística e árvores de decisão. 
VectorAssembler aceita os seguintes tipos de coluna de entrada: todos os tipos numéricos, tipo booleano e tipo de vetor. Em cada linha, os valores das colunas de entrada serão concatenados em um vetor na ordem especificada.

In [177]:
assembler = VectorAssembler(
    inputCols = abt.columns[:-1] , 
    outputCol = 'features'
)
df_assembler = assembler.transform(abt)

In [178]:
df_assembler = assembler.transform(abt)

In [179]:
#conferindo que foi gerado sem problemas
display(df_assembler.select('features'))

features
"List(0, 66, List(1, 6, 14, 20, 38, 58, 59, 60, 61, 62, 65), List(1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, -1.0, 2.0, 57.0, 57.0))"
"List(0, 66, List(1, 7, 18, 25, 30, 40), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"List(0, 66, List(1, 7, 15, 25, 30, 40, 58, 59, 60, 61), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, -1.0, 2.0))"
"List(0, 66, List(1, 7, 18, 25, 38, 58, 59, 60, 61), List(1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, -1.0, 2.0))"
"List(0, 66, List(3, 6, 19, 23, 35, 58, 61), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
"List(0, 66, List(1, 7, 18, 25, 29, 40, 58, 59, 60, 61, 62, 63, 64, 65), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, -1.0, 2.0, 116.0, 8.0, 8.0, 116.0))"
"List(0, 66, List(4, 6, 14, 20, 50, 58, 61, 62, 65), List(1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 61.0, 61.0))"
"List(0, 66, List(1, 7, 18, 25, 51, 58, 61, 62, 63, 64, 65), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 53.0, 17.0, 17.0, 53.0))"
"List(0, 66, List(2, 6, 19, 23, 30, 45, 58, 59, 60, 61), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 5.0, 4.0, -4.0, 5.0))"
"List(0, 66, List(1, 7, 15, 25, 29, 40, 58, 59, 60, 61), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, -1.0, 2.0))"


####Dividir a base 70% treino, 30% teste

In [181]:
treino, teste = df_assembler.randomSplit([0.7, 0.3], seed=42)

In [182]:
treino.write.mode("overwrite").parquet('/FileStore/olist/ENTREGA_2/0_bases/treino_entrega2.parquet')
teste.write.mode("overwrite").parquet('/FileStore/olist/ENTREGA_2/0_bases/teste_entrega2.parquet')
#validacao.write.mode("overwrite").parquet('/FileStore/olist/validacao_entrega2.parquet')

In [183]:
shape(treino)

In [184]:
shape(teste)

In [185]:
#shape(validacao)

##Regressão Logistica
A Regressão Logística faz parte de uma família de modelos chamada Modelos Lineares Generalizados (GLM) e é adequada quando a variável de interesse (resposta) é binária, isto é, “sim” ou “não”. Através da Regressão Logística é possível avaliar os fatores de influenciam a ocorrência de determinado evento.

In [187]:
#Cria o modelo 
reglog = LogisticRegression(maxIter=10, regParam=0.1, labelCol='target')

In [188]:
#guarda o modelo treinado
modelo = reglog.fit(treino)

#### Informacões do modelo

In [190]:
print(f"intercepto = {modelo.intercept}")

In [191]:
dicionario = {'colunas':abt.columns[:-1],'coeficiente':modelo.coefficients}

In [192]:
df_feature_importance = pd.DataFrame(dicionario)
df_feature_importance['coeficiente_absoluto'] = df_feature_importance['coeficiente'].apply(lambda x:abs(x))
df_feature_importance.sort_values('coeficiente_absoluto',ascending=False)

Unnamed: 0,colunas,coeficiente,coeficiente_absoluto
28,soma_orders_entrega_atrasada_de_1_a_2_semanas,2.019866,2.019866
27,soma_orders_entrega_super_atrasada,1.781707,1.781707
26,soma_orders_entrega_atrasada_ate_1_semana,1.296418,1.296418
13,soma_orders_3_trimestre_2016,1.139707,1.139707
52,soma_orders_AP,-0.273890,0.273890
43,soma_orders_RR,0.222794,0.222794
15,soma_orders_1_trimestre_2018,0.166895,0.166895
31,soma_orders_MA,0.148537,0.148537
49,soma_orders_AM,-0.132489,0.132489
47,soma_orders_RN,-0.118406,0.118406


#### Usar o modelo no teste e na validacao

In [194]:
predicoes = modelo.evaluate(teste)

In [195]:
#predicoes2 = modelo.evaluate(validacao)

#### Avaliar

In [197]:
# Recupera o resumo do treino realizado
from pyspark.ml.regression import LinearRegressionTrainingSummary
resumo_treino = modelo.summary

In [198]:
# Recuperar as métricas
print(f'+-------------|----------------------------------------------------+')
print(f'|areaUnderROC |treino < {resumo_treino.areaUnderROC} >                       |')
print(f'|Acurácia     |treino < {resumo_treino.accuracy} >                       |')
print(f'|Precision    |treino < {resumo_treino.precisionByLabel} >  |')
print(f'|Recall       |treino < {resumo_treino.recallByLabel} >|')
print(f'+-------------|----------------------------------------------------+')

In [199]:
# Recuperar as métricas
print(f'+-------------|-----------------------------------------------------|')#+--------------------------------------------------------+')
print(f'|areaUnderROC | teste < {predicoes.areaUnderROC} >                        |')#| validacao < {predicoes2.areaUnderROC} >                       |')
print(f'|Acurácia     | teste < {predicoes.accuracy} >                        |')#| validacao < {predicoes2.accuracy} >                       |')
print(f'|Precision    | teste < {predicoes.precisionByLabel} >  |')#| validacao < {predicoes2.precisionByLabel} > |')
print(f'|Recall       | teste < {predicoes.recallByLabel} > |')#| validacao < {predicoes2.recallByLabel} > |')
print(f'+-------------|-----------------------------------------------------|')#+--------------------------------------------------------+')

#### Salvar modelo

In [201]:
#dbutils.fs.rm('/FileStore/olist/ENTREGA_2/1_modelos/LogisticRegression_entrega2',True)

In [202]:
%fs
ls /FileStore/olist/ENTREGA_2/0_bases/

path,name,size
dbfs:/FileStore/olist/ENTREGA_2/0_bases/ABT_com_nulo_e_unique_id.parquet/,ABT_com_nulo_e_unique_id.parquet/,0
dbfs:/FileStore/olist/ENTREGA_2/0_bases/ABT_nulos_replace_zero_entrega2.parquet/,ABT_nulos_replace_zero_entrega2.parquet/,0
dbfs:/FileStore/olist/ENTREGA_2/0_bases/DataFrame_completo_para_modelo.parquet/,DataFrame_completo_para_modelo.parquet/,0
dbfs:/FileStore/olist/ENTREGA_2/0_bases/teste_entrega2.parquet/,teste_entrega2.parquet/,0
dbfs:/FileStore/olist/ENTREGA_2/0_bases/treino_entrega2.parquet/,treino_entrega2.parquet/,0


In [203]:
modelo.save('/FileStore/olist/ENTREGA_2/1_modelos/LogisticRegression_entrega2')

##Árvore Decisão
Uma árvore de decisão é um mapa dos possíveis resultados de uma série de escolhas relacionadas. Permite que um indivíduo ou organização compare possíveis ações com base em seus custos, probabilidades e benefícios. Podem pode ser usadas tanto para conduzir diálogos informais quanto para mapear um algoritmo que prevê a melhor escolha, matematicamente.

In [205]:
evaluator = BinaryClassificationEvaluator(labelCol='target', metricName='areaUnderROC')

In [206]:
decision_tree = DecisionTreeClassifier(featuresCol='features', labelCol = 'target', maxDepth=5, impurity='gini')

In [207]:
modelo2 = decision_tree.fit(treino)

#### Informacões do modelo

In [209]:
predicoes_treino = modelo2.transform(treino)

In [210]:
colunas = [abt.columns[index] for index in modelo2.featureImportances.indices]
coeficiente = modelo2.featureImportances.values

In [211]:
dicionario2 = {'indices':modelo2.featureImportances.indices,'colunas':colunas,'coeficiente':coeficiente}

In [212]:
df_feature_importance2 = pd.DataFrame(dicionario2)

In [213]:
df_feature_importance2.sort_values("coeficiente",ascending=False)

Unnamed: 0,indices,colunas,coeficiente
6,58,media_dias_entre_delivered_e_review,0.49228
11,65,max_review_comment_message_length,0.428912
3,30,soma_orders_entrega_super_adiantada,0.031893
2,29,soma_orders_entrega_adiantada_ate_8_dias,0.0243
1,26,soma_orders_entrega_atrasada_ate_1_semana,0.007156
8,61,max_dias_entre_delivered_e_review,0.006564
9,62,media_review_comment_message_length,0.005144
10,63,media_review_comment_title_length,0.001291
7,59,media_dias_receber_e_responder_review,0.001046
4,47,soma_orders_RN,0.000608


###### Visualizar a árvore gerada

In [215]:
display(modelo2)

treeNode
"{""index"":21,""featureType"":""continuous"",""prediction"":null,""threshold"":0.75,""categories"":null,""feature"":58,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":0.5,""categories"":null,""feature"":30,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":0.5,""categories"":null,""feature"":29,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":0.5,""categories"":null,""feature"":61,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":80.5,""categories"":null,""feature"":65,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":15,""featureType"":""continuous"",""prediction"":null,""threshold"":80.5,""categories"":null,""feature"":65,""overflow"":false}"


####usar o modelo no teste e na validacao

In [217]:
predicoes_teste = modelo2.transform(teste)

In [218]:
#predicoes_validacao = modelo2.transform(validacao)

#### Avaliar

In [220]:
areaUnderROC1 = evaluator.evaluate(predicoes_treino)
areaUnderROC2 = evaluator.evaluate(predicoes_teste)
#areaUnderROC3 = evaluator.evaluate(predicoes_validacao)

In [221]:
print(f'areaUnderROC treino    : {areaUnderROC1}')
print(f'areaUnderROC teste     : {areaUnderROC2}')
#print(f'areaUnderROC validacao : {areaUnderROC3}')

In [222]:
#ToDo: outras metricas

#### Salvar modelo

In [224]:
#dbutils.fs.rm('/FileStore/olist/ENTREGA_2/1_modelos/DecisionTreeClassifier_entrega2',True)

In [225]:
modelo2.save('/FileStore/olist/ENTREGA_2/1_modelos/DecisionTreeClassifier_entrega2')

##Comparar os dois modelos

In [227]:
df_comparacao = spark.createDataFrame([("treino", resumo_treino.areaUnderROC,areaUnderROC1),
                                       ("teste", predicoes.areaUnderROC,areaUnderROC2)#, 
                                       #("validacao", predicoes2.areaUnderROC,areaUnderROC3)
                                      ],
                                      ["Base", 
                                       "log_reg", 
                                       "DecisionTree"])

In [228]:
display(df_comparacao)

Base,log_reg,DecisionTree
treino,0.7611568022802209,0.3136852607823182
teste,0.7607256347407678,0.312676866533393
