In [0]:
# Install required library (requests)
# %pip install requests

In [0]:
# %restart_python

In [0]:
import requests
import json

# Define API URL
url = "https://api.open-meteo.com/v1/forecast"
params = {
    "latitude": 28.6139,  # New Delhi
    "longitude": 77.2090,
    "hourly": "temperature_2m,relative_humidity_2m,precipitation"
}

# Fetch JSON data
response = requests.get(url, params=params)
data = response.json()

# Check structure
display(data)


In [0]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

# Extract arrays
times = data['hourly']['time']
temps = data['hourly']['temperature_2m']
humidity = data['hourly']['relative_humidity_2m']
precip = data['hourly']['precipitation']

# Combine into records
records = [
    Row(time=times[i], temperature=temps[i], humidity=humidity[i], precipitation=precip[i])
    for i in range(len(times))
]

df = spark.createDataFrame(records)
display(df)


In [0]:
from pyspark.sql.functions import col, to_timestamp, to_date, hour, avg, when

# Convert and enrich
df_clean = (df
    .withColumn("time", to_timestamp(col("time")))
    .withColumn("date", to_date(col("time")))
    .withColumn("hour", hour(col("time")))
    .withColumn("is_raining", when(col("precipitation") > 0, True).otherwise(False))
)

display(df_clean)


In [0]:
daily_avg = (df_clean
    .groupBy("date")
    .agg(
        avg("temperature").alias("avg_temp"),
        avg("humidity").alias("avg_humidity")
    )
)

display(daily_avg)


In [0]:
# Save as managed Delta table
df_clean.write.format("delta").mode("overwrite").saveAsTable("weather_hourly")

daily_avg.write.format("delta").mode("overwrite").saveAsTable("weather_daily_summary")


In [0]:
%sql
SELECT * FROM weather_hourly LIMIT 10;


In [0]:
%sql
SELECT date, avg_temp, avg_humidity FROM weather_daily_summary;


In [0]:
%sql
SHOW TABLES;

In [0]:
%sql
DESCRIBE DETAIL weather_hourly;