In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, abs, to_timestamp, hour, expr, when
import os

In [3]:
# MinIO/S3 configs
minio_endpoint = os.environ['MINIO_ENDPOINT']
minio_user = os.environ['MINIO_ROOT_USER']
minio_password = os.environ['MINIO_ROOT_PASSWORD']
bucket = "raw"
bucket_refined = "refined"

In [4]:
# Spark session
spark = (
    SparkSession
     .builder
    .master("spark://spark-master:7077")
    .appName("Weather Anomaly Detection - Notebook")
    .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint)
    .config("spark.hadoop.fs.s3a.access.key", minio_user)
    .config("spark.hadoop.fs.s3a.secret.key", minio_password)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.11.1026.jar")
    .getOrCreate()
)
         

spark

25/06/12 22:38:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Temperatura

In [89]:
# Analiza os dados de clima (JSON)
df_weather = spark.read.json(f"s3a://{bucket}/weather/*/*.json")
df_weather.printSchema()

# Converte o campo de tempo para timestamp se necessário
df_weather = df_weather.withColumn("timestamp", to_timestamp(col("time")))

# Adiciona coluna com hora do dia
df_weather = df_weather.withColumn("hour", hour("timestamp"))

# Remove outliers extremos (temperatura fora do 5º-95º percentil)
quantiles = df_weather.approxQuantile("temperature", [0.05, 0.95], 0.05)
df_weather = df_weather.filter(
    (col("temperature") >= quantiles[0]) & (col("temperature") <= quantiles[1])
)

# Estatísticas por cidade e hora
stats_weather = df_weather.groupBy("city", "hour").agg(
    mean("temperature").alias("avg_temp"),
    stddev("temperature").alias("std_temp")
)

# Junta as estatísticas no dataframe
df_weather = df_weather.join(stats_weather, on=["city", "hour"])

# Calcula z-score
df_weather = df_weather.withColumn(
    "temp_z_score", abs((col("temperature") - col("avg_temp")) / col("std_temp"))
)

# Classificação da severidade
df_weather = df_weather.withColumn(
    "anomaly_level",
    when(col("temp_z_score") > 3, "extrema")
    .when(col("temp_z_score") > 2.5, "severa")
    .when(col("temp_z_score") > 2, "moderada")
)

# Estatísticas de IQR (por cidade)
iqr_stats = df_weather.groupBy("city").agg(
    expr("percentile_approx(temperature, 0.25)").alias("Q1"),
    expr("percentile_approx(temperature, 0.75)").alias("Q3")
)
df_weather = df_weather.join(iqr_stats, on="city")
df_weather = df_weather.withColumn("IQR", col("Q3") - col("Q1"))

# Detecta anomalias por IQR
df_weather = df_weather.withColumn(
    "is_iqr_anomaly",
    (col("temperature") < (col("Q1") - 1.5 * col("IQR"))) |
    (col("temperature") > (col("Q3") + 1.5 * col("IQR")))
)

# Filtra apenas registros anômalos por z-score ou IQR
df_weather_anomalies = df_weather.filter(
    (col("temp_z_score") > 2) | (col("is_iqr_anomaly") == True)
)

# Exibe os resultados
df_weather_anomalies.select(
    "timestamp", "city", "temperature", "avg_temp", "std_temp",
    "temp_z_score", "anomaly_level", "is_iqr_anomaly"
).show(10, truncate=False)

# Salva os resultados no S3
df_weather_anomalies.write.mode("overwrite").parquet(
    f"s3a://{bucket}/anomalies/weather"
)

                                                                                

root
 |-- city: string (nullable = true)
 |-- interval: long (nullable = true)
 |-- is_day: long (nullable = true)
 |-- temperature: double (nullable = true)
 |-- time: string (nullable = true)
 |-- weathercode: long (nullable = true)
 |-- winddirection: long (nullable = true)
 |-- windspeed: double (nullable = true)



                                                                                

+-------------------+------------------------+-----------+------------------+-------------------+------------------+-------------+--------------+
|timestamp          |city                    |temperature|avg_temp          |std_temp           |temp_z_score      |anomaly_level|is_iqr_anomaly|
+-------------------+------------------------+-----------+------------------+-------------------+------------------+-------------+--------------+
|2025-06-03 21:15:00|SP-São José do Rio Preto|22.0       |20.548484848484847|0.6279808323416396 |2.3114004070835827|moderada     |false         |
|2025-06-03 21:15:00|SP-São José do Rio Preto|22.0       |20.548484848484847|0.6279808323416396 |2.3114004070835827|moderada     |false         |
|2025-06-03 21:15:00|SP-São José do Rio Preto|22.0       |20.548484848484847|0.6279808323416396 |2.3114004070835827|moderada     |false         |
|2025-06-03 22:00:00|SP-São José do Rio Preto|20.4       |19.86458333333334 |0.23394968719865897|2.2885974889635516|moderada

                                                                                

## Qualidade do Ar

In [90]:
# Analisa os dados de qualidade do ar (Parquet)
df_airquality = spark.read.parquet(f"s3a://{bucket}/airquality/*/*/*/*.parquet")
df_airquality.printSchema()

# Lê os dados Parquet da qualidade do ar
df_airquality = spark.read.parquet(f"s3a://{bucket}/airquality/*/*/*/*.parquet")

# Ajusta timestamp se for string
df_airquality = df_airquality.withColumn("timestamp", to_timestamp(col("timestamp")))

# Filtro para apenas parâmetros relevantes de AQI
aqi_params = ["pm25", "pm10", "no2", "o3", "co", "so2"]
df_airquality = df_airquality.filter(col("parameter").isin(aqi_params))

# Adiciona coluna com hora do dia
df_airquality = df_airquality.withColumn("hour", hour("timestamp"))

# Remove valores extremos antes de calcular estatísticas (5º e 95º percentil)
quantiles = df_airquality.approxQuantile("value", [0.05, 0.95], 0.05)
df_airquality = df_airquality.filter(
    (col("value") >= quantiles[0]) & (col("value") <= quantiles[1])
)

# Calcula estatísticas por cidade, parâmetro e hora
stats_hourly = df_airquality.groupBy("city", "parameter", "hour").agg(
    mean("value").alias("avg_value"),
    stddev("value").alias("std_value")
)

# Junta as estatísticas no DataFrame original
df_airquality = df_airquality.join(stats_hourly, on=["city", "parameter", "hour"])

# Calcula Z-score e marca anomalias com severidade
df_airquality = df_airquality.withColumn(
    "aqi_z_score", abs((col("value") - col("avg_value")) / col("std_value"))
)

df_airquality = df_airquality.withColumn(
    "anomaly_level",
    when(col("aqi_z_score") > 3, "extrema")
    .when(col("aqi_z_score") > 2.5, "severa")
    .when(col("aqi_z_score") > 2, "moderada")
)

# Calcula IQR para reforçar detecção robusta de outliers
iqr_stats = df_airquality.groupBy("city", "parameter").agg(
    expr("percentile_approx(value, 0.25)").alias("Q1"),
    expr("percentile_approx(value, 0.75)").alias("Q3")
)
df_airquality = df_airquality.join(iqr_stats, on=["city", "parameter"])
df_airquality = df_airquality.withColumn("IQR", col("Q3") - col("Q1"))

# Marca anomalias com base no IQR (regra de 1.5*IQR)
df_airquality = df_airquality.withColumn(
    "is_iqr_anomaly",
    (col("value") < (col("Q1") - 1.5 * col("IQR"))) |
    (col("value") > (col("Q3") + 1.5 * col("IQR")))
)

# Filtra anomalias detectadas por Z-score OU por IQR
df_airquality_anomalies = df_airquality.filter(
    (col("aqi_z_score") > 2) | (col("is_iqr_anomaly") == True)
)

# Mostra exemplos
df_airquality_anomalies.select(
    "timestamp", "city", "parameter", "value",
    "avg_value", "std_value", "aqi_z_score", "anomaly_level", "is_iqr_anomaly"
).show(10, truncate=False)

# Salva os resultados
df_airquality_anomalies.write.mode("overwrite").parquet(f"s3a://{bucket}/anomalies/airquality")

root
 |-- city: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- value: double (nullable = true)
 |-- unit: string (nullable = true)
 |-- timestamp: timestamp_ntz (nullable = true)
 |-- source: string (nullable = true)

+-------------------+------------------------+---------+-----+------------------+------------------+------------------+-------------+--------------+
|timestamp          |city                    |parameter|value|avg_value         |std_value         |aqi_z_score       |anomaly_level|is_iqr_anomaly|
+-------------------+------------------------+---------+-----+------------------+------------------+------------------+-------------+--------------+
|2025-06-09 17:00:00|SP-Ribeirão Preto       |pm25     |9.0  |15.0              |8.48528137423857  |0.7071067811865476|NULL         |true          |
|2025-06-09 17:00:00|SP-Sorocaba             |no2      |11.9 |11.9              |NULL              |NULL              |NULL         |true          |
|2025-06-03 1

## Combinação

In [91]:
import pandas as pd

# Lê os arquivos de cidades com lat/long
df_cities = pd.read_json("../shared/cities.json", orient="index").reset_index()
df_cities.columns = ["city", "lat", "long"]
cities_dict = df_cities.to_dict(orient="index")

# Converte para DataFrame do Spark
df_cities = spark.createDataFrame(list(cities_dict.values()))
df_cities.show()

+--------------------+--------+--------+
|                city|     lat|    long|
+--------------------+--------+--------+
|        SP-São Paulo|-23.5505|-46.6333|
|        SP-Guarulhos|-23.4545|-46.5333|
|         SP-Campinas|-22.9099|-47.0626|
|SP-São Bernardo d...|-23.6914|-46.5646|
|SP-São José dos C...|-23.1896|-45.8841|
|      SP-Santo André|-23.6639|-46.5383|
|   SP-Ribeirão Preto|-21.1784|-47.8064|
|           SP-Osasco|-23.5329|-46.7926|
|         SP-Sorocaba|-23.5018|-47.4583|
|             SP-Mauá|-23.6677|-46.4613|
|SP-São José do Ri...|-20.8114|-49.3759|
|  SP-Mogi das Cruzes|-23.5208|-46.1854|
|           SP-Santos|-23.9608|-46.3336|
|          SP-Diadema|-23.6813|-46.6228|
|          SP-Jundiaí|-23.1857|-46.8978|
|       SP-Piracicaba|-22.7253|-47.6492|
|      SP-Carapicuíba|-23.5225|-46.8355|
|            SP-Bauru|-22.3145|-49.0586|
|  SP-Itaquaquecetuba|-23.4867|-46.3486|
|           SP-Franca|-20.5382|-47.4009|
+--------------------+--------+--------+
only showing top

In [92]:
# Le os dados de clima (JSON)
df_weather = spark.read.json(f"s3a://{bucket}/weather/*/*.json")
df_weather.printSchema()



root
 |-- city: string (nullable = true)
 |-- interval: long (nullable = true)
 |-- is_day: long (nullable = true)
 |-- temperature: double (nullable = true)
 |-- time: string (nullable = true)
 |-- weathercode: long (nullable = true)
 |-- winddirection: long (nullable = true)
 |-- windspeed: double (nullable = true)



                                                                                

In [93]:
# Lê os dados de qualidade do ar (Parquet)
df_airquality = spark.read.parquet(f"s3a://{bucket}/airquality/*/*/*/*.parquet")
df_airquality.printSchema()

root
 |-- city: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- value: double (nullable = true)
 |-- unit: string (nullable = true)
 |-- timestamp: timestamp_ntz (nullable = true)
 |-- source: string (nullable = true)



In [85]:
df_weather.show(100, truncate=False)

                                                                                

+------------------------+--------+------+-----------+----------------+-----------+-------------+---------+--------+--------+
|city                    |interval|is_day|temperature|time            |weathercode|winddirection|windspeed|lat     |long    |
+------------------------+--------+------+-----------+----------------+-----------+-------------+---------+--------+--------+
|SP-São José do Rio Preto|900     |1     |23.3       |2025-06-03T17:15|95         |173          |9.1      |-20.8114|-49.3759|
|SP-São José do Rio Preto|900     |1     |23.3       |2025-06-03T17:15|95         |173          |9.1      |-20.8114|-49.3759|
|SP-São José do Rio Preto|900     |1     |23.0       |2025-06-03T17:30|95         |162          |9.1      |-20.8114|-49.3759|
|SP-São José do Rio Preto|900     |1     |23.0       |2025-06-03T17:30|95         |162          |9.1      |-20.8114|-49.3759|
|SP-São José do Rio Preto|900     |1     |22.8       |2025-06-03T17:45|95         |149          |8.4      |-20.8114|-4

In [None]:
# Une os dados de clima com as coordenadas das cidades
df_weather = df_weather.join(df_cities, on="city", how="left")
df_weather.write.mode("overwrite").parquet(f"s3a://{bucket_refined}/weather_with_coords")

# Une os dados de qualidade do ar com as coordenadas das cidades
df_airquality = df_airquality.join(df_cities, on="city", how="left")
df_airquality.write.mode("overwrite").parquet(f"s3a://{bucket_refined}/airquality_with_coords")

                                                                                

In [6]:
spark.stop()