In [1]:
!pip install google-cloud-pubsub

In [9]:
from google.cloud import bigquery

In [11]:

PROJECT_ID = "qwiklabs-gcp-00-d269cced691e"
DATASET_ID = "flight_data"
TABLE_ID = "transponder_msgs"

client = bigquery.Client(project=PROJECT_ID)

# Create dataset
dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

print("Dataset ready:", f"{PROJECT_ID}.{DATASET_ID}")

Dataset ready: qwiklabs-gcp-00-d269cced691e.flight_data


In [12]:
## Define the Table Schema

schema = [
 bigquery.SchemaField("MT", "STRING", mode="NULLABLE", description="SEL ID AIR STA CLK MSG info http://woodair.net/sbs/Article/Barebones42_Socket_Data.htm"),
 bigquery.SchemaField("TT", "INT64", mode="NULLABLE", description="1 - 8"),
 bigquery.SchemaField("SID", "STRING", mode="NULLABLE", description="Database Session record number"),
 bigquery.SchemaField("AID", "STRING", mode="NULLABLE", description="Database Aircraft record number"),
 bigquery.SchemaField("Hex", "STRING", mode="NULLABLE", description="Aircraft Mode S hexadecimal code https://opensky-network.org/datasets/metadata/"),
 bigquery.SchemaField("FID", "STRING", mode="NULLABLE", description="Database Flight record number"),
 bigquery.SchemaField("DMG", "DATE", mode="NULLABLE", description="Date message generated"),
 bigquery.SchemaField("TMG", "TIME", mode="NULLABLE", description="Time message generated"),
 bigquery.SchemaField("DML", "DATE", mode="NULLABLE", description="Date message logged"),
 bigquery.SchemaField("TML", "TIME", mode="NULLABLE", description="Time message logged"),
 bigquery.SchemaField("CS", "STRING", mode="NULLABLE", description="Callsign (flight number or registration)"),
 bigquery.SchemaField("Alt", "INT64", mode="NULLABLE", description="Mode C altitude (Flight Level)"),
 bigquery.SchemaField("GS", "INT64", mode="NULLABLE", description="Ground Speed"),
 bigquery.SchemaField("Trk", "INT64", mode="NULLABLE", description="Track"),
 bigquery.SchemaField("Lat", "FLOAT64", mode="NULLABLE", description="Latitude (N/E positive, S/W negative)"),
 bigquery.SchemaField("Lng", "FLOAT64", mode="NULLABLE", description="Longitude (N/E positive, S/W negative)"),
 bigquery.SchemaField("VR", "INT64", mode="NULLABLE", description="Vertical Rate"),
 bigquery.SchemaField("Sq", "STRING", mode="NULLABLE", description="Assigned Mode A squawk code"),
 bigquery.SchemaField("Alrt", "INT64", mode="NULLABLE", description="Flag to indicate squawk has changed"),
 bigquery.SchemaField("Emer", "INT64", mode="NULLABLE", description="Flag to indicate emergency code has been set"),
 bigquery.SchemaField("SPI", "INT64", mode="NULLABLE", description="Flag to indicate transponder Ident has been activated"),
 bigquery.SchemaField("Gnd", "INT64", mode="NULLABLE", description="Flag to indicate ground squat switch is active"),
]



In [13]:
## Create the table with defined Schema for Pub/Sub to write

table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table, exists_ok=True)

print("Table ready:", table_ref)


Table ready: qwiklabs-gcp-00-d269cced691e.flight_data.transponder_msgs


In [15]:
## Create a Topic
!gcloud pubsub subscriptions create flight-transponder-sub \
  --topic=projects/paul-leroy/topics/flight-transponder \
  --project=qwiklabs-gcp-00-d269cced691e



Created subscription [projects/qwiklabs-gcp-00-d269cced691e/subscriptions/flight-transponder-sub].


In [19]:
## Try to pull data & Test Run

from google.cloud import pubsub_v1

PROJECT_ID = "qwiklabs-gcp-00-d269cced691e"
SUBSCRIPTION_ID = "flight-transponder-sub"

subscriber = pubsub_v1.SubscriberClient()

subscription_path = subscriber.subscription_path(
    PROJECT_ID,
    SUBSCRIPTION_ID
)

response = subscriber.pull(
    request={
        "subscription": subscription_path,
        "max_messages": 10,
    }
)

for msg in response.received_messages:
    print(msg.message.data.decode("utf-8"))


MSG,5,1,1,4B1806,1,2026/01/15,20:29:04.564,2026/01/15,20:29:04.572,,4075,,,,,,,0,,0,
MSG,7,1,1,406C3A,1,2026/01/15,20:29:04.565,2026/01/15,20:29:04.573,,8600,,,,,,,,,,
MSG,8,1,1,3C5EE2,1,2026/01/15,20:29:06.637,2026/01/15,20:29:06.647,,,,,,,,,,,,0
MSG,8,1,1,A3C68C,1,2026/01/15,20:29:06.860,2026/01/15,20:29:06.866,,,,,,,,,,,,0
MSG,7,1,1,407EA1,1,2026/01/15,20:29:11.170,2026/01/15,20:29:11.180,,29000,,,,,,,,,,
MSG,8,1,1,4075C1,1,2026/01/15,20:29:04.514,2026/01/15,20:29:04.518,,,,,,,,,,,,0
MSG,5,1,1,4CA97A,1,2026/01/15,20:29:04.516,2026/01/15,20:29:04.569,,26300,,,,,,,0,,0,
MSG,5,1,1,4079F7,1,2026/01/15,20:29:04.728,2026/01/15,20:29:04.736,,6975,,,,,,,0,,0,
MSG,5,1,1,486495,1,2026/01/15,20:29:04.728,2026/01/15,20:29:04.736,,1650,,,,,,,0,,0,
MSG,6,1,1,407EA1,1,2026/01/15,20:29:05.168,2026/01/15,20:29:05.173,,,,,,,,1156,0,0,0,


In [22]:
## Data Load, Hard code 1000 records

def clean_date(date_str):
    return date_str.replace("/", "-") if date_str else None

def clean_time(time_str):
    return time_str.split(".")[0] if time_str else None

def parse_csv_message(line: str) -> dict:
    f = line.strip().split(",")

    return {
        "MT": f[0],
        "TT": int(f[1]) if f[1] else None,
        "SID": f[2],
        "AID": f[3],
        "Hex": f[4],
        "FID": f[5],
        "DMG": clean_date(f[6]),
        "TMG": clean_time(f[7]),
        "DML": clean_date(f[8]),
        "TML": clean_time(f[9]),
        "CS": f[10] or None,
        "Alt": int(f[11]) if f[11] else None,
        "GS": int(f[12]) if f[12] else None,
        "Trk": int(f[13]) if f[13] else None,
        "Lat": float(f[14]) if f[14] else None,
        "Lng": float(f[15]) if f[15] else None,
        "VR": int(f[16]) if f[16] else None,
        "Sq": f[17] or None,
        "Alrt": int(f[18]) if f[18] else None,
        "Emer": int(f[19]) if f[19] else None,
        "SPI": int(f[20]) if f[20] else None,
        "Gnd": int(f[21]) if f[21] else None,
    }

def pull_and_load():
    response = subscriber.pull(
        request={
            "subscription": subscription_path,
            "max_messages": 1000,
        }
    )

    if not response.received_messages:
        return

    rows = []
    ack_ids = []

    for msg in response.received_messages:
        csv_line = msg.message.data.decode("utf-8")
        rows.append(parse_csv_message(csv_line))
        ack_ids.append(msg.ack_id)

    errors = client.insert_rows_json(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}", rows)

    if errors:
        print("BigQuery errors:", errors)
    else:
        subscriber.acknowledge(
            request={
                "subscription": subscription_path,
                "ack_ids": ack_ids,
            }
        )
        print(f"Inserted {len(rows)} rows.")


while True:
  pull_and_load()


Inserted 860 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 40 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 317 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 988 rows.
Inserted 15 rows.
Inserted 1000 rows.
Inserted 761 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 148 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 568 rows.
Inserted 860 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 425 rows.
Inserted 698 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 176 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 77 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 928 rows.
Inserted 460 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 760 rows.
Inserted 784 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted 390 rows.
Inserted 430 rows.
Inserted 1000 rows.
Inserted 1000 rows.
Inserted

KeyboardInterrupt: 