Step 1: Install and Import Libraries

In [1]:
!pip install --quiet google-cloud-bigquery google-cloud-pubsub

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/320.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m317.4/320.1 kB[0m [31m9.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m320.1/320.1 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from google.cloud import bigquery
from google.cloud import pubsub_v1


Step 2: Set Project, Dataset, and Table

In [3]:
PROJECT_ID = "qwiklabs-gcp-01-79d56652b122"
DATASET_ID = "flight_data"
TABLE_ID = "flight_transponder_msgs"

bq_client = bigquery.Client(project=PROJECT_ID)


Step 3: Create Dataset

In [4]:
dataset_ref = f"{PROJECT_ID}.{DATASET_ID}"
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US"

bq_client.create_dataset(dataset, exists_ok=True)
print("Dataset created.")


Dataset created.


Step 4: Create BigQuery Table with Provided Schema

In [5]:
schema = [
    bigquery.SchemaField("MT", "STRING"),
    bigquery.SchemaField("TT", "INT64"),
    bigquery.SchemaField("SID", "STRING"),
    bigquery.SchemaField("AID", "STRING"),
    bigquery.SchemaField("Hex", "STRING"),
    bigquery.SchemaField("FID", "STRING"),
    bigquery.SchemaField("DMG", "DATE"),
    bigquery.SchemaField("TMG", "TIME"),
    bigquery.SchemaField("DML", "DATE"),
    bigquery.SchemaField("TML", "TIME"),
    bigquery.SchemaField("CS", "STRING"),
    bigquery.SchemaField("Alt", "INT64"),
    bigquery.SchemaField("GS", "INT64"),
    bigquery.SchemaField("Trk", "INT64"),
    bigquery.SchemaField("Lat", "FLOAT64"),
    bigquery.SchemaField("Lng", "FLOAT64"),
    bigquery.SchemaField("VR", "INT64"),
    bigquery.SchemaField("Sq", "STRING"),
    bigquery.SchemaField("Alrt", "INT64"),
    bigquery.SchemaField("Emer", "INT64"),
    bigquery.SchemaField("SPI", "INT64"),
    bigquery.SchemaField("Gnd", "INT64"),
]

table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
table = bigquery.Table(table_ref, schema=schema)

bq_client.create_table(table, exists_ok=True)
print("Table created.")


Table created.


Step 5: Configure Pub/Sub Pull Subscription

In [6]:
TOPIC_PROJECT = "paul-leroy"
TOPIC_NAME = "flight-transponder"
SUBSCRIPTION_NAME = "flight-transponder-pull-sub"

In [7]:
subscriber = pubsub_v1.SubscriberClient()

topic_path = subscriber.topic_path(TOPIC_PROJECT, TOPIC_NAME)
subscription_path = subscriber.subscription_path(
    PROJECT_ID, SUBSCRIPTION_NAME
)

try:
    subscriber.create_subscription(
        name=subscription_path,
        topic=topic_path
    )
    print("Subscription created.")
except Exception:
    print("Subscription already exists.")


Subscription created.


Step 6: Pull Messages and Insert into BigQuery

In [8]:
def parse_message(message):
    values = message.split(",")
    values += [None] * (22 - len(values))

    return {
        "MT": values[0],
        "TT": int(values[1]) if values[1] else None,
        "SID": values[2],
        "AID": values[3],
        "Hex": values[4],
        "FID": values[5],
        "DMG": values[6].replace("/", "-") if values[6] else None,
        "TMG": values[7].split(".")[0] if values[7] else None,
        "DML": values[8].replace("/", "-") if values[8] else None,
        "TML": values[9].split(".")[0] if values[9] else None,
        "CS": values[10],
        "Alt": int(values[11]) if values[11] else None,
        "GS": int(values[12]) if values[12] else None,
        "Trk": int(values[13]) if values[13] else None,
        "Lat": float(values[14]) if values[14] else None,
        "Lng": float(values[15]) if values[15] else None,
        "VR": int(values[16]) if values[16] else None,
        "Sq": values[17],
        "Alrt": int(values[18]) if values[18] else None,
        "Emer": int(values[19]) if values[19] else None,
        "SPI": int(values[20]) if values[20] else None,
        "Gnd": int(values[21]) if values[21] else None,
    }


In [15]:
import time

NUM_ITERATIONS = 12      # e.g. run for ~1 minute
SLEEP_SECONDS = 5        # gap between pulls

for i in range(NUM_ITERATIONS):
    response = subscriber.pull(
        request={
            "subscription": subscription_path,
            "max_messages": 100
        }
    )

    rows = []
    ack_ids = []

    for msg in response.received_messages:
        decoded = msg.message.data.decode("utf-8")
        rows.append(parse_message(decoded))
        ack_ids.append(msg.ack_id)

    if rows:
        bq_client.insert_rows_json(table_ref, rows)
        subscriber.acknowledge(
            request={
                "subscription": subscription_path,
                "ack_ids": ack_ids
            }
        )

    print(f"Iteration {i+1}: Inserted {len(rows)} rows")

    time.sleep(SLEEP_SECONDS)


Iteration 1: Inserted 4 rows
Iteration 2: Inserted 100 rows
Iteration 3: Inserted 100 rows
Iteration 4: Inserted 100 rows
Iteration 5: Inserted 100 rows
Iteration 6: Inserted 100 rows
Iteration 7: Inserted 100 rows
Iteration 8: Inserted 3 rows
Iteration 9: Inserted 100 rows
Iteration 10: Inserted 100 rows
Iteration 11: Inserted 100 rows
Iteration 12: Inserted 100 rows


Step 7: Count Records

In [16]:
bq_client.query(f"""
select count(*) AS record_count
from `{table_ref}`
""").to_dataframe()


Unnamed: 0,record_count
0,1607


Step 8: Query Locations for GeoViz

In [20]:
%%bigquery
SELECT
  ST_GEOGPOINT(Lng, Lat) AS Location
FROM `qwiklabs-gcp-01-79d56652b122.flight_data.flight_transponder_msgs`
WHERE
  Lat IS NOT NULL
  AND Lng IS NOT NULL;


Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,Location
0,POINT(-1.57684 51.52155)
1,POINT(-0.56542 51.47177)
2,POINT(0.52383 51.59966)
3,POINT(-3.72238 51.79295)
4,POINT(1.08389 50.31221)
...,...
76,POINT(2.33088 50.91641)
77,POINT(0.7536 51.68994)
78,POINT(-2.0079 51.80934)
79,POINT(-1.26427 51.08978)
