In [24]:
%%sql

-- Insere novos valores de exemplo
INSERT INTO tb_log_eventos (Matricula, Nome, Filial, Departamento, Cargo, DataAdmissao, DataDesligamento, Operacao, DataLog) VALUES 
(98, 'FLAVIO FAGUNDES', 'CAMPINAS', 'PRODUCAO', 'SUPERVISOR DE PRODUCAO', '2021-12-07', '2025-01-28', 'DESLIGAMENTO', '2025-01-28 10:00:00'),
(143, 'JOSE DA SILVA', 'JUNDIAI', 'MANUTENCAO', 'ELETRICISTA DE MANUTENCAO XP', '2022-02-05', NULL, 'ALTERACAO', '2025-01-29 09:00:00');


StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 44, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [25]:
%%sql

-- Maiores 'DataLog' e 'MatriculaSk' na tabela 'tb_funcionarios'
DROP TABLE IF EXISTS staging_ultima_carga;
CREATE TABLE staging_ultima_carga AS
SELECT 
    MAX(DataLog) AS UltimaDataLog, 
    MAX(MatriculaSk) AS UltimaMatriculaSk 
FROM tb_funcionarios;

StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 46, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [26]:
%%sql
-- Tabela temporária com o log após o último 'DataLog' e quando 'Operacao' <> 'DESLIGAMENTO'
DROP TABLE IF EXISTS staging_logs;
CREATE TABLE staging_logs AS 
SELECT 
    Matricula,
    Nome, 
    Filial,
    Departamento,
    Cargo,
    DataAdmissao,
    DataDesligamento,
    DataLog,
    DataLog AS DataVigenciaInicial,

    -- Window function para trazer o próximo 'DataLog' por 'Matricula'
    LEAD(DataLog) OVER (PARTITION BY Matricula ORDER BY DataLog) AS ProximoDataLog,
    
    -- Window function para criar a coluna 'MatriculaSk' a partir da ordem do 'DataLog'
    -- partindo do maior já existente
    (SELECT COALESCE(UltimaMatriculaSk, 0) FROM staging_ultima_carga) + 
    ROW_NUMBER() OVER (ORDER BY DataLog) AS MatriculaSk  

FROM tb_log_eventos
WHERE DataLog > (SELECT UltimaDataLog FROM staging_ultima_carga)
    AND Operacao <> 'DESLIGAMENTO';

StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 48, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [27]:
%%sql

-- Tabela temporária contendo os novos eventos
DROP TABLE IF EXISTS staging_novos_eventos;
CREATE TABLE staging_novos_eventos AS
SELECT 
    MatriculaSk,  
    Matricula,
    Nome, 
    Filial,
    Departamento,
    Cargo,
    DataAdmissao,
    DataDesligamento,
    DataLog,

    -- 'DataVigenciaInicial' é a data de alteração
    CAST(DataVigenciaInicial AS DATE) AS DataVigenciaInicial,

    -- Coluna 'DataVigenciaFinal'
    -- Quando o 'ProximoDataLog' não estiver vazio, pega o
    -- 'ProximoDataLog' e subtrai um dia
    -- Caso estiver vazio então quer dizer que é o último
    -- Logo adiciona '9999-12-31'
    CASE 
        WHEN ProximoDataLog IS NOT NULL 
        THEN CAST(DATEADD(DAY, -1, ProximoDataLog) AS DATE)
        ELSE CAST('9999-12-31' AS DATE)
    END AS DataVigenciaFinal,

    -- 'EstaAtivo' traz 1 quando for o último evento
    CASE 
        WHEN ProximoDataLog IS NULL THEN 1 
        ELSE 0 
    END AS EstaAtivo,
    
    -- Criado o número da linha para capturar a menor 'DataLog' por Matricula
    ROW_NUMBER() OVER (PARTITION BY Matricula ORDER BY DataLog ASC) AS RN 

FROM staging_logs;

StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 50, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [28]:
%%sql

-- Tabela temporária com os desligamentos após a última carga
DROP TABLE IF EXISTS staging_desligamentos;
CREATE TABLE staging_desligamentos AS
SELECT 
    Matricula,
    Nome, 
    Filial,
    Departamento,
    Cargo,
    DataAdmissao,
    DataDesligamento,
    DataLog,
    CAST(DataLog AS DATE) AS DataVigenciaInicial
FROM tb_log_eventos
WHERE DataLog > (SELECT UltimaDataLog FROM staging_ultima_carga)
    AND Operacao = 'DESLIGAMENTO';


StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 52, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [29]:
%%sql

-- Atualiza a tabela 'tb_funcionarios'
MERGE INTO tb_funcionarios AS f

-- Filtra apenas a menor 'DataLog' de cada 'Matricula' de 'staging_novos_eventos'
USING (SELECT * FROM staging_novos_eventos WHERE RN = 1) AS n  

-- nas linhas onde as matriculas correspondem e estão ativas
ON f.Matricula = n.Matricula AND f.EstaAtivo = 1

-- Atualizando os registros que já existem
WHEN MATCHED THEN 
    UPDATE SET 

        -- Atribuindo a data anterior a 'DataVigenciaInicial' de 'staging_novos_eventos'
        -- em 'DataVigenciaFinal' da tabela 'tb_funcionarios'
        f.DataVigenciaFinal = CAST(DATEADD(DAY, -1, n.DataVigenciaInicial) AS DATE),
        
        -- E coloca 'EstaAtivo' como 0
        f.EstaAtivo = 0;

StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 53, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [30]:
%%sql

-- Insere na tabela 'tb_funcionarios' todas as novas linhas em 'staging_novos_eventos'
INSERT INTO tb_funcionarios(
    MatriculaSk, 
    Matricula, 
    Nome, 
    Filial, 
    Departamento, 
    Cargo, 
    DataAdmissao, 
    DataDesligamento, 
    DataLog, 
    DataVigenciaInicial, 
    DataVigenciaFinal, 
    EstaAtivo
)
SELECT 
    MatriculaSk, 
    Matricula, 
    Nome, 
    Filial, 
    Departamento, 
    Cargo, 
    DataAdmissao, 
    DataDesligamento, 
    DataLog, 
    DataVigenciaInicial, 
    DataVigenciaFinal, 
    EstaAtivo
FROM staging_novos_eventos;

StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 54, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [31]:
%%sql

-- Atualiza na tabela 'tb_funcionarios'
MERGE INTO tb_funcionarios AS f

-- Com os dados de 'staging_desligamentos'
USING (SELECT * FROM staging_desligamentos) AS d  

-- nas linhas onde as matriculas correspondem e estão ativas
ON f.Matricula = d.Matricula AND f.EstaAtivo = 1

-- Atualizando os registros que já existem
WHEN MATCHED THEN 
    UPDATE SET 

        -- As colunas 'DataDesligamento' e 'DataVigenciaFinal' em tb_funcionarios
        -- recebem o dia anterior da 'DataVigenciaInicial' da 'staging_desligamentos'
        f.DataDesligamento = CAST(DATEADD(DAY, -1, d.DataVigenciaInicial) AS DATE),
        f.DataVigenciaFinal = CAST(DATEADD(DAY, -1, d.DataVigenciaInicial) AS DATE),

        -- E coloca 'EstaAtivo' como 0
        f.EstaAtivo = 0;



StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 55, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [32]:
%%sql

-- Exclui as tabelas temporárias
DROP TABLE IF EXISTS staging_ultima_carga;
DROP TABLE IF EXISTS staging_logs;
DROP TABLE IF EXISTS staging_novos_eventos;
DROP TABLE IF EXISTS staging_desligamentos;


StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 59, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [33]:
%%sql

-- Exibe os resultados
SELECT * FROM tb_funcionarios
ORDER BY DataLog;

StatementMeta(, f5405ab1-6c01-4126-92a4-05b6885e763f, 60, Finished, Available, Finished)

<Spark SQL result set with 14 rows and 12 fields>