In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
# ---------------------------
# 1. Imports
# ---------------------------
import requests
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

# ---------------------------
# 2. Load from Public API → Bronze Layer
# ---------------------------
# Example: Open-Meteo API (Free, no key required)
url = "https://api.open-meteo.com/v1/forecast"
params = {
    "latitude": 52.52,    # Berlin
    "longitude": 13.41,
    "hourly": "temperature_2m,relative_humidity_2m",
    "forecast_days": 1,
    "timezone": "UTC"
}

response = requests.get(url, params=params)
data = response.json()

# Convert to Pandas DataFrame
bronze_df = pd.DataFrame({
    "timestamp": data["hourly"]["time"],
    "temperature": data["hourly"]["temperature_2m"],
    "humidity": data["hourly"]["relative_humidity_2m"]
})

# Save Bronze to Lakehouse
spark_bronze = spark.createDataFrame(bronze_df)
spark_bronze.write.format("delta").mode("overwrite").save("Tables/bronze_weather")

print("Bronze layer saved.")

# 

StatementMeta(, 9edbe374-5bca-4f58-85a3-258a70e73631, 3, Finished, Available, Finished)

✅ Bronze layer saved.


In [2]:
df = spark.sql("SELECT * FROM lh_0001_sample.bronze_weather LIMIT 1000")
display(df)

StatementMeta(, 9edbe374-5bca-4f58-85a3-258a70e73631, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4fcf005e-0a46-42d7-94f2-300ae77e6752)

In [3]:
# ---------------------------
# 3. Transform → Silver Layer
# ---------------------------
# Example: Ensure proper dtypes and remove nulls
spark_silver = spark.read.format("delta").load("Tables/bronze_weather") \
    .withColumn("temperature", col("temperature").cast("double")) \
    .withColumn("humidity", col("humidity").cast("double")) \
    .withColumn("timestamp", col("timestamp").cast("timestamp")) \
    .dropna()

spark_silver.write.format("delta").mode("overwrite").save("Tables/silver_weather")

print("Silver layer saved.")


StatementMeta(, 9edbe374-5bca-4f58-85a3-258a70e73631, 5, Finished, Available, Finished)

Silver layer saved.


In [4]:
# ---------------------------
# 4. Aggregate → Gold Layer
# ---------------------------
# Example: Average daily temperature and humidity
from pyspark.sql.functions import dayofmonth, month, year, avg

spark_gold = spark.read.format("delta").load("Tables/silver_weather") \
    .withColumn("day", dayofmonth(col("timestamp"))) \
    .withColumn("month", month(col("timestamp"))) \
    .withColumn("year", year(col("timestamp"))) \
    .groupBy("year", "month", "day") \
    .agg(
        avg("temperature").alias("avg_temperature"),
        avg("humidity").alias("avg_humidity")
    )

spark_gold.write.format("delta").mode("overwrite").save("Tables/gold_weather")

print("Gold layer saved.")

StatementMeta(, 9edbe374-5bca-4f58-85a3-258a70e73631, 6, Finished, Available, Finished)

Gold layer saved.


In [5]:
df = spark.sql("SELECT * FROM lh_0001_sample.gold_weather LIMIT 1000")
display(df)

StatementMeta(, 9edbe374-5bca-4f58-85a3-258a70e73631, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 29521392-41fb-4949-8fd3-427510f27fd3)