In [None]:
# Leitura dos arquivos Parquet na pasta Novos
df = spark.read.parquet("Files/Cotacoes/Novos/*.parquet")

df.createOrReplaceTempView("df")

df = spark.sql("""
    SELECT 
        cotacaoCompra AS Cotacao,
        CAST(dataHoraCotacao AS DATE) AS Data,
        moeda AS Moeda
    FROM
        df
    ORDER BY Data ASC
""").dropDuplicates(["Moeda", "Data"])

display(df)

In [None]:
%%sql

-- Cria uma tabela no lake se não existir
CREATE TABLE IF NOT EXISTS cotacoes (
    Cotacao DOUBLE,
    Data DATE,
    Moeda STRING
)
USING DELTA
PARTITIONED BY (Moeda)

In [None]:
# Executa o merge na tabela cotacoes

df.createOrReplaceTempView("df_novos")

spark.sql("""
    MERGE INTO cotacoes AS e
    USING (
        SELECT
            Cotacao,
            Data,
            Moeda
        FROM
            df_novos
    ) as n
    ON e.Moeda = n.Moeda
        AND e.Data = n.Data
    WHEN NOT MATCHED THEN
        INSERT (Cotacao, Data, Moeda)
        VALUES (n.Cotacao, n.Data, n. Moeda)
""")

In [None]:
%%sql

-- Contagem de linhas
SELECT COUNT(*) FROM cotacoes

In [None]:
# Movimentação dos arquivos parquet da pasta Novos para Carregados

from notebookutils import mssparkutils

origem = "abfss://cf10c457-47e4-4028-8236-9438de701b5e@onelake.dfs.fabric.microsoft.com/7f5a4751-36d0-4ff3-b27a-ba3396e5e7ea/Files/Cotacoes/Novos/" 
destino = "abfss://cf10c457-47e4-4028-8236-9438de701b5e@onelake.dfs.fabric.microsoft.com/7f5a4751-36d0-4ff3-b27a-ba3396e5e7ea/Files/Cotacoes/Carregados/"

if not mssparkutils.fs.exists(destino):
    mssparkutils.fs.mkdirs(destino)

arquivos = mssparkutils.fs.ls(origem)

for arquivo in arquivos:
    caminho_origem = arquivo.path
    nome_arquivo = arquivo.name
    caminho_destino = f"{destino}{nome_arquivo}"
    
    mssparkutils.fs.mv(caminho_origem, caminho_destino)
