# Ridership Open Lakehouse Demo

This notebook will demonstrate a strategy to implement an open lakehouse on GCP, using Apache Iceberg,
as an open source standard for managing data, while still leveraging GCP native capabilities. This demo will use
BigQuery Manged Iceberg Tables, Managed Apache Kafka and Apache Kafka Connect to ingest streaming data, Vertex AI for Generative AI queries on top of the data and Dataplex to govern tables.

This notebook will load data into BigQuery, backed by Parquet files, in the Apache Iceberg specification.

After loading, we will demonstrate processing the data using PySpark.

The processing will simulate `bus ridership` data, based on `bus station ridership` data. The `bus station ridership` shows passangers waiting at a given station at a given timestamp. Our PySpark processing pipelines will create a time windows, simulating a bus picking up those passangers while driving it's route. The routes for the buses are taken from the pre-made `bus_lines` table.

All data in this notebook was prepared in the previous `part0` notebook.

## Setup the environment

In [4]:
import os
USER_AGENT = "cloud-solutions/data-to-ai-nb-v3"

PROJECT_ID = !gcloud config get-value project
PROJECT_ID = PROJECT_ID[0]
BQ_DATASET = "ridership_lakehouse"
BUCKET_NAME = f"{PROJECT_ID}-ridership-lakehouse"
LOCATION = "us-central1"
BQ_CONNECTION_NAME = "cloud-resources-connection"

print(PROJECT_ID)
print(BUCKET_NAME)

lakehouse-demo-1000
lakehouse-demo-1000-ridership-lakehouse


In [2]:
from google.cloud import bigquery, storage
from google.api_core.client_info import ClientInfo

bigquery_client = bigquery.Client(
    project=PROJECT_ID,
    location=LOCATION,
    client_info=ClientInfo(user_agent=USER_AGENT)
)
storage_client = storage.Client(
    project=PROJECT_ID,
    client_info=ClientInfo(user_agent=USER_AGENT)
)

## Create the tables and load data

In [5]:
bus_stops_uri = f"gs://{BUCKET_NAME}/iceberg_data/bus_stations/"
bus_lines_uri = f"gs://{BUCKET_NAME}/iceberg_data/bus_lines/"
ridership_uri = f"gs://{BUCKET_NAME}/iceberg_data/ridership/"

bigquery_client.query(f"DROP TABLE IF EXISTS {BQ_DATASET}.bus_stations;").result()
query = f"""
CREATE TABLE {BQ_DATASET}.bus_stations
(
  bus_stop_id INTEGER,
  address STRING,
  school_zone BOOLEAN,
  seating BOOLEAN,
  latitude FLOAT64,
  longtitude FLOAT64
)
WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{BQ_CONNECTION_NAME}`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = '{bus_stops_uri}');
"""
bigquery_client.query(query).result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7ea77634d590>

In [11]:
bigquery_client.query(
    f'DROP TABLE IF EXISTS {BQ_DATASET}.bus_lines;'
).result()
_create_table_stmt = f"""
    CREATE TABLE {BQ_DATASET}.bus_lines (
        bus_line_id INTEGER,
        bus_line STRING,
        number_of_stops INTEGER,
        stops ARRAY<INTEGER>,
        frequency_minutes INTEGER
    )
    WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{BQ_CONNECTION_NAME}`
    OPTIONS (
        file_format = 'PARQUET',
        table_format = 'ICEBERG',
        storage_uri = '{bus_lines_uri}'
    );
"""
bigquery_client.query(_create_table_stmt).result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7ea764d8b990>

In [7]:
bigquery_client.query(
    f'DROP TABLE IF EXISTS {BQ_DATASET}.ridership;'
).result()
_create_table_stmt = f"""
    CREATE TABLE {BQ_DATASET}.ridership (
        transit_timestamp TIMESTAMP,
        station_id INTEGER,
        ridership INTEGER
    )
    WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{BQ_CONNECTION_NAME}`
    OPTIONS (
        file_format = 'PARQUET',
        table_format = 'ICEBERG',
        storage_uri = '{ridership_uri}'
    );
"""
bigquery_client.query(_create_table_stmt).result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7ea72c854350>

In [12]:
dataset_ref = bigquery_client.dataset(BQ_DATASET)
table_ref = dataset_ref.table("bus_lines")

# BQ tables for Apache Iceberg do not support load with truncating, so we will truncate manually, and then load
truncate = bigquery_client.query(f"DELETE FROM {BQ_DATASET}.bus_lines WHERE TRUE")
truncate.result()

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)

job = bigquery_client.load_table_from_uri(
    f"gs://{BUCKET_NAME}/mta_staging_data/bus_lines.json",
    table_ref,
    job_config=job_config,
)

job.result()

LoadJob<project=lakehouse-demo-1000, location=us-central1, id=6a96e8e1-7c43-4f3c-9005-a0528afc0ca6>

In [13]:
table_ref = dataset_ref.table("bus_stations")

# BQ tables for Apache Iceberg do not support load with truncating, so we will truncate manually, and then load
truncate = bigquery_client.query(f"DELETE FROM {BQ_DATASET}.bus_stations WHERE TRUE")
truncate.result()

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
)

job = bigquery_client.load_table_from_uri(
    f"gs://{BUCKET_NAME}/mta_staging_data/bus_stations.csv",
    table_ref,
    job_config=job_config,
)

job.result()

LoadJob<project=lakehouse-demo-1000, location=us-central1, id=1f529f45-11d1-4644-9ce8-abd1990156b0>

In [14]:
table_ref = dataset_ref.table("ridership")

# BQ tables for Apache Iceberg do not support load with truncating, so we will truncate manually, and then load
truncate = bigquery_client.query(f"DELETE FROM {BQ_DATASET}.ridership WHERE TRUE")
truncate.result()

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
)

job = bigquery_client.load_table_from_uri(
    f"gs://{BUCKET_NAME}/mta_staging_data/ridership/*.csv",
    table_ref,
    job_config=job_config,
)

job.result()

LoadJob<project=lakehouse-demo-1000, location=us-central1, id=7f3dd344-6687-4fcd-819c-289b51ed9594>

## Data Processing
After loading the data to our open data lakehouse, we process the raw data with Dataproc. We simulate bus trips with arrivals and stops along with passengers going in and out of the bus.


In [None]:
import os

PROJECT_ID = !gcloud config get-value project
PROJECT_ID = PROJECT_ID[0]

BQ_DATASET = "ridership_lakehouse"
LOCATION = "us-central1"
APP_NAME = "open-data-lakehouse-demo"

print(PROJECT_ID)
print(LOCATION)
print(BQ_DATASET)
print(APP_NAME)

: 

### Data Processing Function Definintions
Define the functions that execute the data processing:
- Simulate bus stops and arrivals
- Join the corresponding ridership data and simulate the passenger flow
- Save the completed trips to BQ

In [None]:
%pip install dataproc-spark-connect

from google.cloud.dataproc_v1 import Session
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from datetime import datetime, timedelta
from pyspark.sql.connect.functions import when, col, lag, unix_timestamp, window, floor, rand, sum as Fsum, expr
from pyspark.sql.connect.window import Window
from pyspark.sql.connect.types import StructType, StructField, LongType


session_config = Session()

# Create the Spark session.
spark = DataprocSparkSession.builder.projectId(f"{PROJECT_ID}").appName(f"{APP_NAME}").location(f"{LOCATION}").dataprocSessionConfig(session_config).getOrCreate()

ridership = spark.read.format('bigquery') \
    .option("project", PROJECT_ID) \
    .option("table", f"{PROJECT_ID}:{BQ_DATASET}.ridership") \
    .load()

ridership = ridership.withColumn("transit_timestamp_unix_ts", unix_timestamp("transit_timestamp"))
ridership.createOrReplaceTempView("ridership_view")


def get_bus_lines_stations():
    """
    Returns a DataFrame of the ordererd stations for all bus lines.
    """

    df_table = spark.read.format("bigquery") \
        .option("project", PROJECT_ID) \
        .option("table", f"{PROJECT_ID}:{BQ_DATASET}.bus_lines") \
        .load()


    unnested_stations = df_table.selectExpr("bus_line_id", "posexplode(stops) as (stop_index, stop_value)")
    bus_lines_stations = unnested_stations.selectExpr("bus_line_id", "stop_value AS stop_id", "stop_index AS stop_index")
    bus_lines_stations.createOrReplaceTempView("bus_stations")

    return bus_lines_stations


def get_bus_trips_simulation(start_time_str, min_trip_duration_minutes=1, max_trip_duration_minutes=3, seating_capacity_list=[10, 20, 25], standing_capacity_list=[20,30,35]):
    """
    Simulates trips with arrival times for each station based on the bus line schedules and a starting time.
    """

    start_time = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
    unix_ts = int(start_time.timestamp())
    duration_range = max_trip_duration_minutes - min_trip_duration_minutes + 1
    start_ts_str = start_time.strftime('%Y-%m-%d %H:%M:%S')

    capacity_mapping = ",\n    ".join([
        f"({i}, {seating_capacity_list[i]}, {standing_capacity_list[i]})"
        for i in range(min(len(seating_capacity_list), len(standing_capacity_list)))
    ])
    num_options = len(seating_capacity_list)

    # Query
    query = f"""
    WITH capacities_list AS (
      SELECT * FROM VALUES
        {capacity_mapping}
      AS t(idx, seating_capacity, standing_capacity)
    ),

    bus_line_assignments AS (
      SELECT
        bus_line_id,
        ROW_NUMBER() OVER (ORDER BY bus_line_id) % {num_options} AS idx
      FROM (
        SELECT DISTINCT bus_line_id FROM bus_stations
      )
    ),

    bus_capacities AS (
      SELECT
        a.bus_line_id,
        c.seating_capacity,
        c.standing_capacity
      FROM bus_line_assignments a
      JOIN capacities_list c ON a.idx = c.idx
    ),

    delays AS (
      SELECT
        bus_line_id,
        stop_id,
        stop_index,
        FLOOR((ABS(HASH(bus_line_id, stop_index)) % {duration_range})) + {min_trip_duration_minutes} AS random_delay_min,
        SUM(FLOOR((ABS(HASH(bus_line_id, stop_index)) % {duration_range})) + {min_trip_duration_minutes}) OVER (
          PARTITION BY bus_line_id
          ORDER BY stop_index
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS cumulative_delay_min,
        MAX(stop_index) OVER (PARTITION BY bus_line_id) AS max_stop_index
      FROM bus_stations
    ),

    base AS (
      SELECT UNIX_SECONDS(TIMESTAMP('{start_ts_str}')) AS start_ts_unix
    )

    SELECT
      trip_id,
      d.bus_line_id,
      d.stop_id,
      d.stop_index,
      d.random_delay_min,
      d.cumulative_delay_min,
      arrival_time,
      CAST(arrival_time AS LONG) AS arrival_time_unix_ts,
      c.seating_capacity,
      c.standing_capacity,
      (d.stop_index = d.max_stop_index) AS is_last_stop
    FROM (
      SELECT
        {unix_ts} as trip_id,
        d.bus_line_id,
        d.stop_id,
        d.stop_index,
        d.random_delay_min,
        d.cumulative_delay_min,
        d.max_stop_index,
        TIMESTAMP_SECONDS(b.start_ts_unix + d.cumulative_delay_min * 60) AS arrival_time
      FROM delays d
      CROSS JOIN base b
    ) d
    JOIN bus_capacities c ON d.bus_line_id = c.bus_line_id
    ORDER BY d.bus_line_id, d.stop_index
    """

    return spark.sql(query)


def get_ridership_window_intervals(step_in_minutes=10, window_size_in_minutes=180):
    """
    Returns a DataFrame that constructs the timestamp window intervals based on the ridership_view
    """

    step = step_in_minutes * 60
    window_size = window_size_in_minutes * 60

    #CAN THIS BE IMPROVED?
    timestamps_row = spark.sql("""
        SELECT
            unix_timestamp(MIN(transit_timestamp)) AS start_ts,
            unix_timestamp(MAX(transit_timestamp)) AS end_ts
        FROM ridership_view
    """).first()

    start_ts = timestamps_row["start_ts"]
    end_ts = timestamps_row["end_ts"]

    window_bounds = []
    ts = start_ts
    while ts + window_size <= end_ts:
        window_bounds.append((ts, ts + window_size))
        ts += step

    # Create a DataFrame from the list of windows
    window_schema = StructType([
        StructField("window_start", LongType(), False),
        StructField("window_end", LongType(), False),
    ])

    window_df = spark.createDataFrame(window_bounds, schema=window_schema)

    return window_df


def get_ridership_window(ridership_df, window_start, window_end):
    """
    Returns a DataFrame that represents the ridership_df window given by the window start and end
    """

    ridership_window_data = ridership_df.filter(
      (col("transit_timestamp_unix_ts") >= window_start) &
      (col("transit_timestamp_unix_ts") < window_end)
    )

    return ridership_window_data


def get_ridership_simulation_data_filtered(simulations_df, ridership_df, window_start, window_end):
    """
    Returns a DataFrame that represents the station arrivals simulation populated with the ridership data, filtered
    to include only the time span between the window start and window end of the data.
    """

    # Alias both DataFrames to disambiguate columns
    sim_df = simulations_df.alias("sim")
    rid_df = ridership_df.alias("rid")

    # Perform join with aliased references
    simulations_with_ridership = sim_df.join(
        rid_df,
        on=(
            (col("sim.arrival_time_unix_ts") == col("rid.transit_timestamp_unix_ts")) &
            (col("sim.stop_id") == col("rid.station_id"))
        ),
        how="left"
    ).select(
        col("sim.trip_id"),
        col("sim.bus_line_id"),
        col("sim.stop_id"),
        col("sim.stop_index"),
        col("sim.is_last_stop"),
        col("sim.random_delay_min"),
        col("sim.cumulative_delay_min"),
        col("rid.ridership"),
        col("sim.arrival_time"),
        col("sim.arrival_time_unix_ts"),
        col("sim.seating_capacity"),
        col("sim.standing_capacity")
    )

    # Filter based on time window
    filtered_simulations_with_ridership = simulations_with_ridership.filter(
        (col("arrival_time_unix_ts") >= window_start) &
        (col("arrival_time_unix_ts") < window_end)
    )

    return filtered_simulations_with_ridership


def aggregate_ridership_and_simulate_capacity_data (simulations_with_ridership_df):
    """
    Returns a DataFrame that performs:
      - random passengers out simulations based on previous station passengers in
      - calculates bus onboard passangers
      - decides if the buses are overloaded or not
    """


    w = Window.partitionBy("trip_id", "bus_line_id").orderBy("stop_index")

    cumulative_sum = Fsum(when(col("ridership").isNotNull(), col("ridership"))).over(w)

    simulations_with_ridership_df = simulations_with_ridership_df.withColumn(
        "cumulative_ridership",
        when(col("ridership").isNotNull(), cumulative_sum)
    )

    simulations_with_ridership_df = simulations_with_ridership_df.withColumn("prev_ridership", lag(col("ridership")).over(w))

    #We might have to implement a more random passengers_out column. Right now it's based on a random value of the previous ridership value.
    simulations_with_ridership_df = simulations_with_ridership_df.withColumn(
      "passengers_out",
      when(
          col("stop_index") == 0,  # if first stop
          0
      ).otherwise(
          when(
              col("ridership").isNotNull(),
              floor(
                  rand() * (when(col("prev_ridership").isNotNull(), col("prev_ridership")).otherwise(col("ridership")) + 1)
              )
          ).otherwise(None)
        )
    )

    w_currentRow = Window.partitionBy("trip_id", "bus_line_id").orderBy("stop_index").rowsBetween(Window.unboundedPreceding, Window.currentRow)

    simulations_with_ridership_df = simulations_with_ridership_df.withColumn(
        "passengers_onboard_at_departure",
        when(
            col("ridership").isNull(),
            None
        ).otherwise(
            Fsum(col("ridership") - col("passengers_out")).over(w_currentRow)
        )
    )

    simulations_with_ridership_df = simulations_with_ridership_df.withColumn(
        "seating_overcapacity",
        when(
            col("passengers_onboard_at_departure").isNotNull() &
            (col("passengers_onboard_at_departure") > col("seating_capacity")),
            True
        ).otherwise(False)
    ).withColumn(
        "standing_overcapacity",
        when(
            col("passengers_onboard_at_departure").isNotNull() &
            (col("passengers_onboard_at_departure") > (col("seating_capacity") + col("standing_capacity"))),
            True
        ).otherwise(False)
    )

    return simulations_with_ridership_df


def handle_complete_arrival_simulation_data(simulations_with_ridership_df, window_end):
    """
    Decides which trips from the simulations_with_ridership_df had the chance to be populated with ridership data and saves them to BQ.
    """

    # Filter out qualifying trips that reached the last stop before the window_end
    qualifying_trip_ids_df = simulations_with_ridership_df.filter(
        (col("is_last_stop") == True) &
        (col("arrival_time_unix_ts") < window_end)
    ).select("trip_id", "bus_line_id").distinct()

    # Reconstruct full rows for those qualifying trips
    qualifying_trips_df = simulations_with_ridership_df.alias("sim").join(
        qualifying_trip_ids_df.alias("qual_ids"),
        on=["trip_id", "bus_line_id"],
        how="inner"
    ).select(
        col("sim.trip_id"),
        col("sim.bus_line_id"),
        col("sim.stop_id"),
        col("sim.stop_index"),
        col("sim.is_last_stop"),
        col("sim.random_delay_min"),
        col("sim.cumulative_delay_min"),
        col("sim.ridership"),
        col("sim.arrival_time"),
        col("sim.arrival_time_unix_ts"),
        col("sim.seating_capacity"),
        col("sim.standing_capacity")
    )

    # Enrich with ridership aggregation
    qualifying_trips_df = aggregate_ridership_and_simulate_capacity_data(qualifying_trips_df)
    
    # Write the qualifying trips BigQuery
    qualifying_trips_df.write \
        .format("bigquery") \
        .option("table", f"{PROJECT_ID}:{BQ_DATASET}.simulation") \
        .option("writeMethod", "direct") \
        .mode("append") \
        .save()    

    return qualifying_trips_df

### Data Processing Run
Define the functions that execute the data processing:
- Simulate bus stops and arrivals
- Join the corresponding ridership data and simulate the passenger flow
- Save the completed trips to BQ


In [None]:
from datetime import datetime, timezone

bus_lines_stations = get_bus_lines_stations()
bus_lines_stations.cache()

intervals_df = get_ridership_window_intervals()
intervals_df_f=intervals_df.filter(intervals_df.window_end<=1593583800)

for row in intervals_df_f.sort("window_start").toLocalIterator():
    window_start = int(row['window_start'])
    window_end = int(row['window_end'])

    window_start_str = datetime.fromtimestamp(window_start, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

    #1. get filter_ridership
    ridership_window_data=get_ridership_window(ridership, window_start, window_end)

    #2. arrivals_simulation for the lower end of the interval
    simulations=get_bus_trips_simulation(window_start_str)

    #3. join and create a filtered dataframe
    simulations_with_ridership=get_ridership_simulation_data_filtered(simulations, ridership_window_data, window_start, window_end)

    #4. join and create a filtered dataframe
    complete_simulations_with_ridership = handle_complete_arrival_simulation_data(simulations_with_ridership, window_end)

## Basic Analytics
After loading the data to our open data lakehouse, we will demonstrate some basic analytics, but we will repeat the process with several different engines
- BigQuery
- Spark (serverless?)