#Projeto Engenharia de Dados

Para este projeto foi utilizado o Cluster do Databricks Runtime Version 13.3 com Spark 3.4.1 Scala 2.12
Base de dados ["Brazilian E-Commerce Public Dataset by Olist" do Kaggle](https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce)

###8.1. Ingestão e Carregamento de Dados: 

8.1.1. Carregue o conjunto de dados no Databricks. 

8.1.2. Explore o esquema dos dados e faça ajustes conforme necessário. 

###8.2. Transformações de Dados: 

8.2.1. Realize transformações necessárias, como tratamento de valores nulos, conversões de tipos, etc. 

8.2.2. Adicione uma coluna calculada, por exemplo, o valor total de cada transação. 

8.2.3. Agregue os dados para obter estatísticas de vendas, por exemplo, o total de vendas por produto ou por categoria. 

8.2.4. Introduza uma regra mais complexa, como identificar padrões de comportamento de compra ao longo do tempo ou criar categorias personalizadas de produtos com base em determinados critérios. 


###8.3. Saída em Parquet e Delta: 

8.3.1. Grave os dados transformados e agregados em um formato Parquet para persistência eficiente. 

8.3.2. Grave os mesmos dados em formato Delta Lake para aproveitar as funcionalidades de versionamento e transações ACID. 


###8.4. Exploração Adicional (Opcional): 

8.4.1. Execute consultas exploratórias para entender melhor os dados e validar as transformações. 

8.4.2. Crie visualizações ou relatórios para comunicar insights. 

8.4.3. Agende o notebook para execução automática em intervalos regulares para garantir a atualização contínua dos dados.


#Library
Importação das bibliotecas utilizadas nos notebooks

In [0]:
import requests
from zipfile import ZipFile
from io import BytesIO
import os
import tempfile
from pyspark.sql.types import TimestampType, IntegerType
from pyspark.sql.functions import *
from pyspark.sql.window import Window

#Criação de base de dados
Criação de bases de dados raw, tru e ref para dividir as transformações solicitadas no teste em camadas do modelo medalhão com camadas bronze, prata e ouro

1. Bronze - Banco de dados raw
2. Prata - Banco de dados tru (Trusted)
3. Ouro - Banco de dados ref (refined)

Os arquivos iniciais (csv) foram utilizados em camada temporária chamada aqui de Transient


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS raw;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS tru;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS ref;

#Parâmetros
Parâmetros a serem utilizados nos notebooks

Nestes parâmetros, adicionei o download de todos os arquivos csv da base "Brazilian E-Commerce Public Dataset by Olist", encontrado no Kaggle. Todos os arquivos foram hospedados em meu github para melhor uso neste teste.

In [0]:
# Lista de URLs dos arquivos ZIP
urls = [
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_customers_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_geolocation_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_order_items_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_order_payments_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_order_reviews_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_orders_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_products_dataset.csv.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_sellers_dataset.zip",
    "https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/product_category_name_translation.zip"
]

# Pastas para cada camada
folderTransient = 'dbfs:/datalake/transient/'
folderRaw = "dbfs:/datalake/raw/"
folderTru = "dbfs:/datalake/tru/"
folderRef = "dbfs:/datalake/ref/"  

###Criação das pastas para cada camada, onde serão gravados os arquivos csv, parquet e delta parquet

In [0]:
# Verificar se a pasta de destino existe, caso contrário, criá-la
try:
    dbutils.fs.ls(folderTransient)
except:
    dbutils.fs.mkdirs(folderTransient)
    print(f"Pasta de destino '{folderTransient}' criada.")

# Verificar se a pasta de destino existe, caso contrário, criá-la
try:
    dbutils.fs.ls(folderRaw)
except:
    dbutils.fs.mkdirs(folderRaw)
    print(f"Pasta de destino '{folderRaw}' criada.")   

# Verificar se a pasta de destino existe, caso contrário, criá-la
try:
    dbutils.fs.ls(folderTru)
except:
    dbutils.fs.mkdirs(folderTru)
    print(f"Pasta de destino '{folderTru}' criada.")   

# Verificar se a pasta de destino existe, caso contrário, criá-la
try:
    dbutils.fs.ls(folderRef)
except:
    dbutils.fs.mkdirs(folderRef)
    print(f"Pasta de destino '{folderRef}' criada.")       

#Transient
Nesta etapa é realizado o download dos arquivos zip e extraído o conteúdo csv para a pasta de destino da transient. 
Em sequência é realizado a transformação do conteúdo para arquivos parquet e excluído os arquivos csv.

In [0]:
# Função para baixar e extrair um arquivo ZIP
def download_and_extract(url, dest_folder):
    extracted_files = []
    response = requests.get(url)
    if response.status_code == 200:
        with tempfile.TemporaryDirectory() as tmpdir:
            with ZipFile(BytesIO(response.content)) as thezip:
                thezip.extractall(tmpdir)
            for filename in os.listdir(tmpdir):
                local_file = os.path.join(tmpdir, filename)
                dbfs_file = os.path.join(dest_folder, filename)
                dbutils.fs.cp(f'file:{local_file}', dbfs_file)
                extracted_files.append(dbfs_file)
        print(f"Arquivos de '{url}' extraídos em: {dest_folder}")
    else:
        print(f"Falha no download do arquivo {url}: Status code", response.status_code)
    return extracted_files

In [0]:
# Converter CSV para Parquet
def convert_csv_to_parquet(csv_paths, dest_folder_parquet):
    for csv_path in csv_paths:
        # Lendo o arquivo CSV
        df = spark.read.option("header", "true").csv(csv_path, inferSchema=True)

        # Definindo o caminho de destino Parquet
        parquet_path = csv_path.replace('.csv', '')
        
        # Salvando o DataFrame como Parquet
        df.write.mode("overwrite").parquet(parquet_path)

        print(f"Arquivo Parquet salvo em: {parquet_path}")

        # Deletar o arquivo CSV original
        print(csv_path.replace("dbfs:",""))
        dbutils.fs.rm(csv_path.replace("dbfs:",""), recurse=True)

###Atividade 8.1.1 Carregue o conjunto de dados no Databricks
###Atividade 8.3.1 Grave os dados transformados e agregados em um formato Parquet para persistência eficiente
Chamada das funções para download dos arquivos csv e transformação em arquivos parquet

In [0]:
# Processar cada URL
for url in urls:
    csv_files = download_and_extract(url, folderTransient)
    convert_csv_to_parquet(csv_files, folderTransient)  

Arquivos de 'https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_customers_dataset.csv.zip' extraídos em: dbfs:/datalake/transient/
Arquivo Parquet salvo em: dbfs:/datalake/transient/olist_customers_dataset
/datalake/transient/olist_customers_dataset.csv
Arquivos de 'https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_geolocation_dataset.csv.zip' extraídos em: dbfs:/datalake/transient/
Arquivo Parquet salvo em: dbfs:/datalake/transient/olist_geolocation_dataset
/datalake/transient/olist_geolocation_dataset.csv
Arquivos de 'https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_order_items_dataset.csv.zip' extraídos em: dbfs:/datalake/transient/
Arquivo Parquet salvo em: dbfs:/datalake/transient/olist_order_items_dataset
/datalake/transient/olist_order_items_dataset.csv
Arquivos de 'https://github.com/alcedirjunior/lanlink_dataengineer/raw/main/Dados/olist_order_payments_dataset.csv.zip' extraídos em: dbfs:/datalake/trans

#Raw
Nesta etapa é realizado a leitura dos arquivos parquet da transient e gravação de arquivos delta parquet, gerando tabelas na base de dados raw

In [0]:
# Função para converter DataFrame em tabela Delta
def create_delta_table(df, table_name_camada):
    if table_name_camada.startswith("tbl_raw"):
        delta_path = folderRaw + table_name_camada
        table_full_name = f"raw.{table_name_camada}"
        camada = "raw"    
    if table_name_camada.startswith("tbl_tru"):
        delta_path = folderTru + table_name_camada
        table_full_name = f"tru.{table_name_camada}"
        camada = "tru"

    if table_name_camada.startswith("tbl_ref"):
        delta_path = folderRef + table_name_camada
        table_full_name = f"ref.{table_name_camada}"
        camada = "ref"        
        
    # Salvar o DataFrame no formato Delta Lake
    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(delta_path)

    # Verificar se a tabela já existe
    table_exists = spark.sql(f"SHOW TABLES IN {camada} LIKE '{table_name_camada}'").count() 

    # Criar tabela na raw caso ela não exista
    if table_exists == 0:
        print(f"Criando a tabela '{table_name_camada}'...")
        spark.sql(f"CREATE TABLE {table_full_name} USING DELTA LOCATION '{delta_path}'")

    print(f"Operação concluída para a tabela Delta '{table_name_camada}'.")

###Atividade 8.3.2 Grave os mesmos dados em formato Delta Lake para aproveitar as funcionalidades de versionamento e transações ACID
Chamada da função para geração dos delta parquet e tabelas na raw

In [0]:
#Listagem dos arquivos da transient
subfoldersTransient = dbutils.fs.ls('dbfs:/datalake/transient/')

# Processar cada subpasta
for folder in subfoldersTransient:
    if folder.isDir():
        # Nome da tabela baseado no nome da subpasta
        table_name = "tbl_raw_" + os.path.basename(folder.path.rstrip('/'))
        print(table_name)
        
        # Caminho para os arquivos Parquet
        parquet_path = folder.path

        # Ler arquivos Parquet da subpasta
        df = spark.read.parquet(parquet_path)

        # Criar tabela Delta
        create_delta_table(df, table_name)


tbl_raw_olist_customers_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_customers_dataset'.
tbl_raw_olist_geolocation_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_geolocation_dataset'.
tbl_raw_olist_order_items_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_order_items_dataset'.
tbl_raw_olist_order_payments_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_order_payments_dataset'.
tbl_raw_olist_order_reviews_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_order_reviews_dataset'.
tbl_raw_olist_orders_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_orders_dataset'.
tbl_raw_olist_products_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_products_dataset'.
tbl_raw_olist_sellers_dataset
Operação concluída para a tabela Delta 'tbl_raw_olist_sellers_dataset'.
tbl_raw_product_category_name_translation
Operação concluída para a tabela Delta 'tbl_raw_product_category_name_translation'.


###Atividade 8.1.2 Explore o esquema dos dados e faça ajustes conforme necessário
Visualização dos schemas dos arquivos na raw para ajustes na próxima camada

In [0]:
def print_delta_schemas(folderRaw):
    # Listar todos os arquivos e diretórios no caminho fornecido
    items = dbutils.fs.ls(folderRaw)
    
    for item in items:
        # Verificar se é um diretório
        if item.isDir():
            # Caminho para os arquivos Delta dentro do diretório
            delta_path = item.path
            
            try:
                # Tentar ler o diretório como um DataFrame Delta
                df = spark.read.format("delta").load(delta_path)
                
                # Imprimir o nome do diretório e o esquema do DataFrame
                print(f"Esquema para {os.path.basename(delta_path.rstrip('/'))}:")
                df.printSchema()
            except Exception as e:
                print(f"Não foi possível ler o diretório {delta_path} como Delta: {e}")

# Chamar a função com o caminho desejado
print_delta_schemas(folderRaw)

Esquema para tbl_raw_olist_customers_dataset:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

Esquema para tbl_raw_olist_geolocation_dataset:
root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)

Esquema para tbl_raw_olist_order_items_dataset:
root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

Esquema para tbl_raw_olist_order_payments_d

#Trusted
Nesta etapa é realizado os tratamentos de dados, tipagem, nome de colunas, colunas cálculadas, etc, gravando os arquivos em delta parquet e na base de dados tru

###Atividade 8.1.2 Explore o esquema dos dados e faça ajustes conforme necessário
Tratado o nome de todas as colunas nas tabelas

###Atividade 8.2.1 Realize transformações necessárias, como tratamento de valores nulos, conversões de tipos, etc
Tratado valores nulos e conversões de tipos na tabela tru.tbl_tru_comentarios_pedido_venda, linha 37 do próximo notebook

Tratado categoria com valor nulo na tabela tru.tbl_tru_produto na linha 65

###Atividade 8.2.2 Adicione uma coluna calculada, por exemplo, o valor total de cada transação
Adicionado coluna calculada na tabela tru.tbl_tru_pedido_venda_item, linha 28

In [0]:
def transformation_tables(table_name):
    raw_folder = folderRaw + table_name

    # Ler os arquivos do Delta Parquet para um DataFrame do Spark
    df = spark.read.format("delta").load(raw_folder)

    if table_name == 'tbl_raw_olist_customers_dataset':
        table_name_tru = 'tbl_tru_clientes'
        # Novos nomes para as colunas
        new_column_names = ["id","id_unico","cep_prefixo","cidade","estado"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
    
    if table_name == 'tbl_raw_olist_geolocation_dataset':
        table_name_tru = 'tbl_tru_clientes_geolocalizacao'
        # Novos nomes para as colunas
        new_column_names = ["cep_prefixo","latitude","longitude","cidade","estado"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
    
    if table_name == 'tbl_raw_olist_order_items_dataset':
        table_name_tru = 'tbl_tru_pedido_venda_item'
        # Novos nomes para as colunas
        new_column_names = ["id_pedido_venda","id_item_pedido_venda","id_produto","id_vendedor","data_limite_envio","valor_pedido_venda","valor_frete"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
        # Adicionar uma nova coluna com a soma de "valor_pedido_venda" e "valor_frete"
        df = df.withColumn("total_pedido", col("valor_pedido_venda") + col("valor_frete")) 
    
    if table_name == 'tbl_raw_olist_order_payments_dataset':
        table_name_tru = 'tbl_tru_ordem_pagamento'
        # Novos nomes para as colunas
        new_column_names = ["id_ordem_pagamento","sequencia_pagamento","tipo","qtd_parcelas","valor"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
    
    if table_name == 'tbl_raw_olist_order_reviews_dataset':
        table_name_tru = 'tbl_tru_comentarios_pedido_venda'
        # Novos nomes para as colunas
        new_column_names = ["id_comentario","id_pedido_venda","score","titulo_comentario","mensagem_comentario","data_criacao","data_resposta"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
        # Substituir 'NULL' por valores nulos em colunas específicas e corrigir algumas tipagens
        df = df.withColumn("id_pedido_venda", when(col("id_pedido_venda") == "NULL", None).otherwise(col("id_pedido_venda"))) \
                .withColumn("score", when(col("score") == "NULL", None).otherwise(col("score")).cast(IntegerType())) \
                .withColumn("titulo_comentario", when(col("titulo_comentario") == "NULL", None).otherwise(col("titulo_comentario"))) \
                .withColumn("mensagem_comentario", when(col("mensagem_comentario") == "NULL", None).otherwise(col("mensagem_comentario")))\
                .withColumn("data_criacao", when(col("data_criacao") == "NULL", None).otherwise(col("data_criacao")).cast(TimestampType()))\
                .withColumn("data_resposta", when(col("data_resposta") == "NULL", None).otherwise(col("data_resposta")).cast(TimestampType()))  
    
    if table_name == 'tbl_raw_olist_orders_dataset':
        table_name_tru = 'tbl_tru_pedido_venda'
        # Novos nomes para as colunas
        new_column_names = ["id_pedido_venda","id_cliente","status_pedido_venda","data_pedido_venda","data_aprovacao_pagamento","data_postagem","data_entrega","data_estimada_entrega"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
    
    if table_name == 'tbl_raw_olist_products_dataset':
        table_name_tru = 'tbl_tru_produto'
        # Novos nomes para as colunas
        new_column_names = ["id_produto","nome_categoria","nome_produto_qtd_caracteres","descricao_produto_qtd_caracteres","qtd_fotos","peso","tamanho","altura","largura"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
        #Tratamento de valores nulos
        df = df.withColumn("nome_categoria", when(col("nome_categoria").isNull(), "Categoria nao cadastrada").otherwise(col("nome_categoria")))          
        
    if table_name == 'tbl_raw_olist_sellers_dataset':
        table_name_tru = 'tbl_tru_vendedores'
        # Novos nomes para as colunas
        new_column_names = ["id_vendedor","cep_prefixo","cidade","estado"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)    

    if table_name == 'tbl_raw_product_category_name_translation':
        table_name_tru = 'tbl_tru_produto_traducao_nome_categoria'
        # Novos nomes para as colunas
        new_column_names = ["nome_categoria","nome_categoria_ingles"]
        # Renomear todas as colunas
        df = df.toDF(*new_column_names)
    
    if 'table_name_tru' in locals():
        # Se a variável existe, chame a função create_delta_table
        create_delta_table(df, table_name_tru)

In [0]:
# Processar cada subpasta
subfoldersRaw = dbutils.fs.ls('dbfs:/datalake/raw/') 

for folder in subfoldersRaw:
    if folder.isDir():
        # Nome da tabela baseado no nome da subpasta
        table_name = os.path.basename(folder.path.rstrip('/'))
        print(table_name)

        transformation_tables(table_name)

tbl_raw_olist_customers_dataset
Operação concluída para a tabela Delta 'tbl_tru_clientes'.
tbl_raw_olist_geolocation_dataset
Operação concluída para a tabela Delta 'tbl_tru_clientes_geolocalizacao'.
tbl_raw_olist_order_items_dataset
Operação concluída para a tabela Delta 'tbl_tru_pedido_venda_item'.
tbl_raw_olist_order_payments_dataset
Operação concluída para a tabela Delta 'tbl_tru_ordem_pagamento'.
tbl_raw_olist_order_reviews_dataset
Operação concluída para a tabela Delta 'tbl_tru_comentarios_pedido_venda'.
tbl_raw_olist_orders_dataset
Operação concluída para a tabela Delta 'tbl_tru_pedido_venda'.
tbl_raw_olist_products_dataset
Operação concluída para a tabela Delta 'tbl_tru_produto'.
tbl_raw_olist_sellers_dataset
Operação concluída para a tabela Delta 'tbl_tru_vendedores'.
tbl_raw_product_category_name_translation
Operação concluída para a tabela Delta 'tbl_tru_produto_traducao_nome_categoria'.


#Refined - Relatórios
Nesta etapa eu li algumas tabelas da camada tru para geração de tabelas de relatórios, gravando em arquivos delta parquet e na base de dados ref.

###Atividade 8.2.3 Agregue os dados para obter estatísticas de vendas, por exemplo, o total de vendas por produto ou por categoria
Gerado a tabela ref.tbl_ref_total_vendido_categoria com total vendido em $ por categoria de produto.

É possível verificar os valores para a categoria "Categoria nao cadastrada", tratada na camada tru para valores nulos no campo nome_categoria

In [0]:
#Criando relatório de total de vendas por categoria
df = spark.sql("select pro.nome_categoria categoria, round(sum(pedi.total_pedido), 2) valor_total_vendido from tru.tbl_tru_pedido_venda_item pedi left join tru.tbl_tru_produto pro on pedi.id_produto = pro.id_produto group by 1 order by 2 desc")
table_name_ref = "tbl_ref_total_vendido_categoria"
create_delta_table(df, table_name_ref)


Operação concluída para a tabela Delta 'tbl_ref_total_vendido_categoria'.


###Atividade 8.2.3 Agregue os dados para obter estatísticas de vendas, por exemplo, o total de vendas por produto ou por categoria
Gerado a tabela ref.tbl_ref_total_vendido_periodo para trazer os totais em $ por período de mês e ano. Foi utilizado a coluna calculada total_pedido gerada na camada tru, 

In [0]:
df = spark.sql("select date_format(ped.data_pedido_venda, 'yyyy/MM') periodo, round(sum(pedi.total_pedido), 2) valor_total_vendido from tru.tbl_tru_pedido_venda_item pedi left join tru.tbl_tru_pedido_venda ped on pedi.id_pedido_venda = ped.id_pedido_venda group by 1 order by 1 desc")
table_name_ref = "tbl_ref_total_vendido_periodo"
create_delta_table(df, table_name_ref)


Operação concluída para a tabela Delta 'tbl_ref_total_vendido_periodo'.


###Atividade 8.2.4 Introduza uma regra mais complexa, como identificar padrões de comportamento de compra ao longo do tempo ou criar categorias personalizadas de produtos com base em determinados critérios. 
Para esta atividade criei uma métrica onde traz a média de dias entre a geração de novos pedidos de venda por vendedor, a fim de analisar a performance de cada vendedor.

1. Primeiramente coloquei em um dataframe a junção das tabelas de pedido de vendas para buscar a data do pedido de venda e o id do vendedor
2. Criei novas colunas dia_semana e mes no dataframe
3. Utilizei as funções Window.partitionBy, lag e datediff para criar as colunas data_pedido_venda_anterior e diferenca_dias_entre_pedidos_venda, sendo a última com o valor cálculado entre os pedidos de venda
4. Gravei o dataframe em uma view temporária view_pedido_venda
5. Utilizei uma consulta sql na view para trazer a média de dias entre os pedidos para cada id_vendedor e gravei em uma tabela delta parquet chamada ref.tbl_ref_vendedor_media_dias_entre_vendas

In [0]:

df = spark.sql("""
               select ped.*, pedi.id_produto, pedi.id_vendedor
               from tru.tbl_tru_pedido_venda ped left 
               join tru.tbl_tru_pedido_venda_item pedi 
                on ped.id_pedido_venda = pedi.id_pedido_venda where ped.status_pedido_venda not in ('unavailable','canceled')
               """)
df = df.withColumn("dia_semana", dayofweek(col("data_pedido_venda")))
df = df.withColumn("mes", month(col("data_pedido_venda")))

windowSpec = Window.partitionBy("id_vendedor").orderBy("data_pedido_venda")

df = df.withColumn("data_pedido_venda_anterior", lag("data_pedido_venda", 1).over(windowSpec))
df = df.withColumn("diferenca_dias_entre_pedidos_venda", datediff("data_pedido_venda", "data_pedido_venda_anterior"))

df.createOrReplaceTempView("view_pedido_venda")

df = spark.sql("""
               select 
                    id_vendedor
                    ,round(avg(diferenca_dias_entre_pedidos_venda), 0) media_dias_entre_vendas
                from view_pedido_venda vped
                where id_vendedor is not null
                group by 1
                order by 2 desc
               """)

table_name_ref = "tbl_ref_vendedor_media_dias_entre_vendas"
create_delta_table(df, table_name_ref)

Operação concluída para a tabela Delta 'tbl_ref_vendedor_media_dias_entre_vendas'.


###Atividade 8.4.1 Execute consultas exploratórias para entender melhor os dados e validar as transformações
Realizado uma consulta sql validando a quantidade de registros de pedidos de venda entre as camadas raw e tru

In [0]:
%sql
select
(select count(*) from raw.tbl_raw_olist_orders_dataset) total_pedidos_venda_raw,
(select count(*) from tru.tbl_tru_pedido_venda) total_pedidos_venda_tru

total_pedidos_venda_raw,total_pedidos_venda_tru
99441,99441


###Atividade 8.4.1 Execute consultas exploratórias para entender melhor os dados e validar as transformações
Realizado consulta sql para validar o valor total do pedido de venda entre as camadas raw, tru e ref, visto que na camada tru foi gerado uma coluna calculada

In [0]:
%sql
select
(select round(sum(price+freight_value), 2) from raw.tbl_raw_olist_order_items_dataset) total_pedido_raw
,(select round(sum(total_pedido), 2) from tru.tbl_tru_pedido_venda_item) total_pedido_tru
,(select round(sum(valor_total_vendido), 2) from ref.tbl_ref_total_vendido_categoria) total_pedido_ref


total_pedido_raw,total_pedido_tru,total_pedido_ref
15843553.24,15843553.24,15843553.24


###8.4.2 Crie visualizações ou relatórios para comunicar insights. 
Criado gráfico para análise de faturamento por período

In [0]:
%sql
select * from ref.tbl_ref_total_vendido_periodo

periodo,valor_total_vendido
2018/09,166.46
2018/08,1003308.47
2018/07,1058728.03
2018/06,1022677.11
2018/05,1149781.82
2018/04,1159698.04
2018/03,1155126.82
2018/02,986908.96
2018/01,1107301.89
2017/12,863547.23


Databricks visualization. Run in Databricks to view.