## Project 3 work

#### Define common constants and import common libraries

In [72]:
project_id = "cs378-fa2025"
region = "us-central1"
model_name = "gemini-2.5-pro"
bucket_name = "air-travel-open-data"

In [73]:
import pandas, json, time, os
from google import genai
from google.genai.types import CreateBatchJobConfig, JobState
from google.cloud import bigquery
from google.cloud import storage

### Airport Maps

In [71]:
input_folder_prefix = "initial-loads/airport-maps" # where our input images are located
input_folder = "jsonl/airport-maps"
input_data_path = f"{input_folder}/input-data.jsonl" # used for both the local file and GCS file, contains the prediction requests
input_uri = f"gs://{bucket_name}/{input_data_path}" # complete path to file containing the prediction requests
tmp_table = "air_travel_tmp.airport_maps_data" # output dataset and table in BQ
tmp_table_path = f"bq://{project_id}.{tmp_table}"
tmp_parquet_file = "airport_maps_data.parquet"
lakehouse_parquet_file = f"lakehouse/airport-maps/airport_maps_data.parquet"

#### Run the batch job prediction

In [13]:
def create_gemini_request(file_list):

    parts = [
        {
            "text": prompt
        }
    ]

    for file_data in file_list:
        parts.append({
            "file_data": {"file_uri": file_data, "mime_type": "application/pdf"}
        })

    request_dict = {
        "system_instruction": {
            "parts": [
                {
                    "text": system_instruction
                }
            ]
        },
        "contents": {
            "role": "user",
            "parts": parts
        },
        "generation_config": {
            "temperature": 1,
        },
    }

    return request_dict

In [52]:

storage_client = storage.Client()
genai_client = genai.Client(vertexai=True, project=project_id, location=region)

system_instruction = """
You are a helpful assistant who reads and understands airport terminal maps. Your goal is to ensure that an airport's businesses are properly reported.
You double-check your answers to make sure you are not mislabeling a business and always return your answers in json format.
"""

prompt = """Which businesses appear on the terminal map of this airport? Return their terminal, business name, category, and the nearest landmarks as they are shown on the map.
If the business is a dining establishment, return its menu items that it is known for.
Please make sure that your answer conforms to the output schema: {"terminal": "string", "business": "string", "category": "string", "location": "string", "dining": "boolean", "menu_items": "string"}
"""

jsonl_lines = []
file_listing = []
# get the file listings for the maps that we're going to process
bucket = storage_client.bucket(bucket_name)

cur_airport = None
prev_airport = None
num_airports = 0

for blob in bucket.list_blobs(prefix=input_folder_prefix):

    # make a separate dictionary request for each airport
    cur_airport = blob.name.split("-")[2].replace("maps/", "")

    if prev_airport == None or cur_airport == prev_airport:
        file_listing.append(f"gs://{bucket_name}/{blob.name}")
        prev_airport = cur_airport
    else:
        num_airports += 1
        request_dict = create_gemini_request(file_listing)

        jsonl_line = json.dumps({
            "key": prev_airport,  # row identifier
            "request": request_dict
        })

        #print(f"adding {prev_airport} to the input file")
        jsonl_lines.append(jsonl_line)
        file_listing.clear()

        # process cur_airport
        file_listing.append(f"gs://{bucket_name}/{blob.name}")
        prev_airport = cur_airport

if len(file_listing) > 0:
    request_dict = create_gemini_request(file_listing)

    jsonl_line = json.dumps({
        "key": cur_airport,  # use your row identifier here
        "request": request_dict
    })

    num_airports += 1
    jsonl_lines.append(jsonl_line)

print("Number of lines in jsonl file:", len(jsonl_lines))

# write all the lines to the jsonl file
if os.path.isdir(input_folder) != True:
    os.mkdir(input_folder)

with open(input_data_path, 'w') as f:
    for line in jsonl_lines:
        f.write(line + '\n')

storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(input_data_path)
blob.upload_from_filename(input_data_path)

job = genai_client.batches.create(
    model=model_name,
    src=input_uri,
    config=CreateBatchJobConfig(
    dest=tmp_table_path
  )
)

print(f"Created job: {job.name} with {num_airports} airports")
print(f"Job {job.name} is currently in {job.state} state")

Number of lines in jsonl file: 88
Created job: projects/988876466742/locations/us-central1/batchPredictionJobs/3326614434616442880 with 88 airports
Job projects/988876466742/locations/us-central1/batchPredictionJobs/3326614434616442880 is currently in JobState.JOB_STATE_PENDING state


In [54]:
job = genai_client.batches.get(name=job.name)

while job.state not in (JobState.JOB_STATE_SUCCEEDED, JobState.JOB_STATE_FAILED,
                        JobState.JOB_STATE_CANCELLED, JobState.JOB_STATE_PAUSED):
    job = genai_client.batches.get(name=job.name)
    print(f"Job state: {job.state}")
    time.sleep(45)

print(f"Job state: {job.state}")

Job state: JobState.JOB_STATE_SUCCEEDED


#### Check the job results

In [55]:
%%bigquery
select key, response
from air_travel_tmp.airport_maps_data
where status not like '%error%'

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,key,response
0,sfb,"{""candidates"":[{""avgLogprobs"":-0.4782986346790..."
1,bhm,"{""candidates"":[{""avgLogprobs"":-0.2436348086257..."
2,gtr,"{""candidates"":[{""avgLogprobs"":-2.979052734375,..."
3,cvg,"{""candidates"":[{""avgLogprobs"":-0.3004618645363..."
4,lch,"{""candidates"":[{""avgLogprobs"":-0.2530112918749..."
...,...,...
83,las,"{""candidates"":[{""avgLogprobs"":-0.1664780419602..."
84,oak,"{""candidates"":[{""avgLogprobs"":-0.3259787659545..."
85,fll,"{""candidates"":[{""avgLogprobs"":-0.1240846142144..."
86,lax,"{""candidates"":[{""avgLogprobs"":-0.0798005497763..."


#### Create the parquet file

In [57]:
import pandas, pandas_gbq
import json
from google.cloud import storage

storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

sql = f"""select key, response from {tmp_table} where status not like '%error%'
"""

df = pandas_gbq.read_gbq(
    sql,
    project_id=project_id,
    dialect="standard",
)

prediction_results = [] # key = product_id, value = dictionary containing predictions

# extract predictions from response
for index, row in df.iterrows():
    airport = row["key"]
    #print(f"airport: {airport}")
    response_dict = json.loads(row["response"])
    prediction_str = response_dict["candidates"][0]["content"]["parts"][0]["text"].replace("```", "").replace("json", "")
    prediction_list = json.loads(prediction_str)

    for prediction_dict in prediction_list:
        if type(prediction_dict) != dict:
            print(f"discarding: {prediction_dict}")
            continue
        prediction_dict.update({"airport": airport})
        prediction_results.append(prediction_dict)

print(f"number of results: {len(prediction_results)}")

# convert to parquet file
df = pandas.DataFrame(prediction_results)
df.to_parquet(tmp_parquet_file)
print("converted to parquet")

# upload to our lakehouse folder in GCS
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(lakehouse_parquet_file)
blob.upload_from_filename(tmp_parquet_file)
print(f"wrote parquet file to gs://{bucket_name}/{lakehouse_parquet_file}")

Downloading: 100%|[32m██████████[0m|
number of results: 1465
converted to parquet
wrote parquet file to gs://air-travel-open-data/lakehouse/airport-maps/airport_maps_data.parquet


#### Create and load the Iceberg table

In [60]:
%%bigquery

CREATE OR REPLACE TABLE air_travel_raw.airport_maps
(
    airport STRING,
    terminal STRING,
    business STRING,
    category STRING,
    location STRING,
    dining BOOLEAN,
    menu_items STRING
)
WITH CONNECTION `988876466742.us-central1.cloud-storage-connection`
OPTIONS (
 file_format = 'PARQUET',
 table_format = 'ICEBERG',
 storage_uri = 'gs://air-travel-open-data/lakehouse/airport-maps'
);

Query is running:   0%|          |

In [61]:
%%bigquery

load data into air_travel_raw.airport_maps
 from files (
 format = 'parquet',
 uris = ['gs://air-travel-open-data/lakehouse/airport-maps/airport_maps_data.parquet']);

Query is running:   0%|          |

#### Check the output

In [62]:
%%bigquery
select * from air_travel_raw.airport_maps

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,airport,terminal,business,category,location,dining,menu_items
0,sfb,Level One,Jetway Caffe,Dining,Near the Information Desk and ATM,True,"Coffee, pastries, and light snacks"
1,sfb,Level One,TAB,Dining,Near the Global Entry office,True,"Bar food, beer, and cocktails"
2,sfb,Level Two,Stanford Market,Shopping,Near Gate 12,False,
3,sfb,Level Two,Hudson News,Shopping,Near Gate 11,False,
4,sfb,Level Two,Port Paradise,Dining,Near Gate 11,True,Tropical-themed cocktails and bar food
...,...,...,...,...,...,...,...
1460,sfo,3,"COVID-19 Testing, XpresCheck",Service,"Level 1, Arrivals",False,
1461,sfo,3,United Airline Baggage Service,Service,Level 1,False,
1462,sfo,3,SFO Museum,Things to Do,Pre-security area,False,
1463,sfo,3,Yoga Room,Things to Do,Concourse E,False,


In [63]:
%%bigquery
select * from air_travel_raw.airport_maps
where airport = 'aus'

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,airport,terminal,business,category,location,dining,menu_items
0,aus,East Concourse,Mini Market,Shopping,Near Gate 1,False,
1,aus,East Concourse,The Scoreboard,Dining,Near Gate 1,True,"Bar food, drinks"
2,aus,East Concourse,Hut's Hamburgers,Dining,Between Gates 1 and 3,True,Hamburgers
3,aus,East Concourse,Pretzels,Dining,Near Gate 3,True,Pretzels
4,aus,East Concourse,Starbucks,Dining,Near Gate 3,True,"Coffee, pastries, sandwiches"
5,aus,East Concourse,Taste ATX,Dining,Near Gate 5,True,"Local Austin foods, snacks"
6,aus,East Concourse,Second Bar,Dining,Near Gate 5,True,"Cocktails, bar food"
7,aus,East Concourse,Parkside,Dining,Near Gate 5,True,"American cuisine, cocktails"
8,aus,East Concourse,EJE Travel Retail,Shopping,Near Gate 6,False,
9,aus,East Concourse,Jugo,Dining,Near Gate 7,True,"Juice, smoothies"


#### Add the `_load_time` and `_data_source` fields to the table

In [64]:
%%bigquery

alter table air_travel_raw.airport_maps
    add column _data_source STRING, add column _load_time TIMESTAMP;

Query is running:   0%|          |

In [80]:
%%bigquery

update air_travel_raw.airport_maps
    set _data_source = 'airport.guide', _load_time = current_timestamp()
    where 1 = 1

Query is running:   0%|          |

#### Check final output

In [68]:
%%bigquery

select * from air_travel_raw.airport_maps
where airport = 'lax'

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,airport,terminal,business,category,location,dining,menu_items,_data_source,_load_time
0,lax,1,Deli and Co.,Dining,Near Gate 11B,True,"Deli sandwiches, salads, and snacks",airport.guide,2025-09-12 17:51:04.394764+00:00
1,lax,5,Loteria!,Dining,Near Gate 53A,True,Mexican street food,airport.guide,2025-09-12 17:51:04.394764+00:00
2,lax,International,Planet Hollywood,Shopping,Near Gate 131,False,,airport.guide,2025-09-12 17:51:04.394764+00:00
3,lax,4,DFS Duty Free,Shopping,Near Gate 41,False,,airport.guide,2025-09-12 17:51:04.394764+00:00
4,lax,6,Market 8600,Dining,Near Gate 64A,True,Grab-and-go market items,airport.guide,2025-09-12 17:51:04.394764+00:00
...,...,...,...,...,...,...,...,...,...
168,lax,1,Chick-fil-A,Dining,Between Gates 11A and 11B,True,"Chicken sandwiches, nuggets, waffle fries",airport.guide,2025-09-12 17:51:04.394764+00:00
169,lax,4,AA Admirals Club,Airline Lounge,Near Gate 40,False,,airport.guide,2025-09-12 17:51:04.394764+00:00
170,lax,7,DFS Duty Free,Shopping,Near Gate 71A,False,,airport.guide,2025-09-12 17:51:04.394764+00:00
171,lax,International,iStore Boutique,Shopping,Main Hall,False,,airport.guide,2025-09-12 17:51:04.394764+00:00


### TSA Traffic

In [69]:
input_folder_prefix = "initial-loads/tsa-traffic" # where our input pdfs are located
input_folder = "jsonl/tsa-traffic"
input_data_path = f"{input_folder}/input-data.jsonl" # used for both the local file and GCS file, contains the prediction requests
input_uri = f"gs://{bucket_name}/{input_data_path}" # complete path to file containing the prediction requests
tmp_table = "air_travel_tmp.tsa_traffic_data" # output dataset and table in BQ
tmp_table_path = f"bq://{project_id}.{tmp_table}"
tmp_parquet_file = "tsa_traffic_data.parquet"
lakehouse_parquet_file = f"lakehouse/tsa-traffic/tsa_traffic_data.parquet"

#### Run the batch job prediction

In [74]:
def create_gemini_request(file_list):

    parts = [
        {
            "text": prompt
        }
    ]

    for file_data in file_list:
        parts.append({
            "file_data": {"file_uri": file_data, "mime_type": "application/pdf"}
        })

    request_dict = {
        "system_instruction": {
            "parts": [
                {
                    "text": system_instruction
                }
            ]
        },
        "contents": {
            "role": "user",
            "parts": parts
        },
        "generation_config": {
            "temperature": 1,
        },
    }

    return request_dict

In [78]:
storage_client = storage.Client()
genai_client = genai.Client(vertexai=True, project=project_id, location=region)

system_instruction = """
You are a helpful assistant who reads and understands tsa traffic reports. Your goal is to ensure that an airport's traffic patterns is properly reported.
You double-check your answers to make sure you are not mislabeling a checkpoint or reporting wrong numbers and always return your answers in json format.
"""

prompt = """Convert the file to json format. Return the date, hour of day, airport code, airport name, city, state, checkpoint name, and total traffic reported for each time period and location.
Please make sure that your answer conforms to the output schema:
  {"day": "string", "hour": "string, "terminal": "airport_code", "airport_name": "string", "city": "string", "state": "string", "checkpoint": "string", "total_traffic": "integer"}
"""

jsonl_lines = []

# get the file listings for the maps that we're going to process
bucket = storage_client.bucket(bucket_name)

num_reports = 0

for blob in bucket.list_blobs(prefix=input_folder_prefix):

    num_reports += 1

    # make a separate request per pdf
    file_name = blob.name.replace("air-travel-open-data/initial-loads/tsa-traffic", "").replace(".pdf", "")
    file_path = f"gs://{bucket_name}/{blob.name}"

    request_dict = create_gemini_request(file_path)

    jsonl_line = json.dumps({
        "key": file_name,  # row identifier
        "request": request_dict
    })

    jsonl_lines.append(jsonl_line)

print("number of lines in jsonl file:", len(jsonl_lines))

# write all the lines to the jsonl file
if os.path.isdir(input_folder) != True:
    os.mkdir(input_folder)

with open(input_data_path, 'w') as f:
    for line in jsonl_lines:
        f.write(line + '\n')

storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(input_data_path)
blob.upload_from_filename(input_data_path)

job = genai_client.batches.create(
    model=model_name,
    src=input_uri,
    config=CreateBatchJobConfig(
    dest=tmp_table_path
  )
)

print(f"Created job: {job.name} with {num_reports} tsa reports")
print(f"Job {job.name} is currently in {job.state} state")

number of lines in jsonl file: 110
Created job: projects/988876466742/locations/us-central1/batchPredictionJobs/3211702275374317568 with 110 tsa reports
Job projects/988876466742/locations/us-central1/batchPredictionJobs/3211702275374317568 is currently in JobState.JOB_STATE_PENDING state


In [79]:
job = genai_client.batches.get(name=job.name)

while job.state not in (JobState.JOB_STATE_SUCCEEDED, JobState.JOB_STATE_FAILED,
                        JobState.JOB_STATE_CANCELLED, JobState.JOB_STATE_PAUSED):
    job = genai_client.batches.get(name=job.name)
    print(f"Job state: {job.state}")
    time.sleep(45)

print(f"Job state: {job.state}")

Job state: JobState.JOB_STATE_PENDING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_SUCCEEDED
Job state: JobState.JOB_STATE_SUCCEEDED


#### Check the job results

In [None]:
%%bigquery
select key, response
from air_travel_tmp.tsa_traffic_data
where status not like '%error%'

#### Create the parquet file

In [None]:
import pandas, pandas_gbq
import json
from google.cloud import storage

storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

sql = f"""select key, response from {tmp_table} where status not like '%error%'
"""

df = pandas_gbq.read_gbq(
    sql,
    project_id=project_id,
    dialect="standard",
)

prediction_results = [] # key = product_id, value = dictionary containing predictions

# extract predictions from response
for index, row in df.iterrows():
    airport = row["key"]
    #print(f"airport: {airport}")
    response_dict = json.loads(row["response"])
    prediction_str = response_dict["candidates"][0]["content"]["parts"][0]["text"].replace("```", "").replace("json", "")
    prediction_list = json.loads(prediction_str)

    for prediction_dict in prediction_list:
        if type(prediction_dict) != dict:
            print(f"discarding: {prediction_dict}")
            continue
        prediction_results.append(prediction_dict)

print(f"number of results: {len(prediction_results)}")

# convert to parquet file
df = pandas.DataFrame(prediction_results)
df.to_parquet(tmp_parquet_file)
print("converted to parquet")

# upload to our lakehouse folder in GCS
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(lakehouse_parquet_file)
blob.upload_from_filename(tmp_parquet_file)
print(f"wrote parquet file to gs://{bucket_name}/{lakehouse_parquet_file}")

#### Create and load the Iceberg table

In [None]:
%%bigquery

CREATE OR REPLACE TABLE air_travel_raw.tsa_traffic
(
    day STRING,
    hour STRING,
    airport_code STRING,
    airport_name STRING,
    city STRING,
    state STRING,
    checkpoint STRING,
    total_traffic INTEGER
)
WITH CONNECTION `988876466742.us-central1.cloud-storage-connection`
OPTIONS (
 file_format = 'PARQUET',
 table_format = 'ICEBERG',
 storage_uri = 'gs://air-travel-open-data/lakehouse/tsa-traffic'
);

In [None]:
%%bigquery

load data into air_travel_raw.tsa_traffic
 from files (
 format = 'parquet',
 uris = ['gs://air-travel-open-data/lakehouse/airport-maps/tsa_traffic_data.parquet']);

#### Check the output

In [None]:
%%bigquery
select * from air_travel_raw.tsa_traffic

#### Add the `_load_time` and `_data_source` fields to the table

In [None]:
%%bigquery

alter table air_travel_raw.airport_maps
    add column _data_source STRING, add column _load_time TIMESTAMP;

In [None]:
%%bigquery

update air_travel_raw.airport_maps
    set _data_source = 'tsa', _load_time = current_timestamp()
    where 1 = 1

#### Check final output

In [None]:
%%bigquery

select * from air_travel_raw.airport_maps
where airport = 'lax'