In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.functions import when, col, sum, count, isnan, round
from pyspark.sql.functions import regexp_replace, concat_ws, sha2
from pyspark.sql.functions import rtrim
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date

In [2]:
# Criação da instância spark para leitura e criação de tabelas
spark = SparkSession.builder.master("local[*]").enableHiveSupport().getOrCreate()

In [3]:
# Lendo tabela CLIENTES do HDFS:

df_clientes = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/CLIENTES.csv')

In [4]:
# Lendo tabela DIVISAO do HDFS:

df_divisao = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/DIVISAO.csv')

In [5]:
df_divisao.printSchema()

root
 |-- Division: integer (nullable = true)
 |-- Division Name: string (nullable = true)



In [6]:
# Lendo tabela ENDERECO do HDFS:

df_endereco = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/ENDERECO.csv')

In [7]:
# Lendo tabela REGIAO do HDFS:

df_regiao = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/REGIAO.csv')

In [8]:
df_regiao.printSchema()

root
 |-- Region Code: integer (nullable = true)
 |-- Region Name: string (nullable = true)



In [9]:
# Lendo tabela VENDAS do HDFS:

df_vendas = spark.read.options(delimiter=';', header=True).option('inferSchema','true')\
                                .csv('/dados_processamento/dados/VENDAS.csv')

In [10]:
## Analisando a tabela Cliente

df_clientes.printSchema()

root
 |-- Address Number: integer (nullable = true)
 |-- Business Family: string (nullable = true)
 |-- Business Unit: integer (nullable = true)
 |-- Customer: string (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Division: integer (nullable = true)
 |-- Line of Business: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Region Code: integer (nullable = true)
 |-- Regional Sales Mgr: string (nullable = true)
 |-- Search Type: string (nullable = true)



In [11]:
df_clientes.show()

+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit|            Customer|CustomerKey|Customer Type|Division|Line of Business|       Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|      10000000|             R3|            1|    City Supermarket|   10000000|           G2|       2|                |816-455-8733|          4|               S16|          C|
|      10000453|             R3|            1|       A Supermarket|   10000453|           G1|       1|                |816-455-8733|          5|               S19|          C|
|      10000455|             R3|            1|Caribian Supermarket|   10000455|           G2|       2|                |8

In [12]:
# Função para verificar a existêncai de dados Null em um datafreme
def verificar_val_null(df):
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
    
    
# Realiza a correção dos dados que são compostos apenas por espaços
def corrigir_dados_formado_por_espaços(df):
    for c in df.columns:
        df = df.withColumn(c,rtrim(df[c]))
        df = df.withColumn(c, when(df[c] == '', 'NÃO INFORMADO')\
                                         .otherwise(df[c]))
    return df

# Realiza a correção dos dados que são nulos na tabela de acordo com o tipo e a regrea de negocio
def corrigir_dados_null(df):
    for c in df.columns:
        if (str(df.schema[c].dataType) == 'StringType'):
            df = df.withColumn(c, when(df[c].isNull(), 'NÃO INFORMADO').otherwise(df[c]))
        elif (str(df.schema[c].dataType) == 'DoubleType'):
            df = df.withColumn(c, when(df[c].isNull(), 0.0).otherwise(df[c]))
        elif (str(df.schema[c].dataType) == 'IntegerType'):
            df = df.withColumn(c, when(df[c].isNull(), 0).otherwise(df[c]))
    return df


# Corrigir as colunas que estão com os tipos errados de dados (não é valido para datas)
def converter_tipo_coluna(df, colunas, tipo):
    if type(colunas) != list:
        colunas = [colunas] 
    for col_name in colunas:
        df = df.withColumn(col_name, col(col_name).cast(tipo))       
    return df

In [13]:
# Verificando a existência de dados Null ou NaN
verificar_val_null(df_clientes)

+--------------+---------------+-------------+--------+-----------+-------------+--------+----------------+-----+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit|Customer|CustomerKey|Customer Type|Division|Line of Business|Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+--------+-----------+-------------+--------+----------------+-----+-----------+------------------+-----------+
|             0|              0|            0|       0|          0|            0|       0|               0|    0|          0|                 0|          0|
+--------------+---------------+-------------+--------+-----------+-------------+--------+----------------+-----+-----------+------------------+-----------+



In [14]:
'''
Como a tabale não possui valores Null ou NaN, é necessário apenas alterar a coluna "Line of Business" que possuia a 
str "   " (três espaços) representando o campo não informado.
'''
df_clientes = corrigir_dados_formado_por_espaços(df_clientes)
df_clientes = df_clientes.dropDuplicates(['CustomerKey'])

df_clientes.show()

+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|Address Number|Business Family|Business Unit|            Customer|CustomerKey|Customer Type|Division|Line of Business|       Phone|Region Code|Regional Sales Mgr|Search Type|
+--------------+---------------+-------------+--------------------+-----------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|      10000472|             R3|            1|         Aaron Store|   10000472|           G2|       2|   NÃO INFORMADO|816-455-8733|          1|                S4|          C|
|      10025769|             R3|            1|         UUmail Shop|   10025769|           G2|       2|   NÃO INFORMADO|816-455-8733|          1|                S1|          C|
|      10003882|             R3|            1|   Beech Supermarket|   10003882|           G2|       1|   NÃO INFORMADO|8

In [15]:
df_endereco.printSchema()

root
 |-- Address Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer Address 1: string (nullable = true)
 |-- Customer Address 2: string (nullable = true)
 |-- Customer Address 3: string (nullable = true)
 |-- Customer Address 4: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [16]:
df_endereco.show()

+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+
|Address Number|                City|Country|  Customer Address 1|  Customer Address 2|  Customer Address 3|  Customer Address 4|State|    Zip Code|
+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+
|      10000000|               Akron|     US|         PO Box 6258|                 ...|                 ...|                 ...|   OH|       44312|
|      10000453|                 ...|     UK|                 ...|                 ...|                 ...|                 ...| null|            |
|      10000455|    Huntington Beach|     US|   7392 Count Circle|                 ...|                 ...|                 ...|   CA|       92647|
|      10000456|            Edmonton|     CA|    8151 Wagner Road|                 ...|                 ..

In [17]:
# Verificando a existência de dados Null
verificar_val_null(df_endereco)

+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|Address Number|City|Country|Customer Address 1|Customer Address 2|Customer Address 3|Customer Address 4|State|Zip Code|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|             0|   0|      0|                 0|                 0|                 0|                 0|   70|       0|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+



In [18]:
'''
Pela inspeção visual foi possível perceber que uma série de colunas possui dados faltentes, onde esses campos são preenchido 
por um conjunto de espaços (space). Dependendo da coluna a quantidade de espaços variam. No intuito de uniformizar os campos
que não possuiam dados apenas espaços, foi utilizado a função rtrim (remove espaços a direita).O resultado da aplicação desta
função nos campos onde só possuiam espaços é o retorno do campo vazio (''). Logo, aplicando a substituição dos valores nos 
campos vazios, resolvemos uma parte do problema.

O segundo problema encontrado na tabela foi 70 valores Null na coluna 'state'. Então foi utilizado o método isNull para
identificar os campos nulos e a função when para subsitui-los.
'''

df_endereco = corrigir_dados_formado_por_espaços(df_endereco)
df_endereco = corrigir_dados_null(df_endereco)

# Corrigindo o valor duplicado "10021911" da colluna Address Number, que assim como na tabela de cliente possui as PK repetidas
df_endereco = df_endereco.dropDuplicates(['Address Number'])
df_endereco.show()

+--------------+------------+-------+--------------------+--------------------+------------------+------------------+-------------+----------+
|Address Number|        City|Country|  Customer Address 1|  Customer Address 2|Customer Address 3|Customer Address 4|        State|  Zip Code|
+--------------+------------+-------+--------------------+--------------------+------------------+------------------+-------------+----------+
|      10000472|    Van Nuys|     US| 8000 Haskell Avenue|       NÃO INFORMADO|     NÃO INFORMADO|     NÃO INFORMADO|           CA|     91406|
|      10025769|     Redding|     US| 850 Commerce Street|       NÃO INFORMADO|     NÃO INFORMADO|     NÃO INFORMADO|           CA|     96002|
|      10023235|     Houston|     US| 7600 South Santa Fe|       NÃO INFORMADO|     NÃO INFORMADO|     NÃO INFORMADO|           TX|     77061|
|      10003795| Baton Rouge|     US|         PO Box 2484|       NÃO INFORMADO|     NÃO INFORMADO|     NÃO INFORMADO|           LA|     70815|

In [19]:
# Verificando a existência de dados Null ou NaN
verificar_val_null(df_endereco)

+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|Address Number|City|Country|Customer Address 1|Customer Address 2|Customer Address 3|Customer Address 4|State|Zip Code|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+
|             0|   0|      0|                 0|                 0|                 0|                 0|    0|       0|
+--------------+----+-------+------------------+------------------+------------------+------------------+-----+--------+



In [20]:
df_vendas.show()

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|   DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|                Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|          28/04/2019|   10000481|28/04/2018| 

In [21]:
df_vendas.printSchema()

root
 |-- Actual Delivery Date: string (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- DateKey: string (nullable = true)
 |-- Discount Amount: string (nullable = true)
 |-- Invoice Date: string (nullable = true)
 |-- Invoice Number: integer (nullable = true)
 |-- Item Class: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Line Number: integer (nullable = true)
 |-- List Price: string (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Promised Delivery Date: string (nullable = true)
 |-- Sales Amount: string (nullable = true)
 |-- Sales Amount Based on List Price: string (nullable = true)
 |-- Sales Cost Amount: string (nullable = true)
 |-- Sales Margin Amount: string (nullable = true)
 |-- Sales Price: string (nullable = true)
 |-- Sales Quantity: integer (nullable = true)
 |-- Sales Rep: integer (nullable = true)
 |-- U/M: string (nullable = true)



In [22]:
# Verificando a existência de dados Null ou NaN
verificar_val_null(df_vendas)

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|                 253|        253|    253|            255|         253|           253|      8542|      

In [23]:
'''
É possivel verificar que algumas linhas possuem todos os campos nulos, logo, iremos deletar todas estas linhas. Para selecionar
as linhas onde todos os campos são nulos, basta verificar as 3 primeiras colunas.
'''
df_vendas.filter(((col("Actual Delivery Date").isNull()) & (col("CustomerKey").isNull()) & (col("DateKey").isNull()))).show(5)

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+----+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep| U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+----+
|                null|       null|   null|           null|        null|          null|      null|   

In [24]:
# Removendo do dataframe todas as linhas que possuem todos os campos nulos
df_vendas = df_vendas.filter(((col("Actual Delivery Date").isNotNull())\
                              & (col("CustomerKey").isNotNull())\
                              & (col("DateKey").isNotNull())))

In [25]:
# Verificando novamente a existência de dados Null ou NaN para visualizar se as linhas nulas foram removidas
verificar_val_null(df_vendas)

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|                   0|          0|      0|              2|           0|             0|      8289|      

In [26]:
'''
df_vendas = converter_tipo_coluna(df_vendas, 'Item Number', 'int')

cols_dates = ['Actual Delivery Date', 'DateKey', 'Invoice Date', 'Promised Delivery Date']

for col_name_data in cols_dates:
    df_vendas = df_vendas.withColumn(col_name_data, to_date(col(col_name_data),"dd/MM/yyyy"))

    
# O separador decimal utilizado na base de dados é a virgula, logo, precisamos alteralo para o ponto. E aproveitando o loop 
# vamos relizar o cast
col_lis = ['Discount Amount', 'List Price', 'Sales Amount', 'Sales Amount Based on List Price',\
           'Sales Cost Amount', 'Sales Margin Amount', 'Sales Price']

for coluna in col_lis:
    df_vendas = df_vendas.withColumn(coluna, round(regexp_replace(coluna,'\,','.').cast('double'), 2))'''


'\ndf_vendas = converter_tipo_coluna(df_vendas, \'Item Number\', \'int\')\n\ncols_dates = [\'Actual Delivery Date\', \'DateKey\', \'Invoice Date\', \'Promised Delivery Date\']\n\nfor col_name_data in cols_dates:\n    df_vendas = df_vendas.withColumn(col_name_data, to_date(col(col_name_data),"dd/MM/yyyy"))\n\n    \n# O separador decimal utilizado na base de dados é a virgula, logo, precisamos alteralo para o ponto. E aproveitando o loop \n# vamos relizar o cast\ncol_lis = [\'Discount Amount\', \'List Price\', \'Sales Amount\', \'Sales Amount Based on List Price\',           \'Sales Cost Amount\', \'Sales Margin Amount\', \'Sales Price\']\n\nfor coluna in col_lis:\n    df_vendas = df_vendas.withColumn(coluna, round(regexp_replace(coluna,\'\\,\',\'.\').cast(\'double\'), 2))'

In [27]:
# Antes de continuar com limpeza dos dados vamos corrigir as colunas que estão com os tipos errados de dados
df_vendas = converter_tipo_coluna(df_vendas, 'Item Number', 'int')

cols_dates = ['Actual Delivery Date', 'DateKey', 'Invoice Date', 'Promised Delivery Date']

for col_name_data in cols_dates:
    df_vendas = df_vendas.withColumn(col_name_data, to_date(col(col_name_data),"dd/MM/yyyy"))

    
# O separador decimal utilizado na base de dados é a virgula, logo, precisamos alteralo para o ponto. E aproveitando o loop 
# vamos relizar o cast
col_lis = ['Discount Amount', 'List Price', 'Sales Amount', 'Sales Amount Based on List Price',\
           'Sales Cost Amount', 'Sales Margin Amount', 'Sales Price']

for coluna in col_lis:
    df_vendas = df_vendas.withColumn(coluna, round(regexp_replace(coluna,'\,','.').cast('double'), 2))

In [28]:
# Realizando a limpeza dos dados que são nulos na tabela

df_vendas = corrigir_dados_null(df_vendas)

In [29]:
# Verificando novamente a existência de dados Null ou NaN para visualizar se as linhas nulas foram removidas
verificar_val_null(df_vendas)

+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|DateKey|Discount Amount|Invoice Date|Invoice Number|Item Class|Item Number|Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+-------+---------------+------------+--------------+----------+-----------+----+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|                   0|          0|      0|              0|           0|             0|         0|      

In [30]:
df_vendas.show(5)

+--------------------+-----------+----------+---------------+------------+--------------+-------------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|Actual Delivery Date|CustomerKey|   DateKey|Discount Amount|Invoice Date|Invoice Number|   Item Class|Item Number|                Item|Line Number|List Price|Order Number|Promised Delivery Date|Sales Amount|Sales Amount Based on List Price|Sales Cost Amount|Sales Margin Amount|Sales Price|Sales Quantity|Sales Rep|U/M|
+--------------------+-----------+----------+---------------+------------+--------------+-------------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+
|          2019-04-28|   10000481|201

In [31]:
df_vendas.printSchema()

root
 |-- Actual Delivery Date: date (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- DateKey: date (nullable = true)
 |-- Discount Amount: double (nullable = true)
 |-- Invoice Date: date (nullable = true)
 |-- Invoice Number: integer (nullable = true)
 |-- Item Class: string (nullable = true)
 |-- Item Number: integer (nullable = true)
 |-- Item: string (nullable = true)
 |-- Line Number: integer (nullable = true)
 |-- List Price: double (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Promised Delivery Date: date (nullable = true)
 |-- Sales Amount: double (nullable = true)
 |-- Sales Amount Based on List Price: double (nullable = true)
 |-- Sales Cost Amount: double (nullable = true)
 |-- Sales Margin Amount: double (nullable = true)
 |-- Sales Price: double (nullable = true)
 |-- Sales Quantity: integer (nullable = true)
 |-- Sales Rep: integer (nullable = true)
 |-- U/M: string (nullable = true)



In [32]:
'''
5 - efetuar as responder as perguntas:

- quantos pedidos foram realizados
- quantos clientes tem em nossa base
- quantos clientes temos por Região
- quantidade de vendas em 2018
'''

# Criando as views:
df_endereco.createOrReplaceTempView('vw_endereco')
df_divisao.createOrReplaceTempView('vw_divisao')
df_clientes.createOrReplaceTempView('vw_clientes')
df_vendas.createOrReplaceTempView('vw_vendas')
df_regiao.createOrReplaceTempView('vw_regiao')

In [33]:
# Intervalo minimo de datas necessária para criação do DW
spark.sql('''
        SELECT MIN(DateKey), MAX(DateKey) FROM vw_vendas
''').show()

+------------+------------+
|min(DateKey)|max(DateKey)|
+------------+------------+
|  2017-01-09|  2019-03-18|
+------------+------------+



In [34]:
df_data_ST = spark.sql('''
                SELECT 
                EXPLODE(SEQUENCE(TO_DATE('2017-01-09'), TO_DATE('2019-03-18'), INTERVAL 1 DAY)) as DateKey''')

df_data_ST.show(2)

+----------+
|   DateKey|
+----------+
|2017-01-09|
|2017-01-10|
+----------+
only showing top 2 rows



In [35]:
df_time_ST = spark.sql('''
                SELECT
                DateKey,
                DAY(DateKey) AS Day, 
                MONTH(DateKey) AS Month,
                DATE_FORMAT(DateKey,"MMMM") AS Month_Name,
                YEAR(DateKey) as Year,
                QUARTER(DateKey) as Quarter
                FROM(
                SELECT 
                EXPLODE(SEQUENCE(TO_DATE('2017-01-09'), TO_DATE('2019-03-18'), INTERVAL 1 DAY)) as DateKey)''')


df_time_ST.createOrReplaceTempView('vw_time_ST')


In [36]:
# Desnormalizando os dados (apenas com as colunas de interesse):

# Junção das tabelas df_cliente, df_divisao e df_regiao
df_clientes_ST = spark.sql('''
            SELECT 
            vw_clientes.CustomerKey,
            vw_clientes.Customer,
            vw_clientes.`Business Family` as Business_Family,
            vw_clientes.`Customer Type` as Customer_Type,
            vw_clientes.`Address Number` as Address_Number,
            vw_divisao.`Division Name` as Division_Name,
            vw_regiao.`Region Name` as Region_Name          
            FROM vw_clientes
            INNER JOIN vw_divisao
            ON vw_clientes.`Division` = vw_divisao.`Division`
            INNER JOIN vw_regiao
            ON vw_clientes.`Region Code` = vw_regiao.`Region Code`
''')
df_clientes_ST.createOrReplaceTempView('vw_clientes_ST')
# df_clientes_ST.filter(col('CustomerKey') == '10013312').show()

In [37]:
spark.sql('''
            SELECT *  
            FROM vw_clientes_ST
            where CustomerKey = '10021911' 
        ''').show()

+-----------+---------+---------------+-------------+--------------+-------------+-----------+
|CustomerKey| Customer|Business_Family|Customer_Type|Address_Number|Division_Name|Region_Name|
+-----------+---------+---------------+-------------+--------------+-------------+-----------+
|   10021911|PING Shop|             R3|           G1|      10021911|     Domestic|   Southern|
+-----------+---------+---------------+-------------+--------------+-------------+-----------+



In [38]:
# Analise do resultado do Join
# df_clientes_ST.select(col('Customer_Type')).distinct().show()
#df_clientes_ST.show(5)
verificar_val_null(df_clientes_ST)

+-----------+--------+---------------+-------------+--------------+-------------+-----------+
|CustomerKey|Customer|Business_Family|Customer_Type|Address_Number|Division_Name|Region_Name|
+-----------+--------+---------------+-------------+--------------+-------------+-----------+
|          0|       0|              0|            0|             0|            0|          0|
+-----------+--------+---------------+-------------+--------------+-------------+-----------+



In [39]:
# Desnormalizando os dados 
# Junção das tabelas df_clientes_ST e df_endereco
df_clientes_endereco_ST = spark.sql('''
            SELECT 
            vw_clientes_ST.CustomerKey,
            vw_clientes_ST.Customer,
            vw_clientes_ST.Business_Family,
            vw_clientes_ST.Customer_Type,
            vw_clientes_ST.Address_Number,
            vw_clientes_ST.Division_Name,
            vw_clientes_ST.Region_Name, 
            
            
            vw_endereco.City,
            vw_endereco.State,
            vw_endereco.Country
            
            FROM vw_clientes_ST
            LEFT JOIN vw_endereco
            ON vw_clientes_ST.Address_Number = vw_endereco.`Address Number`
''')

In [40]:
verificar_val_null(df_clientes_endereco_ST)

+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+
|CustomerKey|Customer|Business_Family|Customer_Type|Address_Number|Division_Name|Region_Name|City|State|Country|
+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+
|          0|       0|              0|            0|             0|            0|          0| 229|  229|    229|
+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+



In [41]:
df_clientes_endereco_ST = corrigir_dados_null(df_clientes_endereco_ST)

df_clientes_endereco_ST.createOrReplaceTempView('vw_clientes_endereco_ST')

In [42]:
verificar_val_null(df_clientes_endereco_ST)

+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+
|CustomerKey|Customer|Business_Family|Customer_Type|Address_Number|Division_Name|Region_Name|City|State|Country|
+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+
|          0|       0|              0|            0|             0|            0|          0|   0|    0|      0|
+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+



In [43]:
df_tabela_desnormalizada_ST = spark.sql('''
            SELECT 
            
            vw_vendas.DateKey,
            vw_vendas.`Discount Amount` as Discount_Amount,
            vw_vendas.`Sales Cost Amount` as Sales_Cost_Amount,
            vw_vendas.`Sales Price` as Sales_Price, 
            vw_vendas.`Sales Quantity` as Sales_Quantity,
            
            
            vw_clientes_endereco_ST.CustomerKey,
            vw_clientes_endereco_ST.Customer,
            vw_clientes_endereco_ST.Business_Family,
            vw_clientes_endereco_ST.Customer_Type,
            
            
            vw_clientes_endereco_ST.Address_Number,
            vw_clientes_endereco_ST.Division_Name,
            vw_clientes_endereco_ST.Region_Name,
            vw_clientes_endereco_ST.City,
            vw_clientes_endereco_ST.State,
            vw_clientes_endereco_ST.Country,
            
            vw_time_ST.Day, 
            vw_time_ST.Month,
            vw_time_ST.Month_Name,
            vw_time_ST.Year,
            vw_time_ST.Quarter
            
            
            FROM vw_vendas
            LEFT JOIN vw_clientes_endereco_ST
            ON vw_vendas.CustomerKey = vw_clientes_endereco_ST.CustomerKey
            LEFT JOIN vw_time_ST
            ON vw_vendas.DateKey = vw_time_ST.DateKey
''')

In [44]:
# Criando as HASH para cada dimensão:
df_tabela_desnormalizada_ST = df_tabela_desnormalizada_ST.withColumn('sk_Customer',
                                                                     sha2(concat_ws('',df_tabela_desnormalizada_ST.CustomerKey, 
                                                                                    df_tabela_desnormalizada_ST.Customer,
                                                                                    df_tabela_desnormalizada_ST.Business_Family,
                                                                                    df_tabela_desnormalizada_ST.Customer_Type),
                                                                          256))

df_tabela_desnormalizada_ST = df_tabela_desnormalizada_ST.withColumn('sk_Locality',
                                                                     sha2(concat_ws('',df_tabela_desnormalizada_ST.Address_Number, 
                                                                                    df_tabela_desnormalizada_ST.Division_Name,
                                                                                    df_tabela_desnormalizada_ST.City,
                                                                                    df_tabela_desnormalizada_ST.State,
                                                                                    df_tabela_desnormalizada_ST.Country),
                                                                          256))

df_tabela_desnormalizada_ST = df_tabela_desnormalizada_ST.withColumn('sk_Time',
                                                                     sha2(concat_ws('',df_tabela_desnormalizada_ST.DateKey,
                                                                                    df_tabela_desnormalizada_ST.Day, 
                                                                                    df_tabela_desnormalizada_ST.Month,
                                                                                    df_tabela_desnormalizada_ST.Month_Name,
                                                                                    df_tabela_desnormalizada_ST.Year,
                                                                                    df_tabela_desnormalizada_ST.Quarter),
                                                                          256))


In [45]:
df_tabela_desnormalizada_ST.createOrReplaceTempView('vw_tabela_desnormalizada_ST')

In [46]:
# df_tabela_desnormalizada_ST.show(50)
verificar_val_null(df_tabela_desnormalizada_ST)

+-------+---------------+-----------------+-----------+--------------+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+---+-----+----------+----+-------+-----------+-----------+-------+
|DateKey|Discount_Amount|Sales_Cost_Amount|Sales_Price|Sales_Quantity|CustomerKey|Customer|Business_Family|Customer_Type|Address_Number|Division_Name|Region_Name|City|State|Country|Day|Month|Month_Name|Year|Quarter|sk_Customer|sk_Locality|sk_Time|
+-------+---------------+-----------------+-----------+--------------+-----------+--------+---------------+-------------+--------------+-------------+-----------+----+-----+-------+---+-----+----------+----+-------+-----------+-----------+-------+
|      0|              0|                0|          0|             0|          0|       0|              0|            0|             0|            0|          0|   0|    0|      0|  0|    0|         0|   0|      0|          0|          0|      0|
+-------

In [47]:
# Verificando se o tamanho da tabela desnormalizada bate com o tamanho da tbaela vendas
df_tabela_desnormalizada_ST.count() == df_vendas.count()

True

In [48]:
df_tabela_desnormalizada_ST.select('CustomerKey').distinct().count() == df_tabela_desnormalizada_ST.select('sk_Customer').distinct().count()

True

In [49]:
df_tabela_desnormalizada_ST.select('sk_Customer', 'sk_Locality').distinct().show()

+--------------------+--------------------+
|         sk_Customer|         sk_Locality|
+--------------------+--------------------+
|3e298f94d6109a0f5...|d188501b3332ec703...|
|06838117b937aa0a5...|a4516f1bae1fe8778...|
|3f65ca39da17bc6d5...|e55a9e5ba54fef512...|
|09f16ff31a0ddfd59...|7cdd8a675f320b72e...|
|3dbbc94b53c79bad3...|8e48ecee8b41696a0...|
|0ab5562a7a91a6101...|1066c93128c2a79b2...|
|f5e4d100cd36bfd2b...|c62f63735fdd734b8...|
|96f38e9ea126293a4...|4ac7c150622e18ca8...|
|63dcf56c416790b8a...|c2ac1490e5f9d551b...|
|08ca84041156cefd0...|d144517c771d1f11f...|
|bf82488f3e2393fbd...|6ed1be2a18b2234dd...|
|7478cc2f69a317d96...|4c1108e0d756da268...|
|2bf338ef508cf678e...|708d4032cda0e9323...|
|1fa4d6795bf59266c...|acb2a70736f220417...|
|8d094ef91c817aa37...|d7e35e4ba371ec9ec...|
|209c3a0734cba9742...|463329411eb8506c6...|
|e980f59bb3634fa88...|33eb172e548a424c8...|
|ad6de235bc508e236...|5c211883c8e45b9cb...|
|97d4eed9f41df5e95...|bed88fd06080bb8db...|
|6638877cadd81c759...|bdb64c83ba

In [56]:
# Criando as tabelas de dimensões:

DIM_Customer = spark.sql('''
            SELECT 
            DISTINCT sk_Customer,
            CustomerKey,
            Customer,
            Business_Family as `Business Family`,
            Customer_Type as `Customer Type`
            FROM vw_tabela_desnormalizada_ST
''')

DIM_Customer.show()

+--------------------+-----------+-------------------+---------------+-------------+
|         sk_Customer|CustomerKey|           Customer|Business Family|Customer Type|
+--------------------+-----------+-------------------+---------------+-------------+
|4e7366b435f2c93e7...|   10006862|     BL Supermarket|             R3|           G2|
|8aafc4ff103f73cb3...|   10010906|        ediSys Shop|             R3|           G2|
|6638877cadd81c759...|   10019682|       Lamson Store|             R3|           G2|
|078de160cc8f5fa19...|   10000462|           a2i Shop|             R3|           G2|
|944a0a120cb8f960a...|   10013341|Gatierf Supermarket|             R2|           G2|
|da0d246737e33c22e...|   10024010|   Screaming Market|             R2|           G2|
|cd13b37c077c1ccd8...|   10021204|      Pacific Store|             R1|           G3|
|cdac8768b50915fc7...|   10001818|    Acc Supermarket|             R3|           G2|
|9153843541d62bbad...|   10003857|     Beckman Market|           

In [51]:
DIM_Customer.count()

615

In [57]:
DIM_Time = spark.sql('''
            SELECT 
            DISTINCT sk_Time,
            DateKey,
            Day, 
            Month,
            Month_Name as `Month Name`,
            Year,
            Quarter
            FROM vw_tabela_desnormalizada_ST
''')

DIM_Time.show()

+--------------------+----------+---+-----+----------+----+-------+
|             sk_Time|   DateKey|Day|Month|Month Name|Year|Quarter|
+--------------------+----------+---+-----+----------+----+-------+
|a77297494f6f82177...|2017-04-17| 17|    4|     April|2017|      2|
|fe429bd12137a1d73...|2017-08-04|  4|    8|    August|2017|      3|
|9a8c38a902a9d3064...|2017-01-29| 29|    1|   January|2017|      1|
|ea1c90a850698457f...|2018-04-15| 15|    4|     April|2018|      2|
|6d8f50d1bf1efb54e...|2018-04-19| 19|    4|     April|2018|      2|
|bc611cbc38f0c8006...|2018-03-05|  5|    3|     March|2018|      1|
|5f6a4144632929d63...|2018-08-25| 25|    8|    August|2018|      3|
|12517115522c67574...|2018-08-31| 31|    8|    August|2018|      3|
|3155d39794ac2712f...|2018-11-11| 11|   11|  November|2018|      4|
|4144b2794ba6022d7...|2018-04-09|  9|    4|     April|2018|      2|
|26579ac5f07760a20...|2017-08-28| 28|    8|    August|2017|      3|
|646184059f72380e1...|2017-09-18| 18|    9| Sept

In [53]:
# 798
DIM_Time.count()

558

In [58]:
DIM_Locality = spark.sql('''
            SELECT 
            DISTINCT sk_Locality,
            Address_Number as `Address Number`,
            Division_Name as `Division Name`,
            Region_Name as `Region Name`,
            City,
            State,
            Country
            FROM vw_tabela_desnormalizada_ST
''')

DIM_Locality.show()

+--------------------+--------------+-------------+-------------+----------------+-------------+-------------+
|         sk_Locality|Address Number|Division Name|  Region Name|            City|        State|      Country|
+--------------------+--------------+-------------+-------------+----------------+-------------+-------------+
|bacccbe2ff48d486c...|      10022249|     Domestic|     Southern|          Shelby|           NC|           US|
|802d3fb155f4d17fb...|      10025267|International|International|   NÃO INFORMADO|NÃO INFORMADO|NÃO INFORMADO|
|e80b13f4f9859a065...|      10021936|     Domestic|     Southern|      Louisville|           KY|           US|
|219d7277344b4e0da...|      10016780|International|International|   NÃO INFORMADO|NÃO INFORMADO|NÃO INFORMADO|
|86896dceeb0ac4bb8...|      10016549|     Domestic|     Southern|    Jacksonville|           FL|           US|
|0e57c5957d600560a...|      10025291|     Domestic|      Central|           Dover|           OH|           US|
|

In [59]:
fact_orders = spark.sql('''
            SELECT 
            sk_Customer,
            sk_Time,
            sk_Locality,
            Discount_Amount as `Discount Amount`,
            Sales_Cost_Amount as `Sales Cost Amount`,
            Sales_Price as `Sales Price`, 
            Sales_Quantity as `Sales Quantity`
            FROM vw_tabela_desnormalizada_ST
''')

fact_orders.show()

+--------------------+--------------------+--------------------+---------------+-----------------+-----------+--------------+
|         sk_Customer|             sk_Time|         sk_Locality|Discount Amount|Sales Cost Amount|Sales Price|Sales Quantity|
+--------------------+--------------------+--------------------+---------------+-----------------+-----------+--------------+
|d912851bdcbd8921b...|87702de4e2a9f6a88...|8b8d5fbcde8c473ef...|         605.28|            345.2|     669.82|             1|
|d912851bdcbd8921b...|52493187d874b87a4...|8b8d5fbcde8c473ef...|         245.27|           122.21|     303.39|             1|
|d912851bdcbd8921b...|efaf1a66ba883e6ea...|8b8d5fbcde8c473ef...|         605.28|            345.2|     669.82|             1|
|555a3de7185c9783b...|ccd815428418894c9...|ee23db1ca8e5c9048...|         919.75|           605.31|     624.74|             2|
|555a3de7185c9783b...|ccd815428418894c9...|ee23db1ca8e5c9048...|         856.48|           587.13|      58.18|        