In [0]:
from pyspark.sql import SparkSession #Importando Pyspark
from pyspark.sql.functions import * #Funções
from pyspark.sql.types import * #Definir tipos específicos

In [0]:
%sql
--Criando um schema só para a camada bronze
Create DATABASE Prata

**Orders**

Nesta estapa foram feitas as seguintes análises e mudanças:

1 - Mudança no nome dos campos  
2 - Drop em algumas colunas que não irão fazer parte da análise (claro que elas poderiam ser utilizadas para diversos cenários, mas na itenção de enchugar a tabela e ajustar para as análises que pensei, resolvi dropar)  
3 - Ajustando todos os campos de data para o mesmo fuso  
4 - Verificando se haviam siglas de estados fora do padrão  
5 - Vereificação de range min e max para verificar se de fato a menor e a maior data estavam dentro do informado no case.

In [0]:
df_orders = spark.table("workspace.bronze.orders_raw")

#Removendo algumas colunas que ao meu ver, não serão necessárias para análise que pensei
df_orders = df_orders.drop("delivery_address_external_id",
                           "delivery_address_latitude",
                           "delivery_address_longitude",
                           "delivery_address_zip_code",
                           "items",
                           "merchant_latitude",
                           "merchant_longitude",
                           "customer_name")


df_orders = df_orders.withColumnRenamed("customer_id","id_cliente").withColumnRenamed("delivery_address_city","nm_cidade_entrega").withColumnRenamed("delivery_address_country","nm_pais_entrega").withColumnRenamed("delivery_address_district","nm_bairro_entrega").withColumnRenamed("delivery_address_state","nm_estado_entrega").withColumnRenamed("merchant_id","id_restaurante").withColumnRenamed("merchant_timezone","nm_fuso_restaurante").withColumnRenamed("order_created_at","dt_hr_pedido").withColumnRenamed("order_scheduled","flg_agendamento").withColumnRenamed("order_total_amount","#_vl_total_pedido").withColumnRenamed("origin_platform","nm_plataforma").withColumnRenamed("order_scheduled_date","dt_hr_agendamento")\
    .withColumn("dt_hr_loaded",current_timestamp()) #Inserindo coluna técninca para controle de carga


In [0]:
# Tratando colunas de Data

#Percebi que os dados estão em diversos fusos, então resolvi converter todas as datas em horário de brasilia

df_orders = df_orders.withColumn(
    "dt_hr_pedido_brasilia",
    from_utc_timestamp("dt_hr_pedido", "America/Sao_Paulo")
)


df_orders = df_orders.withColumn(
    "dt_hr_agendamento_brasilia",
    from_utc_timestamp("dt_hr_agendamento", "America/Sao_Paulo")
)



In [0]:
df_orders.printSchema()

In [0]:
#df_orders.select("nm_fuso_restaurante").distinct().show()

display(df_orders)

In [0]:
#Verificando se há estados fora do padrão
df_orders.groupBy("nm_estado_entrega").count().show()

In [0]:

#Verificando se de fato o range está de acordo com o informado no case dez/18 a jan/19
df_orders.agg(min("dt_hr_pedido_brasilia")).show()

In [0]:

#Verificando se de fato o range está de acordo com o informado no case dez/18 a jan/19
df_orders.agg(max("dt_hr_pedido_brasilia")).show()

In [0]:
#persistindo os dados em tabela
df_orders.write.format("delta").mode("overwrite").saveAsTable("Prata.orders")

**Users**

foram feitas nessa etapa  
1 - Alteração dos nomes dos campos  
2 - Adição de coluna técninca para data de criação do registro  
3 - Padronizaçao do horário para UTC América/São Paulo  
4 - Verificação de havia dados duplicados ou com id duplicados  
5 - Verificação se havia outros tipos sem ser ativo ou inativo  
6 - Verificaçao se havia necessidade de mudar o tipo da coluna de linguagem, porém   não houve por não haver diversidade nesses dados.  
7 - Padronização do campo nm_usuario para sempre aparecer maíusculo.  

In [0]:
df_user = spark.table("workspace.bronze.users_raw")

df_user = df_user.withColumnRenamed("customer_id","id_cliente").withColumnRenamed("language","nm_linguagem_cel").withColumnRenamed("created_at","dt_hr_criacao").withColumnRenamed("customer_name","nm_usuario").withColumnRenamed("customer_phone_area","cod_ddd").withColumnRenamed("customer_phone_number","ds_telefone")


In [0]:
#Padronizando a data para UTC correto
df_user = df_user.withColumn(
    "dt_hr_criacao",
    from_utc_timestamp("dt_hr_criacao", "America/Sao_Paulo")
)

In [0]:
df_user.printSchema()

root
 |-- id_cliente: string (nullable = true)
 |-- nm_linguagem_cel: string (nullable = true)
 |-- dt_hr_criacao: timestamp (nullable = true)
 |-- active: string (nullable = true)
 |-- nm_usuario: string (nullable = true)
 |-- cod_ddd: string (nullable = true)
 |-- ds_telefone: string (nullable = true)



In [0]:
#Verificando se de fato o id_cliente está duplicado
df_user.groupBy("id_cliente") \
    .agg(count("*").alias("qtd")) \
    .filter("qtd > 1").show()

+----------+---+
|id_cliente|qtd|
+----------+---+
+----------+---+



In [0]:
#Checkando a quantidade de usuários ativos e inativos do df
#Checkando se há algum outro registro que não seguisse o padrão de ativo e inativo
df_user.groupBy("active").count().show()

In [0]:
#Checkando quantas quantidade de linguagens estão nos registros.
df_user.groupBy("nm_linguagem_cel").count().show()

In [0]:
#Percebi que há um padrão diferente entre letras maísculas e minusculas
df_user.select("nm_usuario").show(50)

In [0]:
df_user = df_user.withColumn("nm_usuario", upper("nm_usuario"))

In [0]:
display(df_user)

In [0]:
#Persistindo em tabela
df_user.write.format("delta").mode("overwrite").saveAsTable("Prata.users")

**Restaurante**

No dataframe de restaurante foram feitas as seguintes verificações:  
1 - Mudança dos nomes dos campos para maior entendimento  
2 - Check se o id era único   
3 - Check no padrão true ou false na coluna enable  
4 - Mudança do fuso  
5 - Verificação se as siglas dos estados estavam no mesmo padrão

In [0]:
df_restaurant = spark.table("workspace.bronze.restaurant_raw")

df_restaurant = df_restaurant.withColumnRenamed("id","id_restaurante").withColumnRenamed("created_at","dt_hr_criacao_rest").withColumnRenamed("enabled","enabled").withColumnRenamed("price_range","cod_preco_medio").withColumnRenamed("average_ticket","#_vl_ticket_medio_pedidos").withColumnRenamed("takeout_time","#_vl_tempo_medio").withColumnRenamed("delivery_time","vl_tempo_entrega").withColumnRenamed("minimum_order_value","vl_min_pedido").withColumnRenamed("merchant_zip_code","cod_cep_rest").withColumnRenamed("merchant_city","nm_cidade_rest").withColumnRenamed("merchant_state","nm_estado_rest").withColumnRenamed("merchant_country","nm_pais_rest")

In [0]:
display(df_restaurant)

In [0]:
#Verificando se de fato o id_restaurante é único
df_restaurant.groupBy("id_restaurante") \
    .agg(count("*").alias("qtd")) \
    .filter("qtd > 1").show()

In [0]:
#Checkando a quantidade de restaurantes ativos e inativos do df
#Checkando se há algum outro registro que não seguisse o padrão de ativo e inativo
df_restaurant.groupBy("enabled").count().show()

In [0]:
#Padronizando a data para UTC correto
df_restaurant = df_restaurant.withColumn(
    "dt_hr_criacao_rest",
    from_utc_timestamp("dt_hr_criacao_rest", "America/Sao_Paulo")
)

In [0]:
#Verificando se há alguma sigla de estado fora do padrão
df_restaurant.groupBy("nm_estado_rest").count().show()

In [0]:
#Persistindo em tabela
df_restaurant.write.format("delta").mode("overwrite").saveAsTable("Prata.restaurant")

**Teste A/B**

Neste data frame fiz as seguintes ações:

1 - Mudança do nome da coluna customer_id  
2 - Verificando se ela é única

In [0]:
df_teste_ab = spark.table("workspace.bronze.ab_test_ref")

df_teste_ab = df_teste_ab.withColumnRenamed("customer_id","id_cliente")

In [0]:
#Verificando se de fato o id_cliente é único
df_teste_ab.groupBy("id_cliente") \
    .agg(count("*").alias("qtd")) \
    .filter("qtd > 1").show()

In [0]:
#Persistindo em tabela
df_teste_ab.write.format("delta").mode("overwrite").saveAsTable("Prata.teste_ab")

**Tabela Henriquecida**

