Skip to content

Commit

Permalink
Resolving duplication issues in raw data (#19)
Browse files Browse the repository at this point in the history
* Load realtime data file-by-file from GCS to avoid duplicates in adjacent files
* Add local storage path env to vehicle ingestor for easier local testing
* Add transaction to MERGE statement to clear stage table
  • Loading branch information
VMois committed May 5, 2024
1 parent 9e6400d commit f1232ea
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 100 deletions.
11 changes: 0 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,3 @@ gcloud compute ssh airflow-and-web \
-- -NL 8080:localhost:8080
```


## Development

### Vehicle positions ingestor

Example command that can be used for local testing of ingestor:

```bash
docker run -v /Users/vmois/Projects/miwaitway/service_account.json:/root/creds/service_account.json -e VEHICLE_LOCATION_URL="https://www.miapp.ca/GTFS_RT/Vehicle/VehiclePositions.pb" -e BUCKET_NAME=miwaitway -e LOGLEVEL=debug -e GOOGLE_APPLICATION_CREDENTIALS=/root/creds/service_account.json miwaitway_vehicle_positions_ingestor
```

75 changes: 36 additions & 39 deletions dags/load_realtime_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDatasetOperator,
BigQueryCheckOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from jinja2 import Environment, FileSystemLoader

from config import (
BUCKET_NAME,
Expand All @@ -23,25 +24,50 @@
)


params = {
"dataset_id": RAW_DATASET_NAME,
"project_id": PROJECT_ID,
"stage_vehicle_table_name": STAGE_VEHICLE_TABLE_NAME,
"vehicle_table_name": VEHICLE_TABLE_NAME,
}


jinja_environment = Environment(loader=FileSystemLoader("dags/sql"))
merge_template = jinja_environment.get_template("merge_stage_raw_vehicle_position.sql")
merge_query = merge_template.render({"params": params})


def load_realtime_batch_to_bq(**kwargs):
gcs_hook = GCSHook(gcp_conn_id=GCP_CONN_ID)
# TODO: ideally we want objects to be sorted by creation date
# this is because we overwrite over older data in case of MATCH in MERGE
# and we want to have the latest data that overwrites, no the random
objects = gcs_hook.list(BUCKET_NAME, prefix="realtime/vehicle")

if len(objects):
load_csv = GCSToBigQueryOperator(
task_id="gcs_realtime_to_bq",
for obj in objects:
load_csv_to_bq = GCSToBigQueryOperator(
task_id="load_csv_to_bq",
bucket=BUCKET_NAME,
source_objects=objects,
source_objects=[obj],
destination_project_dataset_table=f"{RAW_DATASET_NAME}.{STAGE_VEHICLE_TABLE_NAME}",
autodetect=None,
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
gcp_conn_id=GCP_CONN_ID,
)
load_csv.execute(context=kwargs)
load_csv_to_bq.execute(context=kwargs)

bigquery_hook = BigQueryHook(
gcp_conn_id=GCP_CONN_ID, useLegacySql=False, priority="BATCH"
)
configuration = {
"query": {
"query": merge_query,
"useLegacySql": False,
}
}
bigquery_hook.insert_job(configuration=configuration, project_id=PROJECT_ID)

# Delete ingested data to preserve space
for obj in objects:
gcs_hook.delete(BUCKET_NAME, object_name=obj)


Expand All @@ -57,17 +83,12 @@ def load_realtime_batch_to_bq(**kwargs):
"load_realtime_miway_data_to_bq",
default_args=default_args,
description="Loads realtime vehicle location data from GCS to BigQuery",
schedule_interval=timedelta(minutes=90),
schedule_interval=timedelta(minutes=60),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["miway"],
max_active_runs=1,
params={
"dataset_id": RAW_DATASET_NAME,
"project_id": PROJECT_ID,
"stage_vehicle_table_name": STAGE_VEHICLE_TABLE_NAME,
"vehicle_table_name": VEHICLE_TABLE_NAME,
},
params=params,
) as dag:
check_if_dataset_exists = BigQueryGetDatasetOperator(
task_id="check_if_raw_miway_dataset_exists",
Expand Down Expand Up @@ -98,33 +119,9 @@ def load_realtime_batch_to_bq(**kwargs):
python_callable=load_realtime_batch_to_bq,
)

append_tmp_to_raw = BigQueryInsertJobOperator(
task_id="append_tmp_to_raw",
gcp_conn_id=GCP_CONN_ID,
configuration={
"query": {
"query": '{% include "sql/merge_stage_raw_vehicle_position.sql" %}',
"useLegacySql": False,
}
},
)

clean_tmp_table = BigQueryInsertJobOperator(
task_id="clean_tmp_table",
gcp_conn_id=GCP_CONN_ID,
configuration={
"query": {
"query": f"TRUNCATE TABLE `{RAW_DATASET_NAME}.{STAGE_VEHICLE_TABLE_NAME}`",
"useLegacySql": False,
}
},
)

(
check_if_dataset_exists
>> check_if_tmp_vehicle_table_has_no_data
>> check_if_vehicle_table_exists
>> load_batch_to_tmp_raw
>> append_tmp_to_raw
>> clean_tmp_table
)
91 changes: 49 additions & 42 deletions dags/sql/merge_stage_raw_vehicle_position.sql
Original file line number Diff line number Diff line change
@@ -1,53 +1,60 @@
BEGIN TRANSACTION;

MERGE `{{ params.project_id }}.{{ params.dataset_id }}.{{ params.vehicle_table_name }}` AS target
USING `{{ params.project_id }}.{{ params.dataset_id }}.{{ params.stage_vehicle_table_name }}` AS source
ON target.vehicle_id = source.vehicle_id
AND target.timestamp = TIMESTAMP_SECONDS(source.timestamp)
AND target.trip_id = source.trip_id
ON target.vehicle_id = source.vehicle_id
AND target.timestamp = TIMESTAMP_SECONDS(source.timestamp)
AND target.trip_id = source.trip_id
WHEN MATCHED THEN
UPDATE SET
target.id = source.id
,target.trip_id = source.trip_id
,target.route_id = source.route_id
,target.direction_id = source.direction_id
,target.start_date = source.start_date
,target.vehicle_id = source.vehicle_id
,target.vehicle_label = source.vehicle_label
,target.latitude = source.latitude
,target.longitude = source.longitude
,target.bearing = source.bearing
,target.speed = source.speed
,target.timestamp = TIMESTAMP_SECONDS(source.timestamp)
,target.occupancy_status = source.occupancy_status
,target.occupancy_percentage = source.occupancy_percentage
, target.trip_id = source.trip_id
, target.route_id = source.route_id
, target.direction_id = source.direction_id
, target.start_date = source.start_date
, target.vehicle_id = source.vehicle_id
, target.vehicle_label = source.vehicle_label
, target.latitude = source.latitude
, target.longitude = source.longitude
, target.bearing = source.bearing
, target.speed = source.speed
, target.timestamp = TIMESTAMP_SECONDS(source.timestamp)
, target.occupancy_status = source.occupancy_status
, target.occupancy_percentage = source.occupancy_percentage
WHEN NOT MATCHED THEN
INSERT (
id
,trip_id
,route_id
,direction_id
,start_date
,vehicle_id
,vehicle_label
,latitude
,longitude
,bearing
,speed
,timestamp
,occupancy_status
,occupancy_percentage
, trip_id
, route_id
, direction_id
, start_date
, vehicle_id
, vehicle_label
, latitude
, longitude
, bearing
, speed
, timestamp
, occupancy_status
, occupancy_percentage
) VALUES (
source.id
,source.trip_id
,source.route_id
,source.direction_id
,source.start_date
,source.vehicle_id
,source.vehicle_label
,source.latitude
,source.longitude
,source.bearing
,source.speed
,TIMESTAMP_SECONDS(source.timestamp)
,source.occupancy_status
,source.occupancy_percentage
, source.trip_id
, source.route_id
, source.direction_id
, source.start_date
, source.vehicle_id
, source.vehicle_label
, source.latitude
, source.longitude
, source.bearing
, source.speed
, TIMESTAMP_SECONDS(source.timestamp)
, source.occupancy_status
, source.occupancy_percentage
);


TRUNCATE TABLE `{{ params.project_id }}.{{ params.dataset_id }}.{{ params.stage_vehicle_table_name }}`;

COMMIT TRANSACTION;
9 changes: 5 additions & 4 deletions gtfs_realtime_ingestor/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# A service to ingest vehicle positions continuously
# A service to ingest vehicle positions from GTFS Realtime endpoint

Useful Docker command for local testing:
Useful Docker command for local testing of vehicle ingestor:

```bash
docker run -v /Users/vmois/Projects/miwaitway/service_account.json:/root/creds/service_account.json -e VEHICLE_LOCATION_URL="https://www.miapp.ca/GTFS_RT/Vehicle/VehiclePositions.pb" -e BUCKET_NAME=miwaitway -e LOGLEVEL=debug -e GOOGLE_APPLICATION_CREDENTIALS=/root/creds/service_account.json miwaitway_vehicle_positions_ingestor
```
docker run -v /Users/vmois/Projects/miwaitway/service_account.json:/root/creds/service_account.json -v /Users/vmois/Projects/miwaitway/tmp:/root/tmp -e CHUNKS_TO_LOAD=3 -e LOCAL_STORAGE_PATH="/root/tmp" -e VEHICLE_LOCATION_URL="https://www.miapp.ca/GTFS_RT/Vehicle/VehiclePositions.pb" -e BUCKET_NAME=miwaitway -e LOGLEVEL=debug -e GOOGLE_APPLICATION_CREDENTIALS=/root/creds/service_account.json miwaitway_vehicle_positions_ingestor
```

20 changes: 16 additions & 4 deletions gtfs_realtime_ingestor/ingest_vehicle_positions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
if BUCKET_NAME is None:
raise ValueError("BUCKET_NAME must be set")

LOCAL_STORAGE_PATH = os.getenv("LOCAL_STORAGE_PATH")
if LOCAL_STORAGE_PATH:
logger.info(
f"Collected files will be saved to a local path {LOCAL_STORAGE_PATH} instead of GCS bucket"
)


def get_field_value(obj, field_name: str, default=None):
try:
Expand Down Expand Up @@ -122,10 +128,16 @@ def extract_vehicle_location():
)

flattened_data.clear()
object_path = f"realtime/vehicle_{current_hash}.csv"
logger.debug(f"Uploading chunks to GCS as {object_path}.")
blob = bucket.blob(object_path)
blob.upload_from_string(df.write_csv(include_header=True))

if LOCAL_STORAGE_PATH:
df.write_csv(
file=f"{LOCAL_STORAGE_PATH}/{current_hash}.csv", include_header=True
)
else:
object_path = f"realtime/vehicle_{current_hash}.csv"
logger.info(f"Uploading chunks to GCS as {object_path}.")
blob = bucket.blob(object_path)
blob.upload_from_string(df.write_csv(include_header=True))


if __name__ == "__main__":
Expand Down

0 comments on commit f1232ea

Please sign in to comment.