<a href="https://colab.research.google.com/github/Difesoares/Feature_Engineering/blob/main/Feature_Engineering_SPARK_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Feature Engineering**

Esse projeto tem o objetivo de criar variáveis explicativas para melhorar o modelo de Machine Learning. Serão utilizadas duas tabelas:

*   **Base de Churn** com informações cadastrais de 1000 pessoas;
*   **Base de Transações** contendo as transações históricas de compras feitas por essas pessoas, incluindo detalhes como data da compra, valor gasto e categoria do produto.




## **Iniciando Spark**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## **Lendo dados com SPARK**

In [None]:
df_transacoes = spark.read.csv("/content/drive/MyDrive/POD Academy/Feature_Engineering_Eng_dados/base_transacoes.csv",
                               header=True,
                               inferSchema=True)

## Habilitando uso do SparkSQL
df_transacoes.createOrReplaceTempView("df_transacoes")

In [None]:
df_publico = spark.read.csv("/content/drive/MyDrive/POD Academy/Feature_Engineering_Eng_dados/base_churn.csv",
                            header=True,
                            inferSchema=True)

# Habilitando uso do SparkSQL
df_publico.createOrReplaceTempView("df_publico")

## **Vendo Schemas e abrindo as tabelas**

In [None]:
df_publico.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Idade: integer (nullable = true)
 |-- Gênero: string (nullable = true)
 |-- Dias desde a Inscrição: integer (nullable = true)
 |-- Usou Suporte: integer (nullable = true)
 |-- Plano: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [None]:
df_transacoes.printSchema()

root
 |-- ID Transação: integer (nullable = true)
 |-- ID Cliente: integer (nullable = true)
 |-- Data: string (nullable = true)
 |-- Valor: double (nullable = true)
 |-- Categoria: string (nullable = true)



In [None]:
df_publico.show(10)

+---+-----+------+----------------------+------------+-------------+-----+
| ID|Idade|Gênero|Dias desde a Inscrição|Usou Suporte|        Plano|Churn|
+---+-----+------+----------------------+------------+-------------+-----+
|  1|   21|     F|                  1331|           1|Intermediário|    1|
|  2|   21|     M|                  1160|           0|Intermediário|    0|
|  3|   62|     M|                   454|           1|       Básico|    0|
|  4|   64|     M|                   226|           1|Intermediário|    0|
|  5|   61|     M|                   474|           1|     Avançado|    0|
|  6|   18|     M|                   419|           0|       Básico|    0|
|  7|   52|     M|                  1334|           0|       Básico|    0|
|  8|   44|     M|                  1124|           1|Intermediário|    0|
|  9|   52|     M|                  1256|           1|Intermediário|    0|
| 10|   64|     F|                  1197|           0|       Básico|    0|
+---+-----+------+-------

In [None]:
df_transacoes.show(10, False)

+------------+----------+-----------------------------+------------------+-----------+
|ID Transação|ID Cliente|Data                         |Valor             |Categoria  |
+------------+----------+-----------------------------+------------------+-----------+
|1           |1         |2022-11-25 13:50:26.548672560|57.287427536330505|Esportes   |
|2           |1         |2020-01-19 12:27:36.637168141|97.07199340552512 |Alimentos  |
|3           |1         |2021-12-28 12:33:58.938053096|169.10581012381087|Livros     |
|4           |1         |2022-02-05 01:39:49.380530968|199.38694865538451|Roupas     |
|5           |1         |2022-11-16 23:06:54.159292032|160.00228343317622|Eletrônicos|
|6           |1         |2020-04-25 12:42:28.672566372|9.842481270765422 |Alimentos  |
|7           |1         |2022-10-31 22:05:18.584070800|76.90706330227667 |Eletrônicos|
|8           |1         |2020-10-06 12:14:52.035398228|53.20607404593595 |Roupas     |
|9           |1         |2022-11-02 12:50:5

## **Criando variáveis explicativas**



In [None]:
# Tempo desde a Última Transação

df_temp_ult_trans = spark.sql ("""
SELECT
    `ID Cliente`,
    DATEDIFF(
        (SELECT MAX(Data) FROM df_transacoes),
        MAX(Data)
    ) AS dias_desde_ultima_transacao
FROM
    df_transacoes
GROUP BY
    `ID Cliente`;

""").show()

+----------+---------------------------+
|ID Cliente|dias_desde_ultima_transacao|
+----------+---------------------------+
|       148|                         26|
|       463|                         51|
|       471|                         14|
|       496|                        487|
|       833|                        175|
|       243|                        179|
|       392|                         21|
|       540|                          4|
|       623|                        140|
|       737|                        120|
|       858|                        862|
|       897|                         70|
|        31|                         30|
|       516|                        903|
|        85|                         11|
|       137|                         18|
|       251|                        258|
|       451|                         65|
|       580|                         39|
|       808|                         15|
+----------+---------------------------+
only showing top

In [None]:
# FREQUENCIA DE COMPRA

df_freq_compra_mensal = spark.sql("""
SELECT `ID Cliente`, (COUNT(*) / (p.`Dias desde a Inscrição`/ 30)) as FREQ_COMPRA_MENSAL
FROM
  df_transacoes as t
LEFT JOIN
  df_publico as p
on
  t.`ID Cliente` = p.`ID`

GROUP BY t.`ID Cliente`, p.`Dias desde a Inscrição`

""").show()

+----------+--------------------+
|ID Cliente|  FREQ_COMPRA_MENSAL|
+----------+--------------------+
|       594| 0.06569343065693431|
|       327|  0.8042895442359249|
|       627|  0.1748704663212435|
|       692|  2.7127659574468086|
|       204| 0.08995502248875561|
|       728|  0.6060606060606061|
|       361|  0.7090103397341211|
|        57| 0.11335012594458439|
|       283| 0.22751895991332613|
|       615|  1.3255813953488371|
|       709|  0.0494641384995878|
|       900| 0.07042253521126761|
|       926|  0.1868629671574179|
|       942|  1.0169491525423728|
|        14|0.059721300597213006|
|        42|  0.4113924050632911|
|       225|  1.5489130434782608|
|       330| 0.27892561983471076|
|       562| 0.11406844106463877|
|       786|  0.4444444444444444|
+----------+--------------------+
only showing top 20 rows



In [None]:
# VALOR TOTAL DE GASTO

df_total_gasto = spark.sql("""
SELECT `ID Cliente`, ROUND(SUM(Valor),2) as VALOR_TOTAL_GASTO
FROM df_transacoes
GROUP BY `ID Cliente`
ORDER BY `VALOR_TOTAL_GASTO` DESC

""").show()

+----------+-----------------+
|ID Cliente|VALOR_TOTAL_GASTO|
+----------+-----------------+
|       759|          2498.27|
|       910|          2370.01|
|       470|          2361.46|
|       718|          2347.81|
|       983|          2346.24|
|       692|          2329.25|
|       471|          2307.39|
|       407|          2297.79|
|       685|          2283.31|
|       699|          2278.31|
|       668|          2269.66|
|       246|          2261.08|
|       167|          2255.57|
|       162|          2254.72|
|       917|          2232.73|
|       861|          2229.12|
|       480|          2193.45|
|       506|          2185.63|
|       794|           2185.2|
|       630|          2171.44|
+----------+-----------------+
only showing top 20 rows



In [None]:
# GASTO MÉDIO POR TRANSAÇÃO

df_gasto_medio_trans = spark.sql("""
SELECT `ID Cliente`,
    ROUND(SUM(VALOR) / COUNT(*), 2) as VALOR_TOTAL_POR_TRANS
FROM df_transacoes
GROUP BY `ID Cliente`

""").show()


+----------+---------------------+
|ID Cliente|VALOR_TOTAL_POR_TRANS|
+----------+---------------------+
|       148|                98.22|
|       463|                93.29|
|       471|               135.73|
|       496|                85.31|
|       833|               101.95|
|       243|                98.88|
|       392|                114.6|
|       540|                92.87|
|       623|                93.95|
|       737|               105.95|
|       858|                129.1|
|       897|                88.58|
|        31|               102.14|
|       516|               136.59|
|        85|                65.46|
|       137|               118.51|
|       251|                88.28|
|       451|                114.2|
|       580|               103.75|
|       808|               100.19|
+----------+---------------------+
only showing top 20 rows



## **Criando flags de janelas para histórico**

Criar uma janela temporal movel que indica se a data da transação está dentro dos últimos meses (3, 6 ou 12 meses) em relação à data mais recente de transação para cada cliente.

*   Últimos 3 meses;
*   Últimos 6 meses;
*   Últimos 12 meses





In [None]:
df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -90) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1
        ELSE 0
    END AS ultimos_3_meses_flag,
    CASE
        WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -180) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1
        ELSE 0
    END AS ultimos_6_meses_flag,
    CASE
        WHEN Data BETWEEN DATE_ADD(MAX(Data) OVER (PARTITION BY `ID Cliente`), -365) AND MAX(Data) OVER (PARTITION BY `ID Cliente`) THEN 1
        ELSE 0
    END AS ultimos_12_meses_flag
FROM df_transacoes
ORDER BY `ID Cliente`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.count()

10171

In [None]:
df_temp_01.show(10, False)

+------------+----------+-----------------------------+------------------+-----------+--------------------+--------------------+---------------------+
|ID Transação|ID Cliente|Data                         |Valor             |Categoria  |ultimos_3_meses_flag|ultimos_6_meses_flag|ultimos_12_meses_flag|
+------------+----------+-----------------------------+------------------+-----------+--------------------+--------------------+---------------------+
|10          |1         |2020-09-06 21:37:41.946902652|56.524638713138636|Alimentos  |0                   |0                   |0                    |
|9           |1         |2022-11-02 12:50:58.407079648|193.00568791656067|Livros     |1                   |1                   |1                    |
|4           |1         |2022-02-05 01:39:49.380530968|199.38694865538451|Roupas     |0                   |0                   |1                    |
|8           |1         |2020-10-06 12:14:52.035398228|53.20607404593595 |Roupas     |0       

## **Criando variáveis explicativas de primeira camada**

Aqui será criado variáveis como a média, minimo, soma e máximo dos valores para cada tipo de consumo.

In [None]:
df_temp_02 = spark.sql("""
select
    `ID Cliente`,
    round(sum(Valor),2) as VL_TOT_CONSUMO,
    round(avg(Valor),2) as VL_MED_CONSUMO,
    round(max(Valor),2) as VL_MAX_CONSUMO,
    round(min(Valor),2) as VL_MIN_CONSUMO,
    round(sum(case when Categoria = 'Esportes' then Valor else 0 end),2) as VL_TOT_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' then Valor else NULL end),2) as VL_MED_CONS_ESPORTES,
    round(max(case when Categoria = 'Esportes' then Valor else NULL end), 2) AS VL_MAX_CONS_ESPORTES,
    round(min(case when Categoria = 'Esportes' then Valor else NULL end), 2) AS VL_MIN_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_ESPORTES,
    round(avg(case when Categoria = 'Esportes' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_ESPORTES,
    round(sum(case when Categoria = 'Alimentos' then Valor else 0 end),2) as VL_TOT_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' then Valor else NULL end),2) as VL_MED_CONS_Alimentos,
    round(max(case when Categoria = 'Alimentos' then Valor else NULL end), 2) AS VL_MAX_CONS_Alimentos,
    round(min(case when Categoria = 'Alimentos' then Valor else NULL end), 2) AS VL_MIN_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_Alimentos,
    round(avg(case when Categoria = 'Alimentos' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_Alimentos,
    round(sum(case when Categoria = 'Livros' then Valor else 0 end),2) as VL_TOT_CONS_Livros,
    round(avg(case when Categoria = 'Livros' then Valor else 0 end),2) as VL_MED_CONS_Livros,
    round(max(case when Categoria = 'Livros' then Valor else 0 end),2) as VL_MAX_CONS_Livros,
    round(min(case when Categoria = 'Livros' then Valor else 0 end),2) as VL_MIN_CONS_Livros,
    round(avg(case when Categoria = 'Livros' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_Livros,
    round(avg(case when Categoria = 'Livros' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_Livros,
    round(avg(case when Categoria = 'Livros' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_Livros,
    round(sum(case when Categoria = 'Roupas' then Valor else 0 end),2) as VL_TOT_CONS_Roupas,
    round(avg(case when Categoria = 'Roupas' then Valor else 0 end),2) as VL_MED_CONS_Roupas,
    round(max(case when Categoria = 'Roupas' then Valor else 0 end),2) as VL_MAX_CONS_Roupas,
    round(min(case when Categoria = 'Roupas' then Valor else 0 end),2) as VL_MIN_CONS_Roupas,
    round(avg(case when Categoria = 'Roupas' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_Roupas,
    round(avg(case when Categoria = 'Roupas' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_Roupas,
    round(avg(case when Categoria = 'Roupas' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_Roupas,
    round(sum(case when Categoria = 'Eletronicos' then Valor else 0 end),2) as VL_TOT_CONS_Eletronicos,
    round(avg(case when Categoria = 'Eletronicos' then Valor else 0 end),2) as VL_MED_CONS_Eletronicos,
    round(max(case when Categoria = 'Eletronicos' then Valor else 0 end),2) as VL_MAX_CONS_Eletronicos,
    round(min(case when Categoria = 'Eletronicos' then Valor else 0 end),2) as VL_MIN_CONS_Eletronicos,
    round(avg(case when Categoria = 'Eletronicos' and ultimos_3_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U3M_CONS_Eletronicos,
    round(avg(case when Categoria = 'Eletronicos' and ultimos_6_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U6M_CONS_Eletronicos,
    round(avg(case when Categoria = 'Eletronicos' and ultimos_12_meses_flag = 1 then Valor else NULL end),2) as VL_MED_U12M_CONS_Eletronicos
from df_temp_01
group by `ID Cliente`
order by `ID Cliente`
""")

df_temp_02.createOrReplaceTempView("df_temp_02")
df_temp_02.count()

1000

In [None]:
df_temp_02.show()

+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+---------------------+---------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+---------------------------+---------------------------+----------------------------+
|ID Cliente|VL_TOT_CONSUMO|VL_MED_CONSUMO|VL_MAX_CONSUMO|VL_MIN_CONSUMO|VL_TOT_CONS_ESPORTES|VL_MED_CONS_ESPORTES|VL_MAX_CONS_E

## **Criando variaveis explicativas de segunda camada**

Razão entre variáveis históricas. A razão captura as tendências ao longo de um período de tempo.

In [None]:
df_temp_03 = spark.sql("""
Select
*,
round(VL_MED_U3M_CONS_ESPORTES/VL_MED_U6M_CONS_ESPORTES,2) as VL_RAZ_MED_U3M_U6M_CONS_ESP,
round(VL_MED_U6M_CONS_ESPORTES/VL_MED_U12M_CONS_ESPORTES,2) as VL_RAZ_MED_U6M_U12M_CONS_ESP,
round(VL_MED_U3M_CONS_Alimentos/VL_MED_U6M_CONS_ALimentos,2) as VL_RAZ_MED_U3M_U6M_CONS_ALI,
round(VL_MED_U6M_CONS_Alimentos/VL_MED_U12M_CONS_Alimentos,2) as VL_RAZ_MED_U6M_U12M_CONS_ALI,
round(VL_MED_U3M_CONS_Livros/VL_MED_U6M_CONS_Livros,2) as VL_RAZ_MED_U3M_U6M_CONS_Livros,
round(VL_MED_U6M_CONS_Livros/VL_MED_U12M_CONS_Livros,2) as VL_RAZ_MED_U6M_U12M_CONS_Livros,
round(VL_MED_U3M_CONS_Roupas/VL_MED_U6M_CONS_Roupas,2) as VL_RAZ_MED_U3M_U6M_CONS_Roupas,
round(VL_MED_U6M_CONS_Roupas/VL_MED_U12M_CONS_Roupas,2) as VL_RAZ_MED_U6M_U12M_CONS_Roupas,
round(VL_MED_U3M_CONS_Eletronicos/VL_MED_U6M_CONS_Eletronicos,2) as VL_RAZ_MED_U3M_U6M_CONS_Elet,
round(VL_MED_U6M_CONS_Eletronicos/VL_MED_U12M_CONS_Eletronicos,2) as VL_RAZ_MED_U6M_U12M_CONS_Elet
from df_temp_02

""")

df_temp_03.createOrReplaceTempView("df_temp_03")
df_temp_03.count()

1000

In [None]:
df_temp_03.show()

+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+---------------------+---------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+---------------------------+---------------------------+----------------------------+---------------------------+----------------------------+---------------------------+----------------------------+--------------

## **Trazer as variáveis explicativas para a base de público**

In [None]:
abt_geral = spark.sql("""
select
  p.*,
  t.*,
  date_format(CURRENT_DATE, 'yyyyMM') as PK_datref,
  CURRENT_DATE as PK_DAT_PROC
from
  df_publico as p
left join
  df_temp_03 as t
on
  p.ID = t.`ID Cliente`

""")

abt_geral.count()

1000

In [None]:
abt_geral.show()

+---+-----+------+----------------------+------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+---------------------+---------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------

## **Renomeando campos fora do padrão**

Vamos padronizar os nomes das features retirando os espaços, acentos e qualquer outro detalhe que destoa.

In [None]:
# Dicionário de renomeação

rename_dict = {
      "Gênero": "GENERO",
      "Dias desde a Inscrição": "QT_DIAS_DESDE_INSCR",
      "Usou Suporte": "FL_USOU_SUPORTE",
      "ID Cliente": "ID_CLIENTE",

}

# Aplicando renomeações
for old_name, new_name in rename_dict.items():
    abt_geral = abt_geral.withColumnRenamed(old_name, new_name)

In [None]:
abt_geral.show()

+---+-----+------+-------------------+---------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+---------------------+---------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+---------------------------+---------------------------+----------------------------+---------------------------+---------------------------

## **Dropar a coluna ID**

A coluna ID ja está sendo representada pela coluna ID_CLIENTE.

In [None]:
abt_final = abt_geral.drop("ID")
abt_final.count()

1000

In [None]:
abt_final.show()

+-----+------+-------------------+---------------+-------------+-----+----------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+------------------------+------------------------+-------------------------+---------------------+---------------------+---------------------+---------------------+-------------------------+-------------------------+--------------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+---------------------------+---------------------------+----------------------------+---------------------------+----------------------------+--