In [11]:
import pyspark
import pyspark.sql.functions as f
import time, shutil, os, json, requests
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder 
    .master("local[6]")
    .appName("delta")
    .config("spark.driver.memory", "13g")
    .config("spark.jars.packages",
            "io.delta:delta-core_2.12:3.3.2")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [7]:
url = "https://archive-api.open-meteo.com/v1/archive"
params = {
	"latitude": [64.1355, 41.8781, 35.6895, 30.0444, 19.076, 1.3521, -1.2921, -12.0464, -23.5505, -37.8136],
	"longitude": [-21.8954, -87.6298, 139.6917, 31.2357, 72.8777, 103.8198, 36.8219, -77.0428, -46.6333, 144.9631],
	"start_date": "2019-01-01",
	"end_date": "2019-12-31",
	"hourly": ["temperature_2m", "relative_humidity_2m", "rain", "soil_temperature_0_to_7cm", "soil_temperature_7_to_28cm", "soil_temperature_28_to_100cm", "soil_temperature_100_to_255cm"],
    "temporal_resolution": "hourly_6",
}

try:
    api_response = requests.get(url, params=params)
    data = api_response.json()
except Exception as e:
	print(f"Error fetching data from API: {e}")

In [8]:
os.makedirs("../data/json", exist_ok=True)

with open("../data/json/api_response.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=4)

In [9]:
bronze_table_df = spark.read.option("multiline", "true").json("../data/json/api_response.json")
bronze_table_df.show(truncate=False)

+---------+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
ID_COL = "location_id"
LAT_COL  = "latitude"
LON_COL  = "longitude"
HOURLY_COL = "hourly"
UNITS_COL  = "hourly_units"

value_fields = [
    "rain",
    "relative_humidity_2m",
    "soil_temperature_0_to_7cm",
    "soil_temperature_100_to_255cm",
    "soil_temperature_28_to_100cm",
    "soil_temperature_7_to_28cm",
    "temperature_2m"
]

time_field = "time"

zipped = f.arrays_zip(
    *[f.col(f"{HOURLY_COL}.{c}") for c in value_fields],
    f.col(f"{HOURLY_COL}.{time_field}")
)

silver_table_df = (
    bronze_table_df
    .withColumn("obs", f.explode(zipped))
    .withColumn(ID_COL, f.when(f.col(ID_COL).isNull(), f.lit(0)).otherwise(f.col(ID_COL)))
    .select(
        f.col(ID_COL).alias("location_id"),
        f.col(LAT_COL).alias("lat"),
        f.col(LON_COL).alias("lon"),
        f.to_timestamp(f.col("obs.time")).alias("timestamp"),
        *[f.col(f"obs.{c}").alias(c) for c in value_fields]
    )
    .orderBy("location_id", "timestamp")
)

silver_table_df.printSchema()
silver_table_df.show(300, truncate=False)


root
 |-- location_id: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- rain: double (nullable = true)
 |-- relative_humidity_2m: long (nullable = true)
 |-- soil_temperature_0_to_7cm: double (nullable = true)
 |-- soil_temperature_100_to_255cm: double (nullable = true)
 |-- soil_temperature_28_to_100cm: double (nullable = true)
 |-- soil_temperature_7_to_28cm: double (nullable = true)
 |-- temperature_2m: double (nullable = true)

+-----------+--------+--------+-------------------+----+--------------------+-------------------------+-----------------------------+----------------------------+--------------------------+--------------+
|location_id|lat     |lon     |timestamp          |rain|relative_humidity_2m|soil_temperature_0_to_7cm|soil_temperature_100_to_255cm|soil_temperature_28_to_100cm|soil_temperature_7_to_28cm|temperature_2m|
+-----------+--------+--------+-------------------+----+-----