In [0]:
storage_account = 'adlsprojetocotacao'

In [0]:

spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW temp_cotacoes_gold AS
    -- Extrai mínimas e máximas datas
    WITH min_max AS (
        SELECT MIN(Data) as min_data,
            MAX(Data) as max_data
        FROM silver.cotacoes
    ),

    -- Gera as sequências de datas
    datas_periodo AS (
        SELECT explode(
            sequence(
                d.min_data,
                d.max_data,
                interval 1 day
            )
        ) AS Data
        FROM min_max d
    ),

    -- Moedas distintas
    moedas AS (
        SELECT DISTINCT Moeda
        FROM dim_moedas
    ),

    -- Faz o plano cartesiano entre moedas e datas
    cross_tb AS (
        SELECT m.Moeda,
            d.Data
        FROM moedas m
        CROSS JOIN datas_periodo d
    ),

    -- Dias sem cotações
    datas_faltantes AS (
        SELECT cr.Moeda, cr.Data
        FROM cross_tb cr
        LEFT JOIN (
            SELECT DISTINCT Moeda, Data
            FROM silver.cotacoes
        ) c
        ON cr.Moeda = c.Moeda AND cr.Data = c.Data
        WHERE c.Data IS NULL
    ),

    -- Adiciona as linhas faltantes
    linhas_adicionadas AS (
        SELECT 
            dt_falt.Moeda, 
            dt_falt.Data, -- Corrigido para Data
            NULL AS Cotacao -- Valores nulos para as cotações adicionadas
        FROM datas_faltantes as dt_falt
    ),

    -- Combina cotações existentes com as linhas adicionadas
    append AS (
        SELECT
            Moeda,
            Data,
            Cotacao
        FROM silver.cotacoes

        UNION ALL

        SELECT 
            Data,
            Moeda,
            Cotacao
        FROM linhas_adicionadas
    ),

    -- Forward fill: Preenche os valores de cotação para as novas linhas
    forward_fill AS (
    SELECT 
        Moeda,
        Data,
        last_value(Cotacao, true)
            OVER(
                PARTITION BY Moeda
                ORDER BY Data
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            ) AS Cotacao
        FROM append
        )

    SELECT
    Moeda,
    Data,
    Cotacao,
    year(Data) AS Ano,
    month(Data) AS Mes,
    day(Data) AS Dia
    FROM forward_fill
    ORDER BY Data, Moeda
""")

In [0]:
spark.sql("""
    CREATE SCHEMA IF NOT EXISTS gold
""")

In [0]:
# Criar tabela particionada por ano e mês para melhor performance
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.fact_cotacoes (
        Moeda STRING,
        Data DATE,
        Cotacao DOUBLE,
        Ano INT,
        Mes INT,
        Dia INT
    )
    USING DELTA
    PARTITIONED BY (Ano, Mes)
    COMMENT 'Tabela de fatos com cotações históricas de moedas, incluindo preenchimento de valores faltantes (forward fill)'
""")

In [0]:
# Inserir os dados na tabela
spark.sql("""
    INSERT OVERWRITE gold.fact_cotacoes
    SELECT * FROM temp_cotacoes_gold         
""")