# PROJETO INDIVIDUAL

Exercício realizado no Bootcamp de Engenharia de Dados da SoulCode

## ENUNCIADO

Nivel Infra
- O Dataset deve ser salvo em ambiente cloud(Cloud Storage).
- O arquivo original e tratado deve ser salvo em MongoDB Atlas em coleções diferentes.
- Os DataFrames devem ser obrigatoriamente salvos em uma bucket do CloudStorage.<br><br>

Nivel Pandas
- O arquivo está em outra linguagem e deve ter seus dados traduzidos para Português-BR.
- Realizar a extração corretamente para um dataframe.
- Verificar a existência de dados inconsistentes e realizar a limpeza para NaN ou NA.
- Realizar o drop(se necessário) de colunas do dataframe realizando o comentário do porque da exclusão.
- Todos os passos devem ser comentados.<br><br>

Nivel PySpark (Funções básicas vistas em aula)
- Deverá ser montada a estrutura do DataFrame utilizando o StructType.
- Verificar a existência de dados inconsistentes, nulos e realizar a limpeza.
- Verificar a necessidade de drop em colunas ou linhas. Caso seja necessário, fazer comentário do porque.
- Realizar a mudança de nome de pelo menos 2 colunas.
- Deverá criar pelo menos duas novas colunas contendo alguma informação relevante sobre as outras colunas já existentes (Funções de Agrupamento, Agregação ou Joins). (Use a sua capacidade analítica).
- Deverá utilizar filtros, ordenação e agrupamento, trazendo dados relevantes para o negócio em questão. (Use a sua capacidade analítica)
- Utilizar pelo menos duas Window Functions.<br><br>

Nivel SparkSQL
- Utilizar no minimo 5 consultas diferentes utilizando o SparkSQL, comentando o porquê de ter escolhido essas funções e explicando o que cada consulta faz.<br><br>

Nível DataStudio
- Construir um dashboard (maximo 1 pagina) para apresentação dos insights.<br><br>

Ferramentas:
- Colab ou Ides | Google Cloud | Data Studio

## INSTALL E BIBLIOTECAS

### GOOGLE

In [None]:
!pip install gcsfs

In [None]:
from google.cloud import storage
from google.colab import drive
import os

### MONGO

In [None]:
from pymongo import MongoClient
!cp /content/drive/MyDrive/SoulCode/modulos/mongo.py .
from mongo import get_mongo_path

### PANDAS

In [None]:
import pandas as pd

### SPARK

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 31 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=4155f85a9e8aa5be8e5bbe767772ef472cb8ade962fcc9e6179d53960957ba8f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## CONEXÕES

### CONEXÃO GOOGLE

Conexão com o Google Drive

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Obtendo a chave de acesso para a Google Cloud Platform(GCP) em uma pasta do Drive

In [None]:
serviceAccount = '/content/drive/MyDrive/SoulCode/Datasets/macro-mercury-349020-d9ed9a670580.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

Conexão com a GCP

In [None]:
client_bucket = storage.Client()
bucket = client_bucket.get_bucket('soulcode')



---



---



---


### CONEXÃO MONGO

Obtendo o IP do Colab para liberar conexão no MongoDB Atlas

Obtendo a string de conexão


In [None]:
conn = get_mongo_path()

Conectando com o Mongo

In [None]:
mongo_client = MongoClient(conn)
db = mongo_client['soulcode']

Criando a collection de backup

In [None]:
db.create_collection('marketing_campaing_original')
collection_backup = db['marketing_campaing_original']

### CONEXÃO SPARK

In [None]:
bucket.blob('marketing_campaign.csv')
path = 'gs://soulcode/Original/marketing_campaign.csv'
spark = (SparkSession.builder
                      .master("local")
                      .appName("Projeto Individual SoulCode")
                      .config("spark.ui.port","4050")
                      .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
                      .getOrCreate()
)


Verificação da conexão

In [None]:
spark

## ETL - PANDAS

### EXTRAÇÃO - LEITURA DE DADOS

Lendo o arquivo csv do bucket 'soulcode'

In [None]:
df = pd.read_csv('https://storage.googleapis.com/soulcode/Original/marketing_campaign.csv', sep=';')

#### BACKUP

Backup do dataframe


In [None]:
df1 = df.copy()

In [None]:
df.to_csv('gs://soulcode/Original/campanha_marketing_bruto.csv')

Enviando dados originais para o Mongo

In [None]:
df_dict = df.to_dict('records')
collection_backup.insert_many(df_dict)

<pymongo.results.InsertManyResult at 0x7f1f36d1a510>

### TRANSFORMAÇÕES

#### ANÁLISE

##### Documentação da tabela

> Bloco com recuo



Após pesquisa na internet foram encontradas as seguintes descrições de cada coluna:

Pessoas

- ID: ID do cliente
- Year_Birth: Data de nascimento
- Education: Escolaridade
- Marital_Status: Estado Civil
- Income: Renda Familiar
- Kidhome: Quantidade crianças na casa
- Teenhome: Quantidade de adolecentes na casa
- Dt_Customer: Data de cadastro
- Recency: Número de dias desde a última compra
- Complain: 1 se o cliente reclamou nos últimos 2 anos, caso contrário 0


Produtos
- MntWines: Quantidade gasta em vinho nos últimos 2 anos
- MntFruits: Quantidade gasta em frutas nos últimos 2 anos
- MntMeatProducts: Quantidade gasta em carne nos últimos 2 anos
- MntFishProducts: Quantidade gasta em peixe nos últimos 2 anos
- MntSweetProducts: Quantidade gasta em doces nos últimos 2 anos
- MntGoldProds: Quantidade gasta em ouro nos últimos 2 anos


Promoções
- NumDealsPurchases: Número de compras feitas com descontos
- AcceptedCmp1: 1 se o cliente aceitou a oferta na 1a campanha, 0 caso contrário
- AcceptedCmp2: 1 se o cliente aceitou a oferta na 2a campanha, 0 caso contrário
- AcceptedCmp3: 1 se o cliente aceitou a oferta na 3a campanha, 0 caso contrário
- AcceptedCmp4: 1 se o cliente aceitou a oferta na 4a campanha, 0 caso contrário
- AcceptedCmp5: 1 se o cliente aceitou a oferta na 5a campanha, 0 caso contrário
- Response: 1 se o cliente aceitou a oferta na última campanha, 0 caso contrário


Forma de compra
- NumWebPurchases: Número de compras feitas pelo website
- NumCatalogPurchases: Número de compras feitas usando um catálogo
- NumStorePurchases: Número de compras feitas na loja
- NumWebVisitsMonth: Número de visitas feita no website no último mês

fonte: https://www.kaggle.com/datasets/imakash3011/customer-personality-analysis

##### Análise exploratória

Determinando quantidade de linhas e colunas

In [None]:
df1.shape

Observando as primeiras linhas para ter uma compreensão básica do dataframe

In [None]:
df1.head().T

Procurando por valores Nulos

In [None]:
df1.isna().sum()

Analisando o tipo de dados de cada coluna

In [None]:
df1.dtypes

Analisando os valores únicos de cada coluna

In [None]:
df1['ID'].is_unique

In [None]:
df1.nunique()

NameError: ignored

In [None]:
sorted(pd.unique(df1['Education']))

In [None]:
sorted(pd.unique(df1['Marital_Status']))

Fazendo uma análise estatística básica de cada coluna

In [None]:
df1.describe().T

Concluões iniciais:

- Somente a coluna 'Income' possui valores nulos
- Não há duplicatas pois a coluna ID possui somente valores únicos
- O tipo de dado da coluna 'Dt_Customer' está em string
- As colunas Z_CostContact e Z_Revenue possuem somente 1 valor e podem ser dropadas
- A coluna 'Income' possui um valor muito discrepante o '666666' é mais de 10 vezes o valor do desvio padrão

#### TRADUÇÃO

Traduzindo os dados do inglês para o português

In [None]:
# Tradução do nome das colunas
(df1.rename(columns={'Year_Birth':'ano_nascimento',
            'Education':'escolaridade',
            'Marital_Status':'estado_civil',
            'Income':'renda',
            'Kidhome':'qtd_crianca',
            'Teenhome':'qtd_adolescente',
            'Dt_Customer':'data_filiação',
            'Recency':'dias_ultima_compra',
            'MntWines':'gasto_vinho',
            'MntFruits':'gasto_fruta',
            'MntMeatProducts':'gasto_carne',
            'MntFishProducts':'gasto_peixe',
            'MntSweetProducts':'gasto_doce',
            'MntGoldProds':'gasto_ouro',
            'NumDealsPurchases':'compras_promoção',
            'NumWebPurchases':'compras_website',
            'NumCatalogPurchases':'compras_catalogo',
            'NumStorePurchases':'compras_loja',
            'NumWebVisitsMonth':'visitas_website',
            'AcceptedCmp1':'campanha_1',
            'AcceptedCmp2':'campanha_2',
            'AcceptedCmp3':'campanha_3',
            'AcceptedCmp4':'campanha_4',
            'AcceptedCmp5':'campanha_5',
            'Complain':'reclamação',
            'Response':'campanha_ultima'},
            inplace=True)
)

In [None]:
# Tradução dos valores da coluna 'Education'
(df1.replace({'2n Cycle':'Pós-graduação',
              'Basic':'Médio',
              'Graduation':'Superior',
              'Master':'Mestrado',
              'PhD':'Doutorado'},
             inplace=True)
)

In [None]:
# Tradução dos valores da coluna 'Marital_status'
(df1.replace({'Absurd':'Solteiro',
              'Alone':'Solteiro',
              'Divorced':'Solteiro',
              'Married':'Casado',
              'Single':'Solteiro',
              'Together':'Casado',
              'Widow':'Solteiro',
              'YOLO':'Solteiro'},
             inplace=True)
)

#### DROP

Eliminando as colunas 'Z_CostContact','Z_Revenue' pois elas possuem um único valor

In [None]:
df1.drop(['Z_CostContact','Z_Revenue'],axis=1,inplace=True)

Elimando a linha com valor muito discrepante

In [None]:
df1 = df1[df1.renda < 600000]

#### NULOS

In [None]:
# Preenchendo os valores nulos com a média da renda
df1['renda'].fillna(df1['renda'].median(), inplace=True)

### LOAD - CARGA

In [None]:
df1.to_csv('gs://soulcode/Tratado/campanha_marketing_tratado.csv')

In [None]:
df1_dict = df1.to_dict('records')

# db.create_collection('campanha_marketing_pandas')
collection_tratado_pandas = db['campanha_marketing_pandas']

# Inserindo os dados no mongodb
collection_tratado_pandas.insert_many(df1_dict)

## ETL - PYSPARK

### EXTRAÇÃO - LEITURA DE DADOS

In [None]:
esquema = (
    StructType([
        StructField('ID', IntegerType(), False),
        StructField('ano_nascimento', IntegerType(), True),
        StructField('escolaridade', StringType(), True),
        StructField('estado_civil', StringType(), True),
        StructField('renda', FloatType(), True),
        StructField('qtd_crianca', IntegerType(), True),
        StructField('qtd_adolescente', IntegerType(), True),
        StructField('data_filiação', StringType(), True),
        StructField('dias_ultima_compra', IntegerType(), True),
        StructField('gasto_vinho', IntegerType(), True),
        StructField('gasto_fruta', IntegerType(), True),
        StructField('gasto_carne', IntegerType(), True),
        StructField('gasto_peixe', IntegerType(), True),
        StructField('gasto_doce', IntegerType(), True),
        StructField('gasto_ouro', IntegerType(), True),
        StructField('compras_promoção', IntegerType(), True),
        StructField('compras_website', IntegerType(), True),
        StructField('compras_catalogo', IntegerType(), True),
        StructField('compras_loja', IntegerType(), True),
        StructField('visitas_website', IntegerType(), True),
        StructField('campanha_3', IntegerType(), True),
        StructField('campanha_4', IntegerType(), True),
        StructField('campanha_5', IntegerType(), True),
        StructField('campanha_1', IntegerType(), True),
        StructField('campanha_2', IntegerType(), True),
        StructField('reclamação', IntegerType(), True),
        StructField('Z_CostContact', IntegerType(), True),
        StructField('Z_Revenue', IntegerType(), True),
        StructField('campanha_ultima', IntegerType(), True),
    ])
)

In [None]:
df_spark = (spark.read.format('csv')
              .option('header','true')
              .option('delimiter',';')
              .option('inferschema','false')
              .load('gs://soulcode/marketing_campaign.csv', schema = esquema)
)

In [None]:
# Backup
df2 = df_spark

### ANÁLISE

In [None]:
df_spark.printSchema()

In [None]:
df2.summary().show()

In [None]:
df2.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            F.col(c).contains('nao_sei') | \
                            (F.col(c) == '' ) | \
                            F.col(c).isNull() | \
                            F.isnan(c), c 
                           )).alias(c)
                    for c in df2.columns]).show()

In [None]:
df2.dropDuplicates().count()

### TRANSFORMAÇÃO

#### TRADUÇÃO

In [None]:
# tradução
df2 = ( df2.withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Absurd', 'Solteiro'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Alone', 'Solteiro'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Divorced', 'Solteiro'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Married', 'Casado'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Single', 'Solteiro'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Together', 'Casado'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'Widow', 'Solteiro'))
     .withColumn('estado_civil', F.regexp_replace(F.col('estado_civil'), 'YOLO', 'Solteiro'))
     .withColumn('escolaridade', F.regexp_replace(F.col('escolaridade'), '2n Cycle', 'Pós-graduação'))
     .withColumn('escolaridade', F.regexp_replace(F.col('escolaridade'), 'Basic', 'Médio'))
     .withColumn('escolaridade', F.regexp_replace(F.col('escolaridade'), 'Graduation', 'Superior'))
     .withColumn('escolaridade', F.regexp_replace(F.col('escolaridade'), 'Master', 'Mestrado'))
     .withColumn('escolaridade', F.regexp_replace(F.col('escolaridade'), 'PhD', 'Doutorado'))
)

#### DROP

In [None]:
# Eliminando colunas com valor único
df2 = df2.drop(F.col('Z_CostContact')).drop(F.col('Z_Revenue'))

##### VALORES NULOS

In [None]:
# nulos 
mean_list = df2.agg({'renda': 'mean'}).collect() # Cálculo da média
mean = mean_list[0].asDict() # Extraindo a média da lista e transformando em dicionário
df2 = df2.fillna(mean['avg(renda)']) # Preenchendo valores nulos com a média

##### ADIÇÃO DE COLUNAS

In [None]:
# Criando colunas
df2 = (df2.withColumn('idade', 2022 - F.col('ano_nascimento'))
          .withColumn('gasto_total', 
                F.col('gasto_vinho') + 
                F.col('gasto_fruta') + 
                F.col('gasto_carne') + 
                F.col('gasto_peixe') + 
                F.col('gasto_doce') + 
                F.col('gasto_ouro')
                      )
          .withColumn('campanhas', 
                F.col('campanha_1') +
                F.col('campanha_2') + 
                F.col('campanha_3') + 
                F.col('campanha_4') + 
                F.col('campanha_5') + 
                F.col('campanha_ultima')
                      )
          .withColumn('filhos', F.col('qtd_crianca') + F.col('qtd_adolescente'))
          .withColumn('numero_compras', 
                F.col('compras_promoção') +
                F.col('compras_website') +
                F.col('compras_catalogo') +
                F.col('compras_loja')
                      )
       
       )

In [None]:
df2.show(2)

+----+--------------+------------+------------+-------+-----------+---------------+-------------+------------------+-----------+-----------+-----------+-----------+----------+----------+----------------+---------------+----------------+------------+---------------+----------+----------+----------+----------+----------+----------+---------------+-----+-----------+---------+------+--------------+
|  ID|ano_nascimento|escolaridade|estado_civil|  renda|qtd_crianca|qtd_adolescente|data_filiação|dias_ultima_compra|gasto_vinho|gasto_fruta|gasto_carne|gasto_peixe|gasto_doce|gasto_ouro|compras_promoção|compras_website|compras_catalogo|compras_loja|visitas_website|campanha_3|campanha_4|campanha_5|campanha_1|campanha_2|reclamação|campanha_ultima|idade|gasto_total|campanhas|filhos|numero_compras|
+----+--------------+------------+------------+-------+-----------+---------------+-------------+------------------+-----------+-----------+-----------+-----------+----------+----------+----------------+-

In [None]:
# O cálculo de idade torna o ano de nascimento redundante
df2 = df2.drop(F.col('ano_nascimento'))

### LOAD - CARGA DE DADOS

In [None]:
db.create_collection('campanha_marketing_spark')

In [None]:
df2_pd = df2.toPandas()
df2_dict = df2_pd.to_dict('records')

collection_tratado_spark = db['campanha_marketing_spark']

# Inserindo os dados no mongodb
collection_tratado_spark.insert_many(df2_dict)

<pymongo.results.InsertManyResult at 0x7f801e866550>

In [None]:
df2.write.csv('gs://soulcode/Tratado', sep=',', mode='append', header=True)

### INSIGHTS

##### FILTROS

In [None]:
# escolaridade
df2.groupBy('escolaridade').count().orderBy('count').show()

In [None]:
# escolaridade x renda
df2.groupBy('escolaridade').mean('renda').orderBy('avg(renda)').show()

In [None]:
# escolaridade x gasto
df2.groupBy('escolaridade').mean('gasto_total').orderBy('avg(gasto_total)').show()

In [None]:
# estado civil
df2.groupBy('estado_civil').count().orderBy('count').show()

In [None]:
# estado civil x renda
df2.groupBy('estado_civil').mean('renda').orderBy('avg(renda)').show()

In [None]:
# estado_civil x gasto
df2.groupBy('estado_civil').mean('gasto_total').orderBy('avg(gasto_total)').show()

In [None]:
# filhos
df2.groupBy('filhos').count().orderBy('count').show()

In [None]:
# filhos x gasto
df2.groupBy('filhos').mean('gasto_total').orderBy('avg(gasto_total)', ascending=False).show()

In [None]:
# filhos x renda
df2.groupBy('filhos').mean('renda').orderBy('avg(renda)', ascending=False).show()

In [None]:
df2.groupBy('campanhas').mean('renda').orderBy('avg(renda)', ascending=False).show()

In [None]:
df2.groupBy('campanhas').mean('gasto_total').orderBy('avg(gasto_total)', ascending=False).show()

##### SQL

In [None]:
# idade
df2.select('idade').summary().show()

In [None]:
df_idade_q1 = (df2.select(F.col('renda'), 
                          F.col('estado_civil'), 
                          F.col('escolaridade'),
                          F.col('gasto_total'),
                          F.col('idade')
                          )
                  .filter(F.col('idade') < 45)
              )

In [None]:
df_idade_q2 = (df2.select(F.col('renda'), 
                          F.col('estado_civil'), 
                          F.col('escolaridade'), 
                          F.col('gasto_total'),
                          F.col('idade')
                          )
                  .filter(F.col('idade') < 52)
                  .filter(F.col('idade') > 45)
              )

In [None]:
df_idade_q3 = (df2.select(F.col('renda'), 
                          F.col('estado_civil'), 
                          F.col('escolaridade'), 
                          F.col('gasto_total'),
                          F.col('idade')
                          )
                  .filter(F.col('idade') < 63)
                  .filter(F.col('idade') > 52)
              )

In [None]:
df_idade_q4 = (df2.select(F.col('renda'), 
                          F.col('estado_civil'), 
                          F.col('escolaridade'), 
                          F.col('gasto_total'),
                          F.col('idade')
                          )
                  .filter(F.col('idade') > 63)
              )

In [None]:
df2.select(F.col('gasto_total')).filter(F.col('estado_civil')=='casado').show()

In [None]:
df2.select(F.col('gasto_total')).filter(F.col('estado_civil')=='solteiro').show()

##### WINDOW FUNCTION

In [None]:
# escolaridade window
w0 = Window.partitionBy(F.col('escolaridade')).orderBy('renda')
df2.withColumn('rank', F.rank().over(w0)).show()

In [None]:
w1 = Window.partitionBy(F.col('estado_civil')).orderBy('gasto_total')
df2.withColumn('dense', F.dense().over(w1)).show()