# Processo de ETL do esquema alarmes_aju.
Será utilizado como guia a arquitetura medallion:

![medallion.png](./medallion.png "medallion.png")

O objetivo é progressivamente melhorar a qualidade e estrutura dos dados enquanto ela passa por cada nível sugerido por essa arquitetura. Nível Bronze (dados brutos) => Nível Prata (dados tratados e filtratos) => Nível Ouro (dados prontos para ser usados em ambientes de B.I.).

In [0]:
%sql
USE CATALOG conecta

# 1. Extração de dados (Nível Bronze)

Serão criados dataframes a partir de duas fontes de dados:
1. Banco de dados de pontos de serviço extraídos do sistema de gestão da empresa Exati. Pontos de serviço são estruturas em que são instalados um ou mais pontos de iluminação pública.
2. Banco de dados de LCUs - dispositivos IoT - e alarmes, extraídos do sistema de telegestão Bright City, intermediado por um banco de dados no MongoDB.

A partir da fonte de dados de pontos de serviço será criado o dataframe "df_ponto_servico".
A partir da fonte de dados de LCUs e alarmes será criado o dataframe "df".

Também será criado um esquema chamado Staging_ETL dentro do catálogo conecta para armazenar tabelas que auxiliam o processo de ETL.

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS Staging_ETL
COMMENT 'Schema utilizado para armazenar delta tables criadas no processo de extração do ETL. Esse schema foi criado para deixar a estrutura de dados mais organizada.'

### 1.1 Dimensão Ponto Servico.
Para extração da tabela DimPontoServico, será necessário fazer um request na API da Exati que armazena os dados de Pontos de Serviço no sistema da empresa.
O código da célula 8 chama as funções das células 6 e 7.
O dataframe df_ponto_servico será a base para alimentar a dimensão de ponto de serviço.

In [0]:
from time import sleep

import requests

def gerar_post_exati(session: requests.sessions.Session, payload: dict, depth: int = 1):
    '''
    O sistema da Exati possui instabilidades e normalmente retorna erros nas requisições.
    Esse método foi criado para fazer tentativas de requests até um depth=4.
    '''
    uri = dbutils.secrets.get(scope = "alarmes_aju", key = "uri_exati")
    response = session.post(url=uri, data=payload).json()
    try:
        message = response['RAIZ']['MESSAGES']['ERRORS']
    except KeyError:
        print(f'KeyError de RAIZ, {response}')
        response = gerar_post_exati(session=session, payload=payload, depth=depth + 1)
        message = response['RAIZ']['MESSAGES']['ERRORS']
    if message:
        print(f'{message}, depth = {depth}')
        if depth > 3:
            return response
        sleep(0.25)
        response = gerar_post_exati(session=session, payload=payload, depth=depth + 1)
    return response


In [0]:
from base64 import b64encode

def auth_exati(session: requests.sessions.Session) -> str:
    '''
    Faz o login no sistema da Exati e retorna um requests.session objeto
    '''
    user = dbutils.secrets.get(scope = "alarmes_aju", key = "user_exati")
    password = dbutils.secrets.get(scope = "alarmes_aju", key = "pass_exati")
    basic_auth = f'Basic {b64encode(f"{user}:{password}".encode()).decode()}'
    payload = {
        'CMD_PLATAFORM': 'GUIA',
        'CMD_COMMAND': 'Login',
        'parser': 'json'
    }
    session.headers = {'Authorization': basic_auth}
    response = gerar_post_exati(session=session, payload=payload)
    return response['RAIZ']['AUTH_TOKEN']


In [0]:
import requests
import pyspark.pandas as ps

with requests.sessions.Session() as session:
    jwt = auth_exati(session=session)
    session.headers = {'Authorization': jwt}
    data = {
            'CMD_IDS_PARQUE_SERVICO': 1,
            'CMD_COMMAND': 'ConsultarPontosServicos',
            'CMD_SEM_PAGINACAO': 0,
            'CMD_ATRIBUTOS_EXPORTACAO': '17,478',
            'CMD_FILTRO_ATRIBUTOS': '394;0;407',
            'parser': 'json'
        }
    response = gerar_post_exati(session=session, payload=data)
records = response['RAIZ']['PONTOS_SERVICOS']['PONTO_SERVICO']
df_ponto_servico = ps.DataFrame.from_records(records) 
display(df_ponto_servico.head())

Tried to attach usage logger `pyspark.databricks.pandas.usage_logger`, but an exception was raised: JVM wasn't initialised. Did you call it on executor side?


ID_PONTO_SERVICO,ID_VERSAO_ATUAL,GEOMETRIA,PONTOS,ID_TIPO_PONTO_SERVICO,ID_STATUS_PONTO_SERVICO,ID_PARQUE_SERVICO,NUMERO_IDENTIFICACAO,NUMERO_IDENTIFICACAO_ANTIGO,ID_POSICAO_GEOGRAFICA,PONTOS_LUMINOSOS,COR,SHAPE,ENDERECO,ATIVO,ID_AUTENT_VALIDO,ID_AUTENT_INSTALACAO,PERMITIR_LINK_COMPARTILHAMENTO,AUTENTICACAO_COMPARTILHAMENTO,DESC_STATUS_PONTO_SERVICO,DESC_TIPO_PONTO_SERVICO,TIPO_GEOMETRIA,TIPO_EXIBICAO,NOME_PARQUE_SERVICO,ID_CLIENTE,NOME_CLIENTE,ID_DOMINIO,DESC_DOMINIO,ID_ESTRUTURA_PS,DESC_ESTRUTURA_PS,TIPO_LATITUDE,LATITUDE_GRAUS,LATITUDE_MINUTOS,LATITUDE_SEGUNDOS,TIPO_LONGITUDE,LONGITUDE_GRAUS,LONGITUDE_MINUTOS,LONGITUDE_SEGUNDOS,LATITUDE_TOTAL,LONGITUDE_TOTAL,NOME_MUNICIPIO,SIGLA_UF,NOME_REGIAO,ID_BAIRRO,NOME_BAIRRO,NOME_LOGRADOURO_COMPLETO,TIPO_ORIGEM_VALIDACAO,GEOMETRIA_JSON,PONTO_CONECTOR,BBOX_PONTO,ALTURA_DO_POSTE,MARCO,NUMERO_LOCAL_INICIAL,NUMERO_LOCAL_ORDEM,ID_LOCALIZACAO,DESC_TIPO_LOGRADOURO,NOME_LOGRADOURO,OBS,ID_AUTENT_DESATIVACAO
30720,5,POINT(-37.099503 -10.988866),"-10.988866,-37.099503",1,4,1,1201,1201.0,182735,0,6832588,C,Avenida Auxiliadora 2,1,460453,532,0,0,Instalado,Ponto IP,P,P,Aracaju,3,PM de Aracaju,1,Iluminação Pública,212431,Possui,S,10,59,19.9176,O,37,5,58.2108,-10.988866,-37.099503,Aracaju,SE,Aracaju,38,Santa Maria,Avenida Auxiliadora 2,Cadastro,"{""type"":""Point"",""coordinates"":[-37.099503,-10.988866]}",0,"-37.100502999999996,-10.989866,-37.098503,-10.987866",11.0,II e III,,,,,,,
30722,7,POINT(-37.099476305555555 -10.9886045),"-10.9886045,-37.099476305555555",1,4,1,1202,1202.0,182714,0,6832588,C,Avenida Auxiliadora 2,1,460453,532,0,0,Instalado,Ponto IP,P,P,Aracaju,3,PM de Aracaju,1,Iluminação Pública,212430,Possui,S,10,59,18.9762,O,37,5,58.1147,-10.9886045,-37.09947630555556,Aracaju,SE,Aracaju,38,Santa Maria,Avenida Auxiliadora 2,Cadastro,"{""type"":""Point"",""coordinates"":[-37.099476306,-10.9886045]}",0,"-37.10047630555555,-10.989604499999999,-37.09847630555556,-10.9876045",11.0,II e III,185.0,,,,,,
23882,7,POINT(-37.099434 -10.988378972222222),"-10.988378972222222,-37.099434",1,4,1,1203,,453280,0,1882125,C,Avenida Vasco da Gama,1,460453,163,0,0,Instalado,Ponto IP,P,P,Aracaju,3,PM de Aracaju,1,Iluminação Pública,212429,Possui,S,10,59,18.1643,O,37,5,57.9624,-10.988378972222222,-37.099434,Aracaju,SE,Aracaju,38,Santa Maria,Avenida Vasco da Gama,Cadastro,"{""type"":""Point"",""coordinates"":[-37.099434,-10.988378972]}",0,"-37.100434,-10.989378972222221,-37.098434000000005,-10.987378972222222",11.0,II e III,1294.0,,,,,,
55907,7,POINT(-37.09936183333333 -10.987983666666667),"-10.987983666666667,-37.09936183333333",1,4,1,1245,1245.0,181893,0,6994531,C,Rua Três,1,460453,532,0,0,Instalado,Ponto IP,P,P,Aracaju,3,PM de Aracaju,1,Iluminação Pública,212426,Possui,S,10,59,16.7412,O,37,5,57.7026,-10.987983666666667,-37.099361833333326,Aracaju,SE,Aracaju,38,Santa Maria,Rua Três,Cadastro,"{""type"":""Point"",""coordinates"":[-37.099361833,-10.987983667]}",0,"-37.10036183333333,-10.988983666666666,-37.098361833333335,-10.986983666666667",11.0,II e III,767.0,,,,,,
24327,8,POINT(-37.099287805555555 -10.987419277777779),"-10.987419277777779,-37.099287805555555",1,4,1,1269,1269.0,182990,0,6832588,C,Rua Quatro,1,460453,163,0,0,Instalado,Ponto IP,P,P,Aracaju,3,PM de Aracaju,1,Iluminação Pública,216924,Possui,S,10,59,14.7094,O,37,5,57.4361,-10.987419277777777,-37.09928780555556,Aracaju,SE,Aracaju,38,Santa Maria,Rua Quatro,Cadastro,"{""type"":""Point"",""coordinates"":[-37.099287806,-10.987419278]}",0,"-37.10028780555555,-10.988419277777778,-37.09828780555556,-10.98641927777778",11.0,II e III,705.0,,,,,,


### 1.2 Outras dimensões

Dados do sistema de telegestão de Aracaju são coletados duas vezes por dia - uma vez às 12h00 e outra às 22h00 - do site de telegestão Bright City e armazenados em uma coleção no MongoDB no formato JSON. Quase todas as tabelas dimensões podem ser derivadas a partir dos documentos armazenados no MongoDB, com exceção da tabela DimPontoServico.

Será necessário para essa etapa utilizar o timestamp mais recente da tabela FatoAlarme, armazenada na coluna DateTimeUltimaModificacao. Isso porque só precisamos extrair do MongoDB registros após o timestamp mais recente.

In [0]:
last_datetime = spark.sql(
    """
    SELECT
        COALESCE(
            MAX (DateTimeUltimaModificacao),
            CAST ('1900-01-01' AS TIMESTAMP)
            ) AS last_datetime
        FROM alarmes_aju.fatoalarme
    """
)
last_datetime = last_datetime.first()['last_datetime']

In [0]:
from pymongo import MongoClient

mongo_db_pass = dbutils.secrets.get(scope = "alarmes_aju", key = "mongodb_pass")
query = {'insert_datetime': {'$gt': last_datetime}}

with MongoClient(mongo_db_pass) as client:
    db = client.get_database('conecta')
    collection = db.get_collection('bc_alarmes')
    dados = collection.find(query)
    docs = dados.to_list()
for doc in docs:
    doc['_id'] = str(doc['_id'])

In [0]:
df = ps.DataFrame.from_records(docs)
display(df.head())



_id,id,ID_PONTO_SERVICO,pole_id_s,name_s,ALTURA_DO_POSTE,MARCO,ocorrencia_criada,address1_s,latitude_f,longitude_f,dimming_level_set_i,work_order_s,barcode_s,ctlatitude_f,ctlongitude_f,last_update_dt,alarm_names_ssci,distance,insert_datetime
692c5c02144e2516bb20d675,2543196311690,52743.0,52743,LCU 108A,11.0,I,False,253 Rua Moacir Leite,-10.92699,-37.052944,0,1649058,2502250108A,-10.9269285,-37.05294,2025-11-30T08:58:17Z,List(DayBurner),3.330507681386353,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d676,2543196312102,17679.0,17679,LCU 1226,16.0,I,False,7501 Avenida Presidente Tancredo Neves,-10.914472,-37.088814,0,1363514,25022501226,-10.914376,-37.089046,2025-11-30T13:47:17Z,List(DayBurner),3.0681934426109443,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d677,2543196312219,67167.0,67167,LCU 129B,16.0,I,True,Avenida Vereador Manoel Nunes Resende,-11.004645,-37.103138,0,143188,2502250129B,-11.00466,-37.103153,2025-05-29T17:40:41Z,List(LostCommunication),1.1119579863579596,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d678,2543196312253,24467.0,7888,LCU 12BD,16.0,I,False,Avenida Mel??cio Machado,-11.004372,-37.072433,0,1411741,250225012BD,-11.004358,-37.072422,2025-11-30T12:55:58Z,List(DayBurner),1.4563306741209223,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d679,2543196312259,1805.0,1805,LCU 12C3,16.0,I,False,Avenida Paulo Barreto de Menezes,-10.927903,-37.04331,0,1357114,250225012C3,-10.927901,-37.04335,2025-11-30T13:17:50Z,List(DayBurner),1.7314846445917806,2025-11-30T15:00:18.867Z


### 1.3 Fim da fase Bronze
Com os dataframes gerados, completamos a fase Bronze da arquitetura Medallion, de ingestão de dados raw. 

# 2. Filtro e Limpeza de dados (Nível Prata)
Nesse estágio serão feitos filtros de dados, checagem de qualidade de dados, imposição de estrutura e esquema de dados nas tabelas obtidas no nível anterior. O objetivo dessa etapa é alimentar as tabelas do esquema alarmes_aju com qualidade e segurança a fim de podermos criar vistas desse DW para utilização por ferramentas OLAP no nível Ouro.

### 2.1 Descarte de linhas obsoletas
Na base de dados operacional hospedada no MongoDB Atlas, existem registros com campos null que devem ser descartados. Esses dados são de LCUs que estão cadastradas no sistema operacional, mas que não estão associadas a nenhum ponto de fixação.

In [0]:
df_slice = df.loc[df['ID_PONTO_SERVICO'].isnull()]
display(df_slice)
index_null = df_slice.index.tolist()
df = df.drop(index_null)



_id,id,ID_PONTO_SERVICO,pole_id_s,name_s,ALTURA_DO_POSTE,MARCO,ocorrencia_criada,address1_s,latitude_f,longitude_f,dimming_level_set_i,work_order_s,barcode_s,ctlatitude_f,ctlongitude_f,last_update_dt,alarm_names_ssci,distance,insert_datetime
692c5c02144e2516bb20d6dd,2543197363949,,SP-DIRETORIA,LCU 601EED,,,False,Avenida São João Batista,-22.91359,-47.055107,0,0,25022601EED,-10.928351,-37.074314,2025-11-18T13:56:50Z,List(LostCommunication),,2025-11-30T15:00:18.867Z
692ce8a6dc090557d2b0c2ce,2543196321587,,30995,LCU 3733,,,False,Rua Jos?? Menezes Prudente,-10.987384,-37.080227,100,1445416,25022503733,-10.990315,-37.078712,2025-11-30T23:40:20Z,List(BallastFailure),,2025-12-01T01:00:22.894Z
692ce8a6dc090557d2b0c2eb,2543197363949,,SP-DIRETORIA,LCU 601EED,,,False,Avenida São João Batista,-22.91359,-47.055107,0,0,25022601EED,-10.928351,-37.074314,2025-11-18T13:56:50Z,List(LostCommunication),,2025-12-01T01:00:22.894Z




### 2.2 Tratamento de dados para a tabela DimLCU

### Observação
Utilizo a consulta a seguir para listar colunas, data_type e comentários das tabelas. Por algum motivo, após alguns ETLs, começou a aparecer alguns registros repetidos nessa consulta, que não entendo o motivo. Navegando pela UI do databricks, a tabela está normal (sem registros repetidos).

In [0]:
%sql
SELECT column_name, full_data_type, comment
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_catalog = 'conecta' AND table_name = 'dimlcu'

column_name,full_data_type,comment
LCUKey,bigint,
IDLCU,bigint,Identificador natural único de uma LCU - dispositivo IoT.
CodigoBarras,string,Código de barras da LCU.
NomeLCU,string,Nome da LCU.
Latitude,double,Latitude da LCU.
Longitude,double,Longitude da LCU.
DistanciaPontoServico,double,A distância entre as coordenadas da LCU para as coodernadas do ponto de serviço ao qual a LCU foi associada.
StartDate,timestamp,
EndDate,timestamp,
IsLateArriving,int,


In [0]:
df.rename(columns={
    'id': 'IDLCU', 'barcode_s': 'CodigoBarras', 'name_s': 'NomeLCU',
    'latitude_f': 'Latitude', 'longitude_f': 'Longitude', 'distance': 'DistanciaPontoServico'
    }, inplace=True)
df['IDLCU'] = df['IDLCU'].apply(int)
display(df[['IDLCU', 'CodigoBarras', 'NomeLCU', 'Latitude', 'Longitude', 'DistanciaPontoServico']].head())



IDLCU,CodigoBarras,NomeLCU,Latitude,Longitude,DistanciaPontoServico
2543196311690,2502250108A,LCU 108A,-10.92699,-37.052944,3.330507681386353
2543196312102,25022501226,LCU 1226,-10.914472,-37.088814,3.0681934426109443
2543196312219,2502250129B,LCU 129B,-11.004645,-37.103138,1.1119579863579596
2543196312253,250225012BD,LCU 12BD,-11.004372,-37.072433,1.4563306741209223
2543196312259,250225012C3,LCU 12C3,-10.927903,-37.04331,1.7314846445917806


### 2.3 Tratamento de dados para a tabela DimTipoAlarme
Nos dados "raw" ingeridos do MongoDB, temos uma coluna tipo array com n strings armazenadas. 
A tabela dimensão TipoAlarme precisa ser atômica e, felizmente, para a operação só é necesssário levar em consideração um tipo de alarme, o mais alto na hierarquia a seguir:

1. Sem Comunicação
2. Falha no Driver
3. Aceso Durante o Dia
4. Apagada Durante a Noite
5. Tensão Alta
6. Tensão Baixa
7. Falha no node

A hierarquia acima também representa todo o domínio da dimensão TipoAlarme.

Existe a possibilidade de criar um registro para cada alarme da array, porém realmente só é necessário o registro do alarme mais alto da hierarquia.

In [0]:
%sql
SELECT column_name, full_data_type, comment
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_catalog = 'conecta' AND table_name = 'dimtipoalarme'

column_name,full_data_type,comment
AlarmeKey,bigint,
NomeAlarme,string,Indica o nome do alarme so sistema de telegestão.
StartDate,timestamp,
EndDate,timestamp,
IsLateArriving,int,
NomeAlarme,string,


In [0]:
def atomizar_alarme(sr):
    if 'LostCommunication' in sr:
        sr = 'Sem Comunicação'
    elif 'BallastFailure' in sr:
        sr = 'Falha no Driver'
    elif 'LampFailure' in sr:
        sr = 'Falha no Driver'
    elif 'DayBurner' in sr:
        sr = 'Acesa Durante o Dia'
    elif 'NightOff' in sr:
        sr = 'Apagada Durante a Noite'
    elif 'HighVoltage' in sr:
        sr = 'Tensão Alta'
    elif 'LowVoltage' in sr:
        sr = 'Tensão Baixa'
    elif 'NodeFailure' in sr:
        sr = 'Falha no Node'
    return sr

In [0]:
df.rename(columns={
    'alarm_names_ssci': 'NomeAlarme'
}, inplace=True)
df['NomeAlarme'] = df['NomeAlarme'].apply(atomizar_alarme)
display(df.head())



_id,IDLCU,ID_PONTO_SERVICO,pole_id_s,NomeLCU,ALTURA_DO_POSTE,MARCO,ocorrencia_criada,address1_s,Latitude,Longitude,dimming_level_set_i,work_order_s,CodigoBarras,ctlatitude_f,ctlongitude_f,last_update_dt,NomeAlarme,DistanciaPontoServico,insert_datetime
692c5c02144e2516bb20d675,2543196311690,52743.0,52743,LCU 108A,11.0,I,False,253 Rua Moacir Leite,-10.92699,-37.052944,0,1649058,2502250108A,-10.9269285,-37.05294,2025-11-30T08:58:17Z,Acesa Durante o Dia,3.330507681386353,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d676,2543196312102,17679.0,17679,LCU 1226,16.0,I,False,7501 Avenida Presidente Tancredo Neves,-10.914472,-37.088814,0,1363514,25022501226,-10.914376,-37.089046,2025-11-30T13:47:17Z,Acesa Durante o Dia,3.0681934426109443,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d677,2543196312219,67167.0,67167,LCU 129B,16.0,I,True,Avenida Vereador Manoel Nunes Resende,-11.004645,-37.103138,0,143188,2502250129B,-11.00466,-37.103153,2025-05-29T17:40:41Z,Sem Comunicação,1.1119579863579596,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d678,2543196312253,24467.0,7888,LCU 12BD,16.0,I,False,Avenida Mel??cio Machado,-11.004372,-37.072433,0,1411741,250225012BD,-11.004358,-37.072422,2025-11-30T12:55:58Z,Acesa Durante o Dia,1.4563306741209223,2025-11-30T15:00:18.867Z
692c5c02144e2516bb20d679,2543196312259,1805.0,1805,LCU 12C3,16.0,I,False,Avenida Paulo Barreto de Menezes,-10.927903,-37.04331,0,1357114,250225012C3,-10.927901,-37.04335,2025-11-30T13:17:50Z,Acesa Durante o Dia,1.7314846445917806,2025-11-30T15:00:18.867Z


### 2.4 Tratamento de dados para a tabela DimData
Os dados ingeridos do MongoDB possuem uma coluna chamada insert_datetime. Essa coluna possui dados tipo timestamp em que registra o momento em que o dado foi adicionado no banco de dados.
Como a granularidade escolhida para o DW foi o dia, e são registrados dados no banco de dados do MongoDB duas vezes ao dia (às 12h e às 22h) será necessário fazer alguns ajustes.

1. Criação de novas colunas:
É necessário, a partir da coluna insert_datetime, criar as colunas Data, Mês, Ano, Trimestre e TrimestreContrato.
2. Descarte de registros:
Se uma LCU registrou o mesmo alarme duas vezes no mesmo dia, posso descartar um dos registros, assumindo o estado daquele alerta para o dia. Ou seja, caso a chave natural IDLCU e as colunas NomeAlarme e Data se repitam, eu posso descartar um dos registros.
3. Converter o datetime da coluna insert_datetime para UTC -03:00. Com UTC 00:00 os dados registrados às 22h horário de brasília iriam cair no dia seguinte às 01h da manhã, o que geraria erros de data.

In [0]:
%sql
SELECT column_name, full_data_type, comment
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_catalog = 'conecta' AND table_name = 'dimdata'

column_name,full_data_type,comment
DataKey,bigint,
Data,date,Data do alarme.
Mes,string,Mês do alarme com a informação do ano.
Ano,int,Ano do alarme.
Trimestre,string,Trimestre do alarme.
TrimestreContrato,string,"Trimestre do contrato. Os trimestres do contrato são enumerados da seguinte forma: T01, T02, T03... Tn. O início do primeiro trimestre do contrato foi em dezembro de 2021, o que significa que a mensuração dos trimestres do contrato seguem uma lógica diferente ao do trimestre normal."
StartDate,timestamp,
EndDate,timestamp,
IsLateArriving,int,
Data,date,


In [0]:
def find_trimestre_contrato(sr, inicio_contrato):
    '''
    Utiliza o datetime do python para achar o número do trimestre do contrato.
    '''
    months_incio = 12 - inicio_contrato.month + 1
    months_sr = sr.month + (sr.year - inicio_contrato.year - 1) * 12
    return f'T{(months_sr + months_incio - 1) // 3 + 1}'

In [0]:
from datetime import datetime, timedelta
inicio_contrato = datetime.strptime('01/12/2021', '%d/%m/%Y')
df['insert_datetime'] = df['insert_datetime'].apply(lambda x: x - timedelta(hours=3))

df['Data'] = df['insert_datetime'].apply(lambda x: x.date())
df['Mes'] = df['insert_datetime'].apply(lambda x: x.strftime("%m %Y"))
df['Ano'] = df['insert_datetime'].apply(lambda x: x.year)
df['Trimestre'] = df['insert_datetime'].apply(lambda x: f'{(x.month - 1) // 3 + 1}º Trimestre {x.year}')
df['TrimestreContrato'] = df['insert_datetime'].apply(find_trimestre_contrato, args=(inicio_contrato,))

display(df[['Data', 'Mes', 'Ano', 'Trimestre', 'TrimestreContrato']].head())



Data,Mes,Ano,Trimestre,TrimestreContrato
2025-11-30,11 2025,2025,4º Trimestre 2025,T16
2025-11-30,11 2025,2025,4º Trimestre 2025,T16
2025-11-30,11 2025,2025,4º Trimestre 2025,T16
2025-11-30,11 2025,2025,4º Trimestre 2025,T16
2025-11-30,11 2025,2025,4º Trimestre 2025,T16


In [0]:
print(df.shape)
df = df.drop_duplicates(subset=['IDLCU', 'NomeAlarme', 'Data'], keep='last')
print(df.shape)

(277, 25)
(188, 25)


### 2.5 Tratamento de dados para a tabela DimPontoServico
Os dados dessa tabelas estão contidos no dataframe df_ponto_servico, criada a partir da extração de dados do sistema da Exati.
O tratamento a ser feito nos dados será:
1. Renomear nome das colunas para seguir a tabela DimPontoServico;
2. Remover colunas desnecessárias;
3. Assegurar o tipo de dados de algumas colunas;

In [0]:
%sql
SELECT column_name, full_data_type, comment
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_catalog = 'conecta' AND table_name = 'dimpontoservico'

column_name,full_data_type,comment
PontoServicoKey,bigint,
IDPontoServico,int,Identificador natural único de um ponto de serviço.
Bairro,string,Bairro onde o ponto de serviço se encontra.
Latitude,double,Latitude do ponto de serviço.
Longitude,double,Longitude do ponto de serviço.
MarcoContrato,string,Marco do contrato - I ou II e III - em que o ponto de serviço está associado.
AlturaInstalacaoLuminaria,double,Altura de instalação média da luminária de iluminação pública.
StartDate,timestamp,
EndDate,timestamp,
IsLateArriving,int,


In [0]:
df_ponto_servico.rename(columns={
    'ID_PONTO_SERVICO': 'IDPontoServico',
    'NOME_BAIRRO': 'Bairro',
    'LATITUDE_TOTAL': 'Latitude',
    'LONGITUDE_TOTAL': 'Longitude',
    'MARCO': 'MarcoContrato',
    'ALTURA_DO_POSTE': 'AlturaInstalacaoLuminaria'
}, inplace=True)

In [0]:
df_ponto_servico = df_ponto_servico[['IDPontoServico', 'Bairro', 'Latitude', 'Longitude', 'MarcoContrato', 'AlturaInstalacaoLuminaria']]
df_ponto_servico['AlturaInstalacaoLuminaria'] = df_ponto_servico['AlturaInstalacaoLuminaria'].apply(float)
df_ponto_servico['Bairro'] = df_ponto_servico['Bairro'].apply(lambda x: x.capitalize())
df_ponto_servico['Latitude'] = df_ponto_servico['Latitude'].apply(float)
df_ponto_servico['Longitude'] = df_ponto_servico['Longitude'].apply(float)
df_ponto_servico['TipoPoste'] = df_ponto_servico.apply(lambda x: 'Poste Baixo' if x['AlturaInstalacaoLuminaria'] < 13 else 'Poste Alto', axis=1)
display(df_ponto_servico.head())



IDPontoServico,Bairro,Latitude,Longitude,MarcoContrato,AlturaInstalacaoLuminaria,TipoPoste
30720,Santa maria,-10.988866,-37.099503,II e III,11.0,Poste Baixo
30722,Santa maria,-10.9886045,-37.09947630555556,II e III,11.0,Poste Baixo
23882,Santa maria,-10.988378972222222,-37.099434,II e III,11.0,Poste Baixo
55907,Santa maria,-10.987983666666668,-37.09936183333333,II e III,11.0,Poste Baixo
24327,Santa maria,-10.98741927777778,-37.09928780555556,II e III,11.0,Poste Baixo


### 2.6 Filtrando apenas colunas necessárias do df com as outras dimensões

Nessa etapa é necessário manter, minimamente, as chaves naturais de cada tabela dimensão.

In [0]:
df.rename(columns={'ID_PONTO_SERVICO': 'IDPontoServico'}, inplace=True)
df['IDPontoServico'] = df['IDPontoServico'].apply(int)
df = df[['IDPontoServico', 'IDLCU', 'CodigoBarras', 'NomeLCU', 'Latitude', 'Longitude', 'DistanciaPontoServico', 'NomeAlarme', 'Data', 'Mes', 'Ano', 'Trimestre', 'TrimestreContrato']]
display(df.head())



IDPontoServico,IDLCU,CodigoBarras,NomeLCU,Latitude,Longitude,DistanciaPontoServico,NomeAlarme,Data,Mes,Ano,Trimestre,TrimestreContrato
52743,2543196311690,2502250108A,LCU 108A,-10.92699,-37.052944,3.330507681386353,Acesa Durante o Dia,2025-11-30,11 2025,2025,4º Trimestre 2025,T16
5617,2543196311825,25022501111,LCU 1111,-11.0014715,-37.0702,2.896712334535035,Falha no Driver,2025-11-30,11 2025,2025,4º Trimestre 2025,T16
17679,2543196312102,25022501226,LCU 1226,-10.914472,-37.088814,3.0681934426109443,Acesa Durante o Dia,2025-11-30,11 2025,2025,4º Trimestre 2025,T16
5617,2543196312212,25022501294,LCU 1294,-11.001489,-37.070225,5.171111708190519,Falha no Driver,2025-11-30,11 2025,2025,4º Trimestre 2025,T16
67167,2543196312219,2502250129B,LCU 129B,-11.004645,-37.103138,1.1119579863579596,Sem Comunicação,2025-11-30,11 2025,2025,4º Trimestre 2025,T16


### 2.7 Finalizando a fase Prata da arquitetura Medallion

Com os dataframes df e df_ponto_servico, será criado tabelas no schema staging_etl para auxiliar na alimentação das tabelas dimensões e tabela fato na fase Ouro.

In [0]:
%sql
DROP TABLE IF EXISTS staging_etl.alarmes_aju_prata;
DROP TABLE IF EXISTS staging_etl.pontos_servico_prata;

In [0]:
df.to_table(name='staging_etl.alarmes_aju_prata', mode='overwrite', format='delta')
df_ponto_servico.to_table(name='staging_etl.pontos_servico_prata', mode='overwrite', format='delta')



# 3. Nível Ouro

Com os dados filtrados e tratados no esquema staging_etl, é possível fazer a carga nas tabelas dimensões e tabela fato do esquema estrela proposto no notebook de criação de esquema.

Será utilizado como guia o material presente em https://www.databricks.com/blog/implementing-dimensional-data-warehouse-databricks-sql-part-2 para o precesso de ETL nas tabelas dimensão e o material em https://www.databricks.com/blog/implementing-dimensional-data-warehouse-databricks-sql-part-3 para o processo de ETL na tabela fato.

### 3.1 Expirando registros versionados nas tabelas dimensões
Quando um valor de um registro em uma tabela dimensão é modificado, esse precisa ser versionado para manter seu histórico de alterações. Isso é possível através das colunas de metadados StartDate e EndDate.

As únicas tabelas que podem sofrer com versionamento são as DimPontoServico e DimLCU.

In [0]:
%sql
DECLARE OR REPLACE VARIABLE run_ts TIMESTAMP DEFAULT current_timestamp();

In [0]:
%sql
CREATE OR REPLACE VIEW DimLCU AS (
  SELECT DISTINCT IDLCU, CodigoBarras, NomeLCU, Latitude, Longitude, DistanciaPontoServico
  FROM staging_etl.alarmes_aju_prata
  );

In [0]:
%sql
MERGE INTO alarmes_aju.dimLCU a
USING DimLCU b
ON a.IDLCU = b.IDLCU
AND a.EndDate is NULL
WHEN MATCHED AND
NOT (
  EQUAL_NULL(a.CodigoBarras, b.CodigoBarras) AND
  EQUAL_NULL(a.NomeLCU, b.NomeLCU) AND
  EQUAL_NULL(a.Latitude, b.Latitude) AND
  EQUAL_NULL(a.Longitude, b.Longitude) AND
  EQUAL_NULL(a.DistanciaPontoServico, b.DistanciaPontoServico)
)
THEN UPDATE SET a.EndDate = run_ts

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
%sql
CREATE OR REPLACE VIEW DimPontoServico AS (
  SELECT DISTINCT IDPontoServico, Bairro, Latitude, Longitude, MarcoContrato, AlturaInstalacaoLuminaria, TipoPoste
  FROM staging_etl.pontos_servico_prata
  );

In [0]:
%sql
MERGE INTO alarmes_aju.dimPontoServico a
USING DimPontoServico b
ON a.IDPontoServico = b.IDPontoServico
AND a.EndDate is NULL
WHEN MATCHED AND
NOT (
  EQUAL_NULL(a.Bairro, b.Bairro) AND
  EQUAL_NULL(a.Latitude, b.Latitude) AND
  EQUAL_NULL(a.Longitude, b.Longitude) AND
  EQUAL_NULL(a.MarcoContrato, b.MarcoContrato) AND
  EQUAL_NULL(a.AlturaInstalacaoLuminaria, b.AlturaInstalacaoLuminaria) AND
  EQUAL_NULL(a.TipoPoste, b.TipoPoste)
)
THEN UPDATE SET a.EndDate = run_ts

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


### 3.2 Inserindo novos valores nas tabelas dimensões

In [0]:
%sql
MERGE INTO alarmes_aju.dimLCU a
USING dimlcu b
ON a.EndDate IS NULL
AND a.IDLCU = b.IDLCU
WHEN NOT MATCHED THEN
INSERT (
  IDLCU,
  CodigoBarras,
  NomeLCU,
  Latitude,
  Longitude,
  DistanciaPontoServico,
  StartDate
)
VALUES (
  b.IDLCU,
  b.CodigoBarras,
  b.NomeLCU,
  b.Latitude,
  b.Longitude,
  b.DistanciaPontoServico,
  run_ts
)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
18,0,0,18


In [0]:
%sql
MERGE INTO alarmes_aju.dimPontoServico a
USING DimPontoServico b
ON a.EndDate IS NULL
AND a.IDPontoServico = b.IDPontoServico
WHEN NOT MATCHED THEN
INSERT (
  IDPontoServico,
  Bairro,
  Latitude,
  Longitude,
  MarcoContrato,
  AlturaInstalacaoLuminaria,
  TipoPoste,
  StartDate
)
VALUES (
  b.IDPontoServico,
  b.Bairro,
  b.Latitude,
  b.Longitude,
  b.MarcoContrato,
  b.AlturaInstalacaoLuminaria,
  b.TipoPoste,
  run_ts
)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
%sql
CREATE OR REPLACE VIEW dimdata AS (
  SELECT DISTINCT Data, Mes, Ano, Trimestre, TrimestreContrato
  FROM staging_etl.alarmes_aju_prata
  );

In [0]:
%sql
MERGE INTO alarmes_aju.dimData a
USING dimdata b
ON a.EndDate IS NULL
AND a.Data = b.Data
WHEN NOT MATCHED THEN
INSERT (
  Data,
  Mes,
  Ano,
  Trimestre,
  TrimestreContrato,
  StartDate
)
VALUES (
  b.Data,
  b.Mes,
  b.Ano,
  b.Trimestre,
  b.TrimestreContrato,
  run_ts
)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,0,0,1


In [0]:
%sql
CREATE OR REPLACE VIEW dimTipoAlarme AS (
  SELECT DISTINCT NomeAlarme
  FROM staging_etl.alarmes_aju_prata
  );

In [0]:
%sql
MERGE INTO alarmes_aju.DimTipoAlarme a
USING dimTipoAlarme b
ON a.EndDate IS NULL
AND a.NomeAlarme = b.NomeAlarme
WHEN NOT MATCHED THEN
INSERT (
  NomeAlarme,
  StartDate
)
VALUES (
  b.NomeAlarme,
  run_ts
)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


### 3.3 Inserindo novos valores na tabela fato

In [0]:
%sql
SELECT column_name, full_data_type, comment
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_catalog = 'conecta' AND table_name = 'fatoalarme'

column_name,full_data_type,comment
AlarmeKey,bigint,
DataKey,bigint,
LCUKey,bigint,
PontoServicoKey,bigint,
DateTimeUltimaModificacao,timestamp,


In [0]:
%sql
CREATE OR REPLACE VIEW fatoAlarmeView AS (
    SELECT AlarmeKey, DataKey, LCUKey, PontoServicoKey
    FROM staging_etl.alarmes_aju_prata a
    LEFT JOIN alarmes_aju.dimdata dimData
    ON a.Data = dimdata.Data
    AND dimData.EndDate IS NULL
    LEFT JOIN alarmes_aju.dimlcu dimLCU
    ON a.IDLCU = dimLCU.IDLCU
    AND dimLCU.EndDate IS NULL
    LEFT JOIN alarmes_aju.dimpontoservico dimPS
    ON a.IDPontoServico = dimPS.IDPontoServico
    AND dimPS.EndDate IS NULL
    LEFT JOIN alarmes_aju.dimtipoalarme dimTipoAlarme
    ON a.NomeAlarme = dimtipoalarme.NomeAlarme
    AND dimtipoalarme.EndDate IS NULL
)

In [0]:
%sql
MERGE INTO alarmes_aju.fatoalarme a
USING fatoAlarmeView b
ON a.AlarmeKey = b.AlarmeKey
AND a.DataKey = b.DataKey
AND a.LCUKey = b.LCUKey
AND a.PontoServicoKey = b.PontoServicoKey
WHEN NOT MATCHED THEN
INSERT (
  AlarmeKey,
  DataKey,
  LCUKey,
  PontoServicoKey,
  DateTimeUltimaModificacao
)
VALUES (
  b.AlarmeKey,
  b.DataKey,
  b.LCUKey,
  b.PontoServicoKey,
  run_ts
)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
188,0,0,188
