In [None]:
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

# Create the Dataproc Serverless session.
session = Session()
# Set the session configuration for BigQuery Metastore with the Iceberg environment.
project_id = "[YOUR_PROJECT]" #REPLACE
region = "us-central1"
subnet_name = "default"
location = "us-central1"
session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
#session.environment_config.peripherals_config.bigquery_metastore_config.project_id = f"{project_id}"
#session.environment_config.peripherals_config.bigquery_metastore_config.location = f"{location}"

warehouse_dir = "gs://[YOUR_DIR]/warehouse" #REPLACE
catalog = "biglake"
namespace = "biglake_nyc"

session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"


# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("sme_academy")
     .dataprocSessionConfig(session)
     .getOrCreate()
)


In [None]:
# Create the namespace in BigQuery.
spark.sql(f"USE `{catalog}`;")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
spark.sql(f"USE `{namespace}`;")


In [None]:
#Let's drop the table if exists and recreate
spark.sql("DROP TABLE IF EXISTS `taxis`");

sql_query = """
CREATE TABLE taxis (
  VendorID BIGINT,
  tpep_pickup_datetime    TIMESTAMP,
  tpep_dropoff_datetime   TIMESTAMP,
  passenger_count         DOUBLE,
  trip_distance           DOUBLE,
  PULocationID            BIGINT,
  DOLocationID            BIGINT,
  RatecodeID              DOUBLE,
  store_and_fwd_flag      STRING,
  payment_type            BIGINT,
  fare_amount             DOUBLE,
  extra                   DOUBLE,
  mta_tax                 DOUBLE,
  tip_amount              DOUBLE,
  tolls_amount            DOUBLE,
  improvement_surcharge   DOUBLE,
  total_amount            DOUBLE,
  congestion_surcharge    DOUBLE,
  airport_fee             DOUBLE,
  tip_percentage          DOUBLE
)
USING iceberg
PARTITIONED BY (tpep_pickup_datetime);
"""
spark.sql(sql_query)



In [None]:
# Cretae temp view, alternatively can use Biglake table in BQ ### REPLACE DIR
sql_query = """
CREATE OR REPLACE TEMPORARY VIEW temp_taxi_data
USING parquet
OPTIONS (
  path 'gs://[YOUR_DIR]/yellow_tripdata_2025-09.parquet');
"""
spark.sql(sql_query)

In [None]:
#Insert values from temp view to iceberg table

sql_query= """
INSERT INTO taxis
SELECT
  CAST(VendorID AS BIGINT),
  tpep_pickup_datetime,
  tpep_dropoff_datetime,
  passenger_count,
  trip_distance,
  CAST(PULocationID AS BIGINT),
  CAST(DOLocationID AS BIGINT),
  RatecodeID,
  store_and_fwd_flag,
  CAST(payment_type AS BIGINT),
  fare_amount,
  extra,
  mta_tax,
  tip_amount,
  tolls_amount,
  improvement_surcharge,
  total_amount,
  congestion_surcharge,
  airport_fee,
  tip_amount/fare_amount
FROM
  temp_taxi_data
LIMIT 1000;
  """
spark.sql(sql_query)


In [None]:
sql_query= """
SELECT
    PULocationID,
    count(*) AS total_trips,
    round(avg(tip_amount), 2) as avg_trip_amount,
    -- Calculate average tip as a percentage of the fare, formatted to 2 decimal places
    round(avg(tip_percentage) * 100, 2) AS avg_tip_percentage
FROM
    taxis
WHERE
    fare_amount > 0 AND tip_amount >= 0
GROUP BY
    PULocationID
ORDER BY
    avg_tip_percentage DESC
LIMIT 10;
  """
spark.sql(sql_query).show()

In [None]:
spark.sql("select * from taxis where PULocationID = 265 order by tip_amount desc limit 100;").show()
