In [None]:
!pip install google-cloud-pubsub



In [None]:
from google.cloud import bigquery
from google.cloud import pubsub_v1
import csv
from datetime import datetime, timezone
from google.api_core.exceptions import DeadlineExceeded


In [None]:
client = bigquery.Client()
PROJECT_ID = client.project
print(f"Using project: {PROJECT_ID}")

Using project: qwiklabs-gcp-01-a82571ac193f


In [None]:
DATASET_ID = "flight_data_dar"
TABLE_ID = "flight_transponder_msgs"

TOPIC_ID = "flight-transponder"
SUBSCRIPTION_ID = "flight-transponder-sub"


In [None]:
bq = bigquery.Client(project=PROJECT_ID)

dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset.location = "US"
bq.create_dataset(dataset, exists_ok=True)


Dataset(DatasetReference('qwiklabs-gcp-01-a82571ac193f', 'flight_data_dar'))

In [None]:
schema = [
    bigquery.SchemaField("MT", "STRING"),
    bigquery.SchemaField("TT", "INT64"),
    bigquery.SchemaField("STD", "STRING"),
    bigquery.SchemaField("AID", "STRING"),
    bigquery.SchemaField("Hex", "STRING"),
    bigquery.SchemaField("FID", "STRING"),
    bigquery.SchemaField("DMG", "DATE"),
    bigquery.SchemaField("TMG", "TIME"),
    bigquery.SchemaField("DML", "DATE"),
    bigquery.SchemaField("TML", "TIME"),
    bigquery.SchemaField("CS", "STRING"),
    bigquery.SchemaField("Alt", "INT64"),
    bigquery.SchemaField("GS", "INT64"),
    bigquery.SchemaField("Trk", "INT64"),
    bigquery.SchemaField("Lat", "FLOAT64"),
    bigquery.SchemaField("Lng", "FLOAT64"),
    bigquery.SchemaField("VR", "INT64"),
    bigquery.SchemaField("Sq", "STRING"),
    bigquery.SchemaField("Alert", "INT64"),
    bigquery.SchemaField("Emer", "INT64"),
    bigquery.SchemaField("SPI", "INT64"),
    bigquery.SchemaField("Gnd", "INT64"),
]

table = bigquery.Table(
    f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}",
    schema=schema
)

bq.create_table(table, exists_ok=True)


Table(TableReference(DatasetReference('qwiklabs-gcp-01-a82571ac193f', 'flight_data_dar'), 'flight_transponder_msgs'))

In [None]:
!gcloud pubsub subscriptions create flight-transponder-sub \
  --topic=projects/paul-leroy/topics/flight-transponder \
  --project=qwiklabs-gcp-01-a82571ac193f


Created subscription [projects/qwiklabs-gcp-01-a82571ac193f/subscriptions/flight-transponder-sub].


In [None]:
SUBSCRIPTION_ID = "flight-transponder-sub"

subscriber = pubsub_v1.SubscriberClient()

subscription_path = subscriber.subscription_path(
    PROJECT_ID,
    SUBSCRIPTION_ID
)

response = subscriber.pull(
    request={
        "subscription": subscription_path,
        "max_messages": 10,
    }
)

for msg in response.received_messages:
    print(msg.message.data.decode("utf-8"))

MSG,8,1,1,70605D,1,2026/01/16,08:35:19.645,2026/01/16,08:35:19.656,,,,,,,,,,,,0
MSG,8,1,1,4CAADA,1,2026/01/16,08:35:19.645,2026/01/16,08:35:19.656,,,,,,,,,,,,0
MSG,8,1,1,406673,1,2026/01/16,08:35:19.646,2026/01/16,08:35:19.656,,,,,,,,,,,,
MSG,8,1,1,4BB274,1,2026/01/16,08:35:19.646,2026/01/16,08:35:19.656,,,,,,,,,,,,0
MSG,5,1,1,4D21F0,1,2026/01/16,08:35:19.647,2026/01/16,08:35:19.656,,37000,,,,,192,,0,,0,
MSG,8,1,1,A4CD23,1,2026/01/16,08:35:19.650,2026/01/16,08:35:19.656,,,,,,,,,,,,0
MSG,8,1,1,471F86,1,2026/01/16,08:35:19.704,2026/01/16,08:35:19.710,,,,,,,,,,,,0
MSG,8,1,1,A4CD23,1,2026/01/16,08:35:19.705,2026/01/16,08:35:19.711,,,,,,,,,,,,0
MSG,8,1,1,A4D483,1,2026/01/16,08:35:19.705,2026/01/16,08:35:19.711,,,,,,,,,,,,0
MSG,3,1,1,407CD7,1,2026/01/16,08:35:19.706,2026/01/16,08:35:19.711,,7000,,,50.91995,-1.28845,,,0,,0,0


In [None]:
import csv

def parse_csv_line(line):
    fields = next(csv.reader([line]))
    fields += [None] * (22 - len(fields))

    return {
        "MT": fields[0],
        "TT": int(fields[1]) if fields[1] else None,
        "STD": fields[2],
        "AID": fields[3],
        "Hex": fields[4],
        "FID": fields[5],

        # DATE/TIME AS STRINGS (BigQuery-safe)
        "DMG": fields[6].replace("/", "-") if fields[6] else None,
        "TMG": fields[7] if fields[7] else None,
        "DML": fields[8].replace("/", "-") if fields[8] else None,
        "TML": fields[9] if fields[9] else None,

        "CS": fields[10],
        "Alt": int(fields[11]) if fields[11] else None,
        "GS": int(fields[12]) if fields[12] else None,
        "Trk": int(fields[13]) if fields[13] else None,
        "Lat": float(fields[14]) if fields[14] else None,
        "Lng": float(fields[15]) if fields[15] else None,
        "VR": int(fields[16]) if fields[16] else None,
        "Sq": fields[17],
        "Alert": int(fields[18]) if fields[18] else None,
        "Emer": int(fields[19]) if fields[19] else None,
        "SPI": int(fields[20]) if fields[20] else None,
        "Gnd": int(fields[21]) if fields[21] else None,
    }


In [None]:
table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
print(table_ref)

qwiklabs-gcp-01-a82571ac193f.flight_data_dar.flight_transponder_msgs


In [None]:
from google.api_core.exceptions import DeadlineExceeded

def collect_stream_data(polls=30):

    for i in range(polls):
        try:
            response = subscriber.pull(
                request={
                    "subscription": subscription_path,
                    "max_messages": 1000
                },
                timeout=15
            )
        except DeadlineExceeded:
            print("Waiting for data...")
            continue

        if not response.received_messages:
            print("No messages in this pull")
            continue

        rows = []
        ack_ids = []

        for msg in response.received_messages:
            parsed = parse_csv_line(msg.message.data.decode("utf-8"))
            if parsed:
                rows.append(parsed)
                ack_ids.append(msg.ack_id)

        errors = bq.insert_rows_json(table_ref, rows)

        if errors:
            print("BigQuery insert errors:", errors)
        else:
            print(f"Inserted {len(rows)} rows")

        subscriber.acknowledge(
            request={
                "subscription": subscription_path,
                "ack_ids": ack_ids
            }
        )


In [None]:
collect_stream_data(polls=40)

Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 429 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 231 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 231 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 231 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 1000 rows
Inserted 231 rows
Inserted 1000 rows
Inserted 1000 rows


In [None]:
%%bigquery
SELECT COUNT(*)
FROM `flight_data_dar.flight_transponder_msgs`


Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,f0_
0,72663


In [None]:
%%bigquery
SELECT
  ST_GEOGPOINT(Lng, Lat) AS location
FROM `flight_data_dar.flight_transponder_msgs`
WHERE Lat IS NOT NULL
  AND Lng IS NOT NULL


Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,location
0,POINT(-1.72399 50.58929)
1,POINT(2.92652 51.30899)
2,POINT(0.31448 51.14936)
3,POINT(-1.38885 52.24384)
4,POINT(-0.12489 51.4783)
...,...
3784,POINT(-0.2486 52.14451)
3785,POINT(-0.85909 51.07516)
3786,POINT(0.18142 51.56767)
3787,POINT(-0.99449 51.38836)
