In [1]:
# ==========================
# 1) SETUP
# ==========================
from pyspark.sql.functions import regexp_replace, col, when, avg, coalesce, concat, round, udf, monotonically_increasing_id
from pyspark.sql.types import StringType, ArrayType, FloatType
import requests, os
import openai
from pyspark.sql import SparkSession
from datetime import datetime
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from openai import OpenAI
import os

# Set OpenAI key before creating Spark session
os.environ["OPENAI_API_KEY"] = "PUT YOUR OPENAI API KEY HERE"

# Create Spark session in Fabric
spark = SparkSession.builder.getOrCreate()

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 5, Finished, Available, Finished)

In [2]:
# ==========================
# 2) EXTRACT DATA FROM API
# ==========================
url = "https://api.data.gov.sg/v1/transport/carpark-availability"
response = requests.get(url)
data = response.json()

# Flatten JSON
carparks = []
for item in data['items'][0]['carpark_data']:
    carparks.append({
        "carpark_number": item["carpark_number"],
        "total_lots": int(item["carpark_info"][0]["total_lots"]),
        "lots_available": int(item["carpark_info"][0]["lots_available"]),
        "update_datetime": item["update_datetime"]
    })

# Convert to Spark DataFrame
df = spark.createDataFrame(carparks)

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 6, Finished, Available, Finished)

In [3]:
# Download file here: https://beta.data.gov.sg/datasets/148/view
# Then upload to /Files
# Purpose is to get the address from the above downloaded data, because carparks does not contain address

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 7, Finished, Available, Finished)

In [4]:
data = spark.read.csv("Files/HDBCarparkInformation.csv", header=True, inferSchema=True)
data = data.withColumnRenamed('car_park_no', 'carpark_number')

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 8, Finished, Available, Finished)

In [5]:
merged_df = df.join(data, on='carpark_number', how='inner')
# Remove 'BLK' and 'BLOCK' from the 'address' column
merged_df = merged_df.withColumn('address', regexp_replace(col('address'), 'BLK', ''))
merged_df = merged_df.withColumn('address', regexp_replace(col('address'), 'BLOCK', ''))

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 9, Finished, Available, Finished)

In [6]:
def remove_text(address):
    separators = ["/", "-", ",", "&", "TO "]
    for separator in separators:
        if separator in address:
            parts = address.split(separator)
            address = parts[-1].strip()                
    return address.strip()

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 10, Finished, Available, Finished)

In [7]:
# Convert it to a PySpark UDF
remove_text_udf = udf(remove_text, StringType())

# Apply it to the 'address' column
merged_df = merged_df.withColumn(
    'address', 
    remove_text_udf(col('address'))
)

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 11, Finished, Available, Finished)

In [8]:
# https://towardsdatascience.com/geocoding-singapore-coordinates-onemap-api-3e1542bf26f7
# https://github.com/shawnthamks/OneMap-API
# https://www.onemap.gov.sg/apidocs/apidocs/#search

def get_coordinates(addresses):
    url = 'https://www.onemap.gov.sg/api/common/elastic/search'
    coordinates_list = []

    for address in addresslist:
        params = {
            'searchVal': address,
            'returnGeom': 'Y',
            'getAddrDetails': 'Y',
            'pageNum': 1
        }
        req = requests.get(url, params=params)
        results_dict = req.json()
        
        if len(results_dict['results']) > 0:
            coordinates_list.append((results_dict['results'][0]['LATITUDE'], results_dict['results'][0]['LONGITUDE']))
        else:
            coordinates_list.append(None)
    
    return coordinates_list

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 12, Finished, Available, Finished)

In [9]:
addresslist = [row['address'] for row in merged_df.select('address').collect()]

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 13, Finished, Available, Finished)

In [10]:
# Set the number of concurrent workers
max_workers = 10
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future = executor.submit(get_coordinates, addresslist)
    coordinates_list = future.result()

# Print the coordinates
for i, coordinates in enumerate(coordinates_list):
    address = addresslist[i]
    if coordinates is not None:
        print(f"Coordinates for {address}: Latitude={coordinates[0]}, Longitude={coordinates[1]}")
    else:
        print(f"No coordinates found for {address}")

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 14, Finished, Available, Finished)

Coordinates for 81 REDHILL LANE: Latitude=1.28775818263836, Longitude=103.819493616875
No coordinates found for 533A HONG LIM MSCP
Coordinates for 88A REDHILL CLOSE: Latitude=1.28648640197045, Longitude=103.818549711555
Coordinates for 163 BUKIT MERAH CENTRAL: Latitude=1.28366139597446, Longitude=103.817072717954
Coordinates for 43 HOLLAND DRIVE: Latitude=1.30743411126247, Longitude=103.792897574712
Coordinates for 451 CLEMENTI AVENUE 3: Latitude=1.31248109025097, Longitude=103.765476495005
Coordinates for 8A EMPRESS ROAD: Latitude=1.31671096509977, Longitude=103.80618154733
Coordinates for 334 CLEMENTI AVENUE 2: Latitude=1.31518502692041, Longitude=103.768432708555
Coordinates for 54 TEBAN GARDENS: Latitude=1.32159846468451, Longitude=103.736923103215
Coordinates for 109 BUKIT PURMEI ROAD: Latitude=1.27392939929265, Longitude=103.825310761145
Coordinates for 25 TEBAN GARDENS: Latitude=1.32307929386097, Longitude=103.737791107104
Coordinates for 61 TEBAN GARDENS ROAD: Latitude=1.322136

In [11]:
# Remove None values
coordinates_list_clean = [coord for coord in coordinates_list if coord is not None]

# Create Spark DataFrame
df_coordinates = spark.createDataFrame(coordinates_list_clean)

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 15, Finished, Available, Finished)

In [12]:
df1 = merged_df.withColumn("row_id", monotonically_increasing_id())
df2 = df_coordinates.withColumn("row_id", monotonically_increasing_id())

carpark_final = df1.join(df2, on="row_id", how="inner").drop("row_id")

carpark_final = carpark_final.withColumnRenamed('_1', 'Latitude')
carpark_final = carpark_final.withColumnRenamed('_2', 'Longitude')
carpark_final = carpark_final.drop("x_coord", "y_coord")

# Replace 'None' string with null in all columns
carpark_final = carpark_final.select(
    [when(col(c) == 'None', None).otherwise(col(c)).alias(c) for c in carpark_final.columns]
)

# Drop rows with any null values
carpark_final = carpark_final.na.drop()

carpark_final = (
    carpark_final
    .withColumn("Latitude", col("Latitude").cast("double"))
    .withColumn("Longitude", col("Longitude").cast("double"))
)

carpark_final = carpark_final.withColumn(
    "Availability",
    round(
        when(col("total_lots") != 0, (col("lots_available") / col("total_lots")) * 100)
        .otherwise(None), 
        1
    )
)

carpark_final.printSchema()
carpark_final.count()

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 16, Finished, Available, Finished)

root
 |-- carpark_number: string (nullable = true)
 |-- lots_available: long (nullable = true)
 |-- total_lots: long (nullable = true)
 |-- update_datetime: string (nullable = true)
 |-- address: string (nullable = true)
 |-- car_park_type: string (nullable = true)
 |-- type_of_parking_system: string (nullable = true)
 |-- short_term_parking: string (nullable = true)
 |-- free_parking: string (nullable = true)
 |-- night_parking: string (nullable = true)
 |-- car_park_decks: integer (nullable = true)
 |-- gantry_height: double (nullable = true)
 |-- car_park_basement: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Availability: double (nullable = true)



1910

In [13]:
# Save to Fabric Lakehouse (Delta format)
carpark_final.write.format("delta").mode("overwrite").save("Tables/CarparkAvailability")

print(f"✅ Ingested {carpark_final.count()} carpark records into Lakehouse.")

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 17, Finished, Available, Finished)

✅ Ingested 1910 carpark records into Lakehouse.


In [14]:
# Check for NULLs in key columns
spark.table("CarparkAvailability").filter(
    col("lots_available").isNull() | 
    col("total_lots").isNull() |
    col("address").isNull()
).count()

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 18, Finished, Available, Finished)

0

#### Historical_Stats

##### The historical_stats table serves as a preprocessed summary of the carpark availability data, designed specifically to power the RAG (Retrieval-Augmented Generation) system. Its core purpose is to Aggregates Raw Data, Creates Semantic Context (generates human-readable text summaries (the context column)) and Cleans & Standardizes Data to ensure consistent formatting for embeddings.

In [15]:
from pyspark.sql.functions import col, avg, expr, lit, coalesce, concat

historical_stats = (
    spark.table("CarparkAvailability")
    # Safely cast to integers (returns NULL if conversion fails)
    .withColumn("lots_available", expr("TRY_CAST(lots_available AS INT)"))
    .withColumn("total_lots", expr("TRY_CAST(total_lots AS INT)"))
    # Replace zeros in total_lots to avoid division errors
    .withColumn("total_lots_safe", 
                expr("CASE WHEN total_lots IS NULL OR total_lots = 0 THEN 1 ELSE total_lots END"))
    # Calculate average occupancy (handle NULLs with coalesce)
    .groupBy("carpark_number", "address")
    .agg(
        coalesce(
            avg(col("lots_available") / col("total_lots_safe")), 
            lit(0.0)
        ).alias("avg_availability")
    )
    # Build context string (handle NULL address)
    .withColumn(
        "context", 
        concat(
            coalesce(col("address"), lit("Unknown")), 
            lit(" averages "), 
            (col("avg_availability") * 100).cast("string"), 
            lit("% availability.")
        )
    )
    .select("carpark_number", "context")
)

# Show results
historical_stats.show(10, truncate=False)

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 19, Finished, Available, Finished)

+--------------+---------------------------------------------------------------------+
|carpark_number|context                                                              |
+--------------+---------------------------------------------------------------------+
|CK47          |787A CHOA CHU KANG DRIVE averages 76.69064748201438% availability.   |
|BJ30          |510A JELAPANG ROAD averages 73.81342062193126% availability.         |
|BJ43          |603A SENJA ROAD averages 78.28685258964143% availability.            |
|AM96          |352A ANG MO KIO STREET 32 averages 63.23809523809524% availability.  |
|U34           |511 BUKIT BATOK STREET 52 averages 51.81518151815182% availability.  |
|A72           |640 ANG MO KIO STREET 61 averages 59.217877094972074% availability.  |
|B35           |140 BEDOK NORTH AVENUE 3 averages 40.10152284263959% availability.   |
|Q84           |32 HOLLAND CLOSE averages 19.53125% availability.                    |
|CK3           |124 TECK WHYE LANE averages

#### Generating embeddings_table

##### The embedding_table is the engine that powers the semantic search capability in the RAG (Retrieval-Augmented Generation) system. It enables Efficient Similarity Calculation (i.e. cosine similarity comparisons)

In [16]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(ArrayType(FloatType()))
def get_embeddings_udf(texts: pd.Series) -> pd.Series:
    client = OpenAI(api_key="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")
    embeddings = []
    for text in texts:
        if not text:
            embeddings.append([0.0]*1536)
            continue
        response = client.embeddings.create(
            input=text,
            model="text-embedding-3-small"
        )
        embeddings.append(response.data[0].embedding)
    return pd.Series(embeddings)

embeddings_table = historical_stats.withColumn(
    "embedding",
    get_embeddings_udf(col("context"))
)
print(embeddings_table.show())

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 20, Finished, Available, Finished)

+--------------+--------------------+--------------------+
|carpark_number|             context|           embedding|
+--------------+--------------------+--------------------+
|          CK47|787A CHOA CHU KAN...|[-0.051202085, 0....|
|          BJ30|510A JELAPANG ROA...|[-0.04415941, 0.0...|
|          BJ43|603A SENJA ROAD a...|[-0.03051225, 0.0...|
|          AM96|352A ANG MO KIO S...|[-0.005807804, 0....|
|           U34|511 BUKIT BATOK S...|[-0.033087425, -0...|
|           A72|640 ANG MO KIO ST...|[-0.010886977, 0....|
|           B35|140 BEDOK NORTH A...|[-0.055673484, 0....|
|           Q84|32 HOLLAND CLOSE ...|[-0.04707278, 0.0...|
|           CK3|124 TECK WHYE LAN...|[-0.004688993, 0....|
|          J84M|854A JURONG WEST ...|[-0.039459676, 0....|
|          SE33|542 SERANGOON NOR...|[-0.04468306, 0.0...|
|            C9|348 CLEMENTI AVEN...|[-0.05758462, 0.0...|
|           C24|118 CLEMENTI STRE...|[-0.04234992, 0.0...|
|           U46|299 BUKIT BATOK S...|[-0.024999563, -0..

#### Save Embeddings for Future Use

In [17]:
embeddings_table.write.format("delta").mode("overwrite").saveAsTable("CarparkEmbeddings")

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 21, Finished, Available, Finished)

In [18]:
## For faster access later:
spark.sql("OPTIMIZE CarparkEmbeddings ZORDER BY carpark_number")

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 22, Finished, Available, Finished)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,numFilesUpdatedWithoutRewrite:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesUpdatedWithoutRewrite:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemovedBreakdown:array<struct<reason:string,metrics:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>>>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,

#### Create a Query Function

In [19]:
from pyspark.sql.functions import pandas_udf, col, lit
import numpy as np

client = OpenAI(api_key=openai.api_key)

def query_carparks(search_query: str, top_n: int = 5):
    """Search carparks by semantic similarity"""
    # Step 1: Get query embedding (outside Spark)
    query_embedding = client.embeddings.create(
        input=search_query,
        model="text-embedding-3-small"
    ).data[0].embedding
    
    # Convert to numpy array for efficient calculations
    query_np = np.array(query_embedding)
    query_norm = np.linalg.norm(query_np)
    
    # Step 2: Define cosine similarity UDF
    @pandas_udf("double")
    def cosine_sim(embeddings: pd.Series) -> pd.Series:
        def _similarity(x):
            if x is None:
                return 0.0
            x_np = np.array(x)
            dot = np.dot(x_np, query_np)
            x_norm = np.linalg.norm(x_np)
            return float(dot / (x_norm * query_norm + 1e-8))  # Prevent division by zero
            
        return embeddings.apply(_similarity)
    
    # Step 3: Run search
    results = (embeddings_table
        .withColumn("similarity", cosine_sim(col("embedding")))
        .orderBy(col("similarity").desc())
        .limit(top_n)
    )
    
    return results

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 23, Finished, Available, Finished)

##### Execute a Sample Query

In [26]:
# Example search
search_results = query_carparks(
    search_query="What do you think would the carpark availability near 109 Bukit Batok for tomorrow?", 
    top_n=3
)

# Show results
search_results.select("carpark_number", "context", "similarity").show(truncate=False)

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 30, Finished, Available, Finished)

+--------------+------------------------------------------------------------------+------------------+
|carpark_number|context                                                           |similarity        |
+--------------+------------------------------------------------------------------+------------------+
|BBM8          |622A BUKIT BATOK CENTRAL averages 54.7112462006079% availability. |0.6521967706966447|
|U40           |644A BUKIT BATOK CENTRAL averages 85.76388888888889% availability.|0.6512535689026021|
|U32           |514 BUKIT BATOK STREET 52 averages 61.71875% availability.        |0.6452455772964272|
+--------------+------------------------------------------------------------------+------------------+



#### Generate AI Summaries

In [27]:
def generate_summary(results_df):
    """Convert search results to natural language"""
    top_matches = "\n".join(
        [f"{row['carpark_number']}: {row['context']} (Score: {row['similarity']:.2f})" 
         for row in results_df.collect()]
    )
    
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{
            "role": "user",
            "content": f"""Predict these carpark availability for tomorrow:
            {top_matches}
            
            Provide a concise answer to the original query."""
        }],
        max_tokens=200
    )
    return response.choices[0].message.content

print(generate_summary(search_results))

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 31, Finished, Available, Finished)

Based on the average availability, it is predicted that for tomorrow the carpark at BBM8: 622A Bukit Batok Central will have approximately 54.7% availability, U40: 644A Bukit Batok Central will have approximately 85.8% availability, and U32: 514 Bukit Batok Street 52 will have approximately 61.7% availability.


#### Performance Optimization

In [22]:
# Cache frequently used data
embeddings_table.cache()

StatementMeta(, 67ab9028-33b0-45ae-a06e-69ff47ba6cce, 26, Finished, Available, Finished)

DataFrame[carpark_number: string, context: string, embedding: array<float>]