In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql import functions as f
import re

spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [2]:
# Passando configurações gerais para os .csv
config_csv = {
  "delimiter" : ";"
  ,"encode" : "utf-8"
  ,"header" : True
  ,"inferSchema" : True
}
# Passando os path em comum para os .csv
root_path = "/user/amadeus/dados_processamento/dados/"

In [3]:
# Criando o dataframe Clientes e usando exceptions em caso de erro.
try:
  df_clientes = spark.read.options(**config_csv).csv(f'{root_path}CLIENTES.csv')
except Exception as e:
  raise(e)
# Tratando linha Line of Business que estava com três espaços '   ' para '' vazio.
df_clientes.withColumn('Line of Business', regexp_replace('Line of Business', '   ', '')) \
  .toPandas()

Unnamed: 0,Address Number,Business Family,Business Unit,Customer,CustomerKey,Customer Type,Division,Line of Business,Phone,Region Code,Regional Sales Mgr,Search Type
0,10000000,R3,1,City Supermarket,10000000,G2,2,,816-455-8733,4,S16,C
1,10000453,R3,1,A Supermarket,10000453,G1,1,,816-455-8733,5,S19,C
2,10000455,R3,1,Caribian Supermarket,10000455,G2,2,,816-455-8733,1,S16,C
3,10000456,R1,1,A&B Shop,10000456,G3,1,,816-455-8733,0,S2,C
4,10000457,O2,1,A&G Shop,10000457,G1,1,,816-455-8733,5,S1,C
...,...,...,...,...,...,...,...,...,...,...,...,...
679,10027560,R2,1,Zilog Shop,10027560,G2,2,,816-455-8733,2,S1,C
680,10027572,R3,1,ZipLip.Com Shop,10027572,G2,2,,816-455-8733,3,S1,C
681,10027575,R3,1,Zitel Shop,10027575,G2,2,,816-455-8733,2,S1,C
682,10027583,R2,1,zNET Shop,10027583,G2,2,,816-455-8733,4,S5,C


In [4]:
# Criando o dataframe Divisao e usando exceptions em caso de erro.
try:
  df_divisao = spark.read.options(**config_csv).csv(f'{root_path}DIVISAO.csv')
except Exception as e:
  raise(e)

In [67]:
# Criando o dataframe Endereço e usando exceptions em caso de erro.
try:
  df_endereco = spark.read.options(**config_csv).csv(f'{root_path}ENDERECO.csv')
except Exception as e:
  raise(e)

for column in df_endereco.columns:
    df_endereco = df_endereco.withColumn(column, rtrim(df_endereco[column]))
    df_endereco = df_endereco.withColumn(column, when(df_endereco[column] == '', "Nao informado")\
                                         .when(df_endereco[column].isNull(), "Nao informado")\
                                         .otherwise(df_endereco[column]))
    
df_endereco.toPandas()

Unnamed: 0,Address Number,City,Country,Customer Address 1,Customer Address 2,Customer Address 3,Customer Address 4,State,Zip Code
0,10000000,Akron,US,PO Box 6258,Nao informado,Nao informado,Nao informado,OH,44312
1,10000453,Nao informado,UK,Nao informado,Nao informado,Nao informado,Nao informado,Nao informado,Nao informado
2,10000455,Huntington Beach,US,7392 Count Circle,Nao informado,Nao informado,Nao informado,CA,92647
3,10000456,Edmonton,CA,8151 Wagner Road,Nao informado,Nao informado,Nao informado,AB,T6E 4N6
4,10000458,Saginaw,US,PO Box 840,Nao informado,Nao informado,Nao informado,MI,48606
...,...,...,...,...,...,...,...,...,...
450,10027560,Odessa,US,3356 Kermit Highway,Nao informado,Nao informado,Nao informado,TX,79764
451,10027572,Elma,US,2210 Bowen Road,Nao informado,Nao informado,Nao informado,NY,14059
452,10027575,Dallas,US,10400 Plano Road,Nao informado,Nao informado,Nao informado,TX,75238
453,10027583,Morton,US,Attention: Charlene Hoyer,500 North Morton Avenue,PO Box 474,Nao informado,IL,61550-0474


In [68]:
df_endereco.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_endereco.columns]).toPandas()

Unnamed: 0,Address Number,City,Country,Customer Address 1,Customer Address 2,Customer Address 3,Customer Address 4,State,Zip Code
0,0,0,0,0,0,0,0,0,0


In [70]:
# Como os campos são todos String, passada a informação Nao informado

df_endereco = df_endereco.na.fill("Nao informado")

df_endereco.toPandas()

Unnamed: 0,Address Number,City,Country,Customer Address 1,Customer Address 2,Customer Address 3,Customer Address 4,State,Zip Code
0,10000000,Akron,US,PO Box 6258,Nao informado,Nao informado,Nao informado,OH,44312
1,10000453,Nao informado,UK,Nao informado,Nao informado,Nao informado,Nao informado,Nao informado,Nao informado
2,10000455,Huntington Beach,US,7392 Count Circle,Nao informado,Nao informado,Nao informado,CA,92647
3,10000456,Edmonton,CA,8151 Wagner Road,Nao informado,Nao informado,Nao informado,AB,T6E 4N6
4,10000458,Saginaw,US,PO Box 840,Nao informado,Nao informado,Nao informado,MI,48606
...,...,...,...,...,...,...,...,...,...
450,10027560,Odessa,US,3356 Kermit Highway,Nao informado,Nao informado,Nao informado,TX,79764
451,10027572,Elma,US,2210 Bowen Road,Nao informado,Nao informado,Nao informado,NY,14059
452,10027575,Dallas,US,10400 Plano Road,Nao informado,Nao informado,Nao informado,TX,75238
453,10027583,Morton,US,Attention: Charlene Hoyer,500 North Morton Avenue,PO Box 474,Nao informado,IL,61550-0474


In [71]:
# Criando o dataframe Região e usando exceptions em caso de erro.
try:
  df_regiao = spark.read.options(**config_csv).csv(f'{root_path}REGIAO.csv')
except Exception as e:
  raise(e)

In [72]:
# Criando o dataframe Vendas e usando exceptions em caso de erro.
try:
  df_vendas = spark.read.options(**config_csv).csv(f'{root_path}VENDAS.csv')
except Exception as e:
  raise(e)

df_vendas.toPandas() # Pelo df vemos que será necessário excluir as linhas vazias com None e Nan da tabela.

Unnamed: 0,Actual Delivery Date,CustomerKey,DateKey,Discount Amount,Invoice Date,Invoice Number,Item Class,Item Number,Item,Line Number,...,Order Number,Promised Delivery Date,Sales Amount,Sales Amount Based on List Price,Sales Cost Amount,Sales Margin Amount,Sales Price,Sales Quantity,Sales Rep,U/M
0,28/04/2019,10000481.0,28/04/2018,-23791,30/04/2018,100012.0,,,Urban Large Eggs,2000.0,...,200015.0,28/04/2019,23791,0,0,23791,23791,1.0,184.0,EA
1,12/07/2019,10002220.0,12/07/2018,36879,14/07/2018,100233.0,P01,20910,Moms Sliced Turkey,1000.0,...,200245.0,12/07/2019,45617,82496,0,45617,45617,1.0,127.0,EA
2,14/10/2019,10002220.0,15/10/2018,10973,17/10/2018,116165.0,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000.0,...,213157.0,14/10/2019,43893,54866,0,43893,43893,1.0,127.0,EA
3,01/06/2019,10002489.0,01/06/2018,-21175,03/06/2018,100096.0,,,Kiwi Lox,1000.0,...,200107.0,01/06/2019,21175,0,0,21175,21175,1.0,160.0,EA
4,26/05/2019,10004516.0,25/05/2018,9662794,27/05/2018,103341.0,P01,60776,High Top Sweet Onion,1000.0,...,203785.0,26/05/2019,8924866,1858766,0,8924866,1961509011,455.0,124.0,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65530,,,,,,,,,,,...,,,,,,,,,,
65531,,,,,,,,,,,...,,,,,,,,,,
65532,,,,,,,,,,,...,,,,,,,,,,
65533,,,,,,,,,,,...,,,,,,,,,,


In [74]:
# Exibindo tipos dos campos de cliente pois teremos de alterar o tipo de DateKey para date (antes como String)
df_vendas.printSchema()

root
 |-- Actual Delivery Date: string (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- DateKey: string (nullable = true)
 |-- Discount Amount: string (nullable = true)
 |-- Invoice Date: string (nullable = true)
 |-- Invoice Number: integer (nullable = true)
 |-- Item Class: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Line Number: integer (nullable = true)
 |-- List Price: string (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Promised Delivery Date: string (nullable = true)
 |-- Sales Amount: string (nullable = true)
 |-- Sales Amount Based on List Price: string (nullable = true)
 |-- Sales Cost Amount: string (nullable = true)
 |-- Sales Margin Amount: string (nullable = true)
 |-- Sales Price: string (nullable = true)
 |-- Sales Quantity: integer (nullable = true)
 |-- Sales Rep: integer (nullable = true)
 |-- U/M: string (nullable = true)



In [75]:
# Alterando os tipos das colunas para os seus respectivos e corretos tipos

# Usando o dtypes que retorna o o nome da coluna e o tipo

for i in df_vendas.dtypes:
    if re.search("Date", i[0]):
        df_vendas = df_vendas\
                    .withColumn(
                        i[0], 
                        to_date(col(i[0]), 'dd/MM/yyyy')
                        .alias(i[0])
                        )

vendastodouble = ['Discount Amount', 'List Price', 'Sales Amount', 
                'Sales Amount Based on List Price', 'Sales Cost Amount',
                 'Sales Margin Amount', 'Sales Price']

for columntodouble in vendastodouble:
    df_vendas = df_vendas\
                    .withColumn(
                    columntodouble,
                    col(columntodouble)
                    .cast('double')
                    )
    
df_vendas.printSchema()

root
 |-- Actual Delivery Date: date (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- DateKey: date (nullable = true)
 |-- Discount Amount: double (nullable = true)
 |-- Invoice Date: date (nullable = true)
 |-- Invoice Number: integer (nullable = true)
 |-- Item Class: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Line Number: integer (nullable = true)
 |-- List Price: double (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Promised Delivery Date: date (nullable = true)
 |-- Sales Amount: double (nullable = true)
 |-- Sales Amount Based on List Price: double (nullable = true)
 |-- Sales Cost Amount: double (nullable = true)
 |-- Sales Margin Amount: double (nullable = true)
 |-- Sales Price: double (nullable = true)
 |-- Sales Quantity: integer (nullable = true)
 |-- Sales Rep: integer (nullable = true)
 |-- U/M: string (nullable = true)



In [76]:
# DF vendas com valores alterados
df_vendas.toPandas()

Unnamed: 0,Actual Delivery Date,CustomerKey,DateKey,Discount Amount,Invoice Date,Invoice Number,Item Class,Item Number,Item,Line Number,...,Order Number,Promised Delivery Date,Sales Amount,Sales Amount Based on List Price,Sales Cost Amount,Sales Margin Amount,Sales Price,Sales Quantity,Sales Rep,U/M
0,2019-04-28,10000481.0,2018-04-28,,2018-04-30,100012.0,,,Urban Large Eggs,2000.0,...,200015.0,2019-04-28,,0.0,0.0,,,1.0,184.0,EA
1,2019-07-12,10002220.0,2018-07-12,,2018-07-14,100233.0,P01,20910,Moms Sliced Turkey,1000.0,...,200245.0,2019-07-12,,,0.0,,,1.0,127.0,EA
2,2019-10-14,10002220.0,2018-10-15,,2018-10-17,116165.0,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000.0,...,213157.0,2019-10-14,,,0.0,,,1.0,127.0,EA
3,2019-06-01,10002489.0,2018-06-01,,2018-06-03,100096.0,,,Kiwi Lox,1000.0,...,200107.0,2019-06-01,,0.0,0.0,,,1.0,160.0,EA
4,2019-05-26,10004516.0,2018-05-25,,2018-05-27,103341.0,P01,60776,High Top Sweet Onion,1000.0,...,203785.0,2019-05-26,,,0.0,,,455.0,124.0,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65530,,,,,,,,,,,...,,,,,,,,,,
65531,,,,,,,,,,,...,,,,,,,,,,
65532,,,,,,,,,,,...,,,,,,,,,,
65533,,,,,,,,,,,...,,,,,,,,,,


In [77]:
# Retirando as variáveis "PK" DateKey (dia da venda) nulas da tabela
df_vendas = df_vendas.filter(col("CustomerKey").isNotNull())
df_vendas.toPandas()

Unnamed: 0,Actual Delivery Date,CustomerKey,DateKey,Discount Amount,Invoice Date,Invoice Number,Item Class,Item Number,Item,Line Number,...,Order Number,Promised Delivery Date,Sales Amount,Sales Amount Based on List Price,Sales Cost Amount,Sales Margin Amount,Sales Price,Sales Quantity,Sales Rep,U/M
0,2019-04-28,10000481,2018-04-28,,2018-04-30,100012,,,Urban Large Eggs,2000,...,200015,2019-04-28,,0.0,0.0,,,1,184,EA
1,2019-07-12,10002220,2018-07-12,,2018-07-14,100233,P01,20910,Moms Sliced Turkey,1000,...,200245,2019-07-12,,,0.0,,,1,127,EA
2,2019-10-14,10002220,2018-10-15,,2018-10-17,116165,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000,...,213157,2019-10-14,,,0.0,,,1,127,EA
3,2019-06-01,10002489,2018-06-01,,2018-06-03,100096,,,Kiwi Lox,1000,...,200107,2019-06-01,,0.0,0.0,,,1,160,EA
4,2019-05-26,10004516,2018-05-25,,2018-05-27,103341,P01,60776,High Top Sweet Onion,1000,...,203785,2019-05-26,,,0.0,,,455,124,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65277,2020-03-18,10017638,2019-03-18,,2019-03-21,226497,P01,13447,High Top Oranges,8000,...,320895,2020-03-18,,,,,,9,180,EA
65278,2020-03-18,10017638,2019-03-18,,2019-03-21,226497,P01,25906,Landslide White Sugar,38000,...,320895,2020-03-18,,,,,,2,180,EA
65279,2020-03-18,10017638,2019-03-18,,2019-03-21,226497,P01,61856,Moms Potato Salad,227001,...,320895,2020-03-18,,,574.0,,,8,180,EA
65280,2020-03-18,10017638,2019-03-18,,2019-03-21,226498,P01,17801,Better Fancy Canned Sardines,1000,...,320907,2020-03-18,,,,,,36,180,EA


In [78]:
# Usando o dtypes mais uma vez para preencher os campos nulos, o i[1] representa o tipo
# Caso seja int, preencher com 0, caso seja string preencher com Nao informado
for i in df_vendas.dtypes:
    if re.search("int", i[1]):
      df_vendas = df_vendas.na.fill(0)
    if re.search("string", i[1]):
      df_vendas = df_vendas.na.fill("Nao informado")
df_vendas.limit(5).toPandas()

Unnamed: 0,Actual Delivery Date,CustomerKey,DateKey,Discount Amount,Invoice Date,Invoice Number,Item Class,Item Number,Item,Line Number,...,Order Number,Promised Delivery Date,Sales Amount,Sales Amount Based on List Price,Sales Cost Amount,Sales Margin Amount,Sales Price,Sales Quantity,Sales Rep,U/M
0,2019-04-28,10000481,2018-04-28,0.0,2018-04-30,100012,Nao informado,Nao informado,Urban Large Eggs,2000,...,200015,2019-04-28,0.0,0.0,0.0,0.0,0.0,1,184,EA
1,2019-07-12,10002220,2018-07-12,0.0,2018-07-14,100233,P01,20910,Moms Sliced Turkey,1000,...,200245,2019-07-12,0.0,0.0,0.0,0.0,0.0,1,127,EA
2,2019-10-14,10002220,2018-10-15,0.0,2018-10-17,116165,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000,...,213157,2019-10-14,0.0,0.0,0.0,0.0,0.0,1,127,EA
3,2019-06-01,10002489,2018-06-01,0.0,2018-06-03,100096,Nao informado,Nao informado,Kiwi Lox,1000,...,200107,2019-06-01,0.0,0.0,0.0,0.0,0.0,1,160,EA
4,2019-05-26,10004516,2018-05-25,0.0,2018-05-27,103341,P01,60776,High Top Sweet Onion,1000,...,203785,2019-05-26,0.0,0.0,0.0,0.0,0.0,455,124,SE


In [79]:
# Observando se alguma linha está nula
df_vendas.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_vendas.columns]).toPandas()

Unnamed: 0,Actual Delivery Date,CustomerKey,DateKey,Discount Amount,Invoice Date,Invoice Number,Item Class,Item Number,Item,Line Number,...,Order Number,Promised Delivery Date,Sales Amount,Sales Amount Based on List Price,Sales Cost Amount,Sales Margin Amount,Sales Price,Sales Quantity,Sales Rep,U/M
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [238]:
# Respondendo o exercício:
# Quantos pedidos foram realizados?
# Quantos clientes tem em nossa base?
# Quantos clientes temos por Região?
# contar distinct Customer por Region Code
# Quantidade de vendas em 2018.
# contar * de vendas where year('DateKey') = 2018

In [88]:
# Question #1: Quantos pedidos foram realizados?
# Answer: Observar a tabela vendas e fazer o count * da tabelas vendas e contando os itens totais de venda
# Answer bonus: Verificar quantidade de itens vendidos nas vendas, somando a coluna SalesQuantity
df_vendas.createOrReplaceTempView("vendasView")
df_vendas_pedidos = spark\
                    .sql("""
                        SELECT COUNT(*) as Total_Vendas, sum(`Sales Quantity`) as Quantitade_Itens_Vendidos
                        FROM VendasView
                    """)
df_vendas_pedidos.toPandas()

Unnamed: 0,Total_Vendas,Quantitade_Itens_Vendidos
0,65282,2943194


In [91]:
# Question #2: Quantos clientes tem em nossa base?
# Answer: Contar o * de clientes
tot_clientes = df_clientes.count()
print(f"Temos em nossa base {tot_clientes} clientes.")

Temos em nossa base 684 clientes.


In [255]:
# Question #3: Quantos clientes temos por Região?
# Answer: Vejo as chaves que interligam a tabela (Region Code)
df_clientes.printSchema()
df_regiao.printSchema()

root
 |-- Address Number: integer (nullable = true)
 |-- Business Family: string (nullable = true)
 |-- Business Unit: integer (nullable = true)
 |-- Customer: string (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Division: integer (nullable = true)
 |-- Line of Business: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Region Code: integer (nullable = true)
 |-- Regional Sales Mgr: string (nullable = true)
 |-- Search Type: string (nullable = true)

root
 |-- Region Code: integer (nullable = true)
 |-- Region Name: string (nullable = true)



In [274]:
# Crio as views para trabalhar em SQL
df_clientes.createOrReplaceTempView("clientesView")
df_regiao.createOrReplaceTempView("regiaoView")
clientes_regiao = spark\
                    .sql("""
                        SELECT r.`Region Name`, count(Customer) AS Tot_Clientes
                        FROM clientesView c
                        INNER JOIN regiaoView r
                        ON c.`Region Code` = r.`Region Code`
                        GROUP BY r.`Region Name`
                        ORDER BY tot_clientes DESC
                    """)
clientes_regiao.toPandas()
# clientes_regiao.show()
# display(clientes_regiao)

Unnamed: 0,Region Name,Tot_Clientes
0,International,299
1,Central,117
2,Southern,100
3,Western,89
4,Northeast,42
5,Canada,37


In [93]:
# Question #4: Qual a quantidade de vendas em 2018?
# Answer: Puxo uma seleção de count com o DateKey == 2018 (Ano de venda)
df_vendas2018 = spark\
                    .sql("""
                        SELECT COUNT(*) Vendas_2018
                        FROM VendasView
                        WHERE YEAR(DateKey) = 2018
                    """)
df_vendas2018.toPandas()

Unnamed: 0,Vendas_2018
0,30560
