<a href="https://colab.research.google.com/github/fernandabomtorin16/churn_model/blob/main/data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.82)] [Waiting for headers] [W                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.82)] [Waiting for headers] [W                                                                               Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://pp

In [3]:
import os

# Defina as variáveis de ambiente para o Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [9]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pyarrow.parquet as pq
import dask.dataframe as dd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

spark = (
    SparkSession.builder
    .appName("Colab PySpark Session")
    .config("spark.driver.memory", "4g")  # Opcional: ajustar a memória
    .getOrCreate()
)
spark

# Extração dos dados e análise exploratória
- Loading dos arquivos parquet, referente as bases de dados do problema
- Base de transações, membros e logs
- Entendimento dos dados disponiveis

In [10]:
# Loading das bases de dados
df_transactions = spark.read.parquet("/content/drive/MyDrive/Data Master/transactions.parquet").persist()
df_members = spark.read.parquet("/content/drive/MyDrive/Data Master/members.parquet").persist()
df_logs = spark.read.parquet("/content/drive/MyDrive/Data Master/user_logs.parquet").persist()

### Base de transações

In [11]:
# Transformação das colunas de data
df_transactions = df_transactions.select(
    F.col('msno'),
    F.col('payment_method_id').cast(DoubleType()),
    F.col('payment_plan_days').cast(DoubleType()),
    F.col('plan_list_price').cast(DoubleType()),
    F.col('actual_amount_paid').cast(DoubleType()),
    F.col('is_auto_renew').cast(IntegerType()),
    F.col('transaction_date'),
    F.col('membership_expire_date'),
    F.col('is_cancel').cast(IntegerType()),
    F.col('safra')
).withColumn(
    'transaction_date',
    F.to_date(F.concat(
                F.col('transaction_date').substr(1, 4),
                F.lit('-'),
                F.col('transaction_date').substr(5, 2),
                F.lit('-'),
                F.col('transaction_date').substr(7, 2)), 'yyyy-MM-dd')
).withColumn(
    'membership_expire_date',
    F.to_date(F.concat(
                F.col('membership_expire_date').substr(1, 4),
                F.lit('-'),
                F.col('membership_expire_date').substr(5, 2),
                F.lit('-'),
                F.col('membership_expire_date').substr(7, 2)), 'yyyy-MM-dd')
).withColumn(
    'safra',
    F.to_date(
        F.concat(
            F.col('safra').substr(1, 4),
            F.lit('-'),
            F.col('safra').substr(5, 2),
            F.lit('-'),
            F.lit('01')), 'yyyy-MM-dd')
)

In [None]:
df_transactions.show(5, truncate=False)

+--------------------------------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+----------+
|msno                                        |payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|transaction_date|membership_expire_date|is_cancel|safra     |
+--------------------------------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+----------+
|+++IZseRRiQS9aaSkH6cMYU6bGDcxUieAi/tH67sC5s=|38.0             |410.0            |1788.0         |1788.0            |0            |2015-11-21      |2017-01-04            |0        |2015-11-01|
|+++snpr7pmobhLKUgSHTv/mpkqgBT0tQJ0zQj6qKrqc=|41.0             |30.0             |149.0          |149.0             |1            |2015-05-26      |2015-06-26            |0        |2015-05-01|
|+++snpr7pmobhLKUgSHTv/mpkqgBT0tQJ0

In [None]:
df_transactions.summary().show()

+-------+--------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|summary|                msno| payment_method_id| payment_plan_days|   plan_list_price|actual_amount_paid|     is_auto_renew|          is_cancel|
+-------+--------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|  count|            20712225|          20712225|          20712225|          20712225|          20712225|          20712225|           20712225|
|   mean|                NULL|38.926502005458126|31.428997850303382|140.24088353617248|142.83468555406287|0.8543830998359664|0.03287691206521752|
| stddev|                NULL|3.5062855923423513|30.559829808399385|132.27679209056055|133.60944276809502|0.3527217381412334| 0.1783143916048489|
|    min|+++FOrTS7ab3tIgIh...|               1.0|               0.0|               0.0|               0.0|                 0

In [None]:
df_transactions.groupBy('safra').count().orderBy(F.desc('safra')).show(100)

+----------+-------+
|     safra|  count|
+----------+-------+
|2017-02-01| 885566|
|2017-01-01| 988576|
|2016-12-01| 968547|
|2016-11-01|1094941|
|2016-10-01|1033898|
|2016-09-01| 982640|
|2016-08-01| 966450|
|2016-07-01| 924032|
|2016-06-01| 804729|
|2016-05-01| 783956|
|2016-04-01| 774169|
|2016-03-01| 775469|
|2016-02-01| 792300|
|2016-01-01| 856716|
|2015-12-01| 861107|
|2015-11-01| 820345|
|2015-10-01| 680465|
|2015-09-01| 714610|
|2015-08-01| 705975|
|2015-07-01| 665280|
|2015-06-01| 775737|
|2015-05-01| 571552|
|2015-04-01| 564582|
|2015-03-01| 626488|
|2015-02-01| 545303|
|2015-01-01| 548792|
+----------+-------+



In [None]:
print('Total de clientes: ')
print(df_transactions.select(F.countDistinct('msno')).collect()[0][0])
print('Total de transações: ')
print(df_transactions.count())

Total de clientes: 
2363626
Total de transações: 
20712225


In [None]:
cancelamento = df_transactions.groupBy('safra').agg(
    F.mean('is_cancel').alias('percentual_cancelamento')
)
print('Média de cancelamentos mensalmente: ', cancelamento.agg(F.mean('percentual_cancelamento')).collect()[0][0])

Média de cancelamentos mensalmente:  0.03374890214137594


Observações sobre os dados de transacções:
- Nenhum valor missing
- Transações de 01-2015 à 12-2027
- 3,2% das transações foram cancelamento
- Em 85% das transações, o cliente esta com renovação automatica acionada

### Base de membros

In [None]:
df_members = df_members.select(
    F.col('msno'),
    F.col('city').cast(IntegerType()),
    F.col('bd').cast(IntegerType()),
    F.col('gender'),
    F.col('registered_via').cast(IntegerType()),
    F.col('registration_init_time'),
    F.col('is_ativo').cast(IntegerType()),
    F.col('safra')
).withColumn(
    'safra',
    F.to_date(
        F.concat(
            F.col('safra').substr(1, 4),
            F.lit('-'),
            F.col('safra').substr(5, 2),
            F.lit('-'),
            F.lit('01')), 'yyyy-MM-dd')
).withColumn(
    'registration_init_time',
    F.to_date(
        F.concat(
            F.col('registration_init_time').substr(1, 4),
            F.lit('-'),
            F.col('registration_init_time').substr(5, 2),
            F.lit('-'),
            F.col('registration_init_time').substr(7, 2)), 'yyyy-MM-dd')
)

In [8]:
df_members.summary().show()

+-------+--------------------+-----------------+------------------+--------+------------------+-------------------+
|summary|                msno|             city|                bd|  gender|    registered_via|           is_ativo|
+-------+--------------------+-----------------+------------------+--------+------------------+-------------------+
|  count|            63867246|         63867246|          63867246|25657069|          63867246|           63867246|
|   mean|                NULL|4.369941346774214|11.430160210759675|    NULL| 5.455677343594869|0.17603491154135564|
| stddev|                NULL|5.807596122789123|19.041318387984273|    NULL|2.4953863289564757| 0.3808498703308826|
|    min|+++4vcS9aMH7KWdfh...|                1|             -7168|  female|                -1|                  0|
|    25%|                NULL|                1|                 0|    NULL|                 3|                  0|
|    50%|                NULL|                1|                 0|    N

In [None]:
df_members.count()

63867246

Gender
* 40% de missing para gender. Por se tratar de uma quantidade muito grande, crio uma nova categoria para os casos em que nao foi informado o gênero.

In [None]:
# Tratamento de missing para gender
df_members = df_members.na.fill({'gender': 'nao_informado'})

In [None]:
df_members.groupBy('safra').count().show()

+----------+-------+
|     safra|  count|
+----------+-------+
|2016-03-01|4697972|
|2016-01-01|4294184|
|2016-08-01|5615921|
|2016-09-01|5778986|
|2016-05-01|5060705|
|2016-04-01|4876301|
|2016-11-01|6114345|
|2016-07-01|5439337|
|2016-12-01|6287789|
|2016-02-01|4502354|
|2016-10-01|5949288|
|2016-06-01|5250064|
+----------+-------+



In [None]:
df_members.filter("bd < 0").count()

3078

In [None]:
df_members.filter("bd > 100").count()

56654

In [None]:
# Separacao de 10 percentiles de bd para analise os valores
# df_members = df_members.withColumn("bd_quantile", F.ntile(20).over(Window.orderBy("bd")))

In [None]:
df_members.groupBy("bd_quantile").agg(F.min("bd"), F.max("bd"), F.mean('bd'), F.median('bd')).show()

+-----------+-------+-------+--------------------+----------+
|bd_quantile|min(bd)|max(bd)|             avg(bd)|median(bd)|
+-----------+-------+-------+--------------------+----------+
|          1|  -7168|      0|-0.33433906511724476|       0.0|
|          2|      0|      0|                 0.0|       0.0|
|          3|      0|      0|                 0.0|       0.0|
|          4|      0|      0|                 0.0|       0.0|
|          5|      0|      0|                 0.0|       0.0|
|          6|      0|      0|                 0.0|       0.0|
|          7|      0|      0|                 0.0|       0.0|
|          8|      0|      0|                 0.0|       0.0|
|          9|      0|      0|                 0.0|       0.0|
|         10|      0|      0|                 0.0|       0.0|
|         11|      0|      0|                 0.0|       0.0|
|         12|      0|      0|                 0.0|       0.0|
|         13|      0|     18|  10.541769144869889|      16.0|
|       

In [None]:
# bd_values = df_members.select('bd').rdd.flatMap(lambda x: x).collect()
# percentile_99_value = np.percentile(bd_values, 99)
# percentile_01_value = np.percentile(bd_values, 1)
# percentile_99_value = 51
# percentile_01_value = 0

In [None]:
# Substituição dos valores outliers de bd
# df_members = df_members.withColumn(
#     'bd',
#     F.when(F.col('bd') < percentile_01_value, percentile_01_value)
#      .when(F.col('bd') > percentile_99_value, percentile_99_value)
#      .otherwise(F.col('bd'))
# )

In [None]:
print('Total de clientes: ', df_members.select(F.countDistinct('msno')).collect()[0][0])
print('Total de membros: ', df_members.count())

Total de clientes:  6287789
Total de membros:  63867246


### Observações sobre a base de membros:
- 63MM de clientes
- Apenas 17% dos membros estao ativos

### Base logs

In [10]:
df_logs.show(5)

+--------------------+------+------+------+------+-------+-------+-------+----------+
|                msno| safra|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+------+------+------+------+-------+-------+-------+----------+
|SwlrSivYHoKF9V5wm...|201701| 121.0|  28.0|  14.0|   29.0|  704.0|  827.0|184606.903|
|rE5wSmHEF1Dhu55zh...|201605|  26.0|   2.0|   5.0|    6.0|  462.0|  256.0|119439.485|
|hx+cyaQ/Jcdr/Z5fo...|201611| 161.0|  71.0|  49.0|   34.0|  668.0|  891.0|204791.242|
|53QW6B70J23X2UCvx...|201502|  37.0|   9.0|   3.0|    9.0|  408.0|  447.0|101186.041|
|/0S1N/oRyxGLZlzxn...|201506| 205.0|  49.0|  23.0|   21.0|  225.0|  489.0| 69957.524|
+--------------------+------+------+------+------+-------+-------+-------+----------+
only showing top 5 rows



In [11]:
df_logs = df_logs.select(
    F.col('msno'),
    F.col('num_25').cast(DoubleType()),
    F.col('num_50').cast(DoubleType()),
    F.col('num_75').cast(DoubleType()),
    F.col('num_985').cast(DoubleType()),
    F.col('num_100').cast(DoubleType()),
    F.col('num_unq').cast(DoubleType()),
    F.col('total_secs').cast(DoubleType()),
    F.col('safra')
).withColumn(
    'safra',
    F.to_date(
        F.concat(
            F.col('safra').substr(1, 4),
            F.lit('-'),
            F.col('safra').substr(5, 2),
            F.lit('-'),
            F.lit('01')), 'yyyy-MM-dd')
)

In [None]:
df_logs.summary().show()

+-------+--------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+--------------------+
|summary|                msno|            safra|           num_25|           num_50|            num_75|           num_985|          num_100|           num_unq|          total_secs|
+-------+--------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+--------------------+
|  count|            26758971|         26758971|         26758971|         26758971|          26758971|          26758971|         26758971|          26758971|            26758971|
|   mean|                NULL|201572.0403868669|95.42601163549973|24.03566975725636|14.903521327483034|16.538671797207748|450.1598216538297|440.91930526775485|-2.11717488634742...|
| stddev|                NULL|61.87618074610147|175.2882595645691|39.12554591867812|  22.626721

# O Case
Dados históricos de dois anos de clientes de uma empresa que oferece streaming de musica baseado em assinatura.
Quando os usuários se inscrevem no serviço, eles podem optar por renovar o serviço manualmente ou renovar automaticamente, podendo cancelar ativamente sua associação à qualquer momento.
## Modelo de Churn
Criação de um modelo classificador para prever clientes que serão churn 3 meses no futuro (ou seja, clientes que possuem assinatura ativa no periodo analisado e 3 mses depois desse período ele não esta mais ativo, ou porque cancelou ou não renovou a assinatura) e identifique os clientes que serão direcionados para a ação de forma proativa.

# Definição de público
- Clientes que possuem assinatura ativa
- Até 2016-09-01 para que possuam apenas clientes que possamos saber o comportamento 3 meses para a frente
- Na base de membros, temos a informacao dos membros ativos, e portanto escolho essa base para selecionar o público

In [14]:
spine = df_members.filter(
    (F.col('is_ativo') == 1) &
    (F.col('safra') < F.lit('2016-09-01'))
).drop('is_ativo').dropDuplicates()

In [None]:
spine.count()

7153111

In [None]:
spine.groupBy('msno').count().orderBy(F.desc('count')).show(5, truncate=False)

+--------------------------------------------+-----+
|msno                                        |count|
+--------------------------------------------+-----+
|+f7M2fwOgbZKfdDo7U5yF2Tn9SCxtV4TVzSydzrQI40=|8    |
|+Pn5eh2MY/PSbCwd1OW+xxZkjcgXETMtzfMf/fcvywI=|8    |
|+QrgUC+7auY8Pf0w2wi6p3nbyA9QSVySmdhAVu/wYZ8=|8    |
|+YOJTYCPOTYfnWWrwjYK9bEZjWVG++pVbCeTq0eGMhA=|8    |
|+QXJIHj6VYFkRpplljnYC+btUq/8MtESX5ng9nIff0U=|8    |
+--------------------------------------------+-----+
only showing top 5 rows



In [None]:
spine.show(5, truncate=False)

+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+
|msno                                        |city|bd |gender       |registered_via|registration_init_time|safra     |
+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+
|++Bks8kE9oclzxZM3hcWs+qzsxuoXFeIE1+7pxKBCQg=|1   |0  |nao_informado|7             |2016-07-30            |2016-07-01|
|++OvJH5FmfZ5CRrYfmbQEk7tJwCZhsJnkWbxClRaUpw=|1   |0  |nao_informado|7             |2013-11-14            |2016-02-01|
|++aVm+4v87MXzbhiEXoS7OszzRxcfny8aKalISGUR9I=|15  |32 |male         |9             |2011-11-03            |2016-05-01|
|++kosgi4V03jOxcBKjM/9tPignUOxcc7jBVnZLJ+lX0=|1   |0  |nao_informado|7             |2015-07-10            |2016-07-01|
|+/EF6xPAo8HLjhC7K8VBWUkXnCYi7KYIaNf071llMV0=|5   |35 |female       |9             |2012-05-16            |2016-02-01|
+--------------------------------------------+--

# Definição da target
- Churn ou não churns, 3 meses no futuro
- Todas as variaveis sobre o cliente, estão em uma visao 3 meses antes da target
- A partir da spine, com os clientes ativos em cada safra, é feito um cruzamento buscando o cliente na safra 3 meses para o futuro
- Cria-se a variável target, 1 se o cliente estiver ativo, 0 se nao estiver ativo (churn)

In [15]:
df_members_aliased = df_members.select('msno', 'safra', 'is_ativo').withColumnRenamed('safra', 'safra_target').withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.col('safra_target')))
master = spine.withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.add_months(F.col('safra'), 3)))
master = master.join(df_members_aliased.select('chave', 'is_ativo'),on='chave', how='left').drop('chave').withColumnRenamed('is_ativo', 'target')

In [None]:
master.show(5, truncate=False)

+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+------+
|msno                                        |city|bd |gender       |registered_via|registration_init_time|safra     |target|
+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+------+
|+TX/zz/YToedCiHQ8OaMmRDwaGF4LU1aaNOXeo408VM=|1   |0  |nao_informado|7             |2012-05-24            |2016-06-01|1     |
|/8HWkgjEG95U3+82KEXrWRcrmkUl62cEUeg8d37j7dQ=|22  |32 |male         |3             |2016-01-14            |2016-05-01|0     |
|+a5M+g3yn6JJoUL0JLg6075GbJZBsBJ4z3FkQ2HJgBU=|1   |0  |nao_informado|7             |2016-04-09            |2016-08-01|1     |
|+mM0jyxAgO+WXyjjj1T0lIs4vYbG0zScKm/ifZ6sQYc=|1   |0  |nao_informado|7             |2014-12-20            |2016-08-01|1     |
|+2BZVWQhXddu49omjb2OOjOwVv0MTtj5l9Io4n+IAD4=|4   |54 |male         |9             |2016-01-15            |2016-03-01|

In [16]:
master.groupBy('target').count().show()

+------+-------+
|target|  count|
+------+-------+
|     1|6134030|
|     0|1019081|
+------+-------+



In [21]:
# Obtendo uma amostra, mantendo a proporção da target
fractions = {0: 0.01, 1: 0.01}

df_sample = master.sampleBy("target", fractions, seed=42)

In [None]:
# Verifique a nova distribuição
df_sample.groupBy("target").count().show()

Target desbalanceada:
- Há uma quantidade bem maior de clientes que continuaram ativos (1), que os clientes churn. Dessa forma, na hora de criar o modelo, é interessante utilizar pesos para balancear a target.

## *Dúvidas / pontos de atenção*

1.   *será necessário apenas um cliente na base?* -> imagino que nao seria um problema o cliente aparecer em mais de um mes, de forma que estamos capturando o comportamente desse cliente no mes, por isso é necessário trazer as features sobre a transação e logs
2.   *estaria certo considerara apenas o ano de 2016?* -> Por hora, entendo que a target deve ser o cliente ativo ou nao, informação que é possivel de encontrar apenas na base de members, e portanto, temos apenas informações de 2016



# Feature Engineering
- Criação de novas features

####**Transações:**

*variaveis uteis existentes:*
- is_auto_renew -> seria um indicativo de que o cliente talvez nao renove o plano?
- payment_plan_days -> indicando quanto tempo o cliente tem para renovar o plano
- actual_amount_paid -> mostrando o quanto o cliente esta pagando, se for muito pode trazer indicios de cancelamento
- membership_expire_date -> transformar em quanto tempo para finalização do plano
- is_canceled -> indicando se o plano foi cancelado ou nao -> pode indicar que o cliente cancelou e no mes seguinte a assinatura nao estara mais ativa



In [None]:
df_transactions=df_transactions.filter(F.year(F.col('safra'))=='2016')

In [None]:
df_transactions.withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.col('safra'))).groupBy('chave').count().orderBy(F.desc('count')).show(5)

+--------------------+-----+
|               chave|count|
+--------------------+-----+
|++5BmBHS2ebe4Whfg...|    1|
|++3UnufK0Ka+brv8d...|    1|
|++4yteQFM9k0Gjq5f...|    1|
|++8XaEDast796K/DQ...|    1|
|+++snpr7pmobhLKUg...|    1|
+--------------------+-----+
only showing top 5 rows



In [None]:
df_sample = df_sample.withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.col('safra')))
df_sample = df_sample.join(df_transactions.withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.col('safra'))).select('chave', 'is_auto_renew', 'payment_plan_days', 'actual_amount_paid', 'membership_expire_date', 'is_cancel'),
                     on='chave', how='left')

In [None]:
df_sample.count()

7153111

In [None]:
df_sample.show(5, truncate=False)

+-------------------------------------------------------+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+------+-------------+-----------------+------------------+----------------------+---------+
|chave                                                  |msno                                        |city|bd |gender       |registered_via|registration_init_time|safra     |target|is_auto_renew|payment_plan_days|actual_amount_paid|membership_expire_date|is_cancel|
+-------------------------------------------------------+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+------+-------------+-----------------+------------------+----------------------+---------+
|+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=_2016-08-01|+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=|15  |26 |male         |9             |2010-11-18            |2016-08-01|1     |1            |30.0   

####**Logs:**
Utilizar a ultima data do mês para buscar as estatisticas sobre o uso, porque dessa forma é possível compreender como foi o comportamento do cliente ao longo do mês com o uso da plataforma.

*variáveis:*
1. num_25
2. num_50
3. num_75
4. num_985
5. num_100
6. num_unq
7. total_secs

In [None]:
df_logs=df_logs.filter(F.year(F.col('safra'))=='2016')

In [None]:
df_logs.withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.col('safra'))).groupBy('chave').count().orderBy(F.desc('count')).show(5)

+--------------------+-----+
|               chave|count|
+--------------------+-----+
|T2gUhlBhFMoSFA9jF...|    1|
|W9RH26bbCJ/jz8erW...|    1|
|53QW6B70J23X2UCvx...|    1|
|WppnGUHAP0pV79RBZ...|    1|
|SwlrSivYHoKF9V5wm...|    1|
+--------------------+-----+
only showing top 5 rows



In [None]:
df_sample = df_sample.join(df_logs.withColumn('chave', F.concat(F.col('msno'), F.lit('_'), F.col('safra'))).select('chave', 'num_25', 'num_50', 'num_75', 'num_985', 'num_100', 'num_unq', 'total_secs'),
                     on='chave', how='left')

In [None]:
df_sample.show(5, truncate=False)

+-------------------------------------------------------+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+------+-------------+-----------------+------------------+----------------------+---------+------+------+------+-------+-------+-------+------------------+
|chave                                                  |msno                                        |city|bd |gender       |registered_via|registration_init_time|safra     |target|is_auto_renew|payment_plan_days|actual_amount_paid|membership_expire_date|is_cancel|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs        |
+-------------------------------------------------------+--------------------------------------------+----+---+-------------+--------------+----------------------+----------+------+-------------+-----------------+------------------+----------------------+---------+------+------+------+-------+-------+-------+------------------+
|+++l/EXNM

# Seleção das features
- Análise das variaveis categóricas e numericas em relacao à target
- Selecao de feature a partir de feature importance usando RandomForest *


















In [None]:
master_pd = master.toPandas()

  An error occurred while calling o407.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.command

Py4JJavaError: An error occurred while calling o407.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 6 tasks (1184.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2488)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:4263)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:4267)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:4243)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:4243)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:4242)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:140)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:142)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:137)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:114)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108)
	at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:69)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:69)


In [None]:
# Separação das features categorias e numericas
cat_cols = ['city', 'gender', 'registered_via', 'is_auto_renew','is_cancel']
num_cols = ['bd', 'payment_plan_days', 'actual_amount_paid', 'num_25', 'num_50', 'num_75', 'num_985', 'num_100', 'num_unq', 'total_secs']

In [None]:
for i in cat_cols:
  master_pd[i] = master_pd[i].astype('string')
  plt.figure(figsize=(10, 5))
  sns.countplot(data=master_pd, x=i, hue='target')
  plt.xlabel(i)
  plt.ylabel('Count')
  plt.legend(title='Target', loc='upper right')
  plt.show()

NameError: name 'master_pd' is not defined

In [None]:
for i in num_cols:
  plt.figure(figsize=(10, 5))
  sns.boxplot(data=master_pd, x='target', y=i)
  plt.xlabel('Target')
  plt.ylabel(i)
  plt.show()

# Modelo
* Avaliação de dois algoritmos para realizar a classificação binária: XGBoost e regressão logística
*