In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql import functions as f
import re

spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [2]:
# Criação de Data Frames do Hive

df_clientes = spark.sql("SELECT * FROM desafio_final.TB_CLIENTES")
df_divisao = spark.sql("SELECT * FROM desafio_final.TB_DIVISAO")
df_endereco = spark.sql("SELECT * FROM desafio_final.TB_ENDERECO")
df_regiao = spark.sql("SELECT * FROM desafio_final.TB_REGIAO")
df_vendas = spark.sql("SELECT * FROM desafio_final.TB_VENDAS")


In [3]:
# Visualizando a Df Clientes e Removendo dt_foto.
df_clientes = df_clientes.drop('dt_foto')
df_clientes.toPandas()

#df_clientes.show()

Unnamed: 0,address_number,business_family,business_unit,customer,customerkey,customer_type,division,line_of_business,phone,region_code,regional_sales_mgr,search_type
0,10000000,R3,1,City Supermarket,10000000,G2,2,,816-455-8733,4,S16,C
1,10000453,R3,1,A Supermarket,10000453,G1,1,,816-455-8733,5,S19,C
2,10000455,R3,1,Caribian Supermarket,10000455,G2,2,,816-455-8733,1,S16,C
3,10000456,R1,1,A&B Shop,10000456,G3,1,,816-455-8733,0,S2,C
4,10000457,O2,1,A&G Shop,10000457,G1,1,,816-455-8733,5,S1,C
...,...,...,...,...,...,...,...,...,...,...,...,...
679,10027560,R2,1,Zilog Shop,10027560,G2,2,,816-455-8733,2,S1,C
680,10027572,R3,1,ZipLip.Com Shop,10027572,G2,2,,816-455-8733,3,S1,C
681,10027575,R3,1,Zitel Shop,10027575,G2,2,,816-455-8733,2,S1,C
682,10027583,R2,1,zNET Shop,10027583,G2,2,,816-455-8733,4,S5,C


In [4]:
# Revomendo as duplicadas da Df Clientes

df_clientes = df_clientes.dropDuplicates(["customerkey"])

In [5]:
# Executei o comando para substituir na coluna LIne of Business os 3 espaços em branco por 'Não Informado'.
df_clientes = df_clientes.withColumn('line_of_business', regexp_replace('line_of_business','   ', 'Não Informado'))
df_clientes.toPandas()

Unnamed: 0,address_number,business_family,business_unit,customer,customerkey,customer_type,division,line_of_business,phone,region_code,regional_sales_mgr,search_type
0,10013312,R2,1,Gate9th Shop,10013312,G2,1,Não Informado,816-455-8733,5,S9,C
1,10018352,R3,1,Johnson Store,10018352,G3,1,Não Informado,816-455-8733,5,S5,C
2,10022746,R3,1,Rdlabs Shop,10022746,G1,1,M1,816-455-8733,5,S19,C
3,10024916,R3,1,T Market,10024916,G2,1,Não Informado,816-455-8733,5,S9,C
4,10025249,R1,1,Texasgulf Market,10025249,G2,2,Não Informado,816-455-8733,2,S11,C
...,...,...,...,...,...,...,...,...,...,...,...,...
678,10025051,R3,1,Taos Supermarket,10025051,G2,2,Não Informado,816-455-8733,1,S16,C
679,10011521,R2,1,Farr Market,10011521,G2,2,Não Informado,816-455-8733,2,S5,C
680,10013376,R3,1,GCC Store,10013376,G3,1,Não Informado,816-455-8733,5,S5,C
681,10024248,R3,1,Sifton Supermarket,10024248,G2,2,M1,816-455-8733,1,S16,C


In [6]:
# Visualizando a Df Endereço e Removendo dt_foto.
df_endereco = df_endereco.drop('dt_foto')
df_endereco.toPandas()


Unnamed: 0,address_number,city,country,customer_address_1,customer_address_2,customer_address_3,customer_address_4,state,zip_code
0,10000000,Akron,US,PO Box 6258,,,,OH,44312
1,10000453,,UK,,,,,,
2,10000455,Huntington Beach,US,7392 Count Circle,,,,CA,92647
3,10000456,Edmonton,CA,8151 Wagner Road,,,,AB,T6E 4N6
4,10000458,Saginaw,US,PO Box 840,,,,MI,48606
...,...,...,...,...,...,...,...,...,...
450,10027560,Odessa,US,3356 Kermit Highway,,,,TX,79764
451,10027572,Elma,US,2210 Bowen Road,,,,NY,14059
452,10027575,Dallas,US,10400 Plano Road,,,,TX,75238
453,10027583,Morton,US,Attention: Charlene Hoyer,500 North Morton Avenue,PO Box 474,,IL,61550-0474


In [7]:
# Executando o for ele vai percorrer as colunas a direita e trocar as colunas que estão vazias ou em branco pelo texto 'Não Informado'
for a in df_endereco.columns:
    df_endereco = df_endereco.withColumn(a, rtrim(df_endereco[a]))
    df_endereco = df_endereco.withColumn(a, when(df_endereco[a] == '', "Não Informado")\
                                    .when(df_endereco[a].isNull(), "Não informado")\
                                    .otherwise(df_endereco[a]))
    
df_endereco.toPandas()                                         

Unnamed: 0,address_number,city,country,customer_address_1,customer_address_2,customer_address_3,customer_address_4,state,zip_code
0,10000000,Akron,US,PO Box 6258,Não Informado,Não Informado,Não Informado,OH,44312
1,10000453,Não Informado,UK,Não Informado,Não Informado,Não Informado,Não Informado,Não Informado,Não Informado
2,10000455,Huntington Beach,US,7392 Count Circle,Não Informado,Não Informado,Não Informado,CA,92647
3,10000456,Edmonton,CA,8151 Wagner Road,Não Informado,Não Informado,Não Informado,AB,T6E 4N6
4,10000458,Saginaw,US,PO Box 840,Não Informado,Não Informado,Não Informado,MI,48606
...,...,...,...,...,...,...,...,...,...
450,10027560,Odessa,US,3356 Kermit Highway,Não Informado,Não Informado,Não Informado,TX,79764
451,10027572,Elma,US,2210 Bowen Road,Não Informado,Não Informado,Não Informado,NY,14059
452,10027575,Dallas,US,10400 Plano Road,Não Informado,Não Informado,Não Informado,TX,75238
453,10027583,Morton,US,Attention: Charlene Hoyer,500 North Morton Avenue,PO Box 474,Não Informado,IL,61550-0474


In [8]:
# Removendo as Duplicadas da Df Endereço

df_endereco = df_endereco.dropDuplicates(["address_number"])

In [9]:
# Visualizando a Df Vendas e Removendo dt_foto.

df_vendas = df_vendas.drop('dt_foto')
df_vendas.toPandas()


Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,28/04/2019,10000481.0,28/04/2018,-23791,30/04/2018,100012.0,,,Urban Large Eggs,2000.0,...,200015.0,28/04/2019,23791,0,0,23791,23791,1.0,184.0,EA
1,12/07/2019,10002220.0,12/07/2018,36879,14/07/2018,100233.0,P01,20910,Moms Sliced Turkey,1000.0,...,200245.0,12/07/2019,45617,82496,0,45617,45617,1.0,127.0,EA
2,14/10/2019,10002220.0,15/10/2018,10973,17/10/2018,116165.0,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000.0,...,213157.0,14/10/2019,43893,54866,0,43893,43893,1.0,127.0,EA
3,01/06/2019,10002489.0,01/06/2018,-21175,03/06/2018,100096.0,,,Kiwi Lox,1000.0,...,200107.0,01/06/2019,21175,0,0,21175,21175,1.0,160.0,EA
4,26/05/2019,10004516.0,25/05/2018,9662794,27/05/2018,103341.0,P01,60776,High Top Sweet Onion,1000.0,...,203785.0,26/05/2019,8924866,1858766,0,8924866,1961509011,455.0,124.0,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65530,,,,,,,,,,,...,,,,,,,,,,
65531,,,,,,,,,,,...,,,,,,,,,,
65532,,,,,,,,,,,...,,,,,,,,,,
65533,,,,,,,,,,,...,,,,,,,,,,


In [10]:
# Usamos o código abaixo para retirar as linhas Nulas da Df Vendas.


df_vendas = df_vendas.where(col("customerkey").isNotNull())
df_vendas.toPandas()



Unnamed: 0,actual_delivery_date,customerkey,datekey,discount_amount,invoice_date,invoice_number,item_class,item_number,item,line_number,...,order_number,promised_delivery_date,sales_amount,sales_amount_based_on_list_price,sales_cost_amount,sales_margin_amount,sales_price,sales_quantity,sales_rep,u/m
0,28/04/2019,10000481,28/04/2018,-23791,30/04/2018,100012,,,Urban Large Eggs,2000,...,200015,28/04/2019,23791,0,0,23791,23791,1,184,EA
1,12/07/2019,10002220,12/07/2018,36879,14/07/2018,100233,P01,20910,Moms Sliced Turkey,1000,...,200245,12/07/2019,45617,82496,0,45617,45617,1,127,EA
2,14/10/2019,10002220,15/10/2018,10973,17/10/2018,116165,P01,38076,Cutting Edge Foot-Long Hot Dogs,1000,...,213157,14/10/2019,43893,54866,0,43893,43893,1,127,EA
3,01/06/2019,10002489,01/06/2018,-21175,03/06/2018,100096,,,Kiwi Lox,1000,...,200107,01/06/2019,21175,0,0,21175,21175,1,160,EA
4,26/05/2019,10004516,25/05/2018,9662794,27/05/2018,103341,P01,60776,High Top Sweet Onion,1000,...,203785,26/05/2019,8924866,1858766,0,8924866,1961509011,455,124,SE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65277,18/03/2020,10017638,18/03/2019,50578,21/03/2019,226497,P01,13447,High Top Oranges,8000,...,320895,18/03/2020,5699,107568,23995,32995,6332222222,9,180,EA
65278,18/03/2020,10017638,18/03/2019,41075,21/03/2019,226497,P01,25906,Landslide White Sugar,38000,...,320895,18/03/2020,46281,87356,42355,3926,231405,2,180,EA
65279,18/03/2020,10017638,18/03/2019,87616,21/03/2019,226497,P01,61856,Moms Potato Salad,227001,...,320895,18/03/2020,9872,186336,574,4132,1234,8,180,EA
65280,18/03/2020,10017638,18/03/2019,2422677,21/03/2019,226498,P01,17801,Better Fancy Canned Sardines,1000,...,320907,18/03/2020,2729751,5152428,161889,1110861,7582641667,36,180,EA


In [34]:
# Visualizando os tipos de dados da Df Vendas e vimos que alguns colunas estão com o tipo de dados string.

df_vendas.printSchema()

root
 |-- actual_delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- datekey: date (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- invoice_number: integer (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: integer (nullable = true)
 |-- list_price: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- promised_delivery_date: date (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- sales_amount_based_on_list_price: string (nullable = true)
 |-- sales_cost_amount: string (nullable = true)
 |-- sales_margin_amount: string (nullable = true)
 |-- sales_price: string (nullable = true)
 |-- sales_quantity: integer (nullable = true)
 |-- sales_rep: integer (nullable = true)
 |-- u/m: string (nullable = true)



In [12]:
# Como as datas estão como string executamos esse código para transformar no formato de date.

data = ['actual_delivery_date', 'datekey', 'invoice_date', 'promised_delivery_date']

for d in data:
    df_vendas = df_vendas.withColumn(d, to_date(col(d), 'dd/MM/yyyy').alias(d))
  


df_vendas.printSchema()

root
 |-- actual_delivery_date: date (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- datekey: date (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- invoice_number: integer (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: integer (nullable = true)
 |-- list_price: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- promised_delivery_date: date (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- sales_amount_based_on_list_price: string (nullable = true)
 |-- sales_cost_amount: string (nullable = true)
 |-- sales_margin_amount: string (nullable = true)
 |-- sales_price: string (nullable = true)
 |-- sales_quantity: integer (nullable = true)
 |-- sales_rep: integer (nullable = true)
 |-- u/m: string (nullable = true)



In [13]:
# Visualizando a Df Divisão e Removendo dt_foto.

df_divisao = df_divisao.drop('dt_foto')
df_divisao.toPandas()

Unnamed: 0,division,division_name
0,1,International
1,2,Domestic


In [14]:
# Visualizando a Df Região e Removendo dt_foto.


df_regiao = df_regiao.drop('dt_foto')
df_regiao.toPandas()

Unnamed: 0,region_code,region_name
0,0,Canada
1,1,Western
2,2,Southern
3,3,Northeast
4,4,Central
5,5,International


In [15]:
# Criando as Views Clientes, Vendas, Endereço, Divisão, Região

df_clientes.createOrReplaceTempView('vw_clientes')
df_vendas.createOrReplaceTempView('vw_vendas')
df_endereco.createOrReplaceTempView('vw_endereco')
df_divisao.createOrReplaceTempView('vw_divisao')
df_regiao.createOrReplaceTempView('vw_regiao')

In [16]:
# Achando a data minima e máxima das df Vendas

df_vendas.agg(f.min("datekey")).show() 


df_vendas.agg(f.max("datekey")).show()

+------------+
|min(datekey)|
+------------+
|  2017-01-09|
+------------+

+------------+
|max(datekey)|
+------------+
|  2019-03-18|
+------------+



In [17]:
# Criando a Df Tempo e colocando algumas colunas de data.

df_tempo = spark.sql('''
                SELECT
                datekey,
                DATE_FORMAT(DateKey,"MMMM") AS Mes,
                MONTH(DateKey) AS MesNumero,
                DAY(DateKey) AS Dia,                     
                YEAR(DateKey) as Ano     
                FROM(
                SELECT 
                EXPLODE(SEQUENCE(TO_DATE('2017-01-09'), TO_DATE('2019-03-18'), INTERVAL 1 DAY)) as DateKey)''')




df_tempo.show()
# Criando View Tempo
df_tempo.createOrReplaceTempView('vw_tempo')

+----------+-------+---------+---+----+
|   datekey|    Mes|MesNumero|Dia| Ano|
+----------+-------+---------+---+----+
|2017-01-09|January|        1|  9|2017|
|2017-01-10|January|        1| 10|2017|
|2017-01-11|January|        1| 11|2017|
|2017-01-12|January|        1| 12|2017|
|2017-01-13|January|        1| 13|2017|
|2017-01-14|January|        1| 14|2017|
|2017-01-15|January|        1| 15|2017|
|2017-01-16|January|        1| 16|2017|
|2017-01-17|January|        1| 17|2017|
|2017-01-18|January|        1| 18|2017|
|2017-01-19|January|        1| 19|2017|
|2017-01-20|January|        1| 20|2017|
|2017-01-21|January|        1| 21|2017|
|2017-01-22|January|        1| 22|2017|
|2017-01-23|January|        1| 23|2017|
|2017-01-24|January|        1| 24|2017|
|2017-01-25|January|        1| 25|2017|
|2017-01-26|January|        1| 26|2017|
|2017-01-27|January|        1| 27|2017|
|2017-01-28|January|        1| 28|2017|
+----------+-------+---------+---+----+
only showing top 20 rows



In [18]:
# União das Datas Frames Df Clientes, Df Endereço, Df Divisão e Df Região .

df_clientes_STG = spark.sql('''
            SELECT 
            vw_clientes.`customerkey`,             
            vw_clientes.`customer`,              
            vw_clientes.`customer_type`,         
            vw_clientes.`address_number`,       
            vw_divisao.`division_name`,          
            vw_regiao.`region_name`,             
            vw_endereco.`city`,                  
            vw_endereco.`state` ,                
            vw_endereco.`country`               
      
         
            
            
            from vw_clientes
            INNER JOIN vw_divisao
            ON vw_clientes.`division` = vw_divisao.`division`
            INNER JOIN vw_regiao
            ON vw_clientes.`region_code` = vw_regiao.`region_code`
            LEFT JOIN vw_endereco
            ON vw_clientes.`address_number` = vw_endereco.`address_number`
            
''')

df_clientes_STG.toPandas()
df_clientes_STG.createOrReplaceTempView('vw_clientes_STG')

In [19]:
# Verificando as colunas em Branco.

df_clientes_STG.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in df_clientes_STG.columns]).toPandas()

Unnamed: 0,customerkey,customer,customer_type,address_number,division_name,region_name,city,state,country
0,0,0,0,0,0,0,229,229,229


In [20]:
# Tirando as colunas em Branco para 'Não Informado'.

df_clientes_STG = df_clientes_STG.na.fill('Nao informado')

In [21]:
# União das Datas Frames Df Clientes, Df Endereço, Df Divisão e Df Região e Df Vendas e Df tempo.

df_clientes_vendas = spark.sql('''
            SELECT 
            vw_clientes_STG.`customerkey`,             
            vw_clientes_STG.`customer`,              
            vw_clientes_STG.`customer_type`,         
            vw_clientes_STG.`address_number`,       
            vw_clientes_STG.`division_name`,          
            vw_clientes_STG.`region_name`,             
            vw_clientes_STG.`city`,                  
            vw_clientes_STG.`state` ,                
            vw_clientes_STG.`country`,                                
            vw_vendas.`discount_amount`,         
            vw_vendas.`sales_amount`,            
            vw_vendas.`sales_margin_amount`,     
            vw_vendas.`sales_price`,          
            vw_vendas.`sales_quantity`,       
            vw_tempo.datekey,
            vw_tempo.Mes,
            vw_tempo.MesNumero,
            vw_tempo.Dia,                     
            vw_tempo.Ano
          
          
            from vw_vendas
            INNER JOIN vw_clientes_STG
            ON vw_clientes_STG.`customerkey` = vw_vendas.`customerkey`
            INNER JOIN vw_tempo
            ON vw_vendas.`datekey` = vw_tempo.`datekey`
          
''')

df_clientes_vendas.toPandas()


Unnamed: 0,customerkey,customer,customer_type,address_number,division_name,region_name,city,state,country,discount_amount,sales_amount,sales_margin_amount,sales_price,sales_quantity,datekey,Mes,MesNumero,Dia,Ano
0,10013312,Gate9th Shop,G2,10013312,International,International,,,,60528,66982,32462,66982,1,2017-02-12,February,2,12,2017
1,10013312,Gate9th Shop,G2,10013312,International,International,,,,24527,30339,18118,30339,1,2017-05-31,May,5,31,2017
2,10013312,Gate9th Shop,G2,10013312,International,International,,,,60528,66982,32462,66982,1,2017-08-14,August,8,14,2017
3,10018352,Johnson Store,G3,10018352,International,International,,,,91975,124947,64416,624735,2,2018-02-19,February,2,19,2018
4,10018352,Johnson Store,G3,10018352,International,International,,,,85648,116352,57639,58176,20,2018-02-19,February,2,19,2018
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65277,10026336,WBM Store,G2,10026336,Domestic,Southern,Columbia,SC,US,235156,291144,110248,145572,20,2018-08-23,August,8,23,2018
65278,10026336,WBM Store,G2,10026336,Domestic,Southern,Columbia,SC,US,18682,2313,4638,11565,2,2018-09-06,September,9,6,2018
65279,10026336,WBM Store,G2,10026336,Domestic,Southern,Columbia,SC,US,19175,24853,14245,2071083333,12,2018-11-04,November,11,4,2018
65280,10026336,WBM Store,G2,10026336,Domestic,Southern,Columbia,SC,US,5363,6951,23756,6951,10,2018-11-08,November,11,8,2018


In [22]:
# Criando as Hash's das dimensões.
df_clientes_vendas = df_clientes_vendas.withColumn('SK_Cliente',
                                                                     sha2(concat_ws('', df_clientes_vendas.customerkey, 
                                                                                    df_clientes_vendas.customer,
                                                                                    df_clientes_vendas.customer_type),
                                                                          256))

df_clientes_vendas = df_clientes_vendas.withColumn('SK_Localidade',
                                                                     sha2(concat_ws('', df_clientes_vendas.address_number, 
                                                                                    df_clientes_vendas.division_name,
                                                                                    df_clientes_vendas.city,
                                                                                    df_clientes_vendas.state,
                                                                                    df_clientes_vendas.region_name,
                                                                                    df_clientes_vendas.country),
                                                                          256))

df_clientes_vendas = df_clientes_vendas.withColumn('SK_Tempo',
                                                                     sha2(concat_ws('', df_clientes_vendas.datekey,
                                                                                    df_clientes_vendas.Mes,
                                                                                    df_clientes_vendas.MesNumero,
                                                                                    df_clientes_vendas.Dia,         
                                                                                    df_clientes_vendas.Ano),                                                                    
                                                                       256))

In [23]:
# Verificando se as Hash's foram criadas na Df clientes_vendas

df_clientes_vendas.toPandas()

# Criando View clientes_vendas.
df_clientes_vendas.createOrReplaceTempView('vw_clientes_vendas')

In [24]:
# Criando a dimensão Cliente


DIM_Cliente = spark.sql('''
            SELECT 
            DISTINCT SK_Cliente,
            customerkey,
            customer,
            Customer_Type 
            FROM vw_clientes_vendas
''')

DIM_Cliente.toPandas()



Unnamed: 0,SK_Cliente,customerkey,customer,Customer_Type
0,db8082b414031bad260a5e4319d116b6831bfbca9d9f51...,10013312,Gate9th Shop,G2
1,657dbf19c1c35841bc77f9a39459507700ae84fa2f9b18...,10018352,Johnson Store,G3
2,babda1652a55c156134e659c44bd856502bafd3513c2e6...,10022746,Rdlabs Shop,G1
3,bacefe514a553fc5dc46c0f5fa2dbb70963eac9d2e8665...,10025249,Texasgulf Market,G2
4,011731ebdf1714c0ef5f085cb40834e8a47267a7b3e077...,10013252,Gate Shop,G2
...,...,...,...,...
610,6e231078e9a21fecd3f392e237dbf5436f06d5462d7b33...,10019066,KENROB Megaplace,G2
611,35f3e88bc1743a35916a395cf3fb98db8a2e6a9a2af169...,10025051,Taos Supermarket,G2
612,c3ae2cf46dc233d30c10ce622d06e7de600d5049618e31...,10011521,Farr Market,G2
613,db1566de445c0a8fe6cd92f76f422077ef192aef46cc0d...,10024248,Sifton Supermarket,G2


In [25]:
# Criando a dimensão localidade.

DIM_Localidade = spark.sql('''
            SELECT 
            DISTINCT SK_Localidade,
            Address_Number,
            Division_Name,
            Region_Name,
            City,
            State,
            Country
            FROM vw_clientes_vendas
''')

DIM_Localidade.toPandas()

Unnamed: 0,SK_Localidade,Address_Number,Division_Name,Region_Name,City,State,Country
0,8c9484743e14cc806669136b37dad37b838a8a96f720ca...,10026355,Domestic,Western,Casper,WY,US
1,0c850a682d85ef7d0564d19f30bf255b74001e748a05a1...,10025508,International,Canada,Moncton,NB,CA
2,d0a8f5ffdca4549e9d2933d99e09d488cad6e32ae57d70...,10013574,International,International,,,
3,cefd7c251326b6446eeb805940b05c438715ba37d399c7...,10015495,Domestic,Central,Kansas City,KS,US
4,b4a549383fb61c1b953c91d3793f4799801c1e26000c71...,10026843,Domestic,Western,Tulare,CA,US
...,...,...,...,...,...,...,...
610,58cfde1f1d57c8f3f6e243143170352b71ce0a452481f6...,10021470,Domestic,Northeast,Wrightsville,PA,US
611,01773f85955fb23b4a0c16719c5b78efd4861232faa39d...,10020492,International,International,,,
612,cd18bf716888190e25e48c4804950a05b579e1922fd0be...,10016113,International,International,,,
613,de7c21937987213414a07fafb86329b98c34918c5da3a7...,10023511,International,International,Schiphol-Rijk,Não Informado,IR


In [26]:
# Criando a dimensão Tempo.

DIM_Tempo = spark.sql('''
            SELECT 
            DISTINCT SK_Tempo,
            DateKey,
            Dia, 
            Mes,
            MesNumero,
            Ano
            FROM vw_clientes_vendas
''')

DIM_Tempo.toPandas()

Unnamed: 0,SK_Tempo,DateKey,Dia,Mes,MesNumero,Ano
0,b4809d6dad3dad6d57cfb42a532315700e5d080545af1e...,2018-12-08,8,December,12,2018
1,d896c44a96151a0db96e0225c53ba12d93cc194a64255b...,2017-01-25,25,January,1,2017
2,5ec121a7ae20ba9a91b2817668c82f3d73a65cb1ab8153...,2017-06-14,14,June,6,2017
3,889a32178a60957e6ec3e89068aded4a10f036365a8b55...,2017-09-27,27,September,9,2017
4,e6343f12cbea834447522db7ce9a79dfc46d4c1c63678f...,2017-11-01,1,November,11,2017
...,...,...,...,...,...,...
553,d3df76e555bd24d8e69c07a156f57b931461a0573a3fe2...,2017-07-20,20,July,7,2017
554,a2ce03854003ec4eab115ccb67e1a21837be44f521213e...,2017-08-16,16,August,8,2017
555,d2754d2b5ff0e293a9d451e5f40c17ff781292c4876a6d...,2019-02-19,19,February,2,2019
556,28822a95f53377217877910eabf63aee61f4ff8a89dcf7...,2018-08-10,10,August,8,2018


In [27]:
# Criando a tabela Fato

Fato = spark.sql('''
            SELECT 
            SK_Cliente,
            SK_Localidade,
            SK_Tempo,
            discount_amount,
            sales_amount,
            sales_margin_amount,
            sales_price,
            sales_quantity
            
            FROM vw_clientes_vendas
''')

Fato.toPandas()

Unnamed: 0,SK_Cliente,SK_Localidade,SK_Tempo,discount_amount,sales_amount,sales_margin_amount,sales_price,sales_quantity
0,db8082b414031bad260a5e4319d116b6831bfbca9d9f51...,26e1d9957f47ff5e433c1cc229f84a4b5924ea88c361db...,4d38a48bd1785da540104dafdea0b690fbdf3e0f8de737...,60528,66982,32462,66982,1
1,db8082b414031bad260a5e4319d116b6831bfbca9d9f51...,26e1d9957f47ff5e433c1cc229f84a4b5924ea88c361db...,c631954223002f5da35cd8140465014697f5e1fe8379c4...,24527,30339,18118,30339,1
2,db8082b414031bad260a5e4319d116b6831bfbca9d9f51...,26e1d9957f47ff5e433c1cc229f84a4b5924ea88c361db...,c7421196ea37fdedb567f9eb512ad4973b9b91a247c440...,60528,66982,32462,66982,1
3,657dbf19c1c35841bc77f9a39459507700ae84fa2f9b18...,daee3ab02ff74d4582fc251682e800624b76efe64a5747...,ae3d6638059b728fb51594d652501ffc4bc6b8a1113896...,91975,124947,64416,624735,2
4,657dbf19c1c35841bc77f9a39459507700ae84fa2f9b18...,daee3ab02ff74d4582fc251682e800624b76efe64a5747...,ae3d6638059b728fb51594d652501ffc4bc6b8a1113896...,85648,116352,57639,58176,20
...,...,...,...,...,...,...,...,...
65277,cfa43ba5ac87e2de0ba5f6611976b42865553ca33c1e72...,d86014d0cbe470af10fde8b7fdf94f144fdcb19aae4333...,87f3b6d768bfbd495a8286f6d05d6e1312f5ebd7ccbf73...,235156,291144,110248,145572,20
65278,cfa43ba5ac87e2de0ba5f6611976b42865553ca33c1e72...,d86014d0cbe470af10fde8b7fdf94f144fdcb19aae4333...,953e568b1fb616ac19c5c56e0a859cd7c5e54aa6b8a49a...,18682,2313,4638,11565,2
65279,cfa43ba5ac87e2de0ba5f6611976b42865553ca33c1e72...,d86014d0cbe470af10fde8b7fdf94f144fdcb19aae4333...,a737e355f224b6022f8ab72439e2126ae6a8fa95e1c544...,19175,24853,14245,2071083333,12
65280,cfa43ba5ac87e2de0ba5f6611976b42865553ca33c1e72...,d86014d0cbe470af10fde8b7fdf94f144fdcb19aae4333...,600a3bdc61797ba7242f8b025381192803c3e2004dfadd...,5363,6951,23756,6951,10


In [28]:
# Salvando as dimensões e fato na dados_saida.

DIM_Cliente.coalesce(1).write.mode('overwrite').options(header='True', delimiter=';').csv("/final/dados_saida/DIM_Cliente.csv")

DIM_Tempo.coalesce(1).write.mode('overwrite').options(header='True', delimiter=';').csv("/final/dados_saida/DIM_Tempo.csv")

DIM_Localidade.coalesce(1).write.mode('overwrite').options(header='True', delimiter=';').csv("/final/dados_saida/DIM_Localidade.csv")

Fato.coalesce(1).write.mode('overwrite').options(header='True', delimiter=';').csv("/final/dados_saida/Fato.csv")