In [1]:
import socket, os
from pyspark.sql import SparkSession


spark_hostname = socket.gethostname()
spark_ip_address = socket.gethostbyname(spark_hostname)
minio_ip_address = "62.72.11.215"

## DEFINIDO O MESMO IP POIS OS SERVIÇOS VÃO EXECUTAR NO MESMO SERVIDOR
print(f"SPARK: {spark_hostname} - {spark_ip_address}")
print(f"MINIO: {minio_ip_address}")

spark = (
    SparkSession.builder
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1,com.amazonaws:aws-java-sdk-bundle:1.12.469,org.apache.hadoop:hadoop-aws:3.3.4")

    # CONFIGURA EXTENSÃO DO DELTA
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

    # LIMITA USO DE CORE E MEMORIA POR EXECUTOR
    .config("spark.cores.max", "2")
    .config("spark.executor.memory", "4g")

    # CONFIGURAÇÃO PARA COMUNICAR COM PROTOCOLO S3
    .config("spark.driver.bindAddress", f"{spark_hostname}")
    .config("spark.driver.host", f"{spark_ip_address}")
    .config("spark.hadoop.fs.s3a.access.key", "minio")
    .config("spark.hadoop.fs.s3a.secret.key", "minio123")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.endpoint", f"http://{minio_ip_address}:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")

    # CRIA SESSÃO DO SPARK
    .master(f"spark://{spark_ip_address}:7077")
    .appName("rascunho")
    .getOrCreate()
)

# hadoop_base_path = os.getenv("HADOOP_HOME").replace("\\", "/")
# hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
# hadoop_config.set("driver.extraClassPath", f"{hadoop_base_path}/lib/native/hadoop-aws-3.3.1.jar:{hadoop_base_path}/lib/native/aws-java-sdk-1.12.153")

# for key, value in [(k.replace("spark.hadoop.", ""), v) for k, v in spark.sparkContext.getConf().getAll() if k.find("hadoop") != -1]:
#     hadoop_config.set(key, value)

SPARK: note_rns - 192.168.18.118
MINIO: 62.72.11.215


# SALVE RAW

In [None]:
municipios = spark.read.load(
    path="s3a://datalake/trusted/municipios_brasileiros", 
    format="delta"
)

municipios.printSchema()

In [None]:
import os
import gzip
import json
# import boto3 
import requests
from uuid import uuid4
from datetime import datetime

# s3 = boto3.resource(
#     's3', 
#     aws_access_key_id="minio",
#     aws_secret_access_key="minio123",
#     endpoint_url=f"http://{minio_ip_address}:9000", 
#     use_ssl=False
# )

years = [
    2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022
]

for year in years:
    base_path = f'C:/Users/Ronildo/Downloads/clima_municipios_brasileiros/{year}'

    if not os.path.exists(base_path):
        os.makedirs(base_path)

    # for row in municipios.filter("NOT e_capital").orderBy("regiao", "codigo_ibge").collect():
    # for row in municipios.filter(~municipios.codigo_ibge.isin(collection)).orderBy("regiao", "codigo_ibge").collect():
    for row in municipios.orderBy("regiao", "codigo_ibge").collect():
        resp = requests.get(f"https://archive-api.open-meteo.com/v1/era5?latitude={row.latitude}&longitude={row.longitude}&start_date={year}-01-01&end_date={year}-12-31&hourly=temperature_2m,relativehumidity_2m,windspeed_10m&timezone=America/Sao_Paulo")
        resp.raise_for_status()

        # (
        #     s3.Object('datalake', f'raw/clima_municipios_brasileiros/2000_2022/{row.regiao}/{datetime.today().strftime("%Y%m%d%H%M%S")}_{uuid4()}.json')
        #     .put(
        #         Body=(bytes(json.dumps({ "responses": resp.json() }).encode('UTF-8')))
        #     )
        # )

        with gzip.open(f'{base_path}/{row.regiao}/{row.codigo_ibge}_{datetime.today().strftime("%Y%m%d%H%M%S")}_{uuid4()}.json.gz', 'wb') as f_out:
            encoded = json.dumps({ "codigo_ibge": row.codigo_ibge, "payload": resp.json() }).encode('utf-8')
            # compressed = gzip.compress(encoded)
            f_out.write(encoded)

    print(f"{year} salvo")

In [None]:
# import gzip
# import glob

# files = glob.glob("C:/Users/Ronildo/Downloads/clima_municipios_brasileiros/2000_2022/Nordeste/*")

# for file in files:
#     with open(file, 'rb') as f_in, \
#         gzip.open(f'{file}.gz', 'wb') as f_out:
#         f_out.writelines(f_in)

# SALVE REFINED

In [2]:
clima_municipios = spark.read.json(
    path="s3a://datalake/raw/clima_municipios_brasileiros/2000_2022",
    # path="s3a://datalake/raw/clima_municipios_brasileiros/2000_2022/Centro-Oeste",
    # path="s3a://datalake/raw/clima_municipios_brasileiros/2000_2022/Nordeste",
    # path="s3a://datalake/raw/clima_municipios_brasileiros/2000_2022/Norte",
    # path="s3a://datalake/raw/clima_municipios_brasileiros/2000_2022/Sudeste",
    # path="s3a://datalake/raw/clima_municipios_brasileiros/2000_2022/Sul",
    # path="s3a://datalake/raw/clima_municipios_brasileiros/2022",
    recursiveFileLookup=True
)

clima_municipios.createOrReplaceTempView("clima_municipios")
clima_municipios.printSchema()

root
 |-- codigo_ibge: long (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- elevation: double (nullable = true)
 |    |-- generationtime_ms: double (nullable = true)
 |    |-- hourly: struct (nullable = true)
 |    |    |-- relativehumidity_2m: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- temperature_2m: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- time: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- windspeed_10m: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |-- hourly_units: struct (nullable = true)
 |    |    |-- relativehumidity_2m: string (nullable = true)
 |    |    |-- temperature_2m: string (nullable = true)
 |    |    |-- time: string (nullable = true)
 |    |    |-- windspeed_10m: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longit

In [None]:
# refined - TEMPORALIDADE
refined = spark.sql("""
    WITH _time AS (
        SELECT DISTINCT
            codigo_ibge,
            posexplode(payload.hourly.time) AS (idx, time),
            payload.elevation,
            payload.latitude,
            payload.longitude,
            payload.timezone,
            payload.timezone_abbreviation,
            payload.utc_offset_seconds
        FROM clima_municipios
    )
    SELECT DISTINCT
        _time.idx,
        _time.codigo_ibge,
        CAST(_time.time AS TIMESTAMP) AS time,
        _time.elevation,
        _time.latitude,
        _time.longitude,
        _time.timezone,
        _time.timezone_abbreviation,
        _time.utc_offset_seconds
    FROM _time
""")

In [3]:
# refined - HUMIDADE RELATIVA
refined = spark.sql("""
    WITH _relativehumidity AS (
        SELECT DISTINCT
            codigo_ibge,
            posexplode(payload.hourly.relativehumidity_2m) AS (idx, relativehumidity_2m), 
            payload.hourly_units.relativehumidity_2m AS unit_relativehumidity_2m
        FROM clima_municipios
    )
    SELECT DISTINCT
        _relativehumidity.idx,
        _relativehumidity.codigo_ibge,
        _relativehumidity.relativehumidity_2m,
        _relativehumidity.unit_relativehumidity_2m
    FROM _relativehumidity
""")

In [5]:
# refined - TEMPERATURA
refined = spark.sql("""
    WITH _temperature AS (
        SELECT DISTINCT
            codigo_ibge,
            posexplode(payload.hourly.temperature_2m) AS (idx, temperature_2m),
            payload.hourly_units.temperature_2m AS unit_temperature_2m
        FROM clima_municipios
    )
    SELECT DISTINCT
        _temperature.idx,
        _temperature.codigo_ibge,
        _temperature.temperature_2m,
        _temperature.unit_temperature_2m
    FROM _temperature
""")

In [7]:
# refined - VELOCIDADE DO VENTO
refined = spark.sql("""
    WITH _windspeed AS (
        SELECT DISTINCT
            codigo_ibge,
            posexplode(payload.hourly.windspeed_10m) AS (idx, windspeed_10m),
            payload.hourly_units.windspeed_10m AS unit_windspeed_10m
        FROM clima_municipios
    )
    SELECT DISTINCT
        _windspeed.idx,
        _windspeed.codigo_ibge,
        _windspeed.windspeed_10m,
        _windspeed.unit_windspeed_10m
    FROM _windspeed
""")

In [None]:
# # refined - FULL QUERY
# refined = spark.sql("""
#     WITH _time AS (
#         SELECT DISTINCT
#             codigo_ibge,
#             posexplode(payload.hourly.time) AS (idx, time),
#             payload.elevation,
#             payload.latitude,
#             payload.longitude,
#             payload.timezone,
#             payload.timezone_abbreviation,
#             payload.utc_offset_seconds,                    
#             payload.hourly AS hourly,                    
#             payload.hourly_units AS hourly_units
#         FROM clima_municipios
#     ), _relativehumidity AS (
#         SELECT DISTINCT
#             codigo_ibge,
#             time,
#             posexplode(hourly.relativehumidity_2m) AS (idx, relativehumidity_2m), 
#             hourly_units.relativehumidity_2m AS unit_relativehumidity_2m
#         FROM _time        
#         WHERE CAST(time AS DATE) BETWEEN '2022-01-01' AND '2022-12-31'
#     ), _temperature AS (
#         SELECT DISTINCT
#             codigo_ibge,
#             time,
#             posexplode(hourly.temperature_2m) AS (idx, temperature_2m),
#             hourly_units.temperature_2m AS unit_temperature_2m
#         FROM _time
#         WHERE CAST(time AS DATE) BETWEEN '2022-01-01' AND '2022-12-31'
#     ), _windspeed AS (
#         SELECT DISTINCT
#             codigo_ibge,
#             time,
#             posexplode(hourly.windspeed_10m) AS (idx, windspeed_10m),
#             hourly_units.windspeed_10m AS unit_windspeed_10m
#         FROM _time
#         WHERE CAST(time AS DATE) BETWEEN '2022-01-01' AND '2022-12-31'
#     )
#     SELECT DISTINCT
#         _time.codigo_ibge,
#         CAST(_time.time AS TIMESTAMP) AS time,
#         _relativehumidity.relativehumidity_2m,
#         _relativehumidity.unit_relativehumidity_2m,
#         _temperature.temperature_2m,
#         _temperature.unit_temperature_2m,
#         _windspeed.windspeed_10m,
#         _windspeed.unit_windspeed_10m,
#         _time.elevation,
#         _time.latitude,
#         _time.longitude,
#         _time.timezone,
#         _time.timezone_abbreviation,
#         _time.utc_offset_seconds
#     FROM _time
#     INNER JOIN _relativehumidity ON _time.codigo_ibge = _relativehumidity.codigo_ibge AND _time.time = _relativehumidity.time AND _time.idx = _relativehumidity.idx
#     INNER JOIN _temperature ON _time.codigo_ibge = _temperature.codigo_ibge AND _time.time = _temperature.time AND _time.idx = _temperature.idx
#     INNER JOIN _windspeed ON _time.codigo_ibge = _windspeed.codigo_ibge AND _time.time = _windspeed.time AND _time.idx = _windspeed.idx
#     ORDER BY time
# """)

In [8]:
#INSERT

# refined.write.parquet("s3a://datalake/refined/clima_municipios_brasileiros/2000_2022/Centro-Oeste", mode="overwrite")
# refined.write.parquet("s3a://datalake/refined/clima_municipios_brasileiros/2000_2022/Nordeste")
# refined.write.parquet("s3a://datalake/refined/clima_municipios_brasileiros/2000_2022/Norte")
# refined.write.parquet("s3a://datalake/refined/clima_municipios_brasileiros/2000_2022/Sudeste")
# refined.write.parquet("s3a://datalake/refined/clima_municipios_brasileiros/2000_2022/Sul")

# (
#     refined.write
#     .option("overwriteSchema", "true")
#     .format("delta")
#     .mode("overwrite")
#     .save("s3a://datalake/refined/clima_municipios_brasileiros")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/tempo/2000_2022")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/humidade/2000_2022")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/temperatura/2000_2022")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/velocidade_do_vento/2000_2022")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/tempo")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/humidade")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/temperatura")
#     # .save("s3a://datalake/refined/clima_municipios_brasileiros/velocidade_do_vento")
# )

(
    refined.write
    .format("parquet")
    .mode("overwrite")
    # .save("s3a://datalake/refined/clima_municipios_brasileiros/temporalidade/2000_2022")
    # .save("s3a://datalake/refined/clima_municipios_brasileiros/humidade/2000_2022")
    # .save("s3a://datalake/refined/clima_municipios_brasileiros/temperatura/2000_2022")
    .save("s3a://datalake/refined/clima_municipios_brasileiros/velocidade_do_vento/2000_2022")
)

In [None]:
# UPSERT

from delta.tables import *

refined_old = DeltaTable.forPath(spark, "s3a://datalake/refined/clima_municipios_brasileiros/tempo")

(
    refined_old.alias("old")
    .merge(
        source=refined.alias("new"), 
        condition="old.codigo_ibge = new.codigo_ibge AND old.time = new.time"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

# SALVE TRUSTED

In [None]:
spark.read.load(path="s3a://datalake/refined/clima_municipios_brasileiros/temporalidade/2000_2022", format="parquet") \
    .createOrReplaceTempView("clima_municipios_temporalidade")
spark.read.load(path="s3a://datalake/refined/clima_municipios_brasileiros/humidade/2000_2022", format="parquet") \
    .createOrReplaceTempView("clima_municipios_humidade")
spark.read.load(path="s3a://datalake/refined/clima_municipios_brasileiros/temperatura/2000_2022", format="parquet") \
    .createOrReplaceTempView("clima_municipios_temperatura")
spark.read.load(path="s3a://datalake/refined/clima_municipios_brasileiros/velocidade_do_vento/2000_2022", format="parquet") \
    .createOrReplaceTempView("clima_municipios_velocidade_do_vento")

trusted = spark.sql(
    """
    SELECT
        tm.codigo_ibge,
        CAST(tm.time AS TIMESTAMP) AS data_hora,
        hm.relativehumidity_2m AS humidade_relativa,
        hm.unit_relativehumidity_2m AS unidade_humidade_relativa,
        tp.temperature_2m AS temperatura,
        tp.unit_temperature_2m AS unidade_temperatura,
        vv.windspeed_10m AS velocidade_do_vento,
        vv.unit_windspeed_10m AS unidade_velocidade_do_vento,
        tm.elevation AS elevacao,
        tm.latitude,
        tm.longitude,
        tm.timezone AS fuso_horario,
        tm.timezone_abbreviation AS abreviacao_fuso_horario,
        tm.utc_offset_seconds AS deslocamento_fuso_horario_em_segundos,
        current_timestamp() AS gerado_em
    FROM clima_municipios_tempo tm
    INNER JOIN clima_municipios_humidade hm ON hm.codigo_ibge = tm.codigo_ibge AND hm.idx = tm.idx
    INNER JOIN clima_municipios_temperatura tp ON tp.codigo_ibge = tm.codigo_ibge AND tp.idx = tm.idx
    INNER JOIN clima_municipios_velocidade_do_vento vv ON vv.codigo_ibge = tm.codigo_ibge AND vv.idx = tm.idx
    """
)

In [None]:
trusted = (
    trusted
    .withMetadata("codigo_ibge", { "tipo": "NUMERICO", "descricao": "Código composto de 7 dígitos, sendo os dois primeiros referentes ao código da Unidade da Federação" })
    .withMetadata("data_hora", { "tipo": "DATA E HORA", "descricao": "Data e hora dos dados meteorológicos" })
    .withMetadata("humidade_relativa", { "tipo": "NUMERICO", "descricao": "Umidade relativa a 2 metros acima do solo" })
    .withMetadata("unidade_humidade_relativa", { "tipo": "TEXTO", "descricao": "Unidade de medida da umidade relativa" })
    .withMetadata("temperatura", { "tipo": "NUMERICO", "descricao": "Temperatura do ar a 2 metros acima do solo" })
    .withMetadata("unidade_temperatura", { "tipo": "TEXTO", "descricao": "Unidade de medida da temperatura do ar" })
    .withMetadata("velocidade_do_vento", { "tipo": "NUMERICO", "descricao": "Velocidade do vento a 10 metros acima do solo" })
    .withMetadata("unidade_velocidade_do_vento", { "tipo": "TEXTO", "descricao": "Unidade de medida da velocidade do vento" })
    .withMetadata("elevacao", { "tipo": "NUMERICO", "descricao": "A elevação usada para redução de escala estatística. Por padrão, um modelo de elevação digital de 90 metros é usado" })
    .withMetadata("latitude", { "tipo": "NUMERICO", "descricao": "Latitude do município" })
    .withMetadata("longitude", { "tipo": "NUMERICO", "descricao": "Longitude do município" })
    .withMetadata("fuso_horario", { "tipo": "TEXTO", "descricao": "Fuso horário considerado para o campo time" })
    .withMetadata("abreviacao_fuso_horario", { "tipo": "TEXTO", "descricao": "Abreviação do fuso horário" })
    .withMetadata("deslocamento_fuso_horario_em_segundos", { "tipo": "NUMERICO", "descricao": "Deslocamento do fuso horário em segundos" })
    .withMetadata("gerado_em", { "tipo": "DATA E HORA", "descricao": "Identifica a data e hora que o registro foi gerado pela orquestração" })
)

# (
#     trusted.write
#     .option("overwriteSchema", "true")
#     .format("delta")
#     .mode("overwrite")
#     .save("s3a://datalake/trusted/clima_municipios_brasileiros")
# )

(
    trusted.write
    .format("parquet")
    .mode("overwrite")    
    .partitionBy("codigo_ibge", "data_hora")
    .save("s3a://datalake/trusted/clima_municipios_brasileiros")
)