# **Air Temperature**<br>
readingType: DBT 1M F<br>
readingUnit: deg code

Varaibles

In [0]:
from datetime import date

today_str = date.today().strftime("%Y-%m-%d")

Date = today_str
Datamode='append'

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

import requests

# Fetch data from API
url = f"https://api-open.data.gov.sg/v2/real-time/api/air-temperature?date={Date}"
response = requests.get(url)
data = response.json()["data"]

# Create Spark DataFrames
stations_df = spark.createDataFrame(data["stations"])

timestamp = data["readings"][0]["timestamp"]
readings_data = data["readings"][0]["data"]

# 👇 Cast value to DoubleType to avoid merge error
readings_df = spark.createDataFrame([(r['stationId'], float(r['value'])) for r in readings_data], schema=['stationId', 'value']) \
    .withColumn("timestamp", F.lit(timestamp)) \
    .withColumn("value", F.col("value").cast(DoubleType()))

# Join and select final columns
final_df = stations_df.join(readings_df, stations_df.deviceId == readings_df.stationId, "inner") \
    .select(
        readings_df.stationId.alias("deviceId"),
        stations_df.name,
        stations_df.location["latitude"].alias("latitude"),
        stations_df.location["longitude"].alias("longitude"),
        readings_df.value,
        readings_df.timestamp.cast("timestamp")
    )

# Save to Delta table
final_df.write.format("delta").mode(Datamode).saveAsTable("climaGuard.singapore.air_temperature")


# **Rainfall**<br>
readingType: TB1 Rainfall 5 Minute Total F<br>
readingUnit: mm<br>

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, TimestampType

# Define JSON response (you can replace this with actual API call using requests.get(...).json())
import requests
import json

url = f"https://api-open.data.gov.sg/v2/real-time/api/rainfall?date={Date}"
response = requests.get(url)
data = response.json()["data"]

# Create DataFrames from 'stations' and 'readings'
stations_df = spark.createDataFrame(data["stations"])

# Explode the readings array and extract timestamp
timestamp = data["readings"][0]["timestamp"]
readings_data = data["readings"][0]["data"]

readings_df = spark.createDataFrame([(r['stationId'], float(r['value'])) for r in readings_data], schema=['stationId', 'value']) \
    .withColumn("timestamp", F.lit(timestamp)) \
    .withColumn("reading_value", F.col("value").cast(DoubleType()))

# Join stations with readings using stationId == deviceId
final_df = stations_df.join(readings_df, stations_df.deviceId == readings_df.stationId, "inner") \
    .select(
        readings_df.stationId.alias("deviceId"),
        stations_df.name,
        stations_df.location["latitude"].alias("latitude"),
        stations_df.location["longitude"].alias("longitude"),
        readings_df.value,
        readings_df.timestamp.cast("timestamp")
    )

# Save as Delta Table in Databricks
final_df.write.format("delta").mode(Datamode).saveAsTable("climaGuard.singapore.rainfall")


# **Relative Humidity**</br>
readingType: RH 1M F<br>
readingUnit: percentage<br>

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, TimestampType

# Define JSON response (you can replace this with actual API call using requests.get(...).json())
import requests
import json

url = f"https://api-open.data.gov.sg/v2/real-time/api/relative-humidity?date={Date}"
response = requests.get(url)
data = response.json()["data"]

# Create DataFrames from 'stations' and 'readings'
stations_df = spark.createDataFrame(data["stations"])

# Explode the readings array and extract timestamp
timestamp = data["readings"][0]["timestamp"]
readings_data = data["readings"][0]["data"]

readings_df = spark.createDataFrame([(r['stationId'], float(r['value'])) for r in readings_data], schema=['stationId', 'value']) \
    .withColumn("timestamp", F.lit(timestamp)) \
    .withColumn("value", F.col("value").cast(DoubleType()))

# Join stations with readings using stationId == deviceId
final_df = stations_df.join(readings_df, stations_df.deviceId == readings_df.stationId, "inner") \
    .select(
        readings_df.stationId.alias("deviceId"),
        stations_df.name,
        stations_df.location["latitude"].alias("latitude"),
        stations_df.location["longitude"].alias("longitude"),
        readings_df.value,
        readings_df.timestamp.cast("timestamp")
    )

# Save as Delta Table in Databricks
final_df.write.format("delta").mode(Datamode).saveAsTable("climaGuard.singapore.relative_humidity")


# **Wind Direction**</br>
readingType: Wind Dir AVG (S) 10M M1M</br>
readingUnit: degrees

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, TimestampType

# Define JSON response (you can replace this with actual API call using requests.get(...).json())
import requests
import json

url = f"https://api-open.data.gov.sg/v2/real-time/api/wind-direction?date={Date}"
response = requests.get(url)
data = response.json()["data"]

# Create DataFrames from 'stations' and 'readings'
stations_df = spark.createDataFrame(data["stations"])

# Explode the readings array and extract timestamp
timestamp = data["readings"][0]["timestamp"]
readings_data = data["readings"][0]["data"]

readings_df = spark.createDataFrame(readings_data).withColumn("timestamp", F.lit(timestamp))

# Join stations with readings using stationId == deviceId
final_df = stations_df.join(readings_df, stations_df.deviceId == readings_df.stationId, "inner") \
    .select(
        readings_df.stationId.alias("deviceId"),
        stations_df.name,
        stations_df.location["latitude"].alias("latitude"),
        stations_df.location["longitude"].alias("longitude"),
        readings_df.value,
        readings_df.timestamp.cast("timestamp")
    )

# Save as Delta Table in Databricks
final_df.write.format("delta").mode(Datamode).saveAsTable("climaGuard.singapore.wind_direction")


# **Wind Speed**</br>
readingType: Wind Speed AVG(S)10M M1M</br>
readingUnit: knots</br>


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, TimestampType

# Define JSON response (you can replace this with actual API call using requests.get(...).json())
import requests
import json

url = f"https://api-open.data.gov.sg/v2/real-time/api/wind-speed?date={Date}"
response = requests.get(url)
data = response.json()["data"]

# Create DataFrames from 'stations' and 'readings'
stations_df = spark.createDataFrame(data["stations"])

# Explode the readings array and extract timestamp
timestamp = data["readings"][0]["timestamp"]
readings_data = data["readings"][0]["data"]

readings_df = spark.createDataFrame([(r['stationId'], float(r['value'])) for r in readings_data], schema=['stationId', 'value']) \
    .withColumn("timestamp", F.lit(timestamp)) \
    .withColumn("value", F.col("value").cast(DoubleType()))

# Join stations with readings using stationId == deviceId
final_df = stations_df.join(readings_df, stations_df.deviceId == readings_df.stationId, "inner") \
    .select(
        readings_df.stationId.alias("deviceId"),
        stations_df.name,
        stations_df.location["latitude"].alias("latitude"),
        stations_df.location["longitude"].alias("longitude"),
        readings_df.value,
        readings_df.timestamp.cast("timestamp")
    )

# Save as Delta Table in Databricks
final_df.write.format("delta").mode(Datamode).saveAsTable("climaGuard.singapore.wind_speed")
