-
Notifications
You must be signed in to change notification settings - Fork 0
/
extract_static_miway_data.py
119 lines (103 loc) · 3.72 KB
/
extract_static_miway_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import zipfile
import io
import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import (
LocalFilesystemToGCSOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDatasetOperator
from config import RAW_DATASET_NAME, BUCKET_NAME, GCP_CONN_ID
STATIC_GTFS_TMP_PATH = "/tmp/static_gtfs"
MIWAY_URL = "https://www.miapp.ca/GTFS/google_transit.zip"
def download_static_gtfs_data(url: str, output_path: str, ti):
if not os.path.exists(output_path):
os.makedirs(output_path)
response = requests.get(url)
zip_data = zipfile.ZipFile(io.BytesIO(response.content))
feeds = []
for filename in zip_data.namelist():
if filename.endswith(".txt"):
feed_path = os.path.join(output_path, filename)
with zip_data.open(filename) as file:
with open(feed_path, "wb") as f:
f.write(file.read())
feeds.append(feed_path)
ti.xcom_push(key="feeds", value=feeds)
def upload_feeds_to_gcs(ti, **kwargs):
feeds = ti.xcom_pull(task_ids="download_static_gtfs_data", key="feeds")
gcs_feeds = []
for feed_path in feeds:
filename = os.path.basename(feed_path)
gcs_path = f"static/{filename}"
upload_to_gcs = LocalFilesystemToGCSOperator(
task_id=f'upload_{filename.split(".")[0]}_feed_to_gcs',
src=feed_path,
dst=gcs_path,
bucket=BUCKET_NAME,
gcp_conn_id=GCP_CONN_ID,
)
upload_to_gcs.execute(context=kwargs)
gcs_feeds.append(gcs_path)
ti.xcom_push(key="gcs_feeds", value=gcs_feeds)
def load_feeds(**kwargs):
gcs_feeds = kwargs["ti"].xcom_pull(task_ids="upload_feeds_to_gcs", key="gcs_feeds")
for gcs_path in gcs_feeds:
filename = os.path.basename(gcs_path)
table_name = filename.split(".")[0]
load_csv = GCSToBigQueryOperator(
task_id=f"gcs_{table_name}_feed_to_bigquery",
bucket=BUCKET_NAME,
source_objects=[gcs_path],
destination_project_dataset_table=f"{RAW_DATASET_NAME}.{table_name}",
autodetect=True,
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
gcp_conn_id=GCP_CONN_ID,
)
load_csv.execute(context=kwargs)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
with DAG(
"extract_static_miway_data",
default_args=default_args,
description="Download static GTFS data, extract feeds, and upload to GCS and BigQuery",
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["miway"],
) as dag:
check_if_dataset_exists = BigQueryGetDatasetOperator(
task_id="check_if_raw_miway_dataset_exists",
gcp_conn_id=GCP_CONN_ID,
dataset_id=RAW_DATASET_NAME,
)
download_data = PythonOperator(
task_id="download_static_gtfs_data",
python_callable=download_static_gtfs_data,
op_kwargs={"url": MIWAY_URL, "output_path": STATIC_GTFS_TMP_PATH},
)
upload_feeds_to_gcs = PythonOperator(
task_id="upload_feeds_to_gcs",
python_callable=upload_feeds_to_gcs,
)
load_feeds_into_bigquery = PythonOperator(
task_id="load_feeds_into_bigquery",
python_callable=load_feeds,
)
(
check_if_dataset_exists
>> download_data
>> upload_feeds_to_gcs
>> load_feeds_into_bigquery
)