In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql import functions as f
import os
import re

In [2]:
spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [3]:
df_regiao = spark.sql("select * from desafio_curso.tbl_regiao")

In [4]:
df_regiao.show()

+-----------+-------------+--------+
|region_code|  region_name| dt_foto|
+-----------+-------------+--------+
|          0|       Canada|20230624|
|          1|      Western|20230624|
|          2|     Southern|20230624|
|          3|    Northeast|20230624|
|          4|      Central|20230624|
|          5|International|20230624|
+-----------+-------------+--------+



In [5]:
df_divisao = spark.sql("select * from desafio_curso.tbl_divisao")

In [6]:
df_divisao.show()

+--------+-------------+--------+
|division|division_name| dt_foto|
+--------+-------------+--------+
|       1|International|20230624|
|       2|     Domestic|20230624|
+--------+-------------+--------+



In [7]:
#Campos com espaço "Não Informado"
query_clientes = '''
select address_number,business_family,business_unit,customer,customerkey,customer_type,division,
case when length(trim(line_of_business)) = 0 then 'Não Informado' else line_of_business end as line_of_business,
phone,region_code,regional_sales_mgr,search_type,dt_foto
from desafio_curso.tbl_clientes'''

df_clientes = spark.sql(query_clientes)

In [8]:
df_clientes.show()

+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+--------+
|address_number|business_family|business_unit|            customer|customerkey|customer_type|division|line_of_business|       phone|region_code|regional_sales_mgr|search_type| dt_foto|
+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+--------+
|      10000000|             R3|            1|    City Supermarket|   10000000|           G2|       2|   Não Informado|816-455-8733|          4|               S16|          C|20230624|
|      10000453|             R3|            1|       A Supermarket|   10000453|           G1|       1|   Não Informado|816-455-8733|          5|               S19|          C|20230624|
|      10000455|             R3|            1|Caribian Supermarket|   10000

In [9]:
#Campos com espaço "Não Informado"
query_endereco = '''
select address_number,
case when length(trim(city)) = 0 then 'Não Informado' else city end as city,country,
case when length(trim(customer_address_1)) = 0 then 'Não Informado' else customer_address_1 end as customer_address_1,
case when length(trim(customer_address_2)) = 0 then 'Não Informado' else customer_address_2 end as customer_address_2,
case when length(trim(customer_address_3)) = 0 then 'Não Informado' else customer_address_3 end as customer_address_3,
case when length(trim(customer_address_4)) = 0 then 'Não Informado' else customer_address_4 end as customer_address_4,
case when length(trim(state)) = 0 then 'Não Informado' else state end as state,
case when length(trim(zip_code)) = 0 then 'Não Informado' else zip_code end as zip_code,dt_foto
from desafio_curso.tbl_endereco'''

df_endereco = spark.sql(query_endereco)

In [10]:
df_endereco.show()

+--------------+----------------+-------+--------------------+--------------------+------------------+------------------+-------------+-------------+--------+
|address_number|            city|country|  customer_address_1|  customer_address_2|customer_address_3|customer_address_4|        state|     zip_code| dt_foto|
+--------------+----------------+-------+--------------------+--------------------+------------------+------------------+-------------+-------------+--------+
|      10000000|           Akron|     US|         PO Box 6258|       Não Informado|     Não Informado|     Não Informado|           OH|        44312|20230624|
|      10000453|   Não Informado|     UK|       Não Informado|       Não Informado|     Não Informado|     Não Informado|Não Informado|Não Informado|20230624|
|      10000455|Huntington Beach|     US|   7392 Count Circle|       Não Informado|     Não Informado|     Não Informado|           CA|        92647|20230624|
|      10000456|        Edmonton|     CA|    8

In [11]:
query_vendas = '''
select actual_delivery_date,customerkey,datekey,
nvl(replace(discount_amount,',','.'),0) as discount_amount,
invoice_date,invoice_number,
nvl(item_class,'Não Informado') as item_class,
nvl(item_number,0) as item_number,
item,line_number,
replace(list_price,',','.') as list_price,
order_number,promise_delivery_date,
replace(sales_amount,',','.') as sales_amount,
replace(sales_amount_based_on_list_price,',','.') as sales_amount_based_on_list_price,
replace(sales_cost_amount,',','.') as sales_cost_amount,
replace(sales_margin_amount,',','.') as sales_margin_amount,
nvl(replace(sales_price,',','.'),0) as sales_price,
sales_quantity,sales_rep,u_m
from desafio_curso.tbl_vendas'''

df_vendas = spark.sql(query_vendas)

In [12]:
df_vendas.show()

+--------------------+-----------+----------+---------------+------------+--------------+-------------+-----------+--------------------+-----------+----------+------------+---------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|   item_class|item_number|                item|line_number|list_price|order_number|promise_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|
+--------------------+-----------+----------+---------------+------------+--------------+-------------+-----------+--------------------+-----------+----------+------------+---------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|          28/04/2019|   10000481|28/04/

In [13]:
df_clientes.createOrReplaceTempView('clientes')
df_regiao.createOrReplaceTempView('regiao')
df_divisao.createOrReplaceTempView('divisao')
df_endereco.createOrReplaceTempView('endereco')
df_vendas.createOrReplaceTempView('vendas')

In [14]:
spark.sql("select sum(sales_amount) as total_sales from vendas").show()

+------------------+
|       total_sales|
+------------------+
|1.86186769050001E8|
+------------------+



In [15]:
query_stage = '''
select
    v.actual_delivery_date,
    v.customerkey,
    v.datekey,
    v.discount_amount,
    v.invoice_date,
    v.invoice_number,
    v.item_class,
    v.item_number,
    v.item,
    v.line_number,
    v.list_price,
    v.order_number,
    v.promise_delivery_date,
    v.sales_amount,
    v.sales_amount_based_on_list_price,
    v.sales_cost_amount,
    v.sales_margin_amount,
    v.sales_price,
    v.sales_quantity,
    v.sales_rep,
    v.u_m,
    c.address_number,
    c.business_family,
    c.business_unit,
    c.customer,
    c.customer_type,
    c.division,
    c.line_of_business,
    c.phone,
    c.region_code,
    c.regional_sales_mgr,
    c.search_type,
    d.division_name,
    r.region_name,
    e.city,
    e.country,
    e.customer_address_1,
    e.customer_address_2,
    e.customer_address_3,
    e.customer_address_4,
    e.state,
    e.zip_code
from vendas v
left join clientes c on v.customerkey = c.customerkey
left join endereco e on c.address_number = e.address_number
inner join regiao r on c.region_code = r.region_code
inner join divisao d on c.division = d.division    
'''

df_stage = spark.sql(query_stage)

In [16]:
df_stage.show(5)

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+---------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+--------------+-------------+--------+----------------+------------+-----------+------------------+-----------+-------------+-----------+-------+-------+--------------------+------------------+------------------+------------------+-----+--------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promise_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|address_number|business_family|business_unit|      customer|customer_type|di

In [17]:
df_stage = (df_stage
            .withColumn('Ano',f.year(f.to_timestamp('invoice_date','dd/MM/yyyy')))
            .withColumn('Mes',f.month(f.to_timestamp('invoice_date','dd/MM/yyyy')))
            .withColumn('Dia',f.dayofmonth(f.to_timestamp('invoice_date','dd/MM/yyyy')))
            .withColumn('Trimestre',f.quarter(f.to_timestamp('invoice_date','dd/MM/yyyy')))
           )

In [18]:
df_stage.show()

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+---------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+--------------------+-------------+--------+----------------+------------+-----------+------------------+-----------+-------------+-----------+--------+-------+--------------------+------------------+------------------+------------------+-----+--------+----+---+---+---------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promise_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|address_number|business_family|business_unit| 

In [19]:
df_stage = df_stage.withColumn("DW_LOCALIDADE", sha2(concat_ws("", df_stage.address_number,df_stage.city,df_stage.country,
            df_stage.customer_address_1,df_stage.customer_address_2,df_stage.customer_address_3,df_stage.customer_address_4,
            df_stage.state,df_stage.zip_code), 256))
df_stage = df_stage.withColumn("DW_CLIENTES", sha2(concat_ws("", df_stage.address_number,df_stage.business_family,
            df_stage.business_unit,df_stage.customer,df_stage.customerkey,df_stage.customer_type,df_stage.division,
            df_stage.division_name,df_stage.line_of_business,df_stage.phone,df_stage.region_code,df_stage.region_name,                                               
            df_stage.regional_sales_mgr,df_stage.search_type), 256))
df_stage = df_stage.withColumn("DW_TEMPO", sha2(concat_ws("", df_stage.invoice_date,df_stage.Ano,df_stage.Mes,df_stage.Dia,
            df_stage.Trimestre), 256))

In [20]:
df_stage.show()

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+---------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+--------------------+-------------+--------+----------------+------------+-----------+------------------+-----------+-------------+-----------+--------+-------+--------------------+------------------+------------------+------------------+-----+--------+----+---+---+---------+--------------------+--------------------+--------------------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promise_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quanti

In [21]:
df_stage.count()

66872

In [22]:
df_stage.createOrReplaceTempView('stage')

In [23]:
#Criando a dimensão Localidade
dim_localidade = spark.sql('''
    SELECT DISTINCT
        DW_LOCALIDADE,
        address_number,
        city,
        country,
        customer_address_1,
        customer_address_2,
        customer_address_3,
        customer_address_4,
        state,
        zip_code
    FROM stage    
''')

In [24]:
#Criando a dimensão Tempo
dim_tempo = spark.sql('''
    SELECT DISTINCT
        DW_TEMPO,
        invoice_date,
        ano,
        mes,
        dia,
        trimestre
    FROM stage    
''')

In [25]:
#Criando a dimensão Cliente
dim_cliente = spark.sql('''
    SELECT DISTINCT
        DW_CLIENTES,
        address_number,
        business_family,
        business_unit,
        customer,
        customerkey,
        customer_type,
        division,
        division_name,
        line_of_business,
        phone,
        region_code,
        region_name,
        regional_sales_mgr,
        search_type
    FROM stage    
''')

In [26]:
spark.sql("select sum(sales_amount) from stage").show()



+---------------------------------+
|sum(CAST(sales_amount AS DOUBLE))|
+---------------------------------+
|              1.876844056200009E8|
+---------------------------------+



In [28]:
ft_vendas = spark.sql('''
    SELECT 
        DW_CLIENTES,
        DW_LOCALIDADE,
        DW_TEMPO,
        sum(sales_amount) as vl_total
    FROM stage
    group by 
        DW_CLIENTES,
        DW_LOCALIDADE,
        DW_TEMPO
''')

In [29]:
ft_vendas.show()

+--------------------+--------------------+--------------------+------------------+
|         DW_CLIENTES|       DW_LOCALIDADE|            DW_TEMPO|          vl_total|
+--------------------+--------------------+--------------------+------------------+
|713a19409e0296ecd...|a08ba97c7c50e3aad...|a978a08e9d0f3d3ff...|            269.63|
|745d15a4cb7077123...|33d9c0440b6293a3f...|d3797e82160177080...|          15234.05|
|323d45bb01a8177e4...|b2cf91c7cc9005a64...|ba0c740fcc4088d60...|            869.24|
|81ab9349dd78f01b5...|e6972dda4cde40525...|12ead547d43638a7b...|2907.3499999999995|
|6557c72fb2228bc3f...|7a597d06bd3db55b1...|4c6f2434fed3f3850...|181496.84999999998|
|8d1235751cf1a4313...|a11391910776a95d3...|2e044170d9463dae8...|             761.3|
|f6d5675559b0d8a1a...|6811d6c14128f6c1b...|783970d0628640a47...|            2560.0|
|16643e965f0c0302b...|4cca9e63ef11eb97b...|96feadf1c07fcbf16...|            6756.1|
|4856dc0aa4abf75c9...|6f3243d467d80d2ec...|17ebda81b9cbfd8a2...|           1

In [32]:
# função para salvar os dados
def salvar_df(df, file):
    output = "/input/desafio_curso/gold/" + file
    erase = "hdfs dfs -rm " + output + "/*"
    rename = "hdfs dfs -get /datalake/gold/"+file+"/part-* /input/desafio_curso/gold/"+file+".csv"
    print(rename)

    df.coalesce(1).write\
        .format("csv")\
        .option("header", True)\
        .option("delimiter", ";")\
        .mode("overwrite")\
        .save("/datalake/gold/"+file+"/")

    os.system(erase)
    os.system(rename)

In [33]:
salvar_df(ft_vendas, 'ft_vendas')
salvar_df(dim_cliente, 'dim_cliente')
salvar_df(dim_tempo, 'dim_tempo')
salvar_df(dim_localidade, 'dim_localidade')

hdfs dfs -get /datalake/gold/ft_vendas/part-* /input/desafio_curso/gold/ft_vendas.csv
hdfs dfs -get /datalake/gold/dim_cliente/part-* /input/desafio_curso/gold/dim_cliente.csv
hdfs dfs -get /datalake/gold/dim_tempo/part-* /input/desafio_curso/gold/dim_tempo.csv
hdfs dfs -get /datalake/gold/dim_localidade/part-* /input/desafio_curso/gold/dim_localidade.csv
