In [0]:
%py

import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
%py

# Definindo o URL da API e fazendo a requisição GET
api_url = 'https://api.weather.gov/gridpoints/MPX/107,66/forecast'
response = requests.get(api_url)

# Verificando se a requisição foi bem-sucedida
if response.status_code == 200:
    data = response.json()
    forecast_data = data['properties']['periods']  # Extraindo as previsões
    forecast_json = json.dumps(forecast_data)  # Convertendo para JSON string
else:
    print(f"Falha ao acessar a API. Status code: {response.status_code}")

In [0]:
%py

# Definindo o schema para os dados
schema = StructType([
    StructField("number", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("startTime", StringType(), True),
    StructField("endTime", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("temperatureUnit", StringType(), True),
    StructField("windSpeed", StringType(), True),
    StructField("windDirection", StringType(), True),
    StructField("shortForecast", StringType(), True),
    StructField("detailedForecast", StringType(), True)
])

# Criando um RDD a partir da string JSON e lendo como DataFrame Spark
rdd = spark.sparkContext.parallelize([forecast_json])
df_bronze = spark.read.json(rdd, schema=schema)
df_bronze.show()  # Exibindo uma amostra dos dados

+------+---------------+--------------------+--------------------+-----------+---------------+------------+-------------+--------------------+--------------------+
|number|           name|           startTime|             endTime|temperature|temperatureUnit|   windSpeed|windDirection|       shortForecast|    detailedForecast|
+------+---------------+--------------------+--------------------+-----------+---------------+------------+-------------+--------------------+--------------------+
|     1| This Afternoon|2024-09-12T17:00:...|2024-09-12T18:00:...|         86|              F|      10 mph|          SSE|        Mostly Sunny|Mostly sunny, wit...|
|     2|        Tonight|2024-09-12T18:00:...|2024-09-13T06:00:...|         63|              F| 5 to 10 mph|           SE|        Mostly Clear|Mostly clear, wit...|
|     3|         Friday|2024-09-13T06:00:...|2024-09-13T18:00:...|         84|              F| 5 to 10 mph|           SE|               Sunny|Sunny, with a hig...|
|     4|   Frida

In [0]:
%py

# Salvando os dados em formato CSV no Bronze Layer
df_bronze.write.csv('/mnt/bronze/forecast_bronze.csv', mode='overwrite', header=True)

print("Dados salvos no Bronze layer.")

Dados salvos no Bronze layer.


In [0]:
%py

# Lendo os dados do CSV bronze
df_bronze = spark.read.csv('/mnt/bronze/forecast_bronze.csv', header=True, inferSchema=True)
df_bronze.show()  # Exibindo uma amostra dos dados

+------+---------------+-------------------+-------------------+-----------+---------------+------------+-------------+--------------------+--------------------+
|number|           name|          startTime|            endTime|temperature|temperatureUnit|   windSpeed|windDirection|       shortForecast|    detailedForecast|
+------+---------------+-------------------+-------------------+-----------+---------------+------------+-------------+--------------------+--------------------+
|     1| This Afternoon|2024-09-12 22:00:00|2024-09-12 23:00:00|         86|              F|      10 mph|          SSE|        Mostly Sunny|Mostly sunny, wit...|
|     2|        Tonight|2024-09-12 23:00:00|2024-09-13 11:00:00|         63|              F| 5 to 10 mph|           SE|        Mostly Clear|Mostly clear, wit...|
|     3|         Friday|2024-09-13 11:00:00|2024-09-13 23:00:00|         84|              F| 5 to 10 mph|           SE|               Sunny|Sunny, with a hig...|
|     4|   Friday Night|2024

In [0]:
%py

# Selecionando colunas específicas para o Silver Layer
df_silver = df_bronze.select("name", "temperature", "windSpeed", "shortForecast")
df_silver.show()  # Exibindo uma amostra dos dados transformados

+---------------+-----------+------------+--------------------+
|           name|temperature|   windSpeed|       shortForecast|
+---------------+-----------+------------+--------------------+
| This Afternoon|         86|      10 mph|        Mostly Sunny|
|        Tonight|         63| 5 to 10 mph|        Mostly Clear|
|         Friday|         84| 5 to 10 mph|               Sunny|
|   Friday Night|         65| 5 to 10 mph|Mostly Cloudy the...|
|       Saturday|         78|       5 mph|Chance Showers An...|
| Saturday Night|         65|       5 mph|Chance Showers An...|
|         Sunday|         85| 5 to 10 mph|        Mostly Sunny|
|   Sunday Night|         66|       5 mph|        Mostly Clear|
|         Monday|         83| 5 to 10 mph|        Mostly Sunny|
|   Monday Night|         66| 5 to 10 mph|       Partly Cloudy|
|        Tuesday|         83| 5 to 15 mph|        Mostly Sunny|
|  Tuesday Night|         67|      10 mph|Mostly Clear then...|
|      Wednesday|         82|10 to 15 mp

In [0]:
%py

# Salvando os dados transformados no Silver Layer
df_silver.write.csv('/mnt/silver/forecast_silver.csv', mode='overwrite', header=True)

print("Dados transformados e salvos no Silver layer.")

Dados transformados e salvos no Silver layer.
