# Código de Desafio Open Weather

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, col, datediff, avg, current_date, date_format
import os

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-spark_2.12:3.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'

In [3]:
spark = SparkSession.builder \
    .appName("Data_Analysis") \
    .config("spark.hadoop.fs.s3a.access.key", "ueVk36Brq2ZX8gCGGj1w") \
    .config("spark.hadoop.fs.s3a.secret.key", "12345678") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://172.29.8.228:9000") \
    .getOrCreate()

In [4]:
minio_bucket_name = "bucketd2d"
minio_path = "transient/historico"

In [5]:
delta_path = f"s3a://{minio_bucket_name}/{minio_path}"

In [6]:
merged_df = spark.read.format("delta").load(delta_path)

In [7]:
merged_df = merged_df.withColumn("data_hora", from_unixtime("dt"))

In [8]:
merged_df = merged_df.withColumn("dias_desde_hoje", datediff(current_date(), col("data_hora")))

In [9]:
filtered_df = merged_df.filter((col("dias_desde_hoje") >= 0) & (col("dias_desde_hoje") <= 30))

# Analisar a temperatura máxima e mínima para cada cidade em um período de 30 dias e exibir os resultados em uma tabela SQL.

In [10]:
result = filtered_df.groupBy("city").agg(avg("tempe_max").alias("media_tempe_max"), avg("tempe_min").alias("media_tempe_min"))

In [11]:
result.show()

+---------+---------------+---------------+
|     city|media_tempe_max|media_tempe_min|
+---------+---------------+---------------+
|Itabaiana|         27.058|         27.058|
|  Lagarto|         27.442|         27.442|
|  Aracaju|          27.97|          27.97|
+---------+---------------+---------------+



In [12]:
spark.stop()

# Identificar a cidade mais quente e a cidade mais fria em um período de 30 dias e exibir os resultados em uma tabela SQ

In [None]:
hottest_city = result.orderBy(col("media_tempe_max").desc()).limit(1)
coldest_city = result.orderBy(col("media_tempe_min")).limit(1)

In [None]:
hottest_city.show()
coldest_city.show()

In [None]:
resultado:

+-------+---------------+---------------+
|   city|media_tempe_max|media_tempe_min|
+-------+---------------+---------------+
|Aracaju|          27.97|          27.97|
+-------+---------------+---------------+

+---------+---------------+---------------+
|     city|media_tempe_max|media_tempe_min|
+---------+---------------+---------------+
|Itabaiana|         27.058|         27.058|
+---------+---------------+---------------+

In [None]:
spark.stop()

# Calcular a média da temperatura para cada dia em um período de 30 dias e exibir os resultados em uma tabela SQL.

In [None]:
filtered_df = filtered_df.withColumn("data", date_format(col("data_hora"), "yyyy-MM-dd"))
result = filtered_df.groupBy("city", "data").agg(avg("tempe_max").alias("media_tempe_max"), avg("tempe_min").alias("media_tempe_min"))

In [None]:
result.show()
+---------+----------+---------------+---------------+
|     city|      data|media_tempe_max|media_tempe_min|
+---------+----------+---------------+---------------+
|Itabaiana|2023-11-04|          34.55|          34.55|
|Itabaiana|2023-11-07|          22.68|          22.68|
|  Aracaju|2023-11-08|          26.97|          26.97|
|  Lagarto|2023-11-08|          23.44|          23.44|
|  Aracaju|2023-11-05|          28.97|          28.97|
|  Lagarto|2023-11-05|          32.03|          32.03|
|Itabaiana|2023-11-05|           31.9|           31.9|
|Itabaiana|2023-11-08|          23.08|          23.08|
|  Lagarto|2023-11-04|          35.33|          35.33|
|  Aracaju|2023-11-07|          26.97|          26.97|
|  Aracaju|2023-11-04|          29.97|          29.97|
|  Lagarto|2023-11-07|          22.97|          22.97|
+---------+----------+---------------+---------------+

In [None]:
spark.stop()

# Identificar as cidades com as maiores e menores variações de temperatura em um período de 30 dias e exibir os resultados em uma tabela SQL.


In [None]:
result = filtered_df.groupBy("city", "data").agg(avg("tempe_max").alias("media_tempe_max"), avg("tempe_min").alias("media_tempe_min"))
result = result.withColumn("var_temp", col("media_tempe_max") - col("media_tempe_min"))

In [None]:
max_var_city = result.orderBy(col("var_temp").desc()).limit(1)
min_var_city = result.orderBy(col("var_temp")).limit(1)

In [None]:
max_var_city.show()
min_var_city.show()

Resultado:

+---------+----------+---------------+---------------+--------+
|     city|      data|media_tempe_max|media_tempe_min|var_temp|
+---------+----------+---------------+---------------+--------+
|Lagarto|2023-11-04|         35.33|          35.33|     0.0|
+---------+----------+---------------+---------------+--------+

+---------+----------+---------------+---------------+--------+
|     city|      data|media_tempe_max|media_tempe_min|var_temp|
+---------+----------+---------------+---------------+--------+
|Itabaiana|2023-11-07|          22.68|          22.68|     0.0|
+---------+----------+---------------+---------------+--------+

In [None]:
spark.stop()

# Obter a previsão do tempo para uma lista de cidades do seu estado nos próximos 7 dias e armazenar os resultados em uma tabela SQL.

In [None]:
filtered_df = merged_df.filter((col("dias_desde_hoje") >= 0) & (col("dias_desde_hoje") <= 7))
filtered_df.show()

Resultado:
+---------+-----+---------+---------+------+------+----------+--------+---------------+--------------------+-------+------+----+----------+-------------------+---------------+----------+
|     city|tempo|tempe_max|tempe_min|nuvens|codígo|        dt|timezone|main_feels_like|     weather_explode|     id|  main|icon|weather_id|          data_hora|dias_desde_hoje|      data|
+---------+-----+---------+---------+------+------+----------+--------+---------------+--------------------+-------+------+----+----------+-------------------+---------------+----------+
|Itabaiana|34.55|    34.55|    34.55|    31|   200|1699107280|  -10800|          35.85|{nuvens dispersas...|3460974|Clouds| 03d|       802|2023-11-04 14:14:40|              4|2023-11-04|
|Itabaiana|22.68|    22.68|    22.68|    49|   200|1699321585|  -10800|          23.33|{nuvens dispersas...|3460974|Clouds| 03n|       802|2023-11-07 01:46:25|              1|2023-11-07|
|  Lagarto|25.03|    25.03|    25.03|    50|   200|1699477366|  -10800|          25.52|{nuvens dispersas...|3459342|Clouds| 03n|       802|2023-11-08 21:02:46|              0|2023-11-08|
|  Aracaju|26.97|    26.97|    26.97|    40|   200|1699477394|  -10800|          29.49|{nuvens dispersas...|3471872|Clouds| 03n|       802|2023-11-08 21:03:14|              0|2023-11-08|
|  Aracaju|28.97|    28.97|    28.97|    40|   200|1699185278|  -10800|          33.42|{nuvens dispersas...|3471872|Clouds| 03d|       802|2023-11-05 11:54:38|              3|2023-11-05|
|  Lagarto|32.03|    32.03|    32.03|    39|   200|1699185545|  -10800|          33.95|{nuvens dispersas...|3459342|Clouds| 03d|       802|2023-11-05 11:59:05|              3|2023-11-05|
|Itabaiana| 31.9|     31.9|     31.9|    23|   200|1699185544|  -10800|           33.5|{algumas nuvens, ...|3460974|Clouds| 02d|       801|2023-11-05 11:59:04|              3|2023-11-05|
|Itabaiana|24.62|    24.62|    24.62|    21|   200|1699477380|  -10800|          25.07|{algumas nuvens, ...|3460974|Clouds| 02n|       801|2023-11-08 21:03:00|              0|2023-11-08|
|  Aracaju|26.97|    26.97|    26.97|    20|   200|1699409224|  -10800|          29.96|{algumas nuvens, ...|3471872|Clouds| 02n|       801|2023-11-08 02:07:04|              0|2023-11-08|
|  Lagarto|35.33|    35.33|    35.33|    24|   200|1699107281|  -10800|          36.93|{algumas nuvens, ...|3459342|Clouds| 02d|       801|2023-11-04 14:14:41|              4|2023-11-04|
|  Aracaju|26.97|    26.97|    26.97|    20|   200|1699321682|  -10800|          29.96|{algumas nuvens, ...|3471872|Clouds| 02n|       801|2023-11-07 01:48:02|              1|2023-11-07|
|Itabaiana|21.54|    21.54|    21.54|     8|   200|1699409210|  -10800|          22.15|{céu limpo, 01n, ...|3460974| Clear| 01n|       800|2023-11-08 02:06:50|              0|2023-11-08|
|  Lagarto|22.97|    22.97|    22.97|    60|   200|1699321693|  -10800|          23.65|{nublado, 04n, 80...|3459342|Clouds| 04n|       803|2023-11-07 01:48:13|              1|2023-11-07|
|  Aracaju|29.97|    29.97|    29.97|    75|   200|1699107189|  -10800|          34.97|{nublado, 04d, 80...|3471872|Clouds| 04d|       803|2023-11-04 14:13:09|              4|2023-11-04|
|  Lagarto|21.85|    21.85|    21.85|    15|   200|1699409236|  -10800|          22.49|{chuva leve, 10n,...|3459342|  Rain| 10n|       500|2023-11-08 02:07:16|              0|2023-11-08|
+---------+-----+---------+---------+------+------+----------+--------+---------------+--------------------+-------+------+----+----------+-------------------+---------------+----------+


# Identificar a cidade com a maior quantidade de dias chuvosos em um período de 30 dias e exibir o resultado em uma tabela SQL.

In [None]:
rainy_days_df = filtered_df.filter(expr("lower(weather_explode.description) like '%chuva%'"))
rainy_days_count = rainy_days_df.groupBy("city").count()
max_rainy_days_city = rainy_days_count.orderBy(col("count").desc()).first()

In [None]:
max_rainy_days_city_name = max_rainy_days_city["city"]
print(f"A cidade com a maior quantidade de dias chuvosos é: {max_rainy_days_city_name}")

Resultado:
A cidade com a maior quantidade de dias chuvosos é: Lagarto

In [None]:
spark.stop()