### Create BQ dataset for storing the raw data

In [20]:
from google.cloud import bigquery

project_id = "cs378-fa2024"
dataset = "air_travel_raw"
region = "us-central1"

bq_client = bigquery.Client()

dataset_id = bigquery.Dataset(f"{project_id}.{dataset}")
dataset_id.location = region
resp = bq_client.create_dataset(dataset_id, exists_ok=True)
print("Created dataset {}.{}".format(bq_client.project, resp.dataset_id))

Created dataset cs378-fa2024.air_travel_raw


### Common functions

In [54]:
from google.cloud import bigquery

project_id = "cs378-fa2024"
bucket = "air-travel-data"
parent_folder = "raw"
region = "us-central1"
dataset = "air_travel_raw"

bq_client = bigquery.Client()

def create_load_table_from_csv(folder, file_name, table, schema, delimiter=",", quote_character="\""):

  uri = f"gs://{bucket}/{parent_folder}/{folder}/{file_name}"
  table_id = f"{project_id}.{dataset}.{table}"

  table = bigquery.Table(table_id, schema=schema)
  table = bq_client.create_table(table, exists_ok=True)
  print("Created table {}".format(table.table_id))

  # remove the data_source and load_time fields before loading the data,
  # neither one is present in the csv
  del schema[-1]
  del schema[-1]
  print(schema)

  job_config = bigquery.LoadJobConfig(
        schema=schema,
        skip_leading_rows=1,
        source_format=bigquery.SourceFormat.CSV,
        write_disposition="WRITE_TRUNCATE",
        field_delimiter=delimiter,
        quote_character=quote_character,
        allow_jagged_rows=True,
        ignore_unknown_values=True
      )

  load_job = bq_client.load_table_from_uri(uri, table_id, job_config=job_config)
  load_job.result()

  destination_table = bq_client.get_table(table_id)
  print("Loaded {} rows.".format(destination_table.num_rows))



def create_load_table_from_json(folder, file_name, table, schema):

  table_id = f"{project_id}.{dataset}.{table}"

  table = bigquery.Table(table_id, schema=schema)
  table = bq_client.create_table(table, exists_ok=True)
  print("Created table {}".format(table.table_id))

  # remove the data_source and load_time fields before loading the data,
  # neither one is present in the json
  del schema[-1]
  del schema[-1]

  #print(schema)

  job_config = bigquery.LoadJobConfig(schema=schema,
      source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
      write_disposition = "WRITE_TRUNCATE"
  )

  uri = f"gs://{bucket}/{parent_folder}/{folder}/{file_name}"

  load_job = bq_client.load_table_from_uri(
      uri,
      table_id,
      location=region,
      job_config=job_config,
  )

  load_job.result()

  destination_table = bq_client.get_table(table_id)
  print("Loaded {} rows.".format(destination_table.num_rows))


### Create and load `airport_businesses`

In [13]:
folder = "airport-maps/out"
file_name = "*.csv"
table = "airport_businesses"
delimiter = "\t"

schema = [
  bigquery.SchemaField("airport_code", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("terminal", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("business", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("category", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("location", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("menu_items", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'airportguide'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table airport_businesses
[SchemaField('airport_code', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('terminal', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('business', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('category', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('location', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('menu_items', 'STRING', 'NULLABLE', None, None, (), None)]
Loaded 1574 rows.


### Create and load `flight_delays`

In [25]:
folder = "on-time-performance"
file_name = "*.csv"
table = "flight_delays"
delimiter = ","

schema = [
  bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("month", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("carrier", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("carrier_name", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("airport", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("airport_name", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("arr_flights", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("arr_del15", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("carrier_ct", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("weather_ct", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("nas_ct", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("security_ct", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("late_aircraft_ct", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("arr_cancelled", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("arr_diverted", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("arr_delay", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("carrier_delay", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("weather_delay", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("nas_delay", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("security_delay", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("late_aircraft_delay", "FLOAT", mode="NULLABLE"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'transtats'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table flight_delays
Loaded 381186 rows.


### Create and load `airlines`, `airports`, `countries`, `aircrafts`, and `flight_routes`
#### Note: This dataset comes with 5 tables

In [26]:
folder = "openflights"
file_name = "airlines.csv"
table = "airlines"
delimiter = ","

schema = [
  bigquery.SchemaField("airline_id", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("alias", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("iata", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("icao", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("callsign", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("active", "BOOL", mode="REQUIRED"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openflights'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table airlines
Loaded 6162 rows.


In [28]:
folder = "openflights"
file_name = "airports_ext.csv"
table = "airports"
delimiter = ","

schema = [
  bigquery.SchemaField("airport_id", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("airport_name", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("city", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("iata", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("icao", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("latitude", "BIGNUMERIC", mode="REQUIRED"),
  bigquery.SchemaField("longitude", "BIGNUMERIC", mode="REQUIRED"),
  bigquery.SchemaField("altitude", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("timezone", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("daylight_savings_time", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("tz_database_timezone", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("type", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openflights'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table airports
Loaded 12668 rows.


In [34]:
folder = "openflights"
file_name = "countries.csv"
table = "countries"
delimiter = ","

schema = [
  bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("iso_code", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("dafif_code", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openflights'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table countries
Loaded 261 rows.


In [35]:
folder = "openflights"
file_name = "planes.csv"
table = "aircrafts"
delimiter = ","

schema = [
  bigquery.SchemaField("aircraft_name", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("iata_code", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("icao_code", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openflights'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table aircrafts
Loaded 246 rows.


In [39]:
folder = "openflights"
file_name = "routes.csv"
table = "flight_routes"
delimiter = ","

schema = [
  bigquery.SchemaField("airline_code", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("airline_id", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("source_airport", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("source_airport_id", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("dest_airport", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("dest_airport_id", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("codeshare", "BOOLEAN", mode="NULLABLE"),
  bigquery.SchemaField("stops", "INTEGER", mode="NULLABLE"),
  bigquery.SchemaField("equipment", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openflights'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table flight_routes
Loaded 67663 rows.


### Create and load `airport_reviews`

In [55]:
folder = "our-airports"
file_name = "*.tsv"
table = "airport_reviews"
delimiter = "\t"
quote_character = "'"

schema = [
  bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("threadRef", "INTEGER", mode="NULLABLE"),
  bigquery.SchemaField("airportRef", "INTEGER", mode="NULLABLE"),
  bigquery.SchemaField("airportIdent", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("date", "DATETIME", mode="NULLABLE"),
  bigquery.SchemaField("memberNickname", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("subject", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("body", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'ourairports'"),
  bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter, quote_character)

Created table airport_reviews
[SchemaField('id', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('threadRef', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('airportRef', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('airportIdent', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('date', 'DATETIME', 'NULLABLE', None, None, (), None), SchemaField('memberNickname', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('subject', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('body', 'STRING', 'NULLABLE', None, None, (), None)]
Loaded 15451 rows.


### Create and load tsa_reports

In [81]:
from google.cloud import bigquery

table = "tsa_traffic"

bq_client = bigquery.Client()

schema = [
    bigquery.SchemaField("date", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("hour", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("airport_code", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("airport_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("city", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("state", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("checkpoint", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("total_count", "INTEGER"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'tsa-foia'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

# create table
table_id = f"{project_id}.{dataset}.{table}"
table = bigquery.Table(table_id, schema=schema)
table = bq_client.create_table(table, exists_ok=True)
print("Created table {}".format(table.table_id))

Created table tsa_traffic


In [92]:
from google.cloud import storage

bucket = "air-travel-data"
folder = "raw/tsa-traffic/llm-text/"

storage_client = storage.Client()

# read files from GCS
blobs = storage_client.list_blobs(bucket, prefix=folder)
for blob in blobs:
    file_path = "/tmp/" + blob.name.split("/")[3]
    print(f"processing {file_path}")
    blob.download_to_filename(file_path)
    rows_to_insert = convert_to_dict(file_path)
    is_error = write_to_BQ(bq_client, table_id, rows_to_insert)

    if is_error == True:
        break
    else:
        os.remove(file_path)

processing /tmp/april-14-2024-to-april-20-2024_1_500.txt
write to BQ
processing /tmp/april-14-2024-to-april-20-2024_501_998.txt
write to BQ
processing /tmp/april-16-2023-to-april-22-2023_1_500.txt
write to BQ
processing /tmp/april-16-2023-to-april-22-2023_501_987.txt
write to BQ
processing /tmp/april-21-2024-to-april-27-2024_1_500.txt
write to BQ
processing /tmp/april-21-2024-to-april-27-2024_501_997.txt
write to BQ
processing /tmp/april-23-2023-to-april-29-2023_1_500.txt
write to BQ
processing /tmp/april-23-2023-to-april-29-2023_501_986.txt
write to BQ
processing /tmp/april-7-2024-to-april-13-2024_1_500.txt
write to BQ
processing /tmp/april-7-2024-to-april-13-2024_501_986.txt
write to BQ
processing /tmp/april-9-2023-to-april-15-2023_1_500.txt
write to BQ
processing /tmp/april-9-2023-to-april-15-2023_501_985.txt
write to BQ
processing /tmp/august-13-2023-to-august-19-2023_1_500.txt
write to BQ
processing /tmp/august-13-2023-to-august-19-2023_501_990.txt
write to BQ
processing /tmp/augu

In [91]:
import time

def convert_to_dict(filepath):

    rows_to_insert = []

    for line_num, line in enumerate(list(open(filepath))):
        #print(f"{line_num}: {line}")

        if "{" == line.strip():
            start_dict = line_num
            #print("start_dict:", start_dict)

        if "}," in line.strip():
            end_dict = line_num
            #print("end_dict:", end_dict)

            dict_list = list(open(filepath))[start_dict+1:end_dict]
            record = {}

            for entry in dict_list:
                entry_str = entry.replace("\n", "").replace(",", "")
                key = entry_str.split(":")[0].replace('"', '').strip()

                if key in ("hour_of_day", "Hour of Day", "hour of day"):
                    key = "hour"

                if key in ("Airport Code", "airport code"):
                    key = "airport_code"

                if key in ("Airport Name", "airport name"):
                    key = "airport_name"

                if key in ("Customer Traffic", "customer traffic", "customer_traffic"):
                    key = "total_count"

                val = entry_str.split(":")[1].replace('"', '').strip()

                if key == "total_count":
                    if val.isdigit():
                        val = int(val)
                    else:
                        print("*** Count must be an int, invalid value: ", val)
                        continue

                record[key] = val

            rows_to_insert.append(record)

    return rows_to_insert


def write_to_BQ(bq_client, table_id, rows_to_insert):

    print("write to BQ")
    is_error = False

    try:

        table = bq_client.get_table(table_id)
        schema = table.schema
        del schema[-1]
        del schema[-1]

        job_config = bigquery.LoadJobConfig(schema=schema,
                                            source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
                                            write_disposition='WRITE_APPEND')

        load_job = bq_client.load_table_from_json(rows_to_insert, destination=table_id, job_config=job_config)
        load_job.result()

        if load_job.errors:
            print('Errors while writing to table:', load_job.errors)
            is_error = True

    except Exception as e:
        print('Error while writing to table:', e)
        if '404' in str(e):
            # table isn't open for writes (it may have been just created)
            print('Table not ready to be written to. Sleeping for 5 seconds.')
            time.sleep(5)
            try:
                load_job = bq_client.load_table_from_json(rows_to_insert, destination=table_id, job_config=job_config)
                load_job.result()
            except Exception as e:
                print('Error occurred while writing to table: {}'.format(e))
                is_error = True

    return is_error