## Importando as bibliotecas necessárias

In [25]:
#Descomentar essa linha abaixo case deseje fazer a instalação dos pacotes por aqui
#!pip install pyspark numpy pandas matplotlib

In [3]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, col, isnan, to_date, floor, datediff, current_date, round
import matplotlib.pyplot as plt
from pyspark.sql.types import FloatType

### Iniciando a sessão do Apache Spark

In [4]:
spark = SparkSession.builder.appName("case-petlove").getOrCreate()
spark

### Lendo o arquivo CSV disponibilizado, mantendo o cabeçalho e inferindo um schema de acordo com os dados

In [6]:
df = spark.read.csv("initial_data/data-test-analytics.csv", header=True, inferSchema = True)

#### Verificando a leitura do arquivo

Caso esteja rodando no VSCode use apenas "df.show" para melhor visualização do arquivo.

In [13]:
df.show(n=5, truncate=True, vertical=True)

-RECORD 0----------------------------------
 id                 | 8bf7960e-3b93-468... 
 created_at         | 08/15/17 07:05 AM    
 updated_at         | 01/14/21 11:23 AM    
 deleted_at         | null                 
 name_hash          | 312d206168a318614... 
 email_hash         | 83eb3aed9a44377df... 
 address_hash       | 8b4bfaa0cbc41a16f... 
 birth_date         | 07/10/74 12:00 AM    
 status             | active               
 version            | 2.31.7               
 city               |  Peixoto da Praia    
 state              | AM                   
 neighborhood       | Aparecida 7ª Seção   
 last_date_purchase | 01/14/21 11:23 AM    
 average_ticket     | 151.142941888541     
 items_quantity     | 10                   
 all_revenue        | 906.857651331245     
 all_orders         | 6                    
 recency            | 35                   
 marketing_source   | crm                  
-RECORD 1----------------------------------
 id                 | a39535b5-4

#### Verificando se há valores nulos

In [16]:
df.select(
    [count
        (when
            (col(c).contains('None') | \
            col(c).contains('Null') | \
            (col(c) == '') | \
            col(c).isNull() | \
            isnan(c), c)
        ).alias(c)
    for c in df.columns]).show(n=5, truncate=False, vertical=True)

-RECORD 0------------------
 id                 | 0    
 created_at         | 0    
 updated_at         | 0    
 deleted_at         | 9495 
 name_hash          | 0    
 email_hash         | 0    
 address_hash       | 0    
 birth_date         | 0    
 status             | 0    
 version            | 0    
 city               | 0    
 state              | 0    
 neighborhood       | 0    
 last_date_purchase | 0    
 average_ticket     | 0    
 items_quantity     | 0    
 all_revenue        | 0    
 all_orders         | 0    
 recency            | 0    
 marketing_source   | 0    



Os valores nulos apresentados não atrapalham nossa análise, pois representam a data de quando o "id" foi deletado/cancelou sua assinatura

### Eliminando as colunas que não farão parte da análise, visto que não possuem dados/estão criptografados

In [17]:
df = df.drop('name_hash', 'email_hash', 'address_hash')

### Verificando o schema atual

In [18]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- deleted_at: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- version: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- last_date_purchase: string (nullable = true)
 |-- average_ticket: double (nullable = true)
 |-- items_quantity: integer (nullable = true)
 |-- all_revenue: double (nullable = true)
 |-- all_orders: integer (nullable = true)
 |-- recency: integer (nullable = true)
 |-- marketing_source: string (nullable = true)



### Alterando os campos de acordo com o formato correto

Inferindo o tipo "date" para as colunas de datas

In [19]:
df = df.withColumn('created_at', to_date(df['created_at'], 'MM/dd/yy hh:mm a')) \
       .withColumn('updated_at', to_date(df['updated_at'], 'MM/dd/yy hh:mm a')) \
       .withColumn('deleted_at', to_date(df['deleted_at'], 'MM/dd/yy hh:mm a')) \
       .withColumn('birth_date', to_date(df['birth_date'], 'MM/dd/yy hh:mm a')-36525) \
       .withColumn('last_date_purchase', to_date(df['last_date_purchase'], 'MM/dd/yy hh:mm a')) 

Inferindo o tipo "float" para as colunas de valor monetário (uma boa prática é se usar double ou uma biblioteca específica para isso, porém os dados estavam quebrados e com muitas casas decimais, tomei a liberdade de usar uma aproximação com float)

In [20]:
df = df.withColumn('average_ticket', round(df["average_ticket"].cast(FloatType()),2))  \
       .withColumn('all_revenue', round(df["all_revenue"].cast(FloatType()), 2) ) 

#### Criando uma nova coluna "age", facilitando o trabalho com a coluna birth_date, em análises pontuais essa é uma prática ok, mas não deve ser usada para dashboards iterativas, nesse caso deve-se usar um campo cálculado

In [21]:
df = df.withColumn('age',floor(datediff(current_date(), to_date(col('birth_date'), 'M/d/yyyy'))/365.24).cast('integer'))

### Checando nosso schema atual

In [22]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- created_at: date (nullable = true)
 |-- updated_at: date (nullable = true)
 |-- deleted_at: date (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- version: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- last_date_purchase: date (nullable = true)
 |-- average_ticket: float (nullable = true)
 |-- items_quantity: integer (nullable = true)
 |-- all_revenue: float (nullable = true)
 |-- all_orders: integer (nullable = true)
 |-- recency: integer (nullable = true)
 |-- marketing_source: string (nullable = true)
 |-- age: integer (nullable = true)



### Ao final da limpeza e criação de novas colunas obtemos o seguinte esquema com as váriaveis sendo definidas em:

1.  Numéricas
	1. Discreta
		1. created_at
		2. updated_at
		3. deleted_at
		4. birth_date
		5. last_date_purchase
		6. items_quantity
		7. recency
		8. age
	2.  Contínua
		1. average_ticket
		2. all_revenue
		3. all_orders
2. Categoricas
	1. ID
	2. status: 3 valores distintos
	3. version:
	4. city
	5. state
	6. neighborhood
	7. marketing_source




### Verificando o shape do nosso DF

In [23]:
print(f'DF Shape: {df.count()} x {len(df.columns)}')


DF Shape: 10000 x 18


### Exportando o dataframe para CSV

In [21]:
df.toPandas().to_csv("data.csv")

Nesse caso fizemos a conversão para pandas "toPandas()", pois o Spark trabalho com o dataframe de forma distribuida, necessitando de outras bibliotecas e configurações para poder exportar o CSV em um único arquivo.
