In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql import functions as f
import os, sys
import re

spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [2]:
# Criando dataframes diretamente do Hive

df_clientes = spark.sql("SELECT * FROM desafio_final.TBL_CLIENTES")
df_divisao = spark.sql("SELECT * FROM desafio_final.TBL_DIVISAO")
df_endereco = spark.sql("SELECT * FROM desafio_final.TBL_ENDERECO")
df_regiao = spark.sql("SELECT * FROM desafio_final.TBL_REGIAO")
df_vendas = spark.sql("SELECT * FROM desafio_final.TBL_VENDAS")

# Retirando todos os espaços das colunas dos dataframes

for each in df_clientes.schema.names:
    df_clientes = df_clientes.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '_')))
    
for each in df_divisao.schema.names:
    df_divisao = df_divisao.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '_')))
    
for each in df_endereco.schema.names:
    df_endereco = df_endereco.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '_')))
    
for each in df_regiao.schema.names:
    df_regiao = df_regiao.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '_')))
        
for each in df_vendas.schema.names:
    df_vendas = df_vendas.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '_')))


# Vendo os Schemas dos dataframes
df_clientes.printSchema()
df_divisao.printSchema()
df_endereco.printSchema()
df_regiao.printSchema()
df_vendas.printSchema()


root
 |-- address_number: integer (nullable = true)
 |-- business_family: string (nullable = true)
 |-- business_unit: integer (nullable = true)
 |-- customer: string (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- division: integer (nullable = true)
 |-- line_of_business: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- regional_sales_mgr: string (nullable = true)
 |-- search_type: string (nullable = true)
 |-- dt_foto: string (nullable = true)

root
 |-- division: integer (nullable = true)
 |-- division_name: string (nullable = true)
 |-- dt_foto: string (nullable = true)

root
 |-- address_number: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- customer_address_1: string (nullable = true)
 |-- customer_address_2: string (nullable = true)
 |-- customer_address_3: string (nullable = true)
 |-- customer_a

In [3]:
# Visualiando meu DF

# df_clientes.show()
df_clientes.toPandas()

Unnamed: 0,address_number,business_family,business_unit,customer,customerkey,customer_type,division,line_of_business,phone,region_code,regional_sales_mgr,search_type,dt_foto
0,10000000,R3,1,City Supermarket,10000000,G2,2,,816-455-8733,4,S16,C,2022-06-09
1,10000453,R3,1,A Supermarket,10000453,G1,1,,816-455-8733,5,S19,C,2022-06-09
2,10000455,R3,1,Caribian Supermarket,10000455,G2,2,,816-455-8733,1,S16,C,2022-06-09
3,10000456,R1,1,A&B Shop,10000456,G3,1,,816-455-8733,0,S2,C,2022-06-09
4,10000457,O2,1,A&G Shop,10000457,G1,1,,816-455-8733,5,S1,C,2022-06-09
...,...,...,...,...,...,...,...,...,...,...,...,...,...
679,10027560,R2,1,Zilog Shop,10027560,G2,2,,816-455-8733,2,S1,C,2022-06-09
680,10027572,R3,1,ZipLip.Com Shop,10027572,G2,2,,816-455-8733,3,S1,C,2022-06-09
681,10027575,R3,1,Zitel Shop,10027575,G2,2,,816-455-8733,2,S1,C,2022-06-09
682,10027583,R2,1,zNET Shop,10027583,G2,2,,816-455-8733,4,S5,C,2022-06-09


In [4]:
# Tratando linha line_of_business que estava com três espaços '   ' para 'Nao informado' por ser string.

df_clientes = df_clientes.withColumn('line_of_business', regexp_replace('line_of_business', '   ', 'Nao informado'))

# df_clientes.show()
df_clientes.toPandas()

Unnamed: 0,address_number,business_family,business_unit,customer,customerkey,customer_type,division,line_of_business,phone,region_code,regional_sales_mgr,search_type,dt_foto
0,10000000,R3,1,City Supermarket,10000000,G2,2,Nao informado,816-455-8733,4,S16,C,2022-06-09
1,10000453,R3,1,A Supermarket,10000453,G1,1,Nao informado,816-455-8733,5,S19,C,2022-06-09
2,10000455,R3,1,Caribian Supermarket,10000455,G2,2,Nao informado,816-455-8733,1,S16,C,2022-06-09
3,10000456,R1,1,A&B Shop,10000456,G3,1,Nao informado,816-455-8733,0,S2,C,2022-06-09
4,10000457,O2,1,A&G Shop,10000457,G1,1,Nao informado,816-455-8733,5,S1,C,2022-06-09
...,...,...,...,...,...,...,...,...,...,...,...,...,...
679,10027560,R2,1,Zilog Shop,10027560,G2,2,Nao informado,816-455-8733,2,S1,C,2022-06-09
680,10027572,R3,1,ZipLip.Com Shop,10027572,G2,2,Nao informado,816-455-8733,3,S1,C,2022-06-09
681,10027575,R3,1,Zitel Shop,10027575,G2,2,Nao informado,816-455-8733,2,S1,C,2022-06-09
682,10027583,R2,1,zNET Shop,10027583,G2,2,Nao informado,816-455-8733,4,S5,C,2022-06-09


In [5]:
# Eliminando a coluna dt_foto

df_clientes = df_clientes.drop('dt_foto')
df_clientes.toPandas()

Unnamed: 0,address_number,business_family,business_unit,customer,customerkey,customer_type,division,line_of_business,phone,region_code,regional_sales_mgr,search_type
0,10000000,R3,1,City Supermarket,10000000,G2,2,Nao informado,816-455-8733,4,S16,C
1,10000453,R3,1,A Supermarket,10000453,G1,1,Nao informado,816-455-8733,5,S19,C
2,10000455,R3,1,Caribian Supermarket,10000455,G2,2,Nao informado,816-455-8733,1,S16,C
3,10000456,R1,1,A&B Shop,10000456,G3,1,Nao informado,816-455-8733,0,S2,C
4,10000457,O2,1,A&G Shop,10000457,G1,1,Nao informado,816-455-8733,5,S1,C
...,...,...,...,...,...,...,...,...,...,...,...,...
679,10027560,R2,1,Zilog Shop,10027560,G2,2,Nao informado,816-455-8733,2,S1,C
680,10027572,R3,1,ZipLip.Com Shop,10027572,G2,2,Nao informado,816-455-8733,3,S1,C
681,10027575,R3,1,Zitel Shop,10027575,G2,2,Nao informado,816-455-8733,2,S1,C
682,10027583,R2,1,zNET Shop,10027583,G2,2,Nao informado,816-455-8733,4,S5,C


In [6]:
# Removendo duplicidades da PK do DF Clientes

df_clientes = df_clientes.dropDuplicates(["customerkey"])

In [7]:
# Conferindo meu DF Divisão e apagando a coluna dt_foto

df_divisao = df_divisao.drop('dt_foto')
df_divisao.toPandas()
# df_divisao.show()

Unnamed: 0,division,division_name
0,1,International
1,2,Domestic


In [8]:
# Verificando meu Dataframe Endereço após apagar o dt_foto

df_endereco = df_endereco.drop('dt_foto')
df_endereco.toPandas()
# df_endereco.show()

Unnamed: 0,address_number,city,country,customer_address_1,customer_address_2,customer_address_3,customer_address_4,state,zip_code
0,10000000,Akron,US,PO Box 6258,,,,OH,44312
1,10000453,,UK,,,,,,
2,10000455,Huntington Beach,US,7392 Count Circle,,,,CA,92647
3,10000456,Edmonton,CA,8151 Wagner Road,,,,AB,T6E 4N6
4,10000458,Saginaw,US,PO Box 840,,,,MI,48606
...,...,...,...,...,...,...,...,...,...
450,10027560,Odessa,US,3356 Kermit Highway,,,,TX,79764
451,10027572,Elma,US,2210 Bowen Road,,,,NY,14059
452,10027575,Dallas,US,10400 Plano Road,,,,TX,75238
453,10027583,Morton,US,Attention: Charlene Hoyer,500 North Morton Avenue,PO Box 474,,IL,61550-0474


In [9]:
# Tratando as colunas sujas com espaço em branco e passando a informação de 'Nao informado' para as colunas vazias após tratamento

for column in df_endereco.columns:
    df_endereco = df_endereco.withColumn(column, trim(df_endereco[column]))
    df_endereco = df_endereco.withColumn(column, when(df_endereco[column] == '', "Nao informado")\
                                         .when(df_endereco[column].isNull(), "Nao informado")\
                                         .otherwise(df_endereco[column]))
    
df_endereco = df_endereco.withColumn("address_number", col("address_number").cast('integer'))
    
df_endereco.toPandas()

Unnamed: 0,address_number,city,country,customer_address_1,customer_address_2,customer_address_3,customer_address_4,state,zip_code
0,10000000,Akron,US,PO Box 6258,Nao informado,Nao informado,Nao informado,OH,44312
1,10000453,Nao informado,UK,Nao informado,Nao informado,Nao informado,Nao informado,Nao informado,Nao informado
2,10000455,Huntington Beach,US,7392 Count Circle,Nao informado,Nao informado,Nao informado,CA,92647
3,10000456,Edmonton,CA,8151 Wagner Road,Nao informado,Nao informado,Nao informado,AB,T6E 4N6
4,10000458,Saginaw,US,PO Box 840,Nao informado,Nao informado,Nao informado,MI,48606
...,...,...,...,...,...,...,...,...,...
450,10027560,Odessa,US,3356 Kermit Highway,Nao informado,Nao informado,Nao informado,TX,79764
451,10027572,Elma,US,2210 Bowen Road,Nao informado,Nao informado,Nao informado,NY,14059
452,10027575,Dallas,US,10400 Plano Road,Nao informado,Nao informado,Nao informado,TX,75238
453,10027583,Morton,US,Attention: Charlene Hoyer,500 North Morton Avenue,PO Box 474,Nao informado,IL,61550-0474


In [10]:
# Removendo duplicidades da PK de Endereço

df_endereco = df_endereco.dropDuplicates(["address_number"])

In [11]:
# Constatando que não há mais linhas com dados null

df_endereco.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_endereco.columns]).toPandas()

Unnamed: 0,address_number,city,country,customer_address_1,customer_address_2,customer_address_3,customer_address_4,state,zip_code
0,0,0,0,0,0,0,0,0,0


In [12]:
# Conferindo meu DF Região e apagando a coluna dt_foto

df_regiao = df_regiao.drop('dt_foto')
df_regiao.toPandas()
# df_regiao.show()

Unnamed: 0,region_code,region_name
0,0,Canada
1,1,Western
2,2,Southern
3,3,Northeast
4,4,Central
5,5,International


In [13]:
# Conferindo meu DF Vendas e apagando a coluna dt_foto

df_vendas = df_vendas.drop('dt_foto')
df_vendas.toPandas()
# df_vendas.show()

# Pelo DF vemos que será necessário excluir as linhas vazias com None e Nan da tabela.

Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,28/04/2019,10000481.0,28/04/2018,-23791,30/04/2018,100012.0,,,Urban Large Eggs,2000.0,...,200015.0,28/04/2019,23791,0,0,23791,23791,1.0,184.0,EA
1,12/07/2019,10002220.0,12/07/2018,36879,14/07/2018,100233.0,P01,20910,Moms Sliced Turkey,1000.0,...,200245.0,12/07/2019,45617,82496,0,45617,45617,1.0,127.0,EA
2,14/10/2019,10002220.0,15/10/2018,10973,17/10/2018,116165.0,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000.0,...,213157.0,14/10/2019,43893,54866,0,43893,43893,1.0,127.0,EA
3,01/06/2019,10002489.0,01/06/2018,-21175,03/06/2018,100096.0,,,Kiwi Lox,1000.0,...,200107.0,01/06/2019,21175,0,0,21175,21175,1.0,160.0,EA
4,26/05/2019,10004516.0,25/05/2018,9662794,27/05/2018,103341.0,P01,60776,High Top Sweet Onion,1000.0,...,203785.0,26/05/2019,8924866,1858766,0,8924866,1961509011,455.0,124.0,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65530,,,,,,,,,,,...,,,,,,,,,,
65531,,,,,,,,,,,...,,,,,,,,,,
65532,,,,,,,,,,,...,,,,,,,,,,
65533,,,,,,,,,,,...,,,,,,,,,,


In [14]:
# Exibindo tipos dos campos de vendas pois teremos de alterar o tipo dos campos que remetem a data
# para date

df_vendas.printSchema()

root
 |-- actual_delivery_date: string (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- datekey: string (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- invoice_number: integer (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: integer (nullable = true)
 |-- list_price: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- promised_delivery_date: string (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- sales_amount_based_on_list_price: string (nullable = true)
 |-- sales_cost_amount: string (nullable = true)
 |-- sales_margin_amount: string (nullable = true)
 |-- sales_price: string (nullable = true)
 |-- sales_quantity: integer (nullable = true)
 |-- sales_rep: integer (nullable = true)
 |-- u/m: string (nullable = true)



In [15]:
# Alterando os tipos das colunas para os seus respectivos e corretos tipos

# Usando o dtypes que retorna o o nome da coluna e o tipo

for i in df_vendas.dtypes:
    if re.search("date", i[0]):
        df_vendas = df_vendas\
                    .withColumn(
                        i[0], 
                        to_date(col(i[0]), 'dd/MM/yyyy')
                        .alias(i[0])
                        )

vendastodouble = ['discount_amount', 'list_price', 'sales_amount', 
                'sales_amount_based_on_list_price', 'sales_cost_amount',
                 'sales_margin_amount', 'sales_price']

for c in vendastodouble:
    df_vendas = df_vendas.withColumn(c, regexp_replace(c, '\,', '.'))
    if c == 'sales_amount':
        df_vendas = df_vendas.withColumn(c, col(c).cast('integer'))
    else:
        df_vendas = df_vendas.withColumn(c, col(c).cast('double'))
        df_vendas = df_vendas.withColumn(c, round(c, 2))
                            
df_vendas.printSchema()

root
 |-- actual_delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- datekey: date (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- invoice_number: integer (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: integer (nullable = true)
 |-- list_price: double (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- promised_delivery_date: date (nullable = true)
 |-- sales_amount: integer (nullable = true)
 |-- sales_amount_based_on_list_price: double (nullable = true)
 |-- sales_cost_amount: double (nullable = true)
 |-- sales_margin_amount: double (nullable = true)
 |-- sales_price: double (nullable = true)
 |-- sales_quantity: integer (nullable = true)
 |-- sales_rep: integer (nullable = true)
 |-- u/m: string (nullable = true)



In [16]:
# DF vendas com valores alterados

df_vendas.toPandas()
# df_vendas.show()

Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,2019-04-28,10000481.0,2018-04-28,-237.91,2018-04-30,100012.0,,,Urban Large Eggs,2000.0,...,200015.0,2019-04-28,237.0,0.00,0.0,237.91,237.91,1.0,184.0,EA
1,2019-07-12,10002220.0,2018-07-12,368.79,2018-07-14,100233.0,P01,20910,Moms Sliced Turkey,1000.0,...,200245.0,2019-07-12,456.0,824.96,0.0,456.17,456.17,1.0,127.0,EA
2,2019-10-14,10002220.0,2018-10-15,109.73,2018-10-17,116165.0,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000.0,...,213157.0,2019-10-14,438.0,548.66,0.0,438.93,438.93,1.0,127.0,EA
3,2019-06-01,10002489.0,2018-06-01,-211.75,2018-06-03,100096.0,,,Kiwi Lox,1000.0,...,200107.0,2019-06-01,211.0,0.00,0.0,211.75,211.75,1.0,160.0,EA
4,2019-05-26,10004516.0,2018-05-25,96627.94,2018-05-27,103341.0,P01,60776,High Top Sweet Onion,1000.0,...,203785.0,2019-05-26,89248.0,185876.60,0.0,89248.66,196.15,455.0,124.0,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65530,,,,,,,,,,,...,,,,,,,,,,
65531,,,,,,,,,,,...,,,,,,,,,,
65532,,,,,,,,,,,...,,,,,,,,,,
65533,,,,,,,,,,,...,,,,,,,,,,


In [17]:
df_vendas.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_vendas.columns]).toPandas()

Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,253,253,253,255,253,253,0,0,0,253,...,253,253,253,253,253,253,254,253,253,0


In [18]:
# Retirando as linhas nulas da tabela

df_vendas = df_vendas.filter(col("customerkey").isNotNull())
df_vendas.toPandas()
# df_vendas.show()

Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,2019-04-28,10000481,2018-04-28,-237.91,2018-04-30,100012,,,Urban Large Eggs,2000,...,200015,2019-04-28,237,0.00,0.00,237.91,237.91,1,184,EA
1,2019-07-12,10002220,2018-07-12,368.79,2018-07-14,100233,P01,20910,Moms Sliced Turkey,1000,...,200245,2019-07-12,456,824.96,0.00,456.17,456.17,1,127,EA
2,2019-10-14,10002220,2018-10-15,109.73,2018-10-17,116165,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000,...,213157,2019-10-14,438,548.66,0.00,438.93,438.93,1,127,EA
3,2019-06-01,10002489,2018-06-01,-211.75,2018-06-03,100096,,,Kiwi Lox,1000,...,200107,2019-06-01,211,0.00,0.00,211.75,211.75,1,160,EA
4,2019-05-26,10004516,2018-05-25,96627.94,2018-05-27,103341,P01,60776,High Top Sweet Onion,1000,...,203785,2019-05-26,89248,185876.60,0.00,89248.66,196.15,455,124,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65277,2020-03-18,10017638,2019-03-18,505.78,2019-03-21,226497,P01,13447,High Top Oranges,8000,...,320895,2020-03-18,569,1075.68,239.95,329.95,63.32,9,180,EA
65278,2020-03-18,10017638,2019-03-18,410.75,2019-03-21,226497,P01,25906,Landslide White Sugar,38000,...,320895,2020-03-18,462,873.56,423.55,39.26,231.41,2,180,EA
65279,2020-03-18,10017638,2019-03-18,876.16,2019-03-21,226497,P01,61856,Moms Potato Salad,227001,...,320895,2020-03-18,987,1863.36,574.00,413.20,123.40,8,180,EA
65280,2020-03-18,10017638,2019-03-18,24226.77,2019-03-21,226498,P01,17801,Better Fancy Canned Sardines,1000,...,320907,2020-03-18,27297,51524.28,16188.90,11108.61,758.26,36,180,EA


In [19]:
# Usando o dtypes mais uma vez para preencher os campos nulos, o i[1] representa o tipo e i[0] o nome da coluna
# Caso seja int ou double, preencher com 0, caso seja string preencher com Nao informado

for i in df_vendas.dtypes:
    if re.search("int", i[1]) or re.search("double", i[1]):
        df_vendas = df_vendas.withColumn(i[0], trim(df_vendas[i[0]]))
        df_vendas = df_vendas.withColumn(i[0], when(df_vendas[i[0]] == '', 0)\
                                         .when(df_vendas[i[0]].isNull(), 0)\
                                         .otherwise(df_vendas[i[0]]))
    if re.search("string", i[1]):
        df_vendas = df_vendas.withColumn(i[0], trim(df_vendas[i[0]]))
        df_vendas = df_vendas.withColumn(i[0], when(df_vendas[i[0]] == '', "Nao informado")\
                                         .when(df_vendas[i[0]].isNull(), "Nao informado")\
                                         .otherwise(df_vendas[i[0]]))   

df_vendas.limit(5).toPandas()

Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,2019-04-28,10000481,2018-04-28,-237.91,2018-04-30,100012,Nao informado,Nao informado,Urban Large Eggs,2000,...,200015,2019-04-28,237,0.0,0.0,237.91,237.91,1,184,EA
1,2019-07-12,10002220,2018-07-12,368.79,2018-07-14,100233,P01,20910,Moms Sliced Turkey,1000,...,200245,2019-07-12,456,824.96,0.0,456.17,456.17,1,127,EA
2,2019-10-14,10002220,2018-10-15,109.73,2018-10-17,116165,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000,...,213157,2019-10-14,438,548.66,0.0,438.93,438.93,1,127,EA
3,2019-06-01,10002489,2018-06-01,-211.75,2018-06-03,100096,Nao informado,Nao informado,Kiwi Lox,1000,...,200107,2019-06-01,211,0.0,0.0,211.75,211.75,1,160,EA
4,2019-05-26,10004516,2018-05-25,96627.94,2018-05-27,103341,P01,60776,High Top Sweet Onion,1000,...,203785,2019-05-26,89248,185876.6,0.0,89248.66,196.15,455,124,SE


In [20]:
# Observando se alguma linha está nula

df_vendas.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_vendas.columns]).toPandas()

Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [21]:
### CóDIGOS REFERENTES AS TABELAS DIMENSIONAL E FATO

In [22]:
df_clientes.printSchema()
df_divisao.printSchema()
df_endereco.printSchema()
df_regiao.printSchema()
df_vendas.printSchema()

root
 |-- address_number: integer (nullable = true)
 |-- business_family: string (nullable = true)
 |-- business_unit: integer (nullable = true)
 |-- customer: string (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- division: integer (nullable = true)
 |-- line_of_business: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- regional_sales_mgr: string (nullable = true)
 |-- search_type: string (nullable = true)

root
 |-- division: integer (nullable = true)
 |-- division_name: string (nullable = true)

root
 |-- address_number: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- customer_address_1: string (nullable = true)
 |-- customer_address_2: string (nullable = true)
 |-- customer_address_3: string (nullable = true)
 |-- customer_address_4: string (nullable = true)
 |-- state: string (nullable = true)
 |-- z

In [23]:
# Verificando o menor dia da minha tabela vendas para utiliza-la como referência na Dim_Tempo
# Criando a dim tempo antes do "tabelão" pois irei fazer join da dim_tempo com o tabelão

df_vendas.agg(f.min("datekey")).show()

+------------+
|min(datekey)|
+------------+
|  2017-01-09|
+------------+



In [24]:
# Criando DF Dim_Tempo (Precisa ser criada pois não dá para adaptar das demais planilhas)

dim_tempo = spark.createDataFrame([(1,)], ["id"])

# Iniciando sequência com o primeiro dia da DateKey até a daa atual
dim_tempo = dim_tempo.withColumn("date", f.explode(f.expr("sequence(to_date('2017-01-09'), to_date(current_date()), interval 1 day)"))
)

# dim_tempo.toPandas()

In [25]:
# Adicionando mais colunas no Dim_Tempo e eliminando a 'id'

# Eliminando a coluna ID pois ela só foi criada para gerar o DF
dim_tempo = dim_tempo.drop('id')

# Formatando o nome da minha coluna 'date' para 'DATA'
dim_tempo = dim_tempo.withColumn('DATA', f.col('date'))

# Fatiando o ano para Inteiro e uma coluna em separado
dim_tempo = dim_tempo.withColumn('NR_ANO', f.date_format(f.col('date'), 'yyyy').cast(IntegerType()))

# Fatiando o ms para Inteiro e uma coluna em separado
dim_tempo = dim_tempo.withColumn('NR_MES', f.date_format(f.col('date'), 'MM').cast(IntegerType()))

# Atribuindo o nome dos trimestres de acordo com os meses
dim_tempo = dim_tempo.withColumn('NM_TRIMESTRE', when(dim_tempo.NR_MES <= 3, "1º Trimestre")\
                                 .when(dim_tempo.NR_MES <= 6, "2º Trimestre")\
                                 .when(dim_tempo.NR_MES <= 9, "3º Trimestre")\
                                 .when(dim_tempo.NR_MES <= 12, "4º Trimestre"))

# Atribuindo o nome dos meses de acordo com o número dos meses
dim_tempo = dim_tempo.withColumn('NM_MES', when(dim_tempo.NR_MES == 1, "Janeiro")\
                                 .when(dim_tempo.NR_MES == 2, "Fevereiro")\
                                 .when(dim_tempo.NR_MES == 3, "Março")\
                                 .when(dim_tempo.NR_MES == 4, "Abril")\
                                 .when(dim_tempo.NR_MES == 5, "Maio")\
                                 .when(dim_tempo.NR_MES == 6, "Junho")\
                                 .when(dim_tempo.NR_MES == 7, "Julho")\
                                 .when(dim_tempo.NR_MES == 8, "Agosto")\
                                 .when(dim_tempo.NR_MES == 9, "Setembro")\
                                 .when(dim_tempo.NR_MES == 10, "Outubro")\
                                 .when(dim_tempo.NR_MES == 11, "Novembro")\
                                 .when(dim_tempo.NR_MES == 12, "Dezembro"))

# Pegando os dias da semana e em seguida atribuindo nome ao número relativos a esses dias
dim_tempo = dim_tempo.withColumn('NM_DIA_SEMANA', dayofweek(col('DATA')))
dim_tempo = dim_tempo.withColumn('NM_DIA_SEMANA', when(dim_tempo.NM_DIA_SEMANA == 1, "Domingo")\
                                 .when(dim_tempo.NM_DIA_SEMANA == 2, "Segunda-feira")\
                                 .when(dim_tempo.NM_DIA_SEMANA == 3, "Terça-feira")\
                                 .when(dim_tempo.NM_DIA_SEMANA == 4, "Quarta-feira")\
                                 .when(dim_tempo.NM_DIA_SEMANA == 5, "Quinta-feira")\
                                 .when(dim_tempo.NM_DIA_SEMANA == 6, "Sexta-feira")\
                                 .when(dim_tempo.NM_DIA_SEMANA == 7, "Sábado"))

# Definindo os números da semana no ano
dim_tempo = dim_tempo.withColumn('NR_SEMANA', weekofyear(col('DATA')))

# Definindo as colunas do dia, derivando da coluna 'date' e tranformando em Inteiro
dim_tempo = dim_tempo.withColumn('NR_DIA', f.date_format(f.col('date'), 'dd').cast(IntegerType()))

# Jogando fora a coluna date pois DATA a substituiu
dim_tempo = dim_tempo.drop('date')


dim_tempo.toPandas()

Unnamed: 0,DATA,NR_ANO,NR_MES,NM_TRIMESTRE,NM_MES,NM_DIA_SEMANA,NR_SEMANA,NR_DIA
0,2017-01-09,2017,1,1º Trimestre,Janeiro,Segunda-feira,2,9
1,2017-01-10,2017,1,1º Trimestre,Janeiro,Terça-feira,2,10
2,2017-01-11,2017,1,1º Trimestre,Janeiro,Quarta-feira,2,11
3,2017-01-12,2017,1,1º Trimestre,Janeiro,Quinta-feira,2,12
4,2017-01-13,2017,1,1º Trimestre,Janeiro,Sexta-feira,2,13
...,...,...,...,...,...,...,...,...
1977,2022-06-09,2022,6,2º Trimestre,Junho,Quinta-feira,23,9
1978,2022-06-10,2022,6,2º Trimestre,Junho,Sexta-feira,23,10
1979,2022-06-11,2022,6,2º Trimestre,Junho,Sábado,23,11
1980,2022-06-12,2022,6,2º Trimestre,Junho,Domingo,23,12


In [26]:
# Reordenando e ajustando a dimensão Tempo
dim_tempo = dim_tempo.select('DATA', 'NR_ANO', 'NM_TRIMESTRE', 'NM_MES'
                             , 'NR_DIA', 'NM_DIA_SEMANA')

dim_tempo.toPandas()

Unnamed: 0,DATA,NR_ANO,NM_TRIMESTRE,NM_MES,NR_DIA,NM_DIA_SEMANA
0,2017-01-09,2017,1º Trimestre,Janeiro,9,Segunda-feira
1,2017-01-10,2017,1º Trimestre,Janeiro,10,Terça-feira
2,2017-01-11,2017,1º Trimestre,Janeiro,11,Quarta-feira
3,2017-01-12,2017,1º Trimestre,Janeiro,12,Quinta-feira
4,2017-01-13,2017,1º Trimestre,Janeiro,13,Sexta-feira
...,...,...,...,...,...,...
1977,2022-06-09,2022,2º Trimestre,Junho,9,Quinta-feira
1978,2022-06-10,2022,2º Trimestre,Junho,10,Sexta-feira
1979,2022-06-11,2022,2º Trimestre,Junho,11,Sábado
1980,2022-06-12,2022,2º Trimestre,Junho,12,Domingo


In [27]:
df_clientes.printSchema()
df_divisao.printSchema()
df_endereco.printSchema()
df_regiao.printSchema()
df_vendas.printSchema()

root
 |-- address_number: integer (nullable = true)
 |-- business_family: string (nullable = true)
 |-- business_unit: integer (nullable = true)
 |-- customer: string (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- division: integer (nullable = true)
 |-- line_of_business: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- regional_sales_mgr: string (nullable = true)
 |-- search_type: string (nullable = true)

root
 |-- division: integer (nullable = true)
 |-- division_name: string (nullable = true)

root
 |-- address_number: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- customer_address_1: string (nullable = true)
 |-- customer_address_2: string (nullable = true)
 |-- customer_address_3: string (nullable = true)
 |-- customer_address_4: string (nullable = true)
 |-- state: string (nullable = true)
 |-- z

In [28]:
# Realizando o Join entre Endereço, Divisao e Regiao e fazendo Select das colunas que vou utilizar
stg_cliente = df_clientes.join(df_endereco, df_clientes.address_number == df_endereco.address_number, 'left')\
            .join(df_divisao, df_clientes.division == df_divisao.division, 'inner')\
            .join(df_regiao, df_clientes.region_code == df_regiao.region_code, 'inner')\
            .select(df_clientes.address_number, df_endereco.city, df_endereco.state, df_divisao.division_name
                   , df_regiao.region_name, df_clientes.customerkey, df_clientes.customer, df_clientes.customer_type)

stg_cliente.count()

683

In [29]:
# Verificando a quantidade de linhas nulas após o primeiro join da grande tabela staging

stg_cliente.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in stg_cliente.columns]).toPandas()

Unnamed: 0,address_number,city,state,division_name,region_name,customerkey,customer,customer_type
0,0,229,229,0,0,0,0,0


In [30]:
# Preenchendo Cidade e Estado com "Nao informado" de modo que as demais informações nao se percam, tais como produtos
# vendidos e também valores faturados

stg_cliente = stg_cliente.na.fill('Nao informado')

In [31]:
# Verificando se ainda há linhas nulas

stg_cliente.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in stg_cliente.columns]).toPandas()

Unnamed: 0,address_number,city,state,division_name,region_name,customerkey,customer,customer_type
0,0,0,0,0,0,0,0,0


In [32]:
# Realizando o Join da stg_cliente com a df_vendas

df_vendas_stg = df_vendas.join(stg_cliente, df_vendas.customerkey == stg_cliente.customerkey, 'inner')\
                .join(dim_tempo, df_vendas.datekey == dim_tempo.DATA)\
                .select(stg_cliente.address_number, stg_cliente.city, stg_cliente.state, stg_cliente.division_name
                        , stg_cliente.region_name, df_vendas.customerkey, stg_cliente.customer
                        , stg_cliente.customer_type, df_vendas.item_number, df_vendas.item, df_vendas.sales_amount
                        , df_vendas.item_class, df_vendas.datekey, dim_tempo.NR_ANO, dim_tempo.NM_TRIMESTRE
                        , dim_tempo.NM_MES, dim_tempo.NR_DIA, dim_tempo.NM_DIA_SEMANA)

df_vendas_stg.count()

65282

In [33]:
df_vendas_stg.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_vendas_stg.columns]).toPandas()

Unnamed: 0,address_number,city,state,division_name,region_name,customerkey,customer,customer_type,item_number,item,sales_amount,item_class,datekey,NR_ANO,NM_TRIMESTRE,NM_MES,NR_DIA,NM_DIA_SEMANA
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [34]:
df_vendas_stg.printSchema()

root
 |-- address_number: integer (nullable = true)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- division_name: string (nullable = false)
 |-- region_name: string (nullable = false)
 |-- customerkey: string (nullable = true)
 |-- customer: string (nullable = false)
 |-- customer_type: string (nullable = false)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- item_class: string (nullable = true)
 |-- datekey: date (nullable = true)
 |-- NR_ANO: integer (nullable = true)
 |-- NM_TRIMESTRE: string (nullable = true)
 |-- NM_MES: string (nullable = true)
 |-- NR_DIA: integer (nullable = true)
 |-- NM_DIA_SEMANA: string (nullable = true)



In [35]:
# Criando a SK de produto com Hash das colunas de suas dimensões

df_vendas_stg = df_vendas_stg.withColumn("SK_PRODUTO", sha2(concat_ws("", df_vendas_stg.item_number, df_vendas_stg.item
                                                                 , df_vendas_stg.item_class), 256))


In [36]:
# Criando as demais hashs

df_vendas_stg = df_vendas_stg.withColumn("SK_CLIENTE", sha2(concat_ws("", df_vendas_stg.customerkey, df_vendas_stg.customer
                                                                 , df_vendas_stg.city, df_vendas_stg.customer_type), 256))

df_vendas_stg = df_vendas_stg.withColumn("SK_LOCALIDADE", sha2(concat_ws("", df_vendas_stg.address_number, df_vendas_stg.city
                                                                 , df_vendas_stg.state, df_vendas_stg.region_name
                                                                        , df_vendas_stg.division_name), 256))
df_vendas_stg = df_vendas_stg.withColumn("SK_DATA", sha2(concat_ws("", df_vendas_stg.datekey, df_vendas_stg.NR_ANO
                                                                 , df_vendas_stg.NM_TRIMESTRE, df_vendas_stg.NM_MES
                                                                 , df_vendas_stg.NR_DIA, df_vendas_stg.NM_DIA_SEMANA), 256))

df_vendas_stg.show(truncate=False)

+--------------+-------------+-------------+-------------+-------------+-----------+--------------+-------------+-----------+----------------------------------+------------+-------------+----------+------+------------+------+------+-------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+
|address_number|city         |state        |division_name|region_name  |customerkey|customer      |customer_type|item_number|item                              |sales_amount|item_class   |datekey   |NR_ANO|NM_TRIMESTRE|NM_MES|NR_DIA|NM_DIA_SEMANA|SK_PRODUTO                                                      |SK_CLIENTE                                                      |SK_LOCALIDADE                                                   |SK_DATA                                             

In [37]:
df_vendas_stg.printSchema()

root
 |-- address_number: integer (nullable = true)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- division_name: string (nullable = false)
 |-- region_name: string (nullable = false)
 |-- customerkey: string (nullable = true)
 |-- customer: string (nullable = false)
 |-- customer_type: string (nullable = false)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- item_class: string (nullable = true)
 |-- datekey: date (nullable = true)
 |-- NR_ANO: integer (nullable = true)
 |-- NM_TRIMESTRE: string (nullable = true)
 |-- NM_MES: string (nullable = true)
 |-- NR_DIA: integer (nullable = true)
 |-- NM_DIA_SEMANA: string (nullable = true)
 |-- SK_PRODUTO: string (nullable = true)
 |-- SK_CLIENTE: string (nullable = true)
 |-- SK_LOCALIDADE: string (nullable = true)
 |-- SK_DATA: string (nullable = true)



In [38]:
# Selecionando as colunas da minha Dimensão Cliente da tabela staging
dim_clientes = df_vendas_stg.select('SK_CLIENTE', 'customerkey', 'customer', 'customer_type', 'city')

# Renomeando as colunas
dim_clientes = dim_clientes.withColumnRenamed("customerkey","NK_ID_CLIENTE")\
                            .withColumnRenamed("customer", "NM_CLIENTE")\
                            .withColumnRenamed("customer_type", "DESC_TIPO_CLIENTE")\
                            .withColumnRenamed("city", "NM_CIDADE_CLIENTE")

# Eliminando colunas nulas
dim_clientes = dim_clientes.na.drop()

# Eliminando duplicidades da SK_CLIENTE (Distinct)
dim_clientes = dim_clientes.dropDuplicates(["SK_CLIENTE"])

# dim_clientes.show()
dim_clientes.count()

615

In [39]:
# Selecionando as colunas da minha Dimensão Produto da tabela staging
dim_produtos = df_vendas_stg.select('SK_PRODUTO', 'item_number', 'item', 'item_class')

# Renomeando colunas
dim_produtos = dim_produtos.withColumnRenamed("item_number","NK_ID_PRODUTO")\
                            .withColumnRenamed("item", "NM_PRODUTO")\
                            .withColumnRenamed("item_class", "NM_CATEGORIA_PRODUTO")

# Eliminando colunas nulas
dim_produtos = dim_produtos.na.drop()

# Eliminando duplicidades da SK_PRODUTO (Distinct)
dim_produtos = dim_produtos.dropDuplicates(["SK_PRODUTO"])

dim_produtos.show()
dim_produtos.count()

+--------------------+------------------+--------------------+--------------------+
|          SK_PRODUTO|     NK_ID_PRODUTO|          NM_PRODUTO|NM_CATEGORIA_PRODUTO|
+--------------------+------------------+--------------------+--------------------+
|0986f08539159ff34...|             37175|  Fast Fudge Cookies|                 P01|
|244c5e81fe2bc7a23...|             60035|      Golden Waffles|                 P01|
|65b508292b3634626...|             39637|Even Better Low F...|                 P01|
|7ea506e11349e12b7...|1z9483441245619172|Imagine Orange Po...|       Nao informado|
|85fb4da8e8a33daa8...|             61501|Excellent Mango D...|                 P01|
|90141d1de6b92de05...|             11967|  High Top Firm Tofu|                 P01|
|a8a65ba63539a2908...|             61531|Excellent Strawbe...|       Nao informado|
|a9f4abbf3c2674a17...|             28671|   Ebony Fancy Plums|       Nao informado|
|b8e13dd550fbb3b5d...|     Nao informado|Just Right Chicke...|       Nao inf

1064

In [40]:
# Selecionando as colunas da minha Dimensão Localidade da tabela staging    
dim_localidade = df_vendas_stg.select('SK_LOCALIDADE', 'address_number', 'city', 'state', 'division_name', 'region_name')

# Renomeando colunas
dim_localidade = dim_localidade.withColumnRenamed("address_number","NK_ID_LOCALIDADE")\
                            .withColumnRenamed("city", "NM_CIDADE_LOCALIDADE")\
                            .withColumnRenamed("state", "NM_ESTADO_LOCALIDADE")\
                            .withColumnRenamed("division_name", "NM_DIVISAO_LOCALIDADE")\
                            .withColumnRenamed("region_name", "NM_REGIAO_LOCALIDADE")

# Eliminando colunas nulas
dim_localidade = dim_localidade.na.drop()

# Eliminando duplicidades da SK_PRODUTO (Distinct)
dim_localidade = dim_localidade.dropDuplicates(["SK_LOCALIDADE"])

dim_localidade.show()
dim_localidade.count()

+--------------------+----------------+--------------------+--------------------+---------------------+--------------------+
|       SK_LOCALIDADE|NK_ID_LOCALIDADE|NM_CIDADE_LOCALIDADE|NM_ESTADO_LOCALIDADE|NM_DIVISAO_LOCALIDADE|NM_REGIAO_LOCALIDADE|
+--------------------+----------------+--------------------+--------------------+---------------------+--------------------+
|31ddf3e5c3c0a4edf...|        10002220|       Prince George|                  BC|        International|              Canada|
|c9c1b90a519ef8223...|        10018146|        Fort Macleod|                  AB|        International|              Canada|
|3eb868eeb056b1b45...|        10025288|          Furstentum|       Nao informado|        International|       International|
|7c7f347f57488a13a...|        10017307|               Anjou|                  QU|        International|              Canada|
|e5ebc1d16317b83a9...|        10021222|            Buffdale|                  UT|             Domestic|             Western|


615

In [41]:
# Selecionando as colunas da minha Dimensão Tempo da tabela staging    
dim_tempo = df_vendas_stg.select('SK_DATA', 'datekey', 'NR_ANO', 'NM_TRIMESTRE', 'NM_MES', 'NR_DIA', 'NM_DIA_SEMANA')

# Renomeando coluna datekey
dim_tempo = dim_tempo.withColumnRenamed("datekey", "DATA")

# Eliminando possíveis nulas
dim_tempo = dim_tempo.na.drop()

# Eliminando duplicidades da SK_PRODUTO (Distinct)
dim_tempo = dim_tempo.dropDuplicates(["SK_DATA"])

dim_tempo.show()
dim_tempo.count()

+--------------------+----------+------+------------+---------+------+-------------+
|             SK_DATA|      DATA|NR_ANO|NM_TRIMESTRE|   NM_MES|NR_DIA|NM_DIA_SEMANA|
+--------------------+----------+------+------------+---------+------+-------------+
|2b54446e92236394b...|2018-02-18|  2018|1º Trimestre|Fevereiro|    18|      Domingo|
|8a6c7e8ba920146e5...|2017-10-06|  2017|4º Trimestre|  Outubro|     6|  Sexta-feira|
|fb39ea8c53b417bb9...|2018-07-30|  2018|3º Trimestre|    Julho|    30|Segunda-feira|
|130c7b486e2225977...|2017-02-09|  2017|1º Trimestre|Fevereiro|     9| Quinta-feira|
|26dd29b195cc38d00...|2017-12-12|  2017|4º Trimestre| Dezembro|    12|  Terça-feira|
|64bf5a5c1fb26b342...|2017-02-15|  2017|1º Trimestre|Fevereiro|    15| Quarta-feira|
|aefab146be9546568...|2018-08-24|  2018|3º Trimestre|   Agosto|    24|  Sexta-feira|
|af231be4610159733...|2018-09-03|  2018|3º Trimestre| Setembro|     3|Segunda-feira|
|4480e28236d6668e1...|2018-10-29|  2018|4º Trimestre|  Outubro|  

558

In [42]:
df_vendas_stg.printSchema()

root
 |-- address_number: integer (nullable = true)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- division_name: string (nullable = false)
 |-- region_name: string (nullable = false)
 |-- customerkey: string (nullable = true)
 |-- customer: string (nullable = false)
 |-- customer_type: string (nullable = false)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- item_class: string (nullable = true)
 |-- datekey: date (nullable = true)
 |-- NR_ANO: integer (nullable = true)
 |-- NM_TRIMESTRE: string (nullable = true)
 |-- NM_MES: string (nullable = true)
 |-- NR_DIA: integer (nullable = true)
 |-- NM_DIA_SEMANA: string (nullable = true)
 |-- SK_PRODUTO: string (nullable = true)
 |-- SK_CLIENTE: string (nullable = true)
 |-- SK_LOCALIDADE: string (nullable = true)
 |-- SK_DATA: string (nullable = true)



In [43]:
# Implementando a tabela FATO VENDAS usando SQL (facilidade na agregação)

df_vendas_stg.createOrReplaceTempView('stg_vendas')

ft_vendas = spark.sql(""" SELECT SK_CLIENTE, SK_PRODUTO, SK_LOCALIDADE, SK_DATA
                        , COUNT(item) AS QTD_VENDAS, SUM(sales_amount) AS VL_VENDAS
                        FROM stg_vendas
                        GROUP BY SK_CLIENTE, SK_PRODUTO, SK_LOCALIDADE, SK_DATA
                        ORDER BY QTD_VENDAS DESC""")
ft_vendas.toPandas()

Unnamed: 0,SK_CLIENTE,SK_PRODUTO,SK_LOCALIDADE,SK_DATA,QTD_VENDAS,VL_VENDAS
0,6a18c7b019270e11987c597b89e7875984f26039f7914c...,53ab0e8dc00419709aa7b441a3828b1d05b51cffc7ebbb...,fe0b2ec1b53f24395c7f15599173b42d5bf68adf44196e...,71c3f08dc4f69a9b2f86f30b3e9e86cc27cc3bb21950b8...,15,25341.0
1,08b4de5960157889166fa0e187e3e5b26dd1149c3c66ee...,53ab0e8dc00419709aa7b441a3828b1d05b51cffc7ebbb...,530bbc65be0b3cda608ed631902f1e57e4698d6b96a31f...,b3717717c58bc4eb5ded03c14c6d53fb3737c0b1571487...,15,24320.0
2,6a18c7b019270e11987c597b89e7875984f26039f7914c...,53ab0e8dc00419709aa7b441a3828b1d05b51cffc7ebbb...,fe0b2ec1b53f24395c7f15599173b42d5bf68adf44196e...,c8f0cb2b4426a2e286a160073d9ae32944deef1e661413...,15,25341.0
3,08b4de5960157889166fa0e187e3e5b26dd1149c3c66ee...,53ab0e8dc00419709aa7b441a3828b1d05b51cffc7ebbb...,530bbc65be0b3cda608ed631902f1e57e4698d6b96a31f...,1dca32766c4f148e3d2d117e849c2215388e0b45150fd0...,15,24320.0
4,8a07f479a84fc95d977ec161edb933cf93fc5a1abe8648...,53ab0e8dc00419709aa7b441a3828b1d05b51cffc7ebbb...,8c0ef458805f47e895c9d6c142171e28a4ad5c203a5bf8...,f700e4ec89a624ff06b009afee20a1218a1ecb65bfadcb...,15,26104.0
...,...,...,...,...,...,...
60351,d9d497795386cff69026b5f733edd5afbfc0e465e33c13...,c72ff05fb18d7d93267065037ef9cb36e799b1b933ba26...,42be5088a385591462b43ad85e5fe538a0c6daa01c5ce2...,9ef6448e140365a5bd5bc47202c5e5940211a9838edadf...,1,1857.0
60352,7f7b3c2bf6c07fcf328643ddfdbadc14ba203f32ca13ca...,875b2f01319c18889fae2b739a3db7c9801ba549e8c595...,7953c8d3e8053623c938b026078132e2780f44f20aa423...,ee3e76006dd0dec8e504de5d67dfe13d9c19f6ff0f1de9...,1,203.0
60353,2658f6657a279e2189fbff826d3d5e00bd9c1e37ff6932...,3e6da23cabb923034d4e4feb959671bd3aa395a2c525b2...,5561dbab25068e6b411e5aab50db8f3725f5e38a842459...,9c5800a7e87a84b99377cabd5f15c6bf547fb81e93f881...,1,226.0
60354,99def7bee89996dfd3edd2f94f03ad6d101c6e62bb31e9...,dbf40eb1338c872762628b40342e2fdb80e56b02625cdc...,81c413ca37e31699e6dab88a1242cf0a60028d6acfa5ce...,675195994803a34f4d07c090abe808d1a10d694ed73fae...,1,205.0


In [44]:
# Criando job para salvar meus arquivos que serão baixados do HDFS e renomea-los já com o nome corretos
# O ideal era que fosse uma função, mas para fins didáticos, dessa forma atende-se a demanda

# A variável stage diz onde o arquivo deve ser baixado no HDFS
stage = "/projeto_final/staging/"

# A varivel output diz a pasta onde ela deve ser movida após a stage
# A variável erase apaga arquivos no output, caso ja exista, para que nao haja choque de mesmo nome (exemplo de overwrite)
# A variavel rename, além de mover, renomer o arquivo entre uma pasta e outra

# Dimensao Clientes

file = 'dimclientes'
output = "/projeto_final/dados_saida/" + file
erase = "hdfs dfs -rm " + output + "/*" 
rename = "hdfs dfs -mv " + stage + "part-*" + ' ' + output + '/' + file + ".csv"

dim_clientes.coalesce(1).write\
        .format("csv")\
        .option("header", True)\
        .option("delimiter", ";")\
        .mode("overwrite")\
        .save(stage)

os.system(erase)
os.system(rename)

# Dimensao Produtos

file = 'dimprodutos'
output = "/projeto_final/dados_saida/" + file
erase = "hdfs dfs -rm " + output + "/*" 
rename = "hdfs dfs -mv " + stage + "part-*" + ' ' + output + '/' + file + ".csv"

dim_produtos.coalesce(1).write\
        .format("csv")\
        .option("header", True)\
        .option("delimiter", ";")\
        .mode("overwrite")\
        .save(stage)

os.system(erase)
os.system(rename)

# Dimensao Localidade

file = 'dimlocalidade' 
output = "/projeto_final/dados_saida/" + file
erase = "hdfs dfs -rm " + output + "/*" 
rename = "hdfs dfs -mv " + stage + "part-*" + ' ' + output + '/' + file + ".csv"

dim_localidade.coalesce(1).write\
        .format("csv")\
        .option("header", True)\
        .option("delimiter", ";")\
        .mode("overwrite")\
        .save(stage)

os.system(erase)
os.system(rename)

# Dimensao Tempo

file = 'dimtempo' 
output = "/projeto_final/dados_saida/" + file
erase = "hdfs dfs -rm " + output + "/*" 
rename = "hdfs dfs -mv " + stage + "part-*" + ' ' + output + '/' + file + ".csv"

dim_tempo.coalesce(1).write\
        .format("csv")\
        .option("header", True)\
        .option("delimiter", ";")\
        .mode("overwrite")\
        .save(stage)
os.system(erase)
os.system(rename)

# Fato Vendas

file = 'fatovendas'
output = "/projeto_final/dados_saida/" + file
erase = "hdfs dfs -rm " + output + "/*" 
rename = "hdfs dfs -mv " + stage + "part-*" + ' ' + output + '/' + file + ".csv"

ft_vendas.coalesce(1).write\
        .format("csv")\
        .option("header", True)\
        .option("delimiter", ";")\
        .mode("overwrite")\
        .save(stage)

os.system(erase)
os.system(rename)

0

In [45]:
dim_clientes.printSchema()
dim_produtos.printSchema()
dim_localidade.printSchema()
dim_tempo.printSchema()
ft_vendas.printSchema()

root
 |-- SK_CLIENTE: string (nullable = true)
 |-- NK_ID_CLIENTE: string (nullable = true)
 |-- NM_CLIENTE: string (nullable = false)
 |-- DESC_TIPO_CLIENTE: string (nullable = false)
 |-- NM_CIDADE_CLIENTE: string (nullable = false)

root
 |-- SK_PRODUTO: string (nullable = true)
 |-- NK_ID_PRODUTO: string (nullable = true)
 |-- NM_PRODUTO: string (nullable = true)
 |-- NM_CATEGORIA_PRODUTO: string (nullable = true)

root
 |-- SK_LOCALIDADE: string (nullable = true)
 |-- NK_ID_LOCALIDADE: integer (nullable = true)
 |-- NM_CIDADE_LOCALIDADE: string (nullable = false)
 |-- NM_ESTADO_LOCALIDADE: string (nullable = false)
 |-- NM_DIVISAO_LOCALIDADE: string (nullable = false)
 |-- NM_REGIAO_LOCALIDADE: string (nullable = false)

root
 |-- SK_DATA: string (nullable = true)
 |-- DATA: date (nullable = true)
 |-- NR_ANO: integer (nullable = true)
 |-- NM_TRIMESTRE: string (nullable = true)
 |-- NM_MES: string (nullable = true)
 |-- NR_DIA: integer (nullable = true)
 |-- NM_DIA_SEMANA: string