In [1]:
import argparse
from typing import Dict, Tuple, Any
import json
from pyspark.sql import SparkSession, DataFrame, HiveContext
from pyspark.sql.types import StringType, DataType
from pyspark.sql.functions import *
import importlib
import pandas

In [2]:
def tratar_campos_vazios(texto):
    return texto.isspace() or texto == ""

tratar_campos_vazios_udf = udf(tratar_campos_vazios, StringType())

In [3]:
df_base_clientes = spark.sql("select * from desafio_curso.tbl_clientes")

In [4]:
df_base_clientes = df_base_clientes.withColumn("lineofbusiness", when(tratar_campos_vazios_udf('lineofbusiness') == True, "Não informado").otherwise(df_base_clientes.lineofbusiness))

In [5]:
df_base_regiao = spark.sql("select * from desafio_curso.tbl_regiao")

In [6]:
df_base_divisao = spark.sql("select * from desafio_curso.tbl_divisao")

In [7]:
df_base_clientes = df_base_clientes.join(df_base_regiao,df_base_clientes.regioncode == df_base_regiao.regioncode, 'inner').drop(df_base_regiao.regioncode) \
                                    .join(df_base_divisao,df_base_clientes.division == df_base_divisao.division, 'inner').drop(df_base_divisao.division) \
                                    #.join(df_base_endereco,df_base_clientes.addressnumber == df_base_endereco.addressnumber, 'left').drop(df_base_endereco.addressnumber)

In [8]:
df_base_endereco = spark.sql("select * from desafio_curso.tbl_endereco")

In [9]:
df_base_endereco = df_base_endereco.withColumn("city", when(tratar_campos_vazios_udf('city') == True, "Não informado").otherwise(df_base_endereco.city))

In [10]:
df_base_endereco = df_base_endereco.withColumn("customeraddress2", when(tratar_campos_vazios_udf('customeraddress2') == True, "Não informado").otherwise(df_base_endereco.customeraddress2))

In [11]:
df_base_endereco = df_base_endereco.withColumn("customeraddress3", when(tratar_campos_vazios_udf('customeraddress3') == True, "Não informado").otherwise(df_base_endereco.customeraddress3))

In [12]:
df_base_endereco = df_base_endereco.withColumn("customeraddress4", when(tratar_campos_vazios_udf('customeraddress4') == True, "Não informado").otherwise(df_base_endereco.customeraddress4))

In [13]:
df_base_endereco = df_base_endereco.withColumn("state", when(tratar_campos_vazios_udf('state') == True, "Não informado").otherwise(df_base_endereco.state))

In [14]:
df_base_endereco = df_base_endereco.withColumn("zipcode", when(tratar_campos_vazios_udf('zipcode') == True, "Não informado").otherwise(df_base_endereco.zipcode))

In [15]:
df_base_vendas = spark.sql("select * from desafio_curso.tbl_vendas")

In [16]:
df_base_vendas = df_base_vendas.dropna()

In [17]:
df_base_vendas = df_base_vendas.withColumn("discountamount", when(tratar_campos_vazios_udf('discountamount') == True, "0").otherwise(df_base_vendas.discountamount))

In [18]:
df_base_vendas = df_base_vendas.withColumn("itemclass", when(tratar_campos_vazios_udf('itemclass') == True, "Não informado").otherwise(df_base_vendas.itemclass))

In [19]:
df_base_vendas = df_base_vendas.withColumn("itemnumber", when(tratar_campos_vazios_udf('itemnumber') == True, "Não informado").otherwise(df_base_vendas.itemnumber))

In [20]:
df_base_vendas = df_base_vendas.withColumn("salesprice", when(tratar_campos_vazios_udf('salesprice') == True, "0").otherwise(df_base_vendas.salesprice))

In [21]:
df_base_vendas = df_base_vendas.withColumn("invoicedate", to_date("invoicedate", "dd/MM/yyyy"))

In [22]:
df_base_vendas = (df_base_vendas
                  .withColumn('ano', year(df_base_vendas.invoicedate))
                  .withColumn('mes', month(df_base_vendas.invoicedate))
                  .withColumn('dia', dayofmonth(df_base_vendas.invoicedate))
                  .withColumn('trimestre', quarter(df_base_vendas.invoicedate))
                 )

In [23]:
df_base_endereco = df_base_endereco.withColumn("DW_LOCALIDADE", sha2(concat_ws("", df_base_endereco.city, df_base_endereco.state, df_base_endereco.country), 256))

In [24]:
dim_localidade = df_base_endereco.dropDuplicates(["DW_LOCALIDADE","city","state","country"])

In [25]:
dim_localidade = dim_localidade.select(["DW_LOCALIDADE","city","state","country"])

In [26]:
df_base_vendas = df_base_vendas.withColumn("DW_TEMPO", sha2(concat_ws("", df_base_vendas.invoicedate, df_base_vendas.dia, df_base_vendas.mes, df_base_vendas.ano, df_base_vendas.trimestre), 256))

In [27]:
dim_tempo = df_base_vendas.dropDuplicates(["DW_TEMPO", "invoicedate", "dia", "mes", "ano", "trimestre"])

In [28]:
dim_tempo = dim_tempo.select("DW_TEMPO", "invoicedate", "dia", "mes", "ano", "trimestre")

In [29]:
df_base_clientes = df_base_clientes.withColumn("DW_CLIENTES", sha2(concat_ws("",df_base_clientes.customer, df_base_clientes.customerkey), 256))

In [30]:
dim_clientes = df_base_clientes.dropDuplicates(["DW_CLIENTES", "customer", "customerkey"])

In [31]:
dim_clientes = dim_clientes.select("DW_CLIENTES", "customer", "customerkey")