# INSTALAÇÃO DAS BIBLIOTECAS E IMPORTAÇÕES

In [None]:
pip install pyspark

In [None]:
pip install gcsfs

In [None]:
pip install pymongo[srv]

In [None]:
import numpy as np
import pymongo
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pandas as pd
from google.cloud import storage
from pyspark.sql.window import Window
import os 

# CRIANDO A CONEXAO COM O GCP

In [None]:
#Indicando o caminho da chave(bucket-gcp) e fazendo a validacao com o 'os.environ'
serviceAccount = ('/content/firm-champion-339219-167196b437af.json')

os.environ['GOOGLE_APPLICATION_CREDENTIALS']  = serviceAccount

In [None]:
#Realizando a conexão com a bucket
cliente = storage.Client()
bucket = cliente.get_bucket('data-engineer-gsantos')


In [None]:
#Selecionando o arquivo
bucket.blob('marketing_campaign.csv')

#Caminho do arquivo
path = ('gs://data-engineer-gsantos/Datasets Brutos/marketing_campaign.csv')

# CRIANDO A CONEXAO COM MONGO_DB

In [None]:
client = pymongo.MongoClient("mongodb+srv://USER:PASSWORD@cluster0.chlaz.mongodb.net/myFirstDatabase?retryWrites=true&w=majority")

# SPARK SESSION

In [None]:
spark = ( 
    SparkSession.builder
          .master('local')
          .appName('Campanha_Marketing')
          .config('spark.ui.port','4050')
          .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
          .getOrCreate()
)

In [None]:
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# CRIANDO O DATAFRAME

In [None]:
df = (
    spark.read.format('csv')
    .option('delimiter',',')
    .option('header','true')
    .option('inferschema','true')
    .load(path)

)

In [None]:
df.show()

# ENVIANDO O ARQUIVO BRUTO PARA O MONGO-DB



*   Realizar a extração corretamente para um dataframe




In [None]:
#Extraindo do Pyspark para o pandas
df_pandas = df.toPandas()


#Criando o Banco e a Coleção dados_brutos
db = client['Campanha_Marketing']
colecao = db.dados_brutos

#Transformando o dataframe para dicionario e inseriNDO na coleção desejada.
df_dici = df_pandas.to_dict('records')
colecao.insert_many(df_dici)


# PANDAS

## TRATAMENTOS PANDAS

In [None]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2240 entries, 0 to 2239
Data columns (total 29 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   ID                   2240 non-null   int32  
 1   Year_Birth           2240 non-null   int32  
 2   Education            2240 non-null   object 
 3   Marital_Status       2240 non-null   object 
 4   Income               2216 non-null   float64
 5   Kidhome              2240 non-null   int32  
 6   Teenhome             2240 non-null   int32  
 7   Dt_Customer          2240 non-null   object 
 8   Recency              2240 non-null   int32  
 9   MntWines             2240 non-null   int32  
 10  MntFruits            2240 non-null   int32  
 11  MntMeatProducts      2240 non-null   int32  
 12  MntFishProducts      2240 non-null   int32  
 13  MntSweetProducts     2240 non-null   int32  
 14  MntGoldProds         2240 non-null   int32  
 15  NumDealsPurchases    2240 non-null   i

In [None]:
#Renomeando as colunas e traduzindo
df_pandas.rename ( columns= {
    
              'ID':'id',
              'Year_Birth':'dt_nasc', 
              'Education':'formacao',
              'Marital_Status':'est_civil',
              'Income':'renda_anual_fam',
              'Kidhome':'qnt_criancas',
              'Teenhome':'qnt_adolesc',
              'Dt_Customer':'dt_cadastro',
              'Recency':'dias_sem_comprar',
              'MntWines':'vl_vinho_2y', 
              'MntFruits':'vl_fruta_2y',
              'MntMeatProducts':'vl_carne_2y',
              'MntFishProducts':'vl_peixe_2y',
              'MntSweetProducts':'vl_doce_2y',
              'MntGoldProds':'vl_ouro_2y',
              'NumDealsPurchases': 'comp_desconto',
              'NumWebPurchases':'comp_site', 
              'NumCatalogPurchasese':'comp_catalogo',
              'NumStorePurchases':'comp_presencial',
              'NumWebVisitsMonth':'ac_site_ult_mes',
              'AcceptedCmp1':'comp_camp_1', 
              'AcceptedCmp2':'comp_camp_2',
              'AcceptedCmp3':'comp_camp_3', 
              'AcceptedCmp4':'comp_camp_4', 
              'AcceptedCmp5':'comp_camp_5',
              'Response':'comp_ult_camp',
              'Complain':'reclamacao_2y' 

},inplace = True)



* Realizar o drop(se necessário) de colunas do dataframe realizando o comentário do porque da exclusão 




In [None]:
# Dropando colunas desnecessárias pois possuem um único valor : 
pd.unique(df_pandas['Z_Revenue'])
df_pandas.drop(columns = ['Z_Revenue'],inplace=True)


In [None]:
# Dropando colunas desnecessárias pois possuem um único valor : 
pd.unique(df_pandas['Z_CostContact'])
df_pandas.drop(columns = ['Z_CostContact'],inplace=True)

## TRADUÇÃO DO DF_PANDAS




*  O arquivo está em outra linguagem e deve ter seus dados traduzidos para Português-BR





In [None]:
df_pandas.dtypes

id                       int32
dt_nasc                  int32
formacao                object
est_civil               object
renda_anual_fam        float64
qnt_criancas             int32
qnt_adolesc              int32
dt_cadastro             object
dias_sem_comprar         int32
vl_vinho_2y              int32
vl_fruta_2y              int32
vl_carne_2y              int32
vl_peixe_2y              int32
vl_doce_2y               int32
vl_ouro_2y               int32
comp_desconto            int32
comp_site                int32
NumCatalogPurchases      int32
comp_presencial          int32
ac_site_ult_mes          int32
comp_camp_3              int32
comp_camp_4              int32
comp_camp_5              int32
comp_camp_1              int32
comp_camp_2              int32
reclamacao_2y            int32
comp_ult_camp            int32
dtype: object

In [None]:
#backup antes da alteração
df2_pandas = df_pandas.copy()

In [None]:
#verificando os valores que precisam ser traduzidos
pd.unique(df_pandas['formacao'])


array(['Graduation', 'PhD', 'Master', 'Basic', '2n Cycle'], dtype=object)

In [None]:
# tradução utilizando o df.loc para localizar o parametro e substituir
df_pandas.loc[df_pandas.formacao == 'Graduation',['formacao']] = 'Graduacao'

In [None]:
df_pandas.loc[df_pandas.formacao == 'PhD',['formacao']] = 'Doutorado'

In [None]:
df_pandas.loc[df_pandas.formacao == 'Master',['formacao']] = 'Mestrado'

In [None]:
df_pandas.loc[df_pandas.formacao == 'Basic',['formacao']] = 'Basico'

In [None]:
df_pandas.loc[df_pandas.formacao == 'Graduation',['formacao']] = 'Graduacao'

In [None]:
df_pandas.loc[df_pandas.formacao == '2n Cycle',['formacao']] = 'Pos_grad'

In [None]:
#verificando as alterações na coluna formacao
pd.unique(df_pandas['formacao'])

array(['Graduacao', 'Doutorado', 'Mestrado', 'Basico', 'Pos_grad'],
      dtype=object)

In [None]:
#verificando os valores que precisam ser traduzidos col 'est_civil'
pd.unique(df_pandas['est_civil'])

array(['Single', 'Together', 'Married', 'Divorced', 'Widow', 'Alone',
       'Absurd', 'YOLO'], dtype=object)



*  Verificar a existência de dados inconsistentes e realizar a limpeza para NaN ou NA




In [None]:
# os dados 'Absurd' e 'YOLO' serão substituidos por 'NaN' para um futuro tratamento
df_pandas.replace(['Single','Together','Married','Divorced','Widow','Alone','Absurd','YOLO'],['Solteiro','Uniao estavel','Casado','Divorciado','Viuva','Solteiro',np.nan,np.nan],inplace=True)

In [None]:
#verificando as alterações
pd.unique(df_pandas['est_civil'])

array(['Solteiro', 'Uniao estavel', 'Casado', 'Divorciado', 'Viuva', nan],
      dtype=object)

In [None]:
# a média de dados nan  na coluna est_civil é muito baixa o que não irá interferir no resultado final da análise.
df_pandas.isna().mean()


id                     0.000000
dt_nasc                0.000000
formacao               0.000000
est_civil              0.001786
renda_anual_fam        0.010714
qnt_criancas           0.000000
qnt_adolesc            0.000000
dt_cadastro            0.000000
dias_sem_comprar       0.000000
vl_vinho_2y            0.000000
vl_fruta_2y            0.000000
vl_carne_2y            0.000000
vl_peixe_2y            0.000000
vl_doce_2y             0.000000
vl_ouro_2y             0.000000
comp_desconto          0.000000
comp_site              0.000000
NumCatalogPurchases    0.000000
comp_presencial        0.000000
ac_site_ult_mes        0.000000
comp_camp_3            0.000000
comp_camp_4            0.000000
comp_camp_5            0.000000
comp_camp_1            0.000000
comp_camp_2            0.000000
reclamacao_2y          0.000000
comp_ult_camp          0.000000
dtype: float64

In [None]:
df_pandas.dropna(subset=['est_civil'],inplace=True)

#PYSPARK

## MONTANDO A ESTRUTURA PYSPARK





In [None]:
#Definindo o Schema

schema = (
    StructType([
        StructField("id", LongType(), True),
        StructField("dt_nasc", LongType(),True),
        StructField("formacao", StringType(),True),
        StructField("est_civil", StringType(),True),
        StructField("renda_anual_fam", DoubleType(), True),
        StructField("qnt_criancas", IntegerType(), True),
        StructField("qnt_adolesc", IntegerType(), True),
        StructField("dt_cadastro", StringType(), True),
        StructField("dias_sem_comprar", LongType(),True),
        StructField("vl_vinho_2y",LongType(),True),
        StructField("vl_fruta_2y",LongType(),True),
        StructField("vl_carne_2y",LongType(),True),
        StructField("vl_peixe_2y",LongType(),True),
        StructField("vl_doce_2y",LongType(),True),
        StructField("vl_ouro_2y",LongType(),True),
        StructField("comp_desconto",LongType(),True),
        StructField("NumCatalogPurchases",LongType(),True),
        StructField("comp_presencial",LongType(),True),
        StructField("comp_site",LongType(),True),
        StructField("ac_site_ult_mes",LongType(),True),
        StructField("comp_camp_1",LongType(),True),
        StructField("comp_camp_2",LongType(),True),
        StructField("comp_camp_3",LongType(),True),
        StructField("comp_camp_4",LongType(),True),
        StructField("comp_camp_5",LongType(),True),
        StructField("comp_ult_camp",LongType(),True),
        StructField("reclamacao_2y",IntegerType(),True)        



        ]))




In [None]:
df2 = spark.createDataFrame(data=df_pandas,schema=schema)

In [None]:
df2.printSchema()

root
 |-- id: long (nullable = true)
 |-- dt_nasc: long (nullable = true)
 |-- formacao: string (nullable = true)
 |-- est_civil: string (nullable = true)
 |-- renda_anual_fam: double (nullable = true)
 |-- qnt_criancas: integer (nullable = true)
 |-- qnt_adolesc: integer (nullable = true)
 |-- dt_cadastro: string (nullable = true)
 |-- dias_sem_comprar: long (nullable = true)
 |-- vl_vinho_2y: long (nullable = true)
 |-- vl_fruta_2y: long (nullable = true)
 |-- vl_carne_2y: long (nullable = true)
 |-- vl_peixe_2y: long (nullable = true)
 |-- vl_doce_2y: long (nullable = true)
 |-- vl_ouro_2y: long (nullable = true)
 |-- comp_desconto: long (nullable = true)
 |-- NumCatalogPurchases: long (nullable = true)
 |-- comp_presencial: long (nullable = true)
 |-- comp_site: long (nullable = true)
 |-- ac_site_ult_mes: long (nullable = true)
 |-- comp_camp_1: long (nullable = true)
 |-- comp_camp_2: long (nullable = true)
 |-- comp_camp_3: long (nullable = true)
 |-- comp_camp_4: long (nullable

## TRATAMENTOS PYSPARK 

In [None]:
# criando o backup
df3 = df2

In [None]:
df2.show()

+----+-------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+-------------+-------------+
|  id|dt_nasc| formacao|    est_civil|renda_anual_fam|qnt_criancas|qnt_adolesc|dt_cadastro|dias_sem_comprar|vl_vinho_2y|vl_fruta_2y|vl_carne_2y|vl_peixe_2y|vl_doce_2y|vl_ouro_2y|comp_desconto|NumCatalogPurchases|comp_presencial|comp_site|ac_site_ult_mes|comp_camp_1|comp_camp_2|comp_camp_3|comp_camp_4|comp_camp_5|comp_ult_camp|reclamacao_2y|
+----+-------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+

In [None]:
#transformando a coluna dt_cadastro para data
df4 = df2.withColumn('dt_cadastro', F.to_date(F.col('dt_cadastro'), 'dd-mm-yyyy'))

In [None]:
df4.printSchema()

root
 |-- id: long (nullable = true)
 |-- dt_nasc: long (nullable = true)
 |-- formacao: string (nullable = true)
 |-- est_civil: string (nullable = true)
 |-- renda_anual_fam: double (nullable = true)
 |-- qnt_criancas: integer (nullable = true)
 |-- qnt_adolesc: integer (nullable = true)
 |-- dt_cadastro: date (nullable = true)
 |-- dias_sem_comprar: long (nullable = true)
 |-- vl_vinho_2y: long (nullable = true)
 |-- vl_fruta_2y: long (nullable = true)
 |-- vl_carne_2y: long (nullable = true)
 |-- vl_peixe_2y: long (nullable = true)
 |-- vl_doce_2y: long (nullable = true)
 |-- vl_ouro_2y: long (nullable = true)
 |-- comp_desconto: long (nullable = true)
 |-- NumCatalogPurchases: long (nullable = true)
 |-- comp_presencial: long (nullable = true)
 |-- comp_site: long (nullable = true)
 |-- ac_site_ult_mes: long (nullable = true)
 |-- comp_camp_1: long (nullable = true)
 |-- comp_camp_2: long (nullable = true)
 |-- comp_camp_3: long (nullable = true)
 |-- comp_camp_4: long (nullable =



*   Realizar a mudança de nome de pelo menos 2 colunas



In [None]:
#renomeando colunas 
df5 = df4.withColumnRenamed("dt_nasc", "ano_nasc").withColumnRenamed("NumCatalogPurchases", "comp_catalogo")



*   Deverá criar pelo menos duas novas colunas contendo alguma informação relevante sobre as outras colunas já existentes (Funções de Agrupamento, Agregação ou Joins). (Use a sua capacidade analítica)



In [None]:
df5.show(5)

+----+--------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+-------------+-------------+
|  id|ano_nasc| formacao|    est_civil|renda_anual_fam|qnt_criancas|qnt_adolesc|dt_cadastro|dias_sem_comprar|vl_vinho_2y|vl_fruta_2y|vl_carne_2y|vl_peixe_2y|vl_doce_2y|vl_ouro_2y|comp_desconto|comp_catalogo|comp_presencial|comp_site|ac_site_ult_mes|comp_camp_1|comp_camp_2|comp_camp_3|comp_camp_4|comp_camp_5|comp_ult_camp|reclamacao_2y|
+----+--------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+-------------+-

In [None]:
# Criando uma nova coluna para saber a idade do consumidor
df6=df5.withColumn("idade", 2022 - F.col("ano_nasc"))

In [None]:
# Criando uma nova coluna para saber o valor total gasto pelo cliente
df7 = df6.withColumn("total_comp",F.col("vl_vinho_2y") + F.col("vl_fruta_2y") + F.col('vl_carne_2y') + F.col('vl_peixe_2y') + F.col('vl_doce_2y') + F.col('vl_ouro_2y'))



*   Deverá utilizar filtros, ordenação e agrupamento, trazendo dados relevantes para o negócio em questão. (Use a sua capacidade analítica)




In [None]:
# subgrupos para facilitar a análise por faixa etaria
df8=(df7.withColumn('fx_etaria', F.when((F.col('idade') <= 30), F.lit('até 30 anos'))
                                .when((F.col('idade') > 30) & (F.col('idade') <= 45), F.lit('Entre 30 e 45'))
                                .when((F.col('idade') > 45) & (F.col('idade') <=60), F.lit('Entre 45 e 60'))                             
                                .otherwise(F.lit('Acima de 60'))
))




In [None]:
df8.show(5)

+----+--------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+-------------+-------------+-----+----------+-------------+
|  id|ano_nasc| formacao|    est_civil|renda_anual_fam|qnt_criancas|qnt_adolesc|dt_cadastro|dias_sem_comprar|vl_vinho_2y|vl_fruta_2y|vl_carne_2y|vl_peixe_2y|vl_doce_2y|vl_ouro_2y|comp_desconto|comp_catalogo|comp_presencial|comp_site|ac_site_ult_mes|comp_camp_1|comp_camp_2|comp_camp_3|comp_camp_4|comp_camp_5|comp_ult_camp|reclamacao_2y|idade|total_comp|    fx_etaria|
+----+--------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+-----------+-



*    Verificar a existência de dados inconsistentes, nulos e realizar a limpeza.







In [None]:
df8.toPandas().isna().sum()

id                   0
ano_nasc             0
formacao             0
est_civil            0
renda_anual_fam     24
qnt_criancas         0
qnt_adolesc          0
dt_cadastro          0
dias_sem_comprar     0
vl_vinho_2y          0
vl_fruta_2y          0
vl_carne_2y          0
vl_peixe_2y          0
vl_doce_2y           0
vl_ouro_2y           0
comp_desconto        0
comp_catalogo        0
comp_presencial      0
comp_site            0
ac_site_ult_mes      0
comp_camp_1          0
comp_camp_2          0
comp_camp_3          0
comp_camp_4          0
comp_camp_5          0
comp_ult_camp        0
reclamacao_2y        0
idade                0
total_comp           0
fx_etaria            0
dtype: int64

In [None]:
# os dados NAN serão substituido por 0 no momento.Posteriormente serão substituidos pela media da renda mensal de acordo com a formação.
df9 = df8.na.fill({'renda_anual_fam':0})

In [None]:
# group by por formação para pegar a media da renda e substituir pelo valores 0 .

df9.groupBy(F.col("formacao")).agg(F.avg("renda_anual_fam")).show()                


+---------+--------------------+
| formacao|avg(renda_anual_fam)|
+---------+--------------------+
|Doutorado|   55597.17355371901|
|Graduacao|   52181.78774422735|
| Pos_grad|  46929.251231527094|
|   Basico|   20306.25925925926|
| Mestrado|   52166.43089430894|
+---------+--------------------+



In [None]:
# substituição dos valores 0 pela média da renda por formacao no pandas
df3_pandas = df9.toPandas()

In [None]:
df3_pandas

Unnamed: 0,id,ano_nasc,formacao,est_civil,renda_anual_fam,qnt_criancas,qnt_adolesc,dt_cadastro,dias_sem_comprar,vl_vinho_2y,...,comp_camp_1,comp_camp_2,comp_camp_3,comp_camp_4,comp_camp_5,comp_ult_camp,reclamacao_2y,idade,total_comp,fx_etaria
0,5524,1957,Graduacao,Solteiro,58138.0,0,0,2012-01-04,58,635,...,0,0,0,0,0,0,1,65,1617,Acima de 60
1,2174,1954,Graduacao,Solteiro,46344.0,1,1,2014-01-08,38,11,...,0,0,0,0,0,0,0,68,27,Acima de 60
2,4141,1965,Graduacao,Uniao estavel,71613.0,0,0,2013-01-21,26,426,...,0,0,0,0,0,0,0,57,776,Entre 45 e 60
3,6182,1984,Graduacao,Uniao estavel,26646.0,1,0,2014-01-10,26,11,...,0,0,0,0,0,0,0,38,53,Entre 30 e 45
4,5324,1981,Doutorado,Casado,58293.0,1,0,2014-01-19,94,173,...,0,0,0,0,0,0,0,41,422,Entre 30 e 45
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2231,10870,1967,Graduacao,Casado,61223.0,0,1,2013-01-13,46,709,...,0,0,0,0,0,0,0,55,1341,Entre 45 e 60
2232,4001,1946,Doutorado,Uniao estavel,64014.0,2,1,2014-01-10,56,406,...,0,0,0,1,0,0,0,76,444,Acima de 60
2233,7270,1981,Graduacao,Divorciado,56981.0,0,0,2014-01-25,91,908,...,0,1,0,0,0,0,0,41,1241,Entre 30 e 45
2234,8235,1956,Mestrado,Uniao estavel,69245.0,0,1,2014-01-24,8,428,...,0,0,0,0,0,0,0,66,843,Acima de 60


In [None]:
# definindo os filtros

filtrodr = df3_pandas.formacao == 'Doutorado'
filtrogr = df3_pandas.formacao == 'Graduacao'
filtromt = df3_pandas.formacao == 'Mestrado'
filtropos = df3_pandas.formacao == 'Pos_grad'
filtrorenda = df3_pandas.renda_anual_fam == 0

In [None]:
# substituindo os valores 0 de doutorado pela media 55597
df3_pandas.loc[filtrodr & filtrorenda,['renda_anual_fam']] =  55597

In [None]:
df3_pandas.loc[df3_pandas.renda_anual_fam == 55597]

Unnamed: 0,id,ano_nasc,formacao,est_civil,renda_anual_fam,qnt_criancas,qnt_adolesc,dt_cadastro,dias_sem_comprar,vl_vinho_2y,...,comp_camp_1,comp_camp_2,comp_camp_3,comp_camp_4,comp_camp_5,comp_ult_camp,reclamacao_2y,idade,total_comp,fx_etaria
43,7281,1959,Doutorado,Solteiro,55597.0,0,0,2013-01-05,80,81,...,0,0,0,0,0,0,0,63,186,Acima de 60
90,8996,1957,Doutorado,Casado,55597.0,2,1,2012-01-19,4,230,...,0,0,0,0,0,0,0,65,603,Acima de 60
128,8268,1961,Doutorado,Casado,55597.0,0,1,2013-01-11,23,352,...,0,0,0,0,0,0,0,61,404,Acima de 60
1386,3769,1972,Doutorado,Uniao estavel,55597.0,1,0,2014-01-02,17,25,...,0,0,0,0,0,0,0,50,42,Entre 45 e 60
2061,1612,1981,Doutorado,Solteiro,55597.0,1,0,2013-01-31,82,23,...,0,0,0,0,0,0,0,41,47,Entre 30 e 45


In [None]:
# substituindo os valores 0 de graduação pela media 52181
df3_pandas.loc[filtrogr & filtrorenda,['renda_anual_fam']] =  52181

In [None]:
df3_pandas.loc[df3_pandas.renda_anual_fam == 52181]

Unnamed: 0,id,ano_nasc,formacao,est_civil,renda_anual_fam,qnt_criancas,qnt_adolesc,dt_cadastro,dias_sem_comprar,vl_vinho_2y,...,comp_camp_1,comp_camp_2,comp_camp_3,comp_camp_4,comp_camp_5,comp_ult_camp,reclamacao_2y,idade,total_comp,fx_etaria
10,1994,1983,Graduacao,Casado,52181.0,1,0,2013-01-15,11,5,...,0,0,0,0,0,0,0,39,19,Entre 30 e 45
27,5255,1986,Graduacao,Solteiro,52181.0,1,0,2013-01-20,19,5,...,0,0,0,0,0,0,0,36,637,Entre 30 e 45
48,7244,1951,Graduacao,Solteiro,52181.0,2,1,2014-01-01,96,48,...,0,0,0,0,0,0,0,71,124,Acima de 60
58,8557,1982,Graduacao,Solteiro,52181.0,1,0,2013-01-17,57,11,...,0,0,0,0,0,0,0,40,46,Entre 30 e 45
91,9235,1957,Graduacao,Solteiro,52181.0,1,1,2014-01-27,45,7,...,0,0,0,0,0,0,0,65,18,Acima de 60
133,1295,1963,Graduacao,Casado,52181.0,0,1,2013-01-11,96,231,...,0,0,0,0,0,0,0,59,725,Entre 45 e 60
312,2437,1989,Graduacao,Casado,52181.0,0,0,2013-01-03,69,861,...,0,1,0,1,0,0,0,33,1611,Entre 30 e 45
319,2863,1970,Graduacao,Solteiro,52181.0,1,2,2013-01-23,67,738,...,0,1,0,1,0,0,0,52,1052,Entre 45 e 60
1382,2902,1958,Graduacao,Uniao estavel,52181.0,1,1,2012-01-03,87,19,...,0,0,0,0,0,0,0,64,45,Acima de 60
2078,5079,1971,Graduacao,Casado,52181.0,1,1,2013-01-03,82,71,...,0,0,0,0,0,0,0,51,97,Entre 45 e 60


In [None]:
# substituindo os valores 0 de mestrado pela media 52166
df3_pandas.loc[filtromt & filtrorenda,['renda_anual_fam']] =  52166

In [None]:
df3_pandas.loc[df3_pandas.renda_anual_fam == 52166]

Unnamed: 0,id,ano_nasc,formacao,est_civil,renda_anual_fam,qnt_criancas,qnt_adolesc,dt_cadastro,dias_sem_comprar,vl_vinho_2y,...,comp_camp_1,comp_camp_2,comp_camp_3,comp_camp_4,comp_camp_5,comp_ult_camp,reclamacao_2y,idade,total_comp,fx_etaria
92,5798,1973,Mestrado,Uniao estavel,52166.0,0,0,2013-01-23,87,445,...,0,0,0,0,0,0,0,49,985,Entre 45 e 60
1379,10475,1970,Mestrado,Uniao estavel,52166.0,0,1,2013-01-01,39,187,...,0,0,0,0,0,0,0,52,317,Entre 45 e 60
2059,7187,1969,Mestrado,Uniao estavel,52166.0,1,1,2013-01-18,52,375,...,0,0,0,0,0,0,0,53,721,Entre 45 e 60
2079,10339,1954,Mestrado,Uniao estavel,52166.0,0,1,2013-01-23,83,161,...,0,0,0,0,0,0,0,68,207,Acima de 60
2084,5250,1943,Mestrado,Viuva,52166.0,0,0,2013-01-30,75,532,...,0,0,1,0,0,0,1,79,1564,Acima de 60


In [None]:
# substituindo os valores 0 de pos_grad pela media 46929
df3_pandas.loc[filtropos & filtrorenda,['renda_anual_fam']] =  46929

In [None]:
df3_pandas.loc[df3_pandas.renda_anual_fam == 46929]

Unnamed: 0,id,ano_nasc,formacao,est_civil,renda_anual_fam,qnt_criancas,qnt_adolesc,dt_cadastro,dias_sem_comprar,vl_vinho_2y,...,comp_camp_1,comp_camp_2,comp_camp_3,comp_camp_4,comp_camp_5,comp_ult_camp,reclamacao_2y,idade,total_comp,fx_etaria
71,10629,1973,Pos_grad,Casado,46929.0,1,0,2012-01-14,25,25,...,0,0,0,0,0,0,0,49,109,Entre 45 e 60
1383,4345,1964,Pos_grad,Solteiro,46929.0,1,1,2014-01-12,49,5,...,0,0,0,0,0,0,0,58,21,Entre 45 e 60
2224,8720,1978,Pos_grad,Uniao estavel,46929.0,0,0,2012-01-12,53,32,...,0,1,0,0,0,0,0,44,1679,Entre 30 e 45


In [None]:
# ensino basico não possuia valores nulos.

In [None]:
# realizando o drop das idades inconsistentes
df3_pandas.loc[df3_pandas.idade > 100]


Unnamed: 0,id,ano_nasc,formacao,est_civil,renda_anual_fam,qnt_criancas,qnt_adolesc,dt_cadastro,dias_sem_comprar,vl_vinho_2y,...,comp_camp_1,comp_camp_2,comp_camp_3,comp_camp_4,comp_camp_5,comp_ult_camp,reclamacao_2y,idade,total_comp,fx_etaria
192,7829,1900,Pos_grad,Divorciado,36640.0,1,0,2013-01-26,99,15,...,0,0,0,0,0,1,0,122,65,Acima de 60
239,11004,1893,Pos_grad,Solteiro,60182.0,0,1,2014-01-17,23,8,...,0,0,0,0,0,0,0,129,22,Acima de 60
339,1150,1899,Doutorado,Uniao estavel,83532.0,0,0,2013-01-26,36,755,...,0,0,1,0,0,0,0,123,1853,Acima de 60


In [None]:
df3_pandas.drop([192,239,339], axis = 0,inplace = True)

In [None]:
df11 = spark.createDataFrame(df3_pandas)

In [None]:
df11.select(F.col('idade')).filter(F.col('idade') > 100).show()

+-----+
|idade|
+-----+
+-----+



In [None]:
# group by por faixa etaria para verificar o que cada grupo consome mais.

df10 = ( df11.groupBy(F.col("fx_etaria"))
               .agg(F.sum("vl_vinho_2y"),
                F.sum("vl_fruta_2y"),
                F.sum("vl_carne_2y"),
                F.sum('vl_peixe_2y'),
                F.sum('vl_doce_2y'),
                F.sum('vl_ouro_2y')


               ))


In [None]:
df10.show()

+-------------+----------------+----------------+----------------+----------------+---------------+---------------+
|    fx_etaria|sum(vl_vinho_2y)|sum(vl_fruta_2y)|sum(vl_carne_2y)|sum(vl_peixe_2y)|sum(vl_doce_2y)|sum(vl_ouro_2y)|
+-------------+----------------+----------------+----------------+----------------+---------------+---------------+
|Entre 30 e 45|          135201|           15263|           96509|           20901|          15616|          24163|
|  Acima de 60|          245778|           18546|          123131|           27801|          19422|          31105|
|  até 30 anos|            8175|             991|            8676|            1670|           1054|           1402|
|Entre 45 e 60|          289529|           23792|          144352|           33148|          24394|          41196|
+-------------+----------------+----------------+----------------+----------------+---------------+---------------+



In [None]:
#group by por est_civil solteiro e gastos com vinho
df12=df11.groupBy(F.col('est_civil')).agg(F.sum('vl_vinho_2y') ).filter(F.col('est_civil') == 'Solteiro')

In [None]:
df14 = df12.withColumnRenamed('sum(vl_vinho_2y)', 'vl_vinho_2y')

In [None]:
df14.show()

+---------+-----------+
|est_civil|vl_vinho_2y|
+---------+-----------+
| Solteiro|     138945|
+---------+-----------+



In [None]:
# Rank por formação e renda
w0 = Window.partitionBy(F.col('formacao')).orderBy('renda_anual_fam')

In [None]:
df11.withColumn('rank', F.rank().over(w0)).show()

+-----+--------+--------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+-------------+-------------+-----+----------+-------------+----+
|   id|ano_nasc|formacao|    est_civil|renda_anual_fam|qnt_criancas|qnt_adolesc|dt_cadastro|dias_sem_comprar|vl_vinho_2y|vl_fruta_2y|vl_carne_2y|vl_peixe_2y|vl_doce_2y|vl_ouro_2y|comp_desconto|comp_catalogo|comp_presencial|comp_site|ac_site_ult_mes|comp_camp_1|comp_camp_2|comp_camp_3|comp_camp_4|comp_camp_5|comp_ult_camp|reclamacao_2y|idade|total_comp|    fx_etaria|rank|
+-----+--------+--------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+---

In [None]:
# Dense rank por faixa etaria e gasto total
w1 = Window.partitionBy(F.col('fx_etaria')).orderBy('total_comp')

In [None]:
df11.withColumn('dense_rank', F.dense_rank().over(w1)).show()

+-----+--------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+---------------+-----------+-----------+-----------+-----------+-----------+-------------+-------------+-----+----------+-----------+----------+
|   id|ano_nasc| formacao|    est_civil|renda_anual_fam|qnt_criancas|qnt_adolesc|dt_cadastro|dias_sem_comprar|vl_vinho_2y|vl_fruta_2y|vl_carne_2y|vl_peixe_2y|vl_doce_2y|vl_ouro_2y|comp_desconto|comp_catalogo|comp_presencial|comp_site|ac_site_ult_mes|comp_camp_1|comp_camp_2|comp_camp_3|comp_camp_4|comp_camp_5|comp_ult_camp|reclamacao_2y|idade|total_comp|  fx_etaria|dense_rank|
+-----+--------+---------+-------------+---------------+------------+-----------+-----------+----------------+-----------+-----------+-----------+-----------+----------+----------+-------------+-------------+---------------+---------+--------

# SPARK-SQL

In [None]:
df11.createOrReplaceTempView("campanha")

In [None]:
#Utilizando a função select para fazer a consulta no dataframe, fazendo o group by por estado civil para saber quem gasta mais com vinhos e ordenando.
spark.sql("SELECT est_civil ,SUM (vl_vinho_2y) As gasto_vinho FROM campanha GROUP BY est_civil ORDER BY gasto_vinho desc  ").show()

+-------------+-----------+
|    est_civil|gasto_vinho|
+-------------+-----------+
|       Casado|        864|
|Uniao estavel|        579|
|     Solteiro|        482|
|   Divorciado|        231|
|        Viuva|         77|
+-------------+-----------+



In [None]:
# Utilizando o select para fazer a consulta ,group by por faixa etaria para verificar com o que cada grupo mais gasta.

# Através dessa consulta podemos observar que as pessoas que mais gastam em vinhos são as que mais tem gasto com carne também.
# As pessoas abaixo de 30 anos são as que menos tem gastos com frutas.

In [None]:
spark.sql("SELECT fx_etaria, SUM(vl_vinho_2y) As gasto_vinho , SUM(vl_carne_2y) As gasto_carne , SUM(vl_fruta_2y) As gasto_fruta, \
SUM(vl_carne_2y)As gasto_carne,SUM(vl_peixe_2y) As gasto_peixe,SUM(vl_doce_2y)As gasto_doce,SUM(vl_ouro_2y)As gasto_ouro FROM campanha GROUP BY fx_etaria ORDER BY gasto_vinho desc").show()

+-------------+-----------+-----------+-----------+-----------+-----------+----------+----------+
|    fx_etaria|gasto_vinho|gasto_carne|gasto_fruta|gasto_carne|gasto_peixe|gasto_doce|gasto_ouro|
+-------------+-----------+-----------+-----------+-----------+-----------+----------+----------+
|Entre 45 e 60|     289529|     144352|      23792|     144352|      33148|     24394|     41196|
|  Acima de 60|     245778|     123131|      18546|     123131|      27801|     19422|     31105|
|Entre 30 e 45|     135201|      96509|      15263|      96509|      20901|     15616|     24163|
|  até 30 anos|       8175|       8676|        991|       8676|       1670|      1054|      1402|
+-------------+-----------+-----------+-----------+-----------+-----------+----------+----------+



In [None]:
#Group By por formação para verificar a renda de cada grupo e o gasto.

In [None]:
spark.sql('SELECT formacao, AVG(renda_anual_fam)  As renda, AVG(total_comp)As gasto FROM campanha GROUP BY formacao ORDER BY renda desc').show()

+---------+------------------+-----------------+
| formacao|             renda|            gasto|
+---------+------------------+-----------------+
|Doutorado|56114.875776397515|670.9937888198758|
| Mestrado|52873.287262872625|610.2710027100271|
|Graduacao|52691.548845470694|619.3694493783304|
| Pos_grad| 47614.94029850746|501.0348258706468|
|   Basico| 20306.25925925926|81.79629629629629|
+---------+------------------+-----------------+



In [None]:
# consulta para verificar a relação entre quem tem mais crianças / adolescentes em casa e o gasto com doces.

In [None]:
spark.sql('SELECT est_civil , (SUM(qnt_criancas)+(SUM(qnt_adolesc)))As filhos , SUM(vl_doce_2y)As gasto_doce FROM campanha GROUP BY est_civil ORDER BY filhos desc').show()

+-------------+------+----------+
|    est_civil|filhos|gasto_doce|
+-------------+------+----------+
|       Casado|   836|     23070|
|Uniao estavel|   568|     15087|
|     Solteiro|   422|     13107|
|   Divorciado|   232|      6218|
|        Viuva|    67|      3004|
+-------------+------+----------+



In [None]:
# consulta para verificar o grupo que ficou mais tempo sem comprar e se existe alguma relação com a renda

In [None]:
spark.sql('SELECT formacao , AVG(renda_anual_fam)As renda, AVG(dias_sem_comprar)As dias_sem_comprar FROM campanha GROUP BY formacao ORDER BY dias_sem_comprar desc ').show()

+---------+------------------+------------------+
| formacao|             renda|  dias_sem_comprar|
+---------+------------------+------------------+
|Graduacao|52691.548845470694| 50.02841918294849|
|Doutorado|56114.875776397515|48.697722567287784|
|   Basico| 20306.25925925926| 48.44444444444444|
| Pos_grad| 47614.94029850746| 48.29353233830846|
| Mestrado|52873.287262872625| 47.58536585365854|
+---------+------------------+------------------+



In [None]:
# transformando a data em string para enviar ao MONGO-DB
df_final = df11.withColumn("dt_cadastro", df11.dt_cadastro.cast("string"))

In [None]:
df_final.printSchema()

root
 |-- id: long (nullable = true)
 |-- ano_nasc: long (nullable = true)
 |-- formacao: string (nullable = true)
 |-- est_civil: string (nullable = true)
 |-- renda_anual_fam: double (nullable = true)
 |-- qnt_criancas: long (nullable = true)
 |-- qnt_adolesc: long (nullable = true)
 |-- dt_cadastro: string (nullable = true)
 |-- dias_sem_comprar: long (nullable = true)
 |-- vl_vinho_2y: long (nullable = true)
 |-- vl_fruta_2y: long (nullable = true)
 |-- vl_carne_2y: long (nullable = true)
 |-- vl_peixe_2y: long (nullable = true)
 |-- vl_doce_2y: long (nullable = true)
 |-- vl_ouro_2y: long (nullable = true)
 |-- comp_desconto: long (nullable = true)
 |-- comp_catalogo: long (nullable = true)
 |-- comp_presencial: long (nullable = true)
 |-- comp_site: long (nullable = true)
 |-- ac_site_ult_mes: long (nullable = true)
 |-- comp_camp_1: long (nullable = true)
 |-- comp_camp_2: long (nullable = true)
 |-- comp_camp_3: long (nullable = true)
 |-- comp_camp_4: long (nullable = true)
 |

# LOAD ARQUIVO TRATADO PARA O MONGO-DB

In [None]:
#Extraindo do Pyspark para o pandas
df_tratado = df_final.toPandas()

#Criando o Banco e a Coleção dados_brutos
db = client['Campanha_Marketing']
colecao = db.df_tratado

#Transformando o dataframe para dicionario e inseriNDO na coleção desejada.
df_tratado_dici = df_tratado.to_dict('records')
colecao.insert_many(df_tratado_dici)


# LOAD ARQUIVO TRATADO PARA A BUCKET

In [None]:
(df_final.write.format("csv").option("header", "true")
                        .option("inferschema", "true")
                        .option("delimiter", ",")
                        .save('gs://data-engineer-gsantos/Datasets Tratados/marketing_campaign_tratado'))