##Atualizando os Pontos de Montagem

In [0]:
dbutils.fs.refreshMounts()

### Definindo uma função para montar um ADLS com um ponto de montagem com ADLS SAS 

In [0]:
storageAccountName = "datalakea554ff8ebf909b7f"
storageAccountAccessKey = ""
sasToken = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-06-21T08:52:06Z&st=2025-06-21T00:52:06Z&spr=https&sig=o8hXtkjAZe2AnVEy5dNqQl4LSl0Nkac98k6pJrV1NfI%3D"

def mount_adls(blobContainerName):
    try:
      dbutils.fs.mount(
        source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
        mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
        #extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
        extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
      )
      print("OK!")
    except Exception as e:
      print("Falha", e)

###Mostrando os pontos de montagem no cluster Databricks

In [0]:
display(dbutils.fs.mounts())

### Mostrando todos os arquivos da camada silver

In [0]:
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/silver"))

###Gerando um dataframe dos delta lake no container silver do Azure Data Lake Storage

In [0]:
df_autor   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/autor")
df_cliente   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/cliente")
df_editora   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/editora")
df_endereco   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/endereco")
df_estoque   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/estoque")
df_item_pedido   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/item_pedido")
df_livro  = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/livro")
df_pagamento  = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/pagamento")
df_pedido  = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/pedido")

### DIM TEMPO

In [None]:
%sql
CREATE TABLE dim_tempo (
    sk_tempo BIGINT GENERATED BY DEFAULT AS IDENTITY,
    id_tempo INT,
    data DATE,
    dia INT,
    mes INT,
    ano INT,
    trimestre INT,
    semestre INT,
    dia_semana VARCHAR(15),
    flag_feriado BOOLEAN,
    descricao_feriado VARCHAR(100)
)
USING delta
LOCATION '/mnt/{storageAccountName}/gold/dim_tempo';

In [None]:
from pyspark.sql.functions import col, dayofmonth, month, year, quarter, date_format, lit, when
from pyspark.sql import SparkSession
import pandas as pd

# Exemplo de feriados nacionais fixos (pode expandir com mais datas se quiser)
feriados = {
    "01-01": "Ano Novo",
    "04-21": "Tiradentes",
    "05-01": "Dia do Trabalho",
    "09-07": "Independência",
    "10-12": "Nossa Senhora Aparecida",
    "11-02": "Finados",
    "11-15": "Proclamação da República",
    "12-25": "Natal"
}

# Criar um DataFrame pandas com todas as datas do período
datas = pd.date_range(start="2022-01-01", end="2024-12-31")
df_tempo_pd = pd.DataFrame({"data": datas})

# Adicionar campos derivados
df_tempo_pd["id_tempo"] = df_tempo_pd.index + 1
df_tempo_pd["dia"] = df_tempo_pd["data"].dt.day
df_tempo_pd["mes"] = df_tempo_pd["data"].dt.month
df_tempo_pd["ano"] = df_tempo_pd["data"].dt.year
df_tempo_pd["trimestre"] = df_tempo_pd["data"].dt.quarter
df_tempo_pd["semestre"] = df_tempo_pd["mes"].apply(lambda m: 1 if m <= 6 else 2)
df_tempo_pd["dia_semana"] = df_tempo_pd["data"].dt.day_name()
df_tempo_pd["flag_feriado"] = df_tempo_pd["data"].dt.strftime("%m-%d").isin(feriados.keys())
df_tempo_pd["descricao_feriado"] = df_tempo_pd["data"].dt.strftime("%m-%d").map(feriados).fillna("")

# Converter para Spark DataFrame
df_tempo = spark.createDataFrame(df_tempo_pd)

# Criar view temporária para MERGE
df_tempo.createOrReplaceTempView("tempo_relacional")

In [None]:
%sql
MERGE INTO dim_tempo AS dt
USING tempo_relacional AS tr
ON dt.data = tr.data

WHEN MATCHED AND (
    dt.dia <> tr.dia OR
    dt.mes <> tr.mes OR
    dt.ano <> tr.ano OR
    dt.trimestre <> tr.trimestre OR
    dt.semestre <> tr.semestre OR
    dt.dia_semana <> tr.dia_semana OR
    dt.flag_feriado <> tr.flag_feriado OR
    dt.descricao_feriado <> tr.descricao_feriado
) THEN
    UPDATE SET
        id_tempo = tr.id_tempo,
        dia = tr.dia,
        mes = tr.mes,
        ano = tr.ano,
        trimestre = tr.trimestre,
        semestre = tr.semestre,
        dia_semana = tr.dia_semana,
        flag_feriado = tr.flag_feriado,
        descricao_feriado = tr.descricao_feriado

WHEN NOT MATCHED THEN
    INSERT (
        id_tempo, data, dia, mes, ano, trimestre, semestre,
        dia_semana, flag_feriado, descricao_feriado
    )
    VALUES (
        tr.id_tempo, tr.data, tr.dia, tr.mes, tr.ano, tr.trimestre, tr.semestre,
        tr.dia_semana, tr.flag_feriado, tr.descricao_feriado
    );

### DIM CLIENTE

In [0]:
%sql
drop table if exists dim_cliente

In [0]:
%sql
create table dim_cliente (
  SK_CARRO             bigint generated by default as identity,
  CODIGO_CLIENTE       int,
  NOME                 varchar(100),
  EMAIL                varchar(100),
  TELEFONE             varchar(20),
  CIDADE               varchar(100),
  ESTADO               varchar(50),
  CEP                  varchar(20)
)
USING delta
LOCATION '/mnt/{storageAccountName}/gold/dim_cliente'

In [0]:
%sql
DESCRIBE TABLE EXTENDED dim_cliente

In [0]:
df_cliente.createOrReplaceTempView("cliente")
df_endereco.createOrReplaceTempView("endereco")

In [0]:
%sql

WITH cliente_relacional AS (
	SELECT a.codigo_cliente
		 , a.nome
		 , a.email
		 , a.telefone
		 , b.cidade
		 , b.estado
		 , b.cep
	  FROM cliente AS a
	  LEFT JOIN endereco AS b ON b.codigo_cliente = a.codigo_cliente 
)
MERGE INTO
	dim_cliente AS dc
USING
	cliente_relacional AS cr
ON dc.codigo_cliente = cr.codigo_cliente   

WHEN MATCHED AND (dc.codigo_cliente <> cr.codigo_cliente OR dc.nome <> cr.nome OR dc.email <> cr.email OR dc.telefone <> cr.telefone OR dc.cidade <> cr.cidade OR dc.estado <> cr.estado OR dc.cep <> cr.cep) THEN

	UPDATE SET codigo_cliente = cr.codigo_cliente,
			   nome = cr.nome,
			   email = cr.email,
			   telefone = cr.telefone,
			   cidade = cr.cidade,
			   estado = cr.estado,
			   cep = cr.cep

WHEN NOT MATCHED THEN
	INSERT (codigo_cliente, nome, email, telefone, cidade, estado, cep)
	VALUES (cr.codigo_cliente, cr.nome, cr.email, cr.telefone, cr.cidade, cr.estado, cr.cep)

In [0]:
%sql
select * from dim_cliente

### DIM LIVRO

In [0]:
%sql
drop table if exists dim_livro

In [0]:
%sql
create table dim_livro (
   SK_LIVRO                bigint generated by default as identity,
   CODIGO_LIVRO            int,
   CODIGO_AUTOR            int,
   CODIGO_EDITORA          int,
   TITULO                  varchar(150),
   NOME_AUTOR              varchar(100),
   NACIONALIDADE_AUTOR     varchar(100),
   NOME_EDITORA            varchar(100),
   CONTATO_EDITORA         varchar(100),
   ANO_PUBLICACAO          int,
   PRECO                   float,
)
USING delta
LOCATION '/mnt/{storageAccountName}/gold/dim_livro'


In [0]:
df_livro.createOrReplaceTempView("livro")
df_autor.createOrReplaceTempView("autor")
df_editora.createOrReplaceTempView("editora")

In [0]:
%sql

WITH livro_relacional AS (
	SELECT l.codigo_livro
		 , l.codigo_autor
		 , l.codigo_editora
		 , l.titulo
		 , a.nome AS nome_autor
		 , a.nacionalidade AS nacionalidade_autor
		 , e.nome AS nome_editora
		 , e.contato AS contato_editora
		 , l.ano_publicacao
		 , l.preco
	FROM livro AS l
	LEFT JOIN autor AS a ON a.codigo_autor = l.codigo_autor
	LEFT JOIN editora AS e ON e.codigo_editora = l.codigo_editora
)
MERGE INTO
	dim_livro AS dl
USING
	livro_relacional AS lr
ON lr.codigo_livro = dl.codigo_livro   

WHEN MATCHED AND (dl.codigo_livro <> lr.codigo_livro OR
				  dl.codigo_autor <> lr.codigo_autor OR
				  dl.codigo_editora <> lr.codigo_editora OR
				  dl.titulo <> lr.titulo OR
				  dl.nome_autor <> lr.nome_autor OR
				  dl.nacionalidade_autor <> lr.nacionalidade_autor OR
				  dl.nome_editora <> lr.nome_editora OR
				  dl.contato_editora <> lr.contato_editora OR
				  dl.ano_publicacao <> lr.ano_publicacao OR
				  dl.preco <> lr.preco) THEN

	UPDATE SET codigo_livro = lr.codigo_livro,
			   codigo_autor = lr.codigo_autor,
			   codigo_editora = lr.codigo_editora,
			   titulo = lr.titulo,
			   nome_autor = lr.nome_autor,
			   nacionalidade_autor = lr.nacionalidade_autor,
			   nome_editora = lr.nome_editora,
			   contato_editora = lr.contato_editora,
			   ano_publicacao = lr.ano_publicacao,
			   preco = lr.preco,

WHEN NOT MATCHED THEN

	INSERT (codigo_livro, codigo_autor, codigo_editora, titulo, nome_autor, nacionalidade_autor, nome_editora, contato_editora, ano_publicacao, preco)
	VALUES (lr.codigo_livro, lr.codigo_autor, lr.codigo_editora, lr.titulo, lr.nome_autor, lr.nacionalidade_autor, lr.nome_editora, lr.contato_editora, lr.ano_publicacao, lr.preco)

In [0]:
%sql
select * from dim_livro

### DIM STATUS PEDIDO

In [None]:
%sql
DROP TABLE IF EXISTS dim_status_pedido

In [None]:
%sql
CREATE TABLE dim_status_pedido (
    SK_STATUS_PEDIDO   BIGINT GENERATED BY DEFAULT AS IDENTITY,
    ID_STATUS          INT,
    STATUS             VARCHAR(50),
    DESCRICAO          VARCHAR(100)
)
USING delta
LOCATION '/mnt/{storageAccountName}/gold/dim_status_pedido';

In [None]:
df_pedido.createOrReplaceTempView("pedido")

In [None]:
%sql
WITH status_relacional AS (
    SELECT DISTINCT
        CAST(MONOTONICALLY_INCREASING_ID() AS INT) AS id_status,
        status,
        CASE
            WHEN LOWER(status) = 'Pago' THEN 'Pedido pago ao cliente'
            WHEN LOWER(status) = 'Pendente' THEN 'Aguardando pagamento'
            WHEN LOWER(status) = 'Cancelado' THEN 'Pedido cancelado'
            ELSE 'Outro status'
        END AS descricao
    FROM pedido
)

MERGE INTO dim_status_pedido AS dsp
USING status_relacional AS sr
ON dsp.status = sr.status

WHEN MATCHED AND (
    dsp.descricao <> sr.descricao
) THEN
    UPDATE SET
        descricao = sr.descricao

WHEN NOT MATCHED THEN
    INSERT (id_status, status, descricao)
    VALUES (sr.id_status, sr.status, sr.descricao)

In [None]:
%sql
SELECT * FROM dim_status_pedido;

### DIM FORMA DE PAGAMENTO

In [None]:
%sql
DROP TABLE IF EXISTS dim_forma_pagamento

In [None]:
%sql
CREATE TABLE dim_forma_pagamento (
    SK_FORMA_PAGAMENTO     BIGINT GENERATED BY DEFAULT AS IDENTITY,
    ID_FORMA_PAGAMENTO     INT,
    FORMA_PAGAMENTO        VARCHAR(50)
)
USING DELTA
LOCATION '/mnt/{storageAccountName}/gold/dim_forma_pagamento';

In [None]:
df_pagamento.createOrReplaceTempView("forma_pagamento")

In [None]:
%sql
MERGE INTO dim_forma_pagamento AS d
USING (
    SELECT cod_tipo
         , CASE
            WHEN cod_tipo = 1 THEN 'Cartão'
            WHEN cod_tipo = 2 THEN 'Boleto'
            WHEN cod_tipo = 3 'Cancelado' THEN 'Pix'
            ELSE 'Outro status'
           END AS nome
    FROM forma_pagamento
) AS r
ON d.id_forma_pagamento = r.id_forma_pagamento

WHEN MATCHED AND (
    d.forma_pagamento <> r.forma_pagamento
) THEN
    UPDATE SET
        forma_pagamento = r.forma_pagamento

WHEN NOT MATCHED THEN
    INSERT (id_forma_pagamento, forma_pagamento)
    VALUES (r.id_forma_pagamento, r.forma_pagamento);

In [None]:
%sql
SELECT * FROM dim_forma_pagamento;

### FATO PEDIDOS

In [0]:
%sql
drop table if exists fato_pedidos

In [0]:
%sql
CREATE TABLE IF NOT EXISTS fato_pedidos (
    sk_fato_pedido BIGINT GENERATED BY DEFAULT AS IDENTITY,
    codigo_pedido INT,
    id_tempo INT,
    CODIGO_CLIENTE INT,
    codigo_livro INT,
    id_forma_pagamento INT,
    id_status INT,
    quantidade INT,
    preco_unitario DECIMAL(10,2),
    valor_total DECIMAL(10,2),
    custo_unitario DECIMAL(10,2),
    custo_total DECIMAL(10,2),
    margem_lucro DECIMAL(10,2)
)
USING delta
LOCATION '/mnt/${storageAccountName}/gold/fato_pedidos';

In [None]:
df_item_pedido.createOrReplaceTempView("item_pedido")

In [0]:
%sql
 WITH fato_stage AS (
  SELECT 
    p.codigo_pedido,
    t.id_tempo,
    p.CODIGO_CLIENTE,
    ip.codigo_livro,
    fp.id_forma_pagamento,
    s.id_status,
    ip.quantidade,
    ip.preco_unitario,
    pag.valor_total,
    ip.preco_unitario AS custo_unitario,
    ip.quantidade * ip.preco_unitario AS custo_total,
    (ip.quantidade * ip.preco_unitario) - pag.valor_pago AS margem_lucro
  FROM pedido p
  JOIN item_pedido ip ON ip.codigo_pedido = p.codigo_pedido
  JOIN forma_pagamento pag ON pag.codigo_pedido = p.codigo_pedido
  JOIN dim_tempo t ON t.data = p.data
  JOIN dim_cliente c ON c.CODIGO_CLIENTE = p.CODIGO_CLIENTE
  JOIN dim_livro l ON l.codigo_livro = ip.codigo_livro
  LEFT JOIN dim_forma_pagamento fp ON fp.id_forma_pagamento = pag.cod_tipo
  LEFT JOIN dim_status_pedido s ON s.status = p.status
)

MERGE INTO fato_pedidos AS f
USING fato_stage AS s
ON f.codigo_pedido = s.codigo_pedido AND f.codigo_livro = s.codigo_livro

WHEN MATCHED THEN UPDATE SET
  f.id_tempo = s.id_tempo,
  f.codigo_livro = s.codigo_livro,
  f.id_forma_pagamento = s.id_forma_pagamento,
  f.id_status = s.id_status,
  f.quantidade = s.quantidade,
  f.preco_unitario = s.preco_unitario,
  f.valor_total = s.valor_total,
  f.custo_unitario = s.custo_unitario,
  f.custo_total = s.custo_total,
  f.margem_lucro = s.margem_lucro,
  f.desconto_aplicado = s.desconto_aplicado

WHEN NOT MATCHED THEN INSERT (
  codigo_pedido, id_tempo, codigo_livro, id_forma_pagamento, id_status,
  quantidade, preco_unitario, valor_total, custo_unitario, custo_total, margem_lucro, desconto_aplicado
) VALUES (
  s.codigo_pedido, s.id_tempo, s.codigo_livro, s.id_forma_pagamento, s.id_status,
  s.quantidade, s.preco_unitario, s.valor_total, s.custo_unitario, s.custo_total, s.margem_lucro, s.desconto_aplicado
);

In [0]:
%sql
select * from fato_pedidos