<b>Banco de Dados Meteorológicos do INMET</b> - https://bdmep.inmet.gov.br/

Este notebookcarrega os dados climáticos obtidos do INMET.


## Amostra dos dados brutos (INMET)

Os arquivos do INMET normalmente incluem **metadados nas primeiras linhas** (ex.: `REGIAO:`, `UF:`, `ESTACAO:`) e o cabeçalho real dos dados aparece a partir da 9ª linha.


In [0]:
arquivo = open("/Volumes/aula/python/root/dados/INMET_S_SC_A841_JOACABA_01-01-2025_A_31-10-2025.CSV", "r", encoding="latin1").read()
print(arquivo[:1100]) # Imprime os primeiros 500 caracteres

REGIAO:;S
UF:;SC
ESTACAO:;JOACABA
CODIGO (WMO):;A841
LATITUDE:;-27,16916666
LONGITUDE:;-51,55888888
ALTITUDE:;767,63
DATA DE FUNDACAO:;20/09/07
Data;Hora UTC;PRECIPITAÇÃO TOTAL, HORÁRIO (mm);PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB);PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB);PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB);RADIACAO GLOBAL (Kj/m²);TEMPERATURA DO AR - BULBO SECO, HORARIA (°C);TEMPERATURA DO PONTO DE ORVALHO (°C);TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C);TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C);TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C);TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C);UMIDADE REL. MAX. NA HORA ANT. (AUT) (%);UMIDADE REL. MIN. NA HORA ANT. (AUT) (%);UMIDADE RELATIVA DO AR, HORARIA (%);VENTO, DIREÇÃO HORARIA (gr) (° (gr));VENTO, RAJADA MAXIMA (m/s);VENTO, VELOCIDADE HORARIA (m/s);
2025/01/01;0000 UTC;;;;;;;;;;;;;;;;;;
2025/01/01;0100 UTC;;;;;;;;;;;;;;;;;;
2025/01/01;0200 UTC;;;;;;;;;;;;;;;;;;
2025/01/01;0300 UTC;;;;;;;;;;;;;;;;;;
2


## 1. **Coleta/Aquisição:**

 Simular a ingestão de dados de uma fonte de logs ou transações em tempo real (pode ser um dataset CSV/JSON grande) na camada Raw (Bruta).

In [0]:
from pyspark.sql.types import *

# definicao do schema na camada RAW
schema_inmet = {
    "Data": ("DATA", "string"),
    "Hora UTC": ("HORA_UTC", "string"),
    "PRECIPITAÇÃO TOTAL, HORÁRIO (mm)": ("PRECIPITACAO_TOTAL_MM", "string"),
    "PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)": ("PRESSAO_ATMOSFERICA_ESTACAO_MB", "string"),
    "PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)": ("PRESSAO_ATMOSFERICA_MAX_HORA_ANT_MB", "string"),
    "PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)": ("PRESSAO_ATMOSFERICA_MIN_HORA_ANT_MB", "string"),
    "RADIACAO GLOBAL (Kj/m²)": ("RADIACAO_GLOBAL_KJ_M2", "string"),
    "TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)": ("TEMPERATURA_AR_C", "string"),
    "TEMPERATURA DO PONTO DE ORVALHO (°C)": ("TEMPERATURA_PONTO_ORVALHO_C", "string"),
    "TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)": ("TEMPERATURA_MAX_HORA_ANT_C", "string"),
    "TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)": ("TEMPERATURA_MIN_HORA_ANT_C", "string"),
    "TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)": ("TEMPERATURA_ORVALHO_MAX_HORA_ANT_C", "string"),
    "TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)": ("TEMPERATURA_ORVALHO_MIN_HORA_ANT_C", "string"),
    "UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)": ("UMIDADE_REL_MAX_HORA_ANT_PCT", "string"),
    "UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)": ("UMIDADE_REL_MIN_HORA_ANT_PCT", "string"),
    "UMIDADE RELATIVA DO AR, HORARIA (%)": ("UMIDADE_RELATIVA_PCT", "string"),
    "VENTO, DIREÇÃO HORARIA (gr) (° (gr))": ("VENTO_DIRECAO_GRAUS", "string"),
    "VENTO, RAJADA MAXIMA (m/s)": ("VENTO_RAJADA_MAX_MS", "string"),
    "VENTO, VELOCIDADE HORARIA (m/s)": ("VENTO_VELOCIDADE_MS", "string"),
    "Unnamed: 19": ("COLUNA_EXTRA", "string"),
    "METADADOS": ("METADADOS", "string")
}

In [0]:
import glob
arquivos = glob.glob("/Volumes/aula/python/root/dados/*.CSV")  # Lista os arquivos de uma pasta
qtd = len(arquivos)
print(arquivos[:5])
print(qtd)

['/Volumes/aula/python/root/dados/INMET_CO_DF_A001_BRASILIA_01-01-2025_A_31-10-2025.CSV', '/Volumes/aula/python/root/dados/INMET_CO_DF_A042_BRAZLANDIA_01-01-2025_A_31-10-2025.CSV', '/Volumes/aula/python/root/dados/INMET_CO_DF_A045_AGUAS EMENDADAS_01-01-2025_A_31-10-2025.CSV', '/Volumes/aula/python/root/dados/INMET_CO_DF_A046_GAMA (PONTE ALTA)_01-01-2025_A_31-10-2025.CSV', '/Volumes/aula/python/root/dados/INMET_CO_DF_A047_PARANOA (COOPA-DF)_01-01-2025_A_31-10-2025.CSV']
570


In [0]:
import json
import pandas as pd
import pyspark.sql.functions as F

# DataFrame vazio para acumular os arquivos
df_raw_inmet = None
n = 0
METADADOS = {}

# Loop sobre cada arquivo
for arq in arquivos[:35]:
    print(f" - Processando arquivo {n} de {qtd}: {arq}")

    # acessa o cabeçalho do arquivo e cria um json com os metadados
    with open(arquivos[n], "r", encoding="latin1") as f:
        for i in range(8):
            chave, valor = f.readline().strip().split(";", 1)
            METADADOS[chave.replace(":", "")] = valor
    
    metadata_json = json.dumps(METADADOS, ensure_ascii=False)
    
    df_pd = pd.read_csv(arquivos[n],
                    sep=";",
                    encoding="latin1",
                    header=0,
                    skiprows=8
                )  
    n += 1

    # Convert pandas DataFrame to Spark DataFrame
    df = spark.createDataFrame(df_pd)

    # Adicionar coluna de metadados
    df = df.withColumn("METADADOS", F.lit(metadata_json))

    # Aplica schema correto para o dataframe
    for old_name, (new_name, new_type) in schema_inmet.items():
        df = df.withColumnRenamed(old_name, new_name).withColumn(new_name, F.col(new_name).cast(new_type))
    
    # append
    if df_raw_inmet is None:
        df_raw_inmet = df
    else:
        df_raw_inmet = df_raw_inmet.unionByName(df, allowMissingColumns=True)

print(" ----- - Quantidade de linhas carregadas:", df_raw_inmet.count())

 - Processando arquivo 0 de 570: /Volumes/aula/python/root/dados/INMET_CO_DF_A001_BRASILIA_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 1 de 570: /Volumes/aula/python/root/dados/INMET_CO_DF_A042_BRAZLANDIA_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 2 de 570: /Volumes/aula/python/root/dados/INMET_CO_DF_A045_AGUAS EMENDADAS_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 3 de 570: /Volumes/aula/python/root/dados/INMET_CO_DF_A046_GAMA (PONTE ALTA)_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 4 de 570: /Volumes/aula/python/root/dados/INMET_CO_DF_A047_PARANOA (COOPA-DF)_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 5 de 570: /Volumes/aula/python/root/dados/INMET_CO_GO_A002_GOIANIA_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 6 de 570: /Volumes/aula/python/root/dados/INMET_CO_GO_A003_MORRINHOS_01-01-2025_A_31-10-2025.CSV
 - Processando arquivo 7 de 570: /Volumes/aula/python/root/dados/INMET_CO_GO_A005_PORANGATU_01-01-2025_A_31-10-2025.CSV
 - Processando arqu

In [0]:
# Definindo o caminho de saída (simulação de um Data Lake)
caminho_parquet = "/Volumes/aula/python/root/raw/RAW_INMET"

# Ação: Salvar o DataFrame tratado em Parquet
(
    df_raw_inmet.write
    .mode("overwrite") # Sobrescreve se o diretório já existir
    .option("compression", "snappy") # Snappy é o padrão (bom trade-off)
    .parquet(caminho_parquet)
)

In [0]:
dados = spark.read.parquet("/Volumes/aula/python/root/raw/RAW_INMET")
display(dados.limit(2))

DATA,HORA_UTC,PRECIPITACAO_TOTAL_MM,PRESSAO_ATMOSFERICA_ESTACAO_MB,PRESSAO_ATMOSFERICA_MAX_HORA_ANT_MB,PRESSAO_ATMOSFERICA_MIN_HORA_ANT_MB,RADIACAO_GLOBAL_KJ_M2,TEMPERATURA_AR_C,TEMPERATURA_PONTO_ORVALHO_C,TEMPERATURA_MAX_HORA_ANT_C,TEMPERATURA_MIN_HORA_ANT_C,TEMPERATURA_ORVALHO_MAX_HORA_ANT_C,TEMPERATURA_ORVALHO_MIN_HORA_ANT_C,UMIDADE_REL_MAX_HORA_ANT_PCT,UMIDADE_REL_MIN_HORA_ANT_PCT,UMIDADE_RELATIVA_PCT,VENTO_DIRECAO_GRAUS,VENTO_RAJADA_MAX_MS,VENTO_VELOCIDADE_MS,COLUNA_EXTRA,METADADOS
2025/07/10,0000 UTC,0,9148,9148,9143,,167,92,178,164,93,88,63.0,56.0,61.0,129.0,55,18,,"{""REGIAO"": ""CO"", ""UF"": ""GO"", ""ESTACAO"": ""CRISTALINA (FAZENDA SANTA MONICA)"", ""CODIGO (WMO)"": ""A056"", ""LATITUDE"": ""-16,39944444"", ""LONGITUDE"": ""-47,62583332"", ""ALTITUDE"": ""932"", ""DATA DE FUNDACAO"": ""25/01/18""}"
2025/07/10,0100 UTC,0,9151,9151,9148,,159,87,167,154,93,86,65.0,61.0,62.0,117.0,55,12,,"{""REGIAO"": ""CO"", ""UF"": ""GO"", ""ESTACAO"": ""CRISTALINA (FAZENDA SANTA MONICA)"", ""CODIGO (WMO)"": ""A056"", ""LATITUDE"": ""-16,39944444"", ""LONGITUDE"": ""-47,62583332"", ""ALTITUDE"": ""932"", ""DATA DE FUNDACAO"": ""25/01/18""}"



## 2. Pré-processamento:

Aplicar pelo menos 3 transformações de qualidade (ex: limpeza de nulos, casting de tipos, desduplicação). Mover para a camada Curated (Tratada).


In [0]:
dados = spark.read.parquet("/Volumes/aula/python/root/raw/RAW_INMET")

df_silver = dados.withColumn("DataHora",
                    F.to_timestamp(
                                F.concat_ws(" ", 
                                            F.to_date(F.col("Data"), "yyyy/MM/dd"), # formatando o campo de data
                                            F.date_format(F.concat(F.substring("HORA_UTC",1,2), F.lit(":"), F.substring("HORA_UTC",3,2)), "HHmm") # formatando hora
                                           ),  # junta Data e Hora
                                "yyyy-MM-dd HHmm"
                                )
                    ) \
            .withColumn("Chuva", F.regexp_replace(F.col("PRECIPITACAO_TOTAL_MM"), ",", ".").cast(DoubleType()) ) \
            .withColumn("Temperatura", F.regexp_replace(F.col("TEMPERATURA_AR_C"), ",", ".").cast(DoubleType()) ) \
            .withColumn("Umidade_Perc", F.regexp_replace(F.col("UMIDADE_RELATIVA_PCT"), ",", ".").cast(DoubleType()) ) \
            .withColumn("Vento_m_s", F.regexp_replace(F.col("VENTO_VELOCIDADE_MS"), ",", ".").cast(DoubleType()) ) \
            .withColumn("UF", F.get_json_object(F.col("METADADOS"), "$.UF")) \
            .withColumn("estacao_met", F.get_json_object(F.col("METADADOS"), "$.ESTACAO")) \
            .select("DataHora", "Chuva", "Temperatura", "Umidade_Perc", "Vento_m_s", "uf", "estacao_met")

display(df_silver.limit(2))

DataHora,Chuva,Temperatura,Umidade_Perc,Vento_m_s,uf,estacao_met
2025-07-10T00:00:00.000Z,0.0,16.7,61.0,1.8,GO,CRISTALINA (FAZENDA SANTA MONICA)
2025-07-10T01:00:00.000Z,0.0,15.9,62.0,1.2,GO,CRISTALINA (FAZENDA SANTA MONICA)



## 3. Persistência: 

Utilizar um dos Formatos Open Data (Delta Lake, Apache Iceberg ou Apache Hudi) para armazenar a tabela final.


In [0]:
# Dropar tabela case exista
spark.sql("DROP TABLE IF EXISTS aula.default.clima")

DataFrame[]

In [0]:
# Definindo o caminho de saída (simulação de um Data Lake)
caminho_parquet = "aula.default.clima"

df_silver = df_silver.withColumn("regiao", F.lit("")) # cria uma coluna vazia

df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .partitionBy("UF") \
    .saveAsTable(caminho_parquet)

In [0]:
spark.table("aula.default.clima").limit(2).display()

DataHora,Chuva,Temperatura,Umidade_Perc,Vento_m_s,uf,estacao_met,regiao
2025-07-10T00:00:00.000Z,0.0,16.7,61.0,1.8,GO,CRISTALINA (FAZENDA SANTA MONICA),
2025-07-10T01:00:00.000Z,0.0,15.9,62.0,1.2,GO,CRISTALINA (FAZENDA SANTA MONICA),



## 4. Recurso Avançado (Obrigatório)

Demonstrar uma operação de UPSERT (Update ou Insert) ou DELETE na tabela final.

In [0]:
%sql
-- update na coluna regiao
update aula.default.clima
SET regiao = 
    CASE 
        WHEN uf IN ('AC', 'AM', 'AP', 'PA', 'RO', 'RR', 'TO') THEN 'Norte'
        WHEN uf IN ('AL', 'BA', 'CE', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE') THEN 'Nordeste'
        WHEN uf IN ('DF', 'GO', 'MT', 'MS') THEN 'Centro-Oeste'
        WHEN uf IN ('ES', 'MG', 'RJ', 'SP') THEN 'Sudeste'
        WHEN uf IN ('PR', 'RS', 'SC') THEN 'Sul'
        ELSE 'Região Não Mapeada' 
    END

num_affected_rows
255360


In [0]:
spark.table("aula.default.clima").limit(2).display()

DataHora,Chuva,Temperatura,Umidade_Perc,Vento_m_s,uf,estacao_met,regiao
2025-01-01T00:00:00.000Z,0.0,26.0,56.0,0.9,MS,CAMPO GRANDE,Centro-Oeste
2025-01-01T01:00:00.000Z,0.0,25.7,56.0,1.0,MS,CAMPO GRANDE,Centro-Oeste


In [0]:
spark.table("aula.default.clima").groupBy("UF").count().display()

UF,count
MS,29184
DF,36480
GO,189696


In [0]:
spark.sql("""
    DELETE FROM aula.default.clima
    WHERE UF = 'DF'
""")

DataFrame[num_affected_rows: bigint]

In [0]:
spark.table("aula.default.clima").groupBy("UF").count().display()

UF,count
MS,29184
GO,189696


## RESTORE - trabalhando com versionamento

In [0]:
df = spark.sql("DESCRIBE HISTORY aula.default.clima")  # visualizar o histórico de alterações da tabela
display(df.select("version", "timestamp", "userId", "operation"))

version,timestamp,userId,operation
2,2025-12-03T15:37:31.000Z,8602734324472921,DELETE
1,2025-12-03T15:37:27.000Z,8602734324472921,UPDATE
0,2025-12-03T15:37:23.000Z,8602734324472921,CREATE OR REPLACE TABLE AS SELECT


In [0]:
%sql
RESTORE TABLE aula.default.clima TO VERSION AS OF 1; -- realiza o restore da versão 1

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
1083575,3,0,1,0,174343


In [0]:
spark.table("aula.default.clima").groupBy("UF").count().display()

UF,count
DF,36480
GO,189696
MS,29184


In [0]:
df = spark.sql("DESCRIBE HISTORY aula.default.clima")  # visualizar o histórico de alterações da tabela
display(df.select("version", "timestamp", "userId", "operation"))

version,timestamp,userId,operation
3,2025-12-03T15:37:38.000Z,8602734324472921,RESTORE
2,2025-12-03T15:37:31.000Z,8602734324472921,DELETE
1,2025-12-03T15:37:27.000Z,8602734324472921,UPDATE
0,2025-12-03T15:37:23.000Z,8602734324472921,CREATE OR REPLACE TABLE AS SELECT
