# Import Libraries and Start Spark Session

In [47]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.functions import create_map, lit
from pyspark.sql.window import Window
from itertools import chain




In [48]:
# Initialize Glue and Spark
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session




# 1. Get Combined Dataset of Carparks Availability

In [49]:
# load lta carparks availability dataset
file_path = "s3://athena-results-090325/Unsaved/2025/03/30/1eedf5d9-0ed0-493d-8991-d98b9b26727a.csv"
df1 = spark.read.option("header", "true").csv(file_path)




In [50]:
#filter lot type for cars only
df1 = df1.filter(df1["lot_type"] == "C")




In [51]:
#fill NA for total_lots and available_lots
df1 = df1.fillna({"total_lots": 0})
df1 = df1.fillna({"available_lots": 0})




In [52]:
#change datatype for available lots and total lots to integer
df1 = df1.withColumn("available_lots", F.col("available_lots").cast("integer"))
df1 = df1.withColumn("total_lots", F.col("total_lots").cast("integer"))




In [53]:
df1_agg = df1.groupBy("carpark_id", "development", "location", "agency", "source") \
    .agg(
        F.max("total_lots").alias("total_lots"),
        F.max("available_lots").alias("available_lots")
    )




Check Total Lots Values

In [54]:
#if total_lots = 0, update with max available_lots value
df1_agg = df1_agg.withColumn("total_lots", F.when(df1_agg["total_lots"] == 0, df1_agg["available_lots"]).otherwise(df1_agg["total_lots"]))




In [55]:
#count carparks with total_lots = 0
print(df1_agg.filter(df1_agg["total_lots"] == 0).count())

7


In [56]:
#Check if any carparks > 5000 total lots
df1_agg.filter(df1_agg["total_lots"] > 5000).show()

+----------+--------------------+--------------------+------+------+----------+--------------+
|carpark_id|         development|            location|agency|source|total_lots|available_lots|
+----------+--------------------+--------------------+------+------+----------+--------------+
|       BM4|                null|                null|  null|   hdb|      9999|          9913|
|       BM4|BLK 35A JALAN RUM...|1.287667284247440...|   HDB|   lta|      9907|          9907|
+----------+--------------------+--------------------+------+------+----------+--------------+


In [57]:
#update BM4 total lots to 150
df1_agg = df1_agg.withColumn("total_lots", F.when(df1_agg["carpark_id"] == "BM4", 150).otherwise(df1_agg["total_lots"]))




# 2. Combine Carpark Locations for HDB and LTA Dataset - Lat / Long and XY Coordinates

In [58]:
#split the location into lat and long
df1_agg = df1_agg.withColumn("location_split", F.split(df1_agg["location"], " ")) \
                 .withColumn("latitude", F.col("location_split").getItem(0)) \
                 .withColumn("longitude", F.col("location_split").getItem(1)) \
                 .drop("location_split")




In [59]:
#rearrange columns
df1_agg = df1_agg[["carpark_id", "development", "latitude", "longitude", "total_lots", "agency", "source"]]




In [60]:
#rename columns
df1_agg = df1_agg.withColumnRenamed("development", "address") \
                 .withColumnRenamed("source", "dataset")




Get HDB Carparks Dataset for Long & Lat of Carparks from HDB Dataset

In [68]:
# load HDB carparks info dataset
file_path = "s3://hdb-carpark-info/HDBCarparkInformation.csv"
df2 = spark.read.option("header", "true").csv(file_path)




In [67]:
#check df2 for duplicate car_park_no
duplicate_car_park_no = df2.groupBy("car_park_no").count().filter(F.col("count") > 1)
duplicate_car_park_no.show(5)

+-----------+-----+
|car_park_no|count|
+-----------+-----+
+-----------+-----+


In [None]:
#select columns from df2
df2 = df2[["car_park_no", "address", "x_coord", "y_coord"]]

#rename columns
df2 = df2.withColumnRenamed("car_park_no", "carpark_id") \
                  .withColumnRenamed("address", "address_hdb") \
                 .withColumnRenamed("x_coord", "x_coordinate") \
                 .withColumnRenamed("y_coord", "y_coordinate")

In [None]:
#merge df1_agg and df2
dfs_merge = df1_agg.join(df2, on="carpark_id", how="left")

In [None]:
#if address is null, update from address_hdb
dfs_merge = dfs_merge.withColumn("address", F.when(F.col("address").isNull(), F.col("address_hdb")).otherwise(F.col("address")))

In [None]:
#rearrange columns
dfs_merge = dfs_merge[["carpark_id", "address", "latitude", "longitude", "x_coordinate", "y_coordinate", "total_lots", "agency", "dataset"]]

In [30]:
#if duplicate carpark_id, keep those from dataset = hdb
dfs_merge = dfs_merge.withColumn(
    "priority",
    F.when((F.col("agency") == "HDB") & (F.col("dataset") == "hdb"), 1).otherwise(2)
)

# Step 2: Apply row_number window over carpark_id
window = Window.partitionBy("carpark_id").orderBy("priority")

# Step 3: Keep only the top-priority row per carpark_id
dfs_merge = dfs_merge.withColumn("row_num", F.row_number().over(window)) \
                     .filter(F.col("row_num") == 1) \
                     .drop("priority", "row_num")

# 3. Update Lat / Long, X_Coord / Y_Coord

In [90]:
#import libraries
import pyproj
import geopandas as gpd
from shapely.geometry import Point
from pyproj import CRS, Transformer

from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, udf, struct
from pyspark.sql.types import StructType, StructField, DoubleType

ModuleNotFoundError: No module named 'pyproj'


if x_coordinate and y_coordinate are NULL, update by mapping the same location from latitude and longitude. Format accepted by OneMap API

In [32]:
# Cast lat/lon before applying UDF
dfs_merge = dfs_merge.withColumn("latitude", F.col("latitude").cast("double"))
dfs_merge = dfs_merge.withColumn("longitude", F.col("longitude").cast("double"))

# Define UDF function
def transform_coords(lat, lon):
    transformer = pyproj.Transformer.from_crs("EPSG:4326", "EPSG:3414", always_xy=True)
    try:
        x, y = transformer.transform(float(lon), float(lat))
        return x, y
    except:
        return None, None

# Register UDF
schema = StructType([
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True)
])
transform_coords_udf = F.udf(transform_coords, schema)

# Apply UDF to fill in missing x/y
dfs_merge = dfs_merge.withColumn(
    "x_coordinate",
    F.when(
        F.col("x_coordinate").isNull(),
        transform_coords_udf("latitude", "longitude").getItem("x")
    ).otherwise(F.col("x_coordinate"))
)

dfs_merge = dfs_merge.withColumn(
    "y_coordinate",
    F.when(
        F.col("y_coordinate").isNull(),
        transform_coords_udf("latitude", "longitude").getItem("y")
    ).otherwise(F.col("y_coordinate"))
)

If latitude and longitude are NULL, update by mapping the same location from x_coordinate and y_coordinate . Format accepted by OneMap API

In [33]:
# Define the projection systems using pyproj
wgs84 = pyproj.CRS('EPSG:4326')  # WGS84
svy21 = pyproj.CRS('EPSG:3414')  # SVY21
transformer = pyproj.Transformer.from_crs(svy21, wgs84, always_xy=True)

# UDF to convert SVY21 to WGS84 (convert x and y to longitude and latitude)
def svy21_to_wgs84(x, y):
    if x is None or y is None:
        return None, None
    # Ensure x and y are scalar values
    lon, lat = transformer.transform(float(x), float(y))  # Convert to float to ensure scalars
    # Return as a Row (which is equivalent to a struct)
    return Row(longitude=lon, latitude=lat)

# Register the UDF with a StructType return type (longitude, latitude)
svy21_to_wgs84_udf = F.udf(svy21_to_wgs84, StructType([
    StructField("longitude", DoubleType()),
    StructField("latitude", DoubleType())
]))

# Update missing latitude and longitude by applying the reverse transformation on x and y coordinates
dfs_merge = dfs_merge.withColumn(
    'longitude',
    F.when(F.col('longitude').isNull(), svy21_to_wgs84_udf(F.col('x_coordinate'), F.col('y_coordinate'))['longitude'])
    .otherwise(F.col('longitude'))
)

dfs_merge = dfs_merge.withColumn(
    'latitude',
    F.when(F.col('latitude').isNull(), svy21_to_wgs84_udf(F.col('x_coordinate'), F.col('y_coordinate'))['latitude'])
    .otherwise(F.col('latitude'))
)


# Update Values for Lat/Long and X/Y Coordinate

In [34]:
# Create a list of carpark_ids to update
carpark_ids_to_update = ["C32", "CR6", "PL90", "TH3L", "THPP", "W56L"]

# Create a list of corresponding new addresses
new_addresses = ["BLK 514-519 WEST COAST ROAD", "BLK 10 NORTH BRIDGE ROAD", "BLK 445 & 446 PUNGGOL WAY", "BLK 123A -123C,125A-125B,126A -126C TENGAH DRIVE", "BLK 128A & 128B PLANTATION CRESCENT", "891C WOODLANDS DRIVE 50"]

# Create a dictionary for efficient lookup
address_updates = dict(zip(carpark_ids_to_update, new_addresses))

In [35]:
#update address column for selected carpark_id
mapping_expr = create_map([lit(x) for x in chain(*address_updates.items())])

dfs_merge = dfs_merge.withColumn(
    "address",
    when(col("carpark_id").isin(carpark_ids_to_update), mapping_expr.getItem(col("carpark_id"))).otherwise(col("address"))
)




In [36]:
#drop vull values for null address, lat and long
dfs_merge = dfs_merge.filter(F.col("address").isNotNull() & F.col("latitude").isNotNull() & F.col("longitude").isNotNull())

In [37]:
dfs_merge = dfs_merge.withColumn("x_coordinate", col("x_coordinate").cast("double")) \
       .withColumn("y_coordinate", col("y_coordinate").cast("double"))

# Get Postal Code Based on OneMap API

In [38]:
import json
import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [39]:
#add postal code column after address
dfs_merge = dfs_merge.withColumn("postal_code", lit(None).cast("string"))

In [40]:
# Authenticate & Get API Token
def get_token():
    url = "https://www.onemap.gov.sg/api/auth/post/getToken"
    data = {
        "email": "e0997996@u.nus.edu",
        "password": "Munchkin1993!"
    }
    response = requests.post(url, json=data)
    return response.json().get("access_token")

token = get_token()
print("Access Token:", token)

Access Token: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIyOTU1NmZjN2I2ZWMxYWI1NGU3MWEzNGNlOWU0ZGRlNCIsImlzcyI6Imh0dHA6Ly9pbnRlcm5hbC1hbGItb20tcHJkZXppdC1pdC1uZXctMTYzMzc5OTU0Mi5hcC1zb3V0aGVhc3QtMS5lbGIuYW1hem9uYXdzLmNvbS9hcGkvdjIvdXNlci9zZXNzaW9uIiwiaWF0IjoxNzQ0MjA4OTkzLCJleHAiOjE3NDQ0NjgxOTMsIm5iZiI6MTc0NDIwODk5MywianRpIjoiaDlkaWd6eHRQMjBGc2YzVyIsInVzZXJfaWQiOjYxMzMsImZvcmV2ZXIiOmZhbHNlfQ.gt-leVRPHmSsdGUHHV5xNn2yIx2Jtc0NhZ3nqfYc4_A


In [41]:
#update postal_code using address
def fetch_postal(address):
    try:
        if not address:
            return None

        url = (
            f"https://www.onemap.gov.sg/api/common/elastic/search"
            f"?searchVal={address}&returnGeom=Y&getAddrDetails=Y&pageNum=1"
        )

        headers = {"Authorization": f"Bearer {token}"}
        response = requests.get(url, headers=headers, timeout=5)

        if response.status_code == 200:
            result = response.json()
            results = result.get("results", [])

            # Return the first POSTAL that is not "NIL" or None or empty string
            for item in results:
                postal = item.get("POSTAL")
                if postal and postal.strip().upper() != "NIL":
                    return postal

        else:
            print(f"Request failed with status code {response.status_code}")

    except Exception as e:
        print(f"Error fetching postal for address '{address}': {e}")
    return None

# Register the UDF
fetch_postal_udf = udf(fetch_postal, StringType())

In [42]:
#update postal_code column
df_with_postal = dfs_merge.withColumn("postal_code", fetch_postal_udf(dfs_merge.address))

# Assign Area Based on OneMap API

In [44]:
import json
import requests
from pyspark.sql.functions import explode, udf, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, MapType
from shapely.geometry import shape, MultiPolygon, Point

Get the Planning Areas and Corresponding Longitude and Latitude

In [45]:
# Authenticate & Get API Token
def get_token():
    url = "https://www.onemap.gov.sg/api/auth/post/getToken"
    data = {
        "email": "e0997996@u.nus.edu",
        "password": "Munchkin1993!"
    }
    response = requests.post(url, json=data)
    return response.json().get("access_token")

token = get_token()
print("Access Token:", token)

Access Token: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIyOTU1NmZjN2I2ZWMxYWI1NGU3MWEzNGNlOWU0ZGRlNCIsImlzcyI6Imh0dHA6Ly9pbnRlcm5hbC1hbGItb20tcHJkZXppdC1pdC1uZXctMTYzMzc5OTU0Mi5hcC1zb3V0aGVhc3QtMS5lbGIuYW1hem9uYXdzLmNvbS9hcGkvdjIvdXNlci9zZXNzaW9uIiwiaWF0IjoxNzQ0MjA4OTkzLCJleHAiOjE3NDQ0NjgxOTMsIm5iZiI6MTc0NDIwODk5MywianRpIjoiaDlkaWd6eHRQMjBGc2YzVyIsInVzZXJfaWQiOjYxMzMsImZvcmV2ZXIiOmZhbHNlfQ.gt-leVRPHmSsdGUHHV5xNn2yIx2Jtc0NhZ3nqfYc4_A


In [46]:
#Fetch Planning Area GeoJSON Data
url = "https://www.onemap.gov.sg/api/public/popapi/getAllPlanningarea?year=2019"
headers = {"Authorization": token}
response = requests.get(url, headers=headers)
data = response.json()

#Normalize and parse geojson
search_results = data["SearchResults"]
df = spark.createDataFrame(search_results)

def get_multipolygon_from_geojson(geojson_str):
    try:
        geojson = json.loads(geojson_str)  # Parse string into dict
        geom = shape(geojson)
        if geom.geom_type == "Polygon":
            return MultiPolygon([geom])
        elif geom.geom_type == "MultiPolygon":
            return geom
    except Exception as e:
        print("Error parsing geojson:", e)
    return None

#Create list of (planning_area_name, geometry) tuples
planning_areas = []
for row in df.collect():
    geom = get_multipolygon_from_geojson(row["geojson"])
    if geom:
        planning_areas.append((row["pln_area_n"], geom))

# Broadcast the list to Spark
broadcast_areas = spark.sparkContext.broadcast(planning_areas)

#Define a Spark UDF that finds which planning area a lat/lon point falls in
def find_planning_area(lat, lon):
    try:
        point = Point(lon, lat)
        for name, poly in broadcast_areas.value:
            if poly.contains(point):
                return name
    except:
        return None
    return None

find_planning_area_udf = udf(find_planning_area, StringType())

#Apply the UDF and it contains 'latitude' and 'longitude' columns
cp_info_area = df_with_postal.withColumn(
    "planning_area",
    find_planning_area_udf("latitude", "longitude")
)

In [47]:
#rearrange columns
cp_info_area = cp_info_area[["carpark_id", "planning_area", "address", "postal_code",  "latitude", "longitude", "x_coordinate", "y_coordinate", "total_lots", "agency", "dataset"]]
#rename column
cp_info_area = cp_info_area.withColumnRenamed("planning_area", "area")

In [48]:
#add update_datetime to current datetime in format dd/mm/yyyy hh:mm
cp_info_area = cp_info_area.withColumn("update_datetime", F.current_timestamp())

In [None]:
#add update_datetime to current datetime in format dd/mm/yyyy hh:mm
cp_info_area = cp_info_area.withColumn(
    "update_datetime",
    F.date_format(F.current_timestamp(), "dd/MM/yyyy HH:mm:ss")
)

In [None]:
#change postal code type to string
cp_info_area = cp_info_area.withColumn("postal_code", col("postal_code").cast("string"))

# Output Final Table

In [None]:
cp_info_area.write.mode("overwrite").option("header", True).csv("s3://carpark-information/carpark_information_full.csv")