## Overview

ETL - DADOS HISTÓRICOS ANUAIS - 2023

A missão do Instituto Nacional de Meteorologia (INMET), órgão do Ministério da Agricultura e Pecuária, é agregar valor à produção no Brasil por meio de informações meteorológicas. Esta missão é alcançada por meio de monitoramento, análise e previsão de tempo e de clima, que se fundamentam em pesquisa aplicada, trabalho em parceria e compartilhamento do conhecimento, com ênfase em resultados práticos e confiáveis.

https://portal.inmet.gov.br/dadoshistoricos


Arquivo utilizado : INMET_CO_DF_A001_BRASILIA_01-01-2023_A_31-12-2023.CSV

Primeiras sete linhas do arquivo:

REGIAO:	CO
UF:	DF
ESTACAO:	BRASILIA
CODIGO (WMO):	A001
LATITUDE:	-15,78944444
LONGITUDE:	-47,92583332
ALTITUDE:	1160,96
DATA DE FUNDACAO:	07/05/2000





## Preparação do ambiente

In [0]:
import requests
from pyspark.dbutils import DBUtils
from pyspark.sql.functions import dayofyear, expr, col, avg, round, regexp_replace, year

# Extração

## Download

In [0]:
from pyspark.dbutils import DBUtils

dbutils = DBUtils(spark)

# URL do arquivo de origem
url = 'https://portal.inmet.gov.br/uploads/dadoshistoricos/2023.zip'

# Caminho na pasta no DBFS onde o arquivo será salvo
dbfs_path = '/FileStore/tables/downloads/inmet_2023.zip'

# Baixa o arquivo
r = requests.get(url, allow_redirects=True)
open('/tmp/inmet_2023.zip', 'wb').write(r.content)

# Copia o arquivo para o DBFS
dbutils.fs.cp('file:/tmp/inmet_2023.zip', dbfs_path)



Out[14]: True

### Descompactação dos dados

O arquivo com as medições meteriológicas do INMET são disponililizadas compactadas e contém os dados separados em um arquivo para cada estação meteriológica. Então nas quatro celulas a seguir faço as seguintes tarefas.

1º - Descompacto o arquivo zip na pasta /tmp do cluster

2º - Localizo o arquivo de interesse (visualmente mesmo)

3º - Removo as 8 primeiras linhas do arquivo que posuem duas colunas com informações relativas a identificação da estação que efetuaou as medições.

4º - Cópio o arquivo para a pasta de donloads no meu DBFS

In [0]:
%sh 
rm /tmp/inmet_descompactado/*
unzip /tmp/inmet_2023.zip -d /tmp/inmet_descompactado

Archive:  /tmp/inmet_2023.zip
  inflating: /tmp/inmet_descompactado/INMET_CO_DF_A001_BRASILIA_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_DF_A042_BRAZLANDIA_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_DF_A045_AGUAS EMENDADAS_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_DF_A046_GAMA (PONTE ALTA)_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_DF_A047_PARANOA (COOPA-DF)_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_GO_A002_GOIANIA_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_GO_A003_MORRINHOS_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_GO_A005_PORANGATU_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_GO_A011_SAO SIMAO_01-01-2023_A_31-12-2023.CSV  
  inflating: /tmp/inmet_descompactado/INMET_CO_GO_A012_LUZIANIA_01-01-2023_A_31-12-2023.CSV  


In [0]:
%sh tail -n +9 /tmp/inmet_descompactado/INMET_CO_DF_A001_BRASILIA_01-01-2023_A_31-12-2023.CSV > temp && mv temp /tmp/inmet_descompactado/INMET_CO_DF_A001_BRASILIA_01-01-2023_A_31-12-2023.CSV

In [0]:
# Caminho na pasta no DBFS onde o arquivo será salvo
dbfs_path_d = '/FileStore/tables/downloads/inmet_brasilia_2023.csv'

# Copiando o arquivo para a pasta apropriada no DBFS
dbutils.fs.cp('file:/tmp/inmet_descompactado/INMET_CO_DF_A001_BRASILIA_01-01-2023_A_31-12-2023.CSV', dbfs_path_d)

Out[17]: True

# Transformação

## Processamanto

In [0]:
# Iniciando variáveis
file_location = "/FileStore/tables/downloads/inmet_brasilia_2023.csv"
file_type = "csv"

# Opções para o CSV
infer_schema = "True"
first_row_is_header = "True"
delimiter = ";"

# Criando do dataframe com o arquivos baixado
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Remove colunas que não serão usadas

# colunas = df.columns
# print(f'Nomes das colunas: {colunas}')

df = df.drop('PRECIPITA��O TOTAL, HOR�RIO (mm)', 'PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)', 'PRESS�O ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)', 'PRESS�O ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)', 'RADIACAO GLOBAL (Kj/m�)', 'TEMPERATURA DO PONTO DE ORVALHO (�C)', 'TEMPERATURA M�XIMA NA HORA ANT. (AUT) (�C)', 'TEMPERATURA M�NIMA NA HORA ANT. (AUT) (�C)', 'TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (�C)', 'TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (�C)', 'UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)', 'UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)',  'VENTO, DIRE��O HORARIA (gr) (� (gr))', 'VENTO, RAJADA MAXIMA (m/s)', 'VENTO, VELOCIDADE HORARIA (m/s)', '_c19')

# Renomeia colunas que serão utilizadas

df = df.withColumnRenamed("Data", "data")
df = df.withColumnRenamed("Hora UTC", "HoraUTC")
df = df.withColumnRenamed("TEMPERATURA DO AR - BULBO SECO, HORARIA (�C)", "temperaturaArC")
df = df.withColumnRenamed("UMIDADE RELATIVA DO AR, HORARIA (%)", "umidadeArPerc")

df = df.withColumn("temperaturaArC", regexp_replace(col("temperaturaArC"), ",", ".")) # Substitui ',' por '.' para viabilizar calculo da mediana


In [0]:
# Adicionar uma nova coluna de ID que é composta pelo dia do ano e o ano.
df = df.withColumn("ClimaID", (dayofyear(df["Data"])*10000) + year(df["Data"]) )

In [0]:
# Ajusta o tipo de dado usano nas colunas de temperatura e umidade.

df = df.withColumn("temperaturaArC", col("temperaturaArC").cast("float"))
df = df.withColumn("umidadeArPerc", col("umidadeArPerc").cast("int"))

# Calcula as medianas relativas as medições de 24 horas da estação.
df_medianas = df.groupBy("climaID", "data").agg(
    round(expr("percentile_approx(TemperaturaArC, 0.5)"), 1).alias("medianaTemperaturaArC"),
    expr("percentile_approx(UmidadeArPerc, 0.5)").alias("medianaUmidadeArPerc")
)

### Teste da tabela DimClima

In [0]:
# Cria uma tabela temporária

temp_table_name = "DimClima"

df_medianas.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

SELECT * FROM DimClima LIMIT 20;

climaID,data,medianaTemperaturaArC,medianaUmidadeArPerc
12023,2023-01-01,20.1,83
22023,2023-01-02,22.5,69
32023,2023-01-03,19.7,89
42023,2023-01-04,19.7,84
52023,2023-01-05,19.7,84
62023,2023-01-06,19.5,90
72023,2023-01-07,19.1,89
82023,2023-01-08,19.8,84
92023,2023-01-09,19.0,86
102023,2023-01-10,21.1,71


# Carregamento (LOAD)

## Persistindo os dados

In [0]:
# Apaga arquivo anteriror, se houver.

dbutils.fs.rm("dbfs:/FileStore/tables/DW/DimClima.parquet", recurse=True)

Out[23]: True

In [0]:
# Define caminho no DBFS
caminho_dbfs = "dbfs:/FileStore/tables/DW/DimClima.parquet"

# Salva o DataFrame no formato Parquet no DBFS com cabeçalho
df_medianas.write.format("delta").mode("overwrite").option("header", True).save(caminho_dbfs)
