# Tempo - Spark

## Preparando o ambiente

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession\
    .builder\
    .appName("Clima Porto Alegre")\
    .getOrCreate()

df = spark.read.json("../data/weather/extraction")
df.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/31 13:24:51 WARN Utils: Your hostname, Salatiels-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.190 instead (on interface en0)
25/12/31 13:24:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/31 13:24:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+------------+--------------------+--------+---------+---------+---------------+--------------------+-----------------+--------+
|     address|                days|latitude|longitude|queryCost|resolvedAddress|            stations|         timezone|tzoffset|
+------------+--------------------+--------+---------+---------+---------------+--------------------+-----------------+--------+
|Porto Alegre|[{20.5, Partially...|-30.0328|   -51.23|        1|   Porto Alegre|{{0.0, 12019.0, S...|America/Sao_Paulo|    -3.0|
+------------+--------------------+--------+---------+---------+---------------+--------------------+-----------------+--------+



In [47]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- days: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cloudcover: double (nullable = true)
 |    |    |-- conditions: string (nullable = true)
 |    |    |-- datetime: string (nullable = true)
 |    |    |-- datetimeEpoch: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- dew: double (nullable = true)
 |    |    |-- feelslike: double (nullable = true)
 |    |    |-- feelslikemax: double (nullable = true)
 |    |    |-- feelslikemin: double (nullable = true)
 |    |    |-- humidity: double (nullable = true)
 |    |    |-- icon: string (nullable = true)
 |    |    |-- moonphase: double (nullable = true)
 |    |    |-- precip: double (nullable = true)
 |    |    |-- precipcover: double (nullable = true)
 |    |    |-- precipprob: double (nullable = true)
 |    |    |-- preciptype: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)


## Transformando dados

### Stations

In [31]:
station_names = df.select("stations.*").columns

df_with_array = df.withColumn(
    "stations_array", 
    F.array(*[F.col(f"stations.{name}") for name in station_names])
)

df_used_stations = df_with_array.select(F.explode("stations_array").alias("station")).select("station.*")
df_stations = df_used_stations.select(['id', 'latitude', 'longitude', 'name'])
df_stations.show()

+----+--------+---------+----+
|  id|latitude|longitude|name|
+----+--------+---------+----+
|SBCO|  -29.95|   -51.15|SBCO|
|SBPA|   -30.0|   -51.18|SBPA|
+----+--------+---------+----+



### Station Quality

In [60]:
processed_date = df.select(F.min(F.array_min("days.datetime"))).collect()[0][0]
location_data = (df
                    .select(['latitude', 'longitude', 'resolvedAddress'])
                    .withColumnsRenamed({
                        'latitude': 'weather_latitude',
                        'longitude': 'weather_longitude',
                        'resolvedAddress': 'weather_address'
                    })
                    .first())
df_station_quality = (df_used_stations
                      .select(['id', 'distance', 'quality'])
                      .withColumnRenamed('id', 'station_id')
                      .withColumns({
                            'processed_date': F.lit(processed_date),
                            'weather_latitude': F.lit(location_data['weather_latitude']),
                            'weather_longitude': F.lit(location_data['weather_longitude']),
                            'weather_address': F.lit(location_data['weather_address'])
                      }))
df_station_quality.show()


+----------+--------+-------+--------------+----------------+-----------------+---------------+
|station_id|distance|quality|processed_date|weather_latitude|weather_longitude|weather_address|
+----------+--------+-------+--------------+----------------+-----------------+---------------+
|      SBCO| 12019.0|     39|    2025-12-31|        -30.0328|           -51.23|   Porto Alegre|
|      SBPA|  6046.0|     50|    2025-12-31|        -30.0328|           -51.23|   Porto Alegre|
+----------+--------+-------+--------------+----------------+-----------------+---------------+



### Days

In [66]:
df.select(F.explode('days').alias('day')).select('day.*').select(
    ['cloudcover', 'conditions', 'datetime', 'description', 'feelslike', 'feelslikemax', 'feelslikemin', 'humidity', 'icon', 'precip',
      'precipcover', 'precipprob', 'preciptype', 'severerisk', 'solarenergy', 'solarradiation', 'temp', 'tempmax', 'tempmin', 'uvindex',
      'winddir', 'windspeed', 'windgust']).withColumnRenamed('datetime', 'date').withColumns({
                            'processed_date': F.lit(processed_date),
                            'weather_latitude': F.lit(location_data['weather_latitude']),
                            'weather_longitude': F.lit(location_data['weather_longitude']),
                            'weather_address': F.lit(location_data['weather_address'])
                      }).show()

+----------+--------------------+----------+--------------------+---------+------------+------------+--------+-----------------+------+-----------+----------+----------+----------+-----------+--------------+----+-------+-------+-------+-------+---------+--------+--------------+----------------+-----------------+---------------+
|cloudcover|          conditions|      date|         description|feelslike|feelslikemax|feelslikemin|humidity|             icon|precip|precipcover|precipprob|preciptype|severerisk|solarenergy|solarradiation|temp|tempmax|tempmin|uvindex|winddir|windspeed|windgust|processed_date|weather_latitude|weather_longitude|weather_address|
+----------+--------------------+----------+--------------------+---------+------------+------------+--------+-----------------+------+-----------+----------+----------+----------+-----------+--------------+----+-------+-------+-------+-------+---------+--------+--------------+----------------+-----------------+---------------+
|      20.