diff --git a/datasets/san_francisco/dataset.yaml b/datasets/san_francisco/dataset.yaml new file mode 100644 index 000000000..454e09f69 --- /dev/null +++ b/datasets/san_francisco/dataset.yaml @@ -0,0 +1,24 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: san_francisco + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ +resources: + - type: bigquery_dataset + dataset_id: san_francisco + description: "san francisco dataset" diff --git a/datasets/san_francisco/infra/provider.tf b/datasets/san_francisco/infra/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/san_francisco/infra/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/san_francisco/infra/san_francisco_dataset.tf b/datasets/san_francisco/infra/san_francisco_dataset.tf new file mode 100644 index 000000000..fac313b71 --- /dev/null +++ b/datasets/san_francisco/infra/san_francisco_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "san_francisco_311_service_requests" { + dataset_id = "san_francisco_311_service_requests" + project = var.project_id + description = "san_francisco_311_service_requests" +} + +output "bigquery_dataset-san_francisco_311_service_requests-dataset_id" { + value = google_bigquery_dataset.san_francisco_311_service_requests.dataset_id +} diff --git a/datasets/san_francisco/infra/san_francisco_pipeline.tf b/datasets/san_francisco/infra/san_francisco_pipeline.tf new file mode 100644 index 000000000..ee11bfc02 --- /dev/null +++ b/datasets/san_francisco/infra/san_francisco_pipeline.tf @@ -0,0 +1,34 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "bqt_san_francisco_311_service_requests" { + project = var.project_id + dataset_id = "san_francisco" + table_id = "311_service_requests" + description = "san_francisco_etl_process" + depends_on = [ + google_bigquery_dataset.san_francisco + ] +} + +output "bigquery_table-bqt_san_francisco_311_service_requests-table_id" { + value = google_bigquery_table.bqt_san_francisco_311_service_requests.table_id +} + +output "bigquery_table-bqt_san_francisco_311_service_requests-id" { + value = google_bigquery_table.bqt_san_francisco_311_service_requests.id +} diff --git a/datasets/san_francisco/infra/variables.tf b/datasets/san_francisco/infra/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/san_francisco/infra/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/Dockerfile b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..748bc3bec --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8 +ENV PYTHONUNBUFFERED True +COPY requirements.txt ./ +RUN python3 -m pip install --no-cache-dir -r requirements.txt +WORKDIR /custom +COPY ./csv_transform.py . +CMD ["python3", "csv_transform.py"] diff --git a/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..0df600f6a --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,1335 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import fnmatch +import json +import logging +import os +import pathlib +import re +import shutil +import typing +import zipfile as zip +from datetime import datetime + +import pandas as pd +import requests +from google.api_core.exceptions import NotFound +from google.cloud import bigquery, storage + + +def main( + source_url: str, + source_url_dict: dict, + source_url_list: typing.List[str], + pipeline_name: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + table_id: str, + drop_dest_table: str, + schema_path: str, + header_row_ordinal: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + input_headers: typing.List[str], + data_dtypes: dict, + trip_data_names: typing.List[str], + trip_data_dtypes: dict, + tripdata_names: typing.List[str], + tripdata_dtypes: dict, + rename_headers_tripdata: dict, + rename_headers_list: dict, + empty_key_list: typing.List[str], + gen_location_list: dict, + resolve_datatypes_list: dict, + remove_paren_list: typing.List[str], + strip_newlines_list: typing.List[str], + strip_whitespace_list: typing.List[str], + date_format_list: dict, + reorder_headers_list: typing.List[str], +) -> None: + + logging.info(f"{pipeline_name} process started") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + execute_pipeline( + source_url=source_url, + source_url_dict=source_url_dict, + source_url_list=source_url_list, + source_file=source_file, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=table_id, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + chunksize=chunksize, + header_row_ordinal=header_row_ordinal, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + input_headers=input_headers, + data_dtypes=data_dtypes, + trip_data_names=trip_data_names, + trip_data_dtypes=trip_data_dtypes, + tripdata_names=tripdata_names, + tripdata_dtypes=tripdata_dtypes, + rename_headers_tripdata=rename_headers_tripdata, + rename_headers_list=rename_headers_list, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + ) + logging.info(f"{pipeline_name} process completed") + + +def execute_pipeline( + source_url: str, + source_url_dict: dict, + source_url_list: typing.List[str], + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + chunksize: str, + header_row_ordinal: str, + target_gcs_bucket: str, + target_gcs_path: str, + input_headers: typing.List[str], + data_dtypes: dict, + trip_data_names: typing.List[str], + trip_data_dtypes: dict, + tripdata_names: typing.List[str], + tripdata_dtypes: dict, + rename_headers_tripdata: dict, + rename_headers_list: typing.List[str], + empty_key_list: typing.List[str], + gen_location_list: dict, + resolve_datatypes_list: dict, + remove_paren_list: typing.List[str], + strip_newlines_list: typing.List[str], + strip_whitespace_list: typing.List[str], + date_format_list: dict, + reorder_headers_list: typing.List[str], +) -> None: + if ( + destination_table == "311_service_requests" + or destination_table == "film_locations" + or destination_table == "sffd_service_calls" + or destination_table == "street_trees" + ): + download_file(source_url, source_file) + elif destination_table == "calendar": + process_sf_calendar( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + rename_headers_list=rename_headers_list, + reorder_headers_list=reorder_headers_list, + ) + return None + elif destination_table == "routes": + process_sf_muni_routes( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + reorder_headers_list=reorder_headers_list, + ) + return None + elif destination_table == "shapes": + process_sf_muni_shapes( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + rename_headers_list=rename_headers_list, + reorder_headers_list=reorder_headers_list, + ) + return None + elif destination_table == "stops": + process_sf_muni_stops( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + reorder_headers_list=reorder_headers_list, + ) + return None + elif destination_table == "sfpd_incidents": + download_file_http( + source_url=source_url_dict["sfpd_incidents"], source_file=source_file + ) + elif destination_table == "bikeshare_station_info": + source_url_json = f"{source_url}.json" + source_file_json = str(source_file).replace(".csv", "") + "_stations.json" + download_file_json(source_url_json, source_file_json, source_file, "stations") + elif destination_table == "bikeshare_station_status": + source_url_json = f"{source_url}.json" + source_file_json = str(source_file).replace(".csv", "") + "_status.json" + download_file_json(source_url_json, source_file_json, source_file, "stations") + if destination_table == "bikeshare_trips": + dest_path = os.path.split(source_file)[0] + download_url_files_from_list(source_url_list, dest_path) + stage_input_files(dest_path, source_file) + process_source_file( + source_file=str(source_file).replace(".csv", "_trip_data.csv"), + target_file=str(target_file).replace(".csv", "_trip_data.csv"), + chunksize=int(chunksize), + input_headers=trip_data_names, + data_dtypes=trip_data_dtypes, + destination_table=destination_table, + rename_headers_list=rename_headers_list, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + header_row_ordinal=None, + ) + process_source_file( + source_file=str(source_file).replace(".csv", "_tripdata.csv"), + target_file=str(target_file).replace(".csv", "_tripdata.csv"), + chunksize=int(chunksize), + input_headers=tripdata_names, + data_dtypes=tripdata_dtypes, + destination_table=destination_table, + rename_headers_list=rename_headers_tripdata, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + header_row_ordinal=None, + ) + handle_tripdata( + target_file=target_file, + resolve_datatypes_list=resolve_datatypes_list, + rename_headers_list=rename_headers_list, + reorder_headers_list=reorder_headers_list, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + else: + process_source_file( + source_file=source_file, + target_file=target_file, + chunksize=chunksize, + header_row_ordinal=header_row_ordinal, + input_headers=input_headers, + data_dtypes=data_dtypes, + destination_table=destination_table, + rename_headers_list=rename_headers_list, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + ) + if os.path.exists(target_file): + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + if drop_dest_table == "Y": + drop_table = True + else: + drop_table = False + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + else: + error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." + raise ValueError(error_msg) + else: + logging.info( + f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." + ) + + +def process_sf_calendar( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + rename_headers_list: typing.List[str], + reorder_headers_list: typing.List[str], +) -> None: + df_calendar = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["calendar"], + target_file_path=str(target_file), + ) + df_calendar_attributes = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["calendar_attributes"], + target_file_path=str(target_file), + ) + df_calendar_dates = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["calendar_dates"], + target_file_path=str(target_file), + ) + df = df_calendar.merge( + df_calendar_attributes, + how="inner", + on=None, + left_on="service_id", + right_on="service_id", + sort=True, + suffixes=("_calendar", "_attributes"), + copy=True, + indicator=False, + validate=None, + ) + df = df.merge( + df_calendar_dates, + how="inner", + on=None, + left_on="service_id", + right_on="service_id", + sort=True, + suffixes=("_calendar", "_dates"), + copy=True, + indicator=False, + validate=None, + ) + df["monday_str"] = df["monday"].apply(lambda x: "False" if x == 0 else "True") + df["tuesday_str"] = df["tuesday"].apply(lambda x: "False" if x == 0 else "True") + df["wednesday_str"] = df["wednesday"].apply(lambda x: "False" if x == 0 else "True") + df["thursday_str"] = df["thursday"].apply(lambda x: "False" if x == 0 else "True") + df["friday_str"] = df["friday"].apply(lambda x: "False" if x == 0 else "True") + df["saturday_str"] = df["saturday"].apply(lambda x: "False" if x == 0 else "True") + df["sunday_str"] = df["sunday"].apply(lambda x: "False" if x == 0 else "True") + df["exception_type_str"] = df["exception_type"].apply( + lambda x: "True" if x == 1 else "False" + ) + df = df[ + [ + "service_id", + "start_date", + "end_date", + "service_description", + "date", + "exception_type_str", + "monday_str", + "tuesday_str", + "wednesday_str", + "thursday_str", + "friday_str", + "saturday_str", + "sunday_str", + ] + ] + df = rename_headers(df=df, rename_headers_list=rename_headers_list) + df["exceptions"] = df["exceptions"].apply( + lambda x: f"{str(x).strip()[:4]}-{str(x).strip()[4:6]}-{str(x).strip()[6:8]}" + ) + df = reorder_headers(df=df, output_headers_list=reorder_headers_list) + save_to_new_file(df=df, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def process_sf_muni_routes( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + reorder_headers_list: typing.List[str], +) -> None: + df_routes = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["routes"], + target_file_path=str(target_file), + ) + df_routes = reorder_headers(df=df_routes, output_headers_list=reorder_headers_list) + save_to_new_file(df=df_routes, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def process_sf_muni_shapes( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + rename_headers_list: typing.List[str], + reorder_headers_list: typing.List[str], +) -> None: + df_shapes = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["shapes"], + target_file_path=str(target_file), + ) + df_shapes = rename_headers(df=df_shapes, rename_headers_list=rename_headers_list) + df_shapes["shape_point_geom"] = df_shapes.apply( + lambda x: create_geometry_columns(x["shape_point_lon"], x["shape_point_lat"]), + axis=1, + ) + df_shapes = reorder_headers(df=df_shapes, output_headers_list=reorder_headers_list) + save_to_new_file(df=df_shapes, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def process_sf_muni_stops( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + reorder_headers_list: typing.List[str], +) -> None: + df_stops = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["stops"], + target_file_path=str(target_file), + ) + df_stops["stop_geom"] = df_stops.apply( + lambda x: create_geometry_columns(x["stop_lon"], x["stop_lat"]), axis=1 + ) + df_stops = reorder_headers(df=df_stops, output_headers_list=reorder_headers_list) + save_to_new_file(df=df_stops, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def create_geometry_columns(long: float, lat: float) -> pd.DataFrame: + return f"POINT({str(long)} {str(lat)})".replace("POINT( )", "") + + +def rename_headers(df: pd.DataFrame, rename_headers_list: dict) -> pd.DataFrame: + logging.info("Renaming Headers") + df = df.rename(columns=rename_headers_list) + return df + + +def gcs_to_df( + project_id: str, + source_file_gcs_path: str, + target_file_path: str, + source_file_type: str = "csv", +) -> pd.DataFrame: + filename = os.path.basename(source_file_gcs_path) + destination_folder = os.path.split(target_file_path)[0] + download_file_gcs( + project_id=project_id, + source_location=source_file_gcs_path, + destination_folder=destination_folder, + ) + if source_file_type == "csv": + df = pd.read_csv(f"{destination_folder}/{filename}") + elif source_file_type == "txt": + df = pd.read_fwf(f"{destination_folder}/{filename}") + return df + + +def http_to_df( + source_url: str, target_file_path: str, source_file_type: str = "csv" +) -> pd.DataFrame: + filename = os.path.basename(source_url) + destination_folder = os.path.split(target_file_path)[0] + download_file_http( + source_url=source_url, source_file=f"{destination_folder}/{filename}" + ) + if source_file_type == "csv": + df = pd.read_csv(f"{destination_folder}/{filename}") + elif source_file_type == "txt": + df = pd.read_fwf(f"{destination_folder}/{filename}") + return df + + +def download_file_gcs( + project_id: str, source_location: str, destination_folder: str +) -> None: + object_name = os.path.basename(source_location) + dest_object = f"{destination_folder}/{object_name}" + storage_client = storage.Client(project_id) + bucket_name = str.split(source_location, "gs://")[1].split("/")[0] + bucket = storage_client.bucket(bucket_name) + source_object_path = str.split(source_location, f"gs://{bucket_name}/")[1] + blob = bucket.blob(source_object_path) + blob.download_to_filename(dest_object) + + +def handle_tripdata( + target_file: str, + resolve_datatypes_list: dict, + rename_headers_list: dict, + reorder_headers_list: typing.List[str], + target_gcs_bucket: str, + target_gcs_path: str, +) -> pd.DataFrame: + logging.info("Compiling target file by merging source data") + trip_data_filepath = str(target_file).replace(".csv", "_trip_data.csv") + logging.info(f"Opening {trip_data_filepath}") + df_trip_data = pd.read_csv( + trip_data_filepath, + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + sep="|", # data column separator, typically "," + ) + tripdata_filepath = str(target_file).replace(".csv", "_tripdata.csv") + logging.info(f"Opening {tripdata_filepath}") + df_tripdata = pd.read_csv( + tripdata_filepath, + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + sep="|", # data column separator, typically "," + ) + df_tripdata.drop_duplicates( + subset=["key_val"], keep="last", inplace=True, ignore_index=False + ) + df_tripdata["trip_id"] = df_tripdata["key_val"].str.replace("-", "") + df_tripdata.set_index("key", inplace=True) + df_trip_data.drop_duplicates( + subset=["key_val"], keep="last", inplace=True, ignore_index=False + ) + df_trip_data.set_index("key", inplace=True) + df = df_trip_data.append(df_tripdata, sort=True) + df["subscriber_type_new"] = df.apply( + lambda x: str(x.subscription_type) + if not str(x.subscriber_type) + else str(x.subscriber_type), + axis=1, + ) + df = df.drop(columns=["subscriber_type"]) + df = resolve_datatypes(df=df, resolve_datatypes_list=resolve_datatypes_list) + df = rename_headers(df=df, rename_headers_list=rename_headers_list) + df = reorder_headers(df=df, output_headers_list=reorder_headers_list) + save_to_new_file(df=df, file_path=target_file, sep="|") + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + +def listdirs(rootdir: str) -> list: + rtn_list = [] + for file in os.listdir(rootdir): + d = os.path.join(rootdir, file) + if os.path.isdir(d): + rtn_list.append(d) + for elem in listdirs(d): + rtn_list.append(elem) + return rtn_list + + +def download_url_files_from_list(url_list: typing.List[str], dest_path: str) -> None: + for url in url_list: + dest_file = dest_path + "/" + os.path.split(url)[1] + download_file_http(url, dest_file) + if ( + url.find(".zip") > -1 or url.find(".gz") > -1 or url.find(".tar") > -1 + ) and (url.find("http") == 0 or url.find("gs:") == 0): + unpack_file(dest_file, dest_path) + else: + logging.info(f"Parsing {dest_file} for decompression") + + +def download_file_http(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} to {source_file}") + src_file = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in src_file: + f.write(chunk) + + +def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None: + if compression_type == "zip": + logging.info(f"Unpacking {infile} to {dest_path}") + zip_decompress(infile=infile, dest_path=dest_path) + else: + logging.info( + f"{infile} ignored as it is not compressed or is of unknown compression" + ) + + +def zip_decompress(infile: str, dest_path: str) -> None: + logging.info(f"Unpacking {infile} to {dest_path}") + with zip.ZipFile(infile, mode="r") as zipf: + zipf.extractall(dest_path) + zipf.close() + + +def stage_input_files(dest_dir: str, target_file_path: str) -> None: + logging.info("Staging input files") + for src_dir in listdirs(dest_dir): + logging.info("-------------------------------------------------------------") + pool_files(src_dir, dest_dir, "tripdata.csv") + pool_files(src_dir, dest_dir, "trip_data.csv") + concatenate_files(target_file_path, dest_dir, "tripdata.csv") + concatenate_files(target_file_path, dest_dir, "trip_data.csv") + + +def pool_files(src_dir: str, dest_dir: str, file_group_wildcard: str): + logging.info(f"Pooling files *{file_group_wildcard}") + if len(fnmatch.filter(os.listdir(src_dir), "*" + file_group_wildcard)) > 0: + for src_file in fnmatch.filter(os.listdir(src_dir), "*" + file_group_wildcard): + logging.info(f"Copying {src_dir}/{src_file} -> {dest_dir}/{src_file}") + shutil.copyfile(f"{src_dir}/{src_file}", f"{dest_dir}/{src_file}") + + +def concatenate_files( + target_file_path: str, dest_path: str, file_group_wildcard: str +) -> None: + target_file_dir = os.path.split(str(target_file_path))[0] + target_file_path = str(target_file_path).replace(".csv", "_" + file_group_wildcard) + logging.info(f"Concatenating files {target_file_dir}/*{file_group_wildcard}") + if os.path.isfile(target_file_path): + os.unlink(target_file_path) + for src_file_path in sorted( + fnmatch.filter(os.listdir(dest_path), "*" + file_group_wildcard) + ): + src_file_path = dest_path + "/" + src_file_path + with open(src_file_path, "r") as src_file: + with open(target_file_path, "a+") as target_file: + next(src_file) + logging.info( + f"Reading from file {src_file_path}, writing to file {target_file_path}" + ) + for line in src_file: + line = ( + os.path.split(src_file_path)[1] + "," + line + ) # include the file source + target_file.write(line) + + +def process_source_file( + source_file: str, + target_file: str, + chunksize: str, + input_headers: typing.List[str], + data_dtypes: dict, + destination_table: str, + rename_headers_list: typing.List[str], + empty_key_list: typing.List[str], + gen_location_list: dict, + resolve_datatypes_list: dict, + remove_paren_list: typing.List[str], + strip_newlines_list: typing.List[str], + strip_whitespace_list: typing.List[str], + date_format_list: dict, + reorder_headers_list: typing.List[str], + header_row_ordinal: str = "0", + field_separator: str = ",", +) -> None: + logging.info(f"Opening source file {source_file}") + if header_row_ordinal is None or header_row_ordinal == "None": + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), # size of batch data, in no. of records + sep=field_separator, # data column separator, typically "," + names=input_headers, + dtype=data_dtypes, + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + target_file_batch=target_file_batch, + target_file=target_file, + skip_header=(not chunk_number == 0), + destination_table=destination_table, + rename_headers_list=rename_headers_list, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + ) + else: + header = int(header_row_ordinal) + if data_dtypes != "[]": + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), # size of batch data, in no. of records + sep=field_separator, # data column separator, typically "," + header=header, # use when the data file does not contain a header + dtype=data_dtypes, + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + target_file_batch=target_file_batch, + target_file=target_file, + skip_header=(not chunk_number == 0), + destination_table=destination_table, + rename_headers_list=rename_headers_list, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + ) + else: + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), # size of batch data, in no. of records + sep=field_separator, # data column separator, typically "," + header=header, # use when the data file does not contain a header + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + target_file_batch=target_file_batch, + target_file=target_file, + skip_header=(not chunk_number == 0), + destination_table=destination_table, + rename_headers_list=rename_headers_list, + empty_key_list=empty_key_list, + gen_location_list=gen_location_list, + resolve_datatypes_list=resolve_datatypes_list, + remove_paren_list=remove_paren_list, + strip_newlines_list=strip_newlines_list, + strip_whitespace_list=strip_whitespace_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + ) + + +def process_chunk( + df: pd.DataFrame, + target_file_batch: str, + target_file: str, + skip_header: bool, + destination_table: str, + rename_headers_list: typing.List[str], + empty_key_list: typing.List[str], + gen_location_list: dict, + resolve_datatypes_list: dict, + remove_paren_list: typing.List[str], + strip_whitespace_list: typing.List[str], + strip_newlines_list: typing.List[str], + date_format_list: dict, + reorder_headers_list: typing.List[str], +) -> None: + logging.info(f"Processing batch file {target_file_batch}") + if destination_table == "311_service_requests": + df = rename_headers(df, rename_headers_list) + df = remove_empty_key_rows(df, empty_key_list) + df = resolve_datatypes(df, resolve_datatypes_list) + df = remove_parenthesis_long_lat(df, remove_paren_list) + df = strip_whitespace(df, strip_whitespace_list) + df = strip_newlines(df, strip_newlines_list) + df = resolve_date_format(df, date_format_list) + df = reorder_headers(df, reorder_headers_list) + elif destination_table == "sffd_service_calls": + df = rename_headers(df, rename_headers_list) + df = strip_whitespace(df, strip_whitespace_list) + df = strip_newlines(df, strip_newlines_list) + df = extract_latitude_from_geom(df, "location_geom", "latitude") + df = extract_longitude_from_geom(df, "location_geom", "longitude") + df = resolve_date_format(df, date_format_list) + df = reorder_headers(df, reorder_headers_list) + elif destination_table == "street_trees": + df = rename_headers(df, rename_headers_list) + df = remove_empty_key_rows(df, empty_key_list) + df = resolve_date_format(df, date_format_list) + df = reorder_headers(df, reorder_headers_list) + elif destination_table == "sfpd_incidents": + df = rename_headers(df=df, rename_headers_list=rename_headers_list) + df = remove_empty_key_rows(df, empty_key_list) + df = resolve_date_format(df, date_format_list) + df["timestamp"] = df.apply( + lambda x: datetime.strftime( + datetime.strptime( + (x["Date"][:10] + " " + x["Time"] + ":00"), "%Y-%m-%d %H:%M:%S" + ), + "%Y-%m-%d %H:%M:%S", + ), + axis=1, + ) + df = reorder_headers(df, reorder_headers_list) + elif destination_table == "bikeshare_station_info": + df = rename_headers(df, rename_headers_list) + df = remove_empty_key_rows(df, empty_key_list) + df = generate_location(df, gen_location_list) + df = resolve_datatypes(df, resolve_datatypes_list) + df = reorder_headers(df, reorder_headers_list) + elif destination_table == "bikeshare_station_status": + df = rename_headers(df, rename_headers_list) + df = remove_empty_key_rows(df, empty_key_list) + df = reorder_headers(df, reorder_headers_list) + elif destination_table == "bikeshare_trips": + if str(target_file).find("_trip_data.csv") > -1: + df = resolve_date_format(df, date_format_list) + if str(target_file).find("_tripdata.csv") > -1: + df = resolve_date_format(df, date_format_list) + df = generate_location(df, gen_location_list) + df = add_key(df) + elif destination_table == "film_locations": + df = rename_headers(df, rename_headers_list) + df = strip_whitespace(df, strip_whitespace_list) + df = strip_newlines(df, strip_newlines_list) + df = reorder_headers(df, reorder_headers_list) + else: + pass + save_to_new_file(df, file_path=str(target_file_batch), sep="|") + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + logging.info(f"Processing batch file {target_file_batch} completed") + + +def add_key(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Adding key column") + df["start_date_str"] = df["start_date"].apply( + lambda x: re.sub("[^0-9.]", "", str(x)) + ) + df["key"] = df.apply( + lambda x: str(x.start_date_str) + "-" + str(x.bike_number), axis=1 + ) + df["key_val"] = df["key"].replace("-", "") + return df + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading file {source_file} from {source_url}") + r = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + + +def download_file_json( + source_url: str, + source_file_json: pathlib.Path, + source_file_csv: pathlib.Path, + subnode_name: str, +) -> None: + logging.info(f"Downloading file {source_url}.json.") + r = requests.get(source_url, stream=True) + with open(f"{source_file_json}.json", "wb") as f: + for chunk in r: + f.write(chunk) + df = pd.read_json(f"{source_file_json}.json")["data"][subnode_name] + df = pd.DataFrame(df) + df.to_csv(source_file_csv, index=False) + + +def load_data_to_bq( + project_id: str, + dataset_id: str, + table_id: str, + file_path: str, + truncate_table: bool, + field_delimiter: str = "|", +) -> None: + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" + ) + client = bigquery.Client(project=project_id) + table_ref = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.field_delimiter = field_delimiter + if truncate_table: + job_config.write_disposition = "WRITE_TRUNCATE" + else: + job_config.write_disposition = "WRITE_APPEND" + job_config.skip_leading_rows = 1 # ignore the header + job_config.autodetect = False + with open(file_path, "rb") as source_file: + job = client.load_table_from_file(source_file, table_ref, job_config=job_config) + job.result() + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" + ) + + +def create_dest_table( + project_id: str, + dataset_id: str, + table_id: str, + schema_filepath: list, + bucket_name: str, + drop_table: bool = False, +) -> bool: + table_ref = f"{project_id}.{dataset_id}.{table_id}" + logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") + client = bigquery.Client() + table_exists = False + try: + table = client.get_table(table_ref) + table_exists_id = table.table_id + logging.info(f"Table {table_exists_id} currently exists.") + if drop_table: + logging.info("Dropping existing table") + client.delete_table(table) + table = None + except NotFound: + table = None + if not table: + logging.info( + ( + f"Table {table_ref} currently does not exist. Attempting to create table." + ) + ) + if check_gcs_file_exists(schema_filepath, bucket_name): + schema = create_table_schema([], bucket_name, schema_filepath) + table = bigquery.Table(table_ref, schema=schema) + client.create_table(table) + print(f"Table {table_ref} was created".format(table_id)) + table_exists = True + else: + file_name = os.path.split(schema_filepath)[1] + file_path = os.path.split(schema_filepath)[0] + logging.info( + f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" + ) + table_exists = False + else: + table_exists = True + return table_exists + + +def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) + return exists + + +def create_table_schema( + schema_structure: list, bucket_name: str = "", schema_filepath: str = "" +) -> list: + logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") + schema = [] + if not (schema_filepath): + schema_struct = schema_structure + else: + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(schema_filepath) + schema_struct = json.loads(blob.download_as_string(client=None)) + for schema_field in schema_struct: + fld_name = schema_field["name"] + fld_type = schema_field["type"] + try: + fld_descr = schema_field["description"] + except KeyError: + fld_descr = "" + fld_mode = schema_field["mode"] + schema.append( + bigquery.SchemaField( + name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr + ) + ) + return schema + + +def remove_empty_key_rows( + df: pd.DataFrame, empty_key_list: typing.List[str] +) -> pd.DataFrame: + logging.info("Removing rows with empty keys") + for key_field in empty_key_list: + df = df[df[key_field] != ""] + return df + + +def resolve_datatypes(df: pd.DataFrame, resolve_datatypes_list: dict) -> pd.DataFrame: + logging.info("Resolving datatypes") + for key, value in resolve_datatypes_list.items(): + if str.lower(value[0:2]) == "int": + df[key] = df[key].fillna(0).astype(value) + else: + df[key] = df[key].astype(value) + return df + + +def remove_parenthesis_long_lat( + df: pd.DataFrame, remove_paren_list: typing.List[str] +) -> pd.DataFrame: + logging.info("Removing parenthesis from geographic fields") + for paren_fld in remove_paren_list: + df[paren_fld].replace("(", "", regex=False, inplace=True) + df[paren_fld].replace(")", "", regex=False, inplace=True) + return df + + +def extract_longitude_from_geom( + df: pd.DataFrame, + geom_field_name: str, + lon_field_name: str, +) -> str: + logging.info(f"Extracting longitude field {lon_field_name} from {geom_field_name}") + df[lon_field_name] = df[geom_field_name].apply( + lambda x: str(x).replace("POINT (", "").replace(")", "").split(" ", 1)[0] + if str(x) != "" + else "POINT ( )" + ) + return df + + +def extract_latitude_from_geom( + df: pd.DataFrame, + geom_field_name: str, + lat_field_name: str, +) -> str: + logging.info(f"Extracting latitude field {lat_field_name} from {geom_field_name}") + df[lat_field_name] = df[geom_field_name].apply( + lambda x: str(x).replace("POINT (", "").replace(")", "").split(" ", 1)[-1] + if str(x) != "" + else "POINT ( )" + ) + return df + + +def generate_location(df: pd.DataFrame, gen_location_list: dict) -> pd.DataFrame: + logging.info("Generating location data") + for key, values in gen_location_list.items(): + logging.info(f"Generating location data for field {key}") + df[key] = ( + "POINT(" + + df[values[0]][:].astype("string") + + " " + + df[values[1]][:].astype("string") + + ")" + ) + return df + + +def strip_whitespace( + df: pd.DataFrame, strip_whitespace_list: typing.List[str] +) -> pd.DataFrame: + for ws_fld in strip_whitespace_list: + logging.info(f"Stripping whitespaces in column {ws_fld}") + df[ws_fld] = df[ws_fld].apply(lambda x: str(x).strip()) + return df + + +def strip_newlines( + df: pd.DataFrame, strip_newlines_list: typing.List[str] +) -> pd.DataFrame: + for ws_fld in strip_newlines_list: + logging.info(f"Stripping newlines in column {ws_fld}") + df[ws_fld] = df[ws_fld].str.replace(r"\n", "", regex=True) + df[ws_fld] = df[ws_fld].str.replace(r"\r", "", regex=True) + return df + + +def resolve_date_format( + df: pd.DataFrame, + date_format_list: dict, +) -> pd.DataFrame: + logging.info("Resolving date formats") + for dt_fld in date_format_list.items(): + logging.info(f"Resolving date formats in field {dt_fld}") + df[dt_fld[0]] = df[dt_fld[0]].apply(convert_dt_format, to_format=dt_fld[1]) + return df + + +def convert_dt_format(dt_str: str, to_format: str = '"%Y-%m-%d %H:%M:%S"') -> str: + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + return "" + else: + if to_format.find(" ") > 0: + # Date and Time + return str( + pd.to_datetime( + dt_str, format=f"{to_format}", infer_datetime_format=True + ) + ) + else: + # Date Only + return str( + pd.to_datetime( + dt_str, format=f"{to_format}", infer_datetime_format=True + ).date() + ) + + +def reorder_headers( + df: pd.DataFrame, output_headers_list: typing.List[str] +) -> pd.DataFrame: + logging.info("Re-ordering Headers") + return df[output_headers_list] + + +def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: + logging.info(f"Saving data to target file.. {file_path} ...") + df.to_csv(file_path, index=False, sep=sep) + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + with open(batch_file_path, "r") as data_file: + if truncate_file: + target_file = open(target_file_path, "w+").close() + with open(target_file_path, "a+") as target_file: + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path}" + ) + target_file.write(data_file.read()) + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def upload_file_to_gcs( + file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + if os.path.exists(file_path): + logging.info( + f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" + ) + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(file_path) + else: + logging.info( + f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." + ) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ.get("SOURCE_URL", ""), + source_url_dict=json.loads(os.environ.get("SOURCE_URL_DICT", r"{}")), + source_url_list=json.loads(os.environ.get("SOURCE_URL_LIST", r"[]")), + pipeline_name=os.environ.get("PIPELINE_NAME", ""), + source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(), + target_file=pathlib.Path(os.environ.get("TARGET_FILE", "")).expanduser(), + chunksize=os.environ.get("CHUNKSIZE", "100000"), + project_id=os.environ.get("PROJECT_ID", ""), + dataset_id=os.environ.get("DATASET_ID", ""), + table_id=os.environ.get("TABLE_ID", ""), + drop_dest_table=os.environ.get("DROP_DEST_TABLE", "N"), + schema_path=os.environ.get("SCHEMA_PATH", ""), + header_row_ordinal=os.environ.get("HEADER_ROW_ORDINAL", "None"), + target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""), + target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""), + input_headers=json.loads(os.environ.get("INPUT_CSV_HEADERS", r"[]")), + data_dtypes=json.loads(os.environ.get("DATA_DTYPES", r"{}")), + rename_headers_list=json.loads(os.environ.get("RENAME_HEADERS_LIST", r"{}")), + trip_data_names=json.loads(os.environ.get("TRIP_DATA_NAMES", r"[]")), + trip_data_dtypes=json.loads(os.environ.get("TRIP_DATA_DTYPES", r"{}")), + tripdata_names=json.loads(os.environ.get("TRIPDATA_NAMES", r"[]")), + tripdata_dtypes=json.loads(os.environ.get("TRIPDATA_DTYPES", r"{}")), + rename_headers_tripdata=json.loads( + os.environ.get("RENAME_HEADERS_TRIPDATA", r"{}") + ), + empty_key_list=json.loads(os.environ.get("EMPTY_KEY_LIST", r"[]")), + gen_location_list=json.loads(os.environ.get("GEN_LOCATION_LIST", r"{}")), + resolve_datatypes_list=json.loads( + os.environ.get("RESOLVE_DATATYPES_LIST", r"{}") + ), + remove_paren_list=json.loads(os.environ.get("REMOVE_PAREN_LIST", r"[]")), + strip_newlines_list=json.loads(os.environ.get("STRIP_NEWLINES_LIST", r"[]")), + strip_whitespace_list=json.loads( + os.environ.get("STRIP_WHITESPACE_LIST", r"[]") + ), + date_format_list=json.loads(os.environ.get("DATE_FORMAT_LIST", r"[]")), + reorder_headers_list=json.loads(os.environ.get("REORDER_HEADERS_LIST", r"[]")), + ) diff --git a/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/requirements.txt b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..33eca2697 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,4 @@ +pandas +requests +google-cloud-bigquery +google-cloud-storage diff --git a/datasets/san_francisco/pipelines/_images/sf_311_service_requests_schema.json b/datasets/san_francisco/pipelines/_images/sf_311_service_requests_schema.json new file mode 100644 index 000000000..8852696bf --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_311_service_requests_schema.json @@ -0,0 +1,116 @@ +[ + { + "name": "unique_key", + "type": "INTEGER", + "description": "Unique case id", + "mode": "REQUIRED" + }, + { + "name": "created_date", + "type": "TIMESTAMP", + "description": "The date and time when the service request was made", + "mode": "NULLABLE" + }, + { + "name": "closed_date", + "type": "TIMESTAMP", + "description": "The date and time when the service request was closed", + "mode": "NULLABLE" + }, + { + "name": "resolution_action_updated_date", + "type": "TIMESTAMP", + "description": "The date and time when the service request was last modified. For requests with status=closed, this will be the date the request was closed", + "mode": "NULLABLE" + }, + { + "name": "status", + "type": "STRING", + "description": "The current status of the service request.", + "mode": "NULLABLE" + }, + { + "name": "status_notes", + "type": "STRING", + "description": "Explanation of why status was changed to current state or more details on current status than conveyed with status alone", + "mode": "NULLABLE" + }, + { + "name": "agency_name", + "type": "STRING", + "description": "The agency responsible for fulfilling or otherwise addressing the service request.", + "mode": "NULLABLE" + }, + { + "name": "category", + "type": "STRING", + "description": "The Human readable name of the specific service request type (service_name)", + "mode": "NULLABLE" + }, + { + "name": "complaint_type", + "type": "STRING", + "description": "More specific description of the problem related to the Category", + "mode": "NULLABLE" + }, + { + "name": "descriptor", + "type": "STRING", + "description": "More specific description of the problem related to the Request Type", + "mode": "NULLABLE" + }, + { + "name": "incident_address", + "type": "STRING", + "description": "Human readable address or description of location", + "mode": "NULLABLE" + }, + { + "name": "supervisor_district", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "neighborhood", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "location", + "type": "STRING", + "description": "Latitude and longitude using the (WGS84) projection.", + "mode": "NULLABLE" + }, + { + "name": "source", + "type": "STRING", + "description": "How the service request was made", + "mode": "NULLABLE" + }, + { + "name": "media_url", + "type": "STRING", + "description": "Website URL", + "mode": "NULLABLE" + }, + { + "name": "latitude", + "type": "FLOAT", + "description": "Latitude using the (WGS84) projection.", + "mode": "NULLABLE" + }, + { + "name": "longitude", + "type": "FLOAT", + "description": "Longitude using the (WGS84) projection.", + "mode": "NULLABLE" + }, + { + "name": "police_district", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_bikeshare_station_info_schema.json b/datasets/san_francisco/pipelines/_images/sf_bikeshare_station_info_schema.json new file mode 100644 index 000000000..da4163d7b --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_bikeshare_station_info_schema.json @@ -0,0 +1,74 @@ +[ + { + "name": "station_id", + "type": "STRING", + "description": "Unique identifier of a station.", + "mode": "REQUIRED" + }, + { + "name": "name", + "type": "STRING", + "description": "Public name of the station", + "mode": "REQUIRED" + }, + { + "name": "short_name", + "type": "STRING", + "description": "Short name or other type of identifier, as used by the data publisher", + "mode": "NULLABLE" + }, + { + "name": "lat", + "type": "FLOAT", + "description": "The latitude of station. The field value must be a valid WGS 84 latitude in decimal degrees format. See: http://en.wikipedia.org/wiki/World_Geodetic_System, https://en.wikipedia.org/wiki/Decimal_degrees", + "mode": "REQUIRED" + }, + { + "name": "lon", + "type": "FLOAT", + "description": "The longitude of station. The field value must be a valid WGS 84 longitude in decimal degrees format. See: http://en.wikipedia.org/wiki/World_Geodetic_System, https://en.wikipedia.org/wiki/Decimal_degrees", + "mode": "REQUIRED" + }, + { + "name": "region_id", + "type": "INTEGER", + "description": "ID of the region where station is located", + "mode": "NULLABLE" + }, + { + "name": "rental_methods", + "type": "STRING", + "description": "Array of enumerables containing the payment methods accepted at this station. Current valid values (in CAPS) are: KEY (i.e. operator issued bike key / fob / card) CREDITCARD PAYPASS APPLEPAY ANDROIDPAY TRANSITCARD ACCOUNTNUMBER PHONE This list is intended to be as comprehensive at the time of publication as possible but is subject to change, as defined in File Requirements above", + "mode": "NULLABLE" + }, + { + "name": "capacity", + "type": "INTEGER", + "description": "Number of total docking points installed at this station, both available and unavailable", + "mode": "NULLABLE" + }, + { + "name": "external_id", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "eightd_has_key_dispenser", + "type": "BOOLEAN", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "has_kiosk", + "type": "BOOLEAN", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "station_geom", + "type": "GEOGRAPHY", + "description": "", + "mode": "NULLABLE" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_bikeshare_station_status_schema.json b/datasets/san_francisco/pipelines/_images/sf_bikeshare_station_status_schema.json new file mode 100644 index 000000000..f6efc9388 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_bikeshare_station_status_schema.json @@ -0,0 +1,68 @@ +[ + { + "name": "station_id", + "type": "STRING", + "description": "Unique identifier of a station", + "mode": "REQUIRED" + }, + { + "name": "num_bikes_available", + "type": "INTEGER", + "description": "Number of bikes available for rental", + "mode": "REQUIRED" + }, + { + "name": "num_bikes_disabled", + "type": "INTEGER", + "description": "Number of disabled bikes at the station. Vendors who do not want to publicize the number of disabled bikes or docks in their system can opt to omit station capacity (in station_information), num_bikes_disabled and num_docks_disabled. If station capacity is published then broken docks/bikes can be inferred (though not specifically whether the decreased capacity is a broken bike or dock)", + "mode": "NULLABLE" + }, + { + "name": "num_docks_available", + "type": "INTEGER", + "description": "Number of docks accepting bike returns", + "mode": "REQUIRED" + }, + { + "name": "num_docks_disabled", + "type": "INTEGER", + "description": "Number of empty but disabled dock points at the station. This value remains as part of the spec as it is possibly useful during development", + "mode": "NULLABLE" + }, + { + "name": "is_installed", + "type": "BOOLEAN", + "description": "1/0 boolean - is the station currently on the street", + "mode": "REQUIRED" + }, + { + "name": "is_renting", + "type": "BOOLEAN", + "description": "1/0 boolean - is the station currently renting bikes (even if the station is empty, if it is set to allow rentals this value should be 1)", + "mode": "REQUIRED" + }, + { + "name": "is_returning", + "type": "BOOLEAN", + "description": "1/0 boolean - is the station accepting bike returns (if a station is full but would allow a return if it was not full then this value should be 1)", + "mode": "REQUIRED" + }, + { + "name": "last_reported", + "type": "INTEGER", + "description": "Integer POSIX timestamp indicating the last time this station reported its status to the backend", + "mode": "REQUIRED" + }, + { + "name": "num_ebikes_available", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "eightd_has_available_keys", + "type": "BOOLEAN", + "description": "", + "mode": "NULLABLE" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_bikeshare_trips_schema.json b/datasets/san_francisco/pipelines/_images/sf_bikeshare_trips_schema.json new file mode 100644 index 000000000..0ace76fb2 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_bikeshare_trips_schema.json @@ -0,0 +1,128 @@ +[ + { + "name": "trip_id", + "type": "STRING", + "mode": "REQUIRED", + "description": "Numeric ID of bike trip" + }, + { + "name": "duration_sec", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "Time of trip in seconds" + }, + { + "name": "start_date", + "type": "TIMESTAMP", + "mode": "NULLABLE", + "description": "Start date of trip with date and time, in PST" + }, + { + "name": "start_station_name", + "type": "STRING", + "mode": "NULLABLE", + "description": "Station name of start station" + }, + { + "name": "start_station_id", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "Numeric reference for start station" + }, + { + "name": "end_date", + "type": "TIMESTAMP", + "mode": "NULLABLE", + "description": "End date of trip with date and time, in PST" + }, + { + "name": "end_station_name", + "type": "STRING", + "mode": "NULLABLE", + "description": "Station name for end station" + }, + { + "name": "end_station_id", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "Numeric reference for end station" + }, + { + "name": "bike_number", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "ID of bike used" + }, + { + "name": "zip_code", + "type": "STRING", + "mode": "NULLABLE", + "description": "Home zip code of subscriber (customers can choose to manually enter zip at kiosk however data is unreliable)" + }, + { + "name": "subscriber_type", + "type": "STRING", + "mode": "NULLABLE", + "description": "Subscriber = annual or 30-day member; Customer = 24-hour or 3-day member" + }, + { + "name": "subscription_type", + "type": "STRING", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "start_station_latitude", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "start_station_longitude", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "end_station_latitude", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "end_station_longitude", + "type": "FLOAT", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "member_birth_year", + "type": "INTEGER", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "member_gender", + "type": "STRING", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "bike_share_for_all_trip", + "type": "STRING", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "start_station_geom", + "type": "GEOGRAPHY", + "mode": "NULLABLE", + "description": "" + }, + { + "name": "end_station_geom", + "type": "GEOGRAPHY", + "mode": "NULLABLE", + "description": "" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_calendar_schema.json b/datasets/san_francisco/pipelines/_images/sf_calendar_schema.json new file mode 100644 index 000000000..c3dab1752 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_calendar_schema.json @@ -0,0 +1,68 @@ +[ + { + "name": "service_id", + "type": "STRING", + "description": "Unique identifier for a set of dates when service is available for one or more routes", + "mode": "nullable" + }, + { + "name": "service_desc", + "type": "STRING", + "description": "Text description of service category", + "mode": "nullable" + }, + { + "name": "sunday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "monday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "tuesday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "wednesday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "thursday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "friday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "saturday", + "type": "BOOLEAN", + "description": "Indicates whether the service is valid for all Sundays operating on the agency is using normal schedules and fare categories", + "mode": "nullable" + }, + { + "name": "exceptions", + "type": "DATE", + "description": "Specifies a particular date when this service is not available. This is typically during holidays, special events, etc. If applicable, the operating service for the day is indicated in replacement_service. Format for the date is YYYY-MM-DD", + "mode": "nullable" + }, + { + "name": "exception_type", + "type": "BOOLEAN", + "description": "Indicates whether service is available on the date specified in the exceptions field. The following are valid values for this field: - TRUE: Service will be operating during the date listed in the column exceptions - FALSE: Service will not be operating during the date listed in the column exceptions", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_film_locations_schema.json b/datasets/san_francisco/pipelines/_images/sf_film_locations_schema.json new file mode 100644 index 000000000..8ab182a36 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_film_locations_schema.json @@ -0,0 +1,68 @@ +[ + { + "name": "title", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "release_year", + "type": "INTEGER", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "locations", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "fun_facts", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "production_company", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "distributor", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "director", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "writer", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "actor_1", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "actor_2", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + }, + { + "name": "actor_3", + "type": "STRING", + "description": "", + "mode": "NULLABLE" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_muni_routes_schema.json b/datasets/san_francisco/pipelines/_images/sf_muni_routes_schema.json new file mode 100644 index 000000000..6ffbd1c12 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_muni_routes_schema.json @@ -0,0 +1,26 @@ +[ + { + "name": "route_id", + "type": "STRING", + "description": "Unique identifier for each route. A single route will have multiple trips", + "mode": "nullable" + }, + { + "name": "route_short_name", + "type": "STRING", + "description": "Contains the short name of a route. This is a short, abstract identifier like 32, 100X, or Green that riders use to identify a route, but which doesn't give any indication of what places the route serves.", + "mode": "nullable" + }, + { + "name": "route_long_name", + "type": "STRING", + "description": "Contains the full name of a route. This name is generally more descriptive than the name from route_short_name and often includes the route's destination or stop.", + "mode": "nullable" + }, + { + "name": "route_type", + "type": "STRING", + "description": "Describes the type of transportation used on a route. The following are valid values for this field: - light_rail: Streetcar or light rail. Used for any light rail or street-level system within a metropolitan area. - subway: Subway or metro. Used for any underground rail system within a metropolitan area. - rail: Used for intercity or long-distance travel. - bus: Used for short- and long-distance bus routes. - ferry: Used for short- and long-distance boat service. - cable_car: Used for street-level cable cars where the cable runs beneath the car. - tram: Gondola or suspended cable car. Typically used for aerial cable cars where the car is suspended from the cable. - funicular: Used for any rail system that moves on steep inclines with a cable traction system.", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_muni_shapes_schema.json b/datasets/san_francisco/pipelines/_images/sf_muni_shapes_schema.json new file mode 100644 index 000000000..d51adb3ab --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_muni_shapes_schema.json @@ -0,0 +1,38 @@ +[ + { + "name": "shape_id", + "type": "STRING", + "description": "Uniquely identifies each shape", + "mode": "nullable" + }, + { + "name": "shape_point_sequence", + "type": "INTEGER", + "description": "Associates the latitude and longitude of a shape point with its sequence order along the shape. The values for shape_pt_sequence must increase throughout the trip but don't need to be consecutive.", + "mode": "nullable" + }, + { + "name": "shape_point_lat", + "type": "FLOAT", + "description": "Associates a shape point's latitude with a shape ID.", + "mode": "nullable" + }, + { + "name": "shape_point_lon", + "type": "FLOAT", + "description": "Associates a shape point's longitude with a shape ID.", + "mode": "nullable" + }, + { + "name": "shape_point_geom", + "type": "GEOGRAPHY", + "description": "Geographic representation of the points latitude and longitude", + "mode": "nullable" + }, + { + "name": "shape_distance_traveled", + "type": "FLOAT", + "description": "Provides the actual distance traveled along the shape from the first shape point to the point specified in this record. This information allows the trip planner to determine how much of the shape to draw when they show part of a trip on the map. The values used for shape_dist_traveled must increase along with shape_pt_sequence: they can't be used to show reverse travel along a route.", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_muni_stops_schema.json b/datasets/san_francisco/pipelines/_images/sf_muni_stops_schema.json new file mode 100644 index 000000000..b4c916374 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_muni_stops_schema.json @@ -0,0 +1,32 @@ +[ + { + "name": "stop_id", + "type": "STRING", + "description": "Uniquely identifies each stop", + "mode": "nullable" + }, + { + "name": "stop_name", + "type": "STRING", + "description": "Full text name of the transit stop", + "mode": "nullable" + }, + { + "name": "stop_lat", + "type": "FLOAT", + "description": "Latitude of the stop", + "mode": "nullable" + }, + { + "name": "stop_lon", + "type": "FLOAT", + "description": "Longitude of the stop", + "mode": "nullable" + }, + { + "name": "stop_geom", + "type": "GEOGRAPHY", + "description": "Geographic representation of the stop's position", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_sffd_service_calls_schema.json b/datasets/san_francisco/pipelines/_images/sf_sffd_service_calls_schema.json new file mode 100644 index 000000000..c6bbcf817 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_sffd_service_calls_schema.json @@ -0,0 +1,218 @@ +[ + { + "name": "call_number", + "type": "INTEGER", + "description": "A unique 9-digit number assigned by the 911 Dispatch Center (DEM) to this call. These number are used for both Police and Fire calls", + "mode": "REQUIRED" + }, + { + "name": "unit_id", + "type": "STRING", + "description": "Unit Identifier. For example E01 for Engine 1 or T01 for Truck 1.", + "mode": "NULLABLE" + }, + { + "name": "incident_number", + "type": "INTEGER", + "description": "A unique 8-digit number assigned by DEM to this Fire incident.", + "mode": "NULLABLE" + }, + { + "name": "call_type", + "type": "STRING", + "description": "Type of call the incident falls into. See the list below.", + "mode": "NULLABLE" + }, + { + "name": "call_date", + "type": "DATE", + "description": "Date the call is received at the 911 Dispatch Center. Used for reporting purposes.", + "mode": "NULLABLE" + }, + { + "name": "watch_date", + "type": "DATE", + "description": "Watch date when the call is received. Watch date starts at 0800 each morning and ends at 0800 the next day.", + "mode": "NULLABLE" + }, + { + "name": "received_timestamp", + "type": "DATETIME", + "description": "Date and time of call is received at the 911 Dispatch Center.", + "mode": "NULLABLE" + }, + { + "name": "entry_timestamp", + "type": "DATETIME", + "description": "Date and time the 911 operator submits the entry of the initical call information into the CAD system", + "mode": "NULLABLE" + }, + { + "name": "dispatch_timestamp", + "type": "DATETIME", + "description": "Date and time the 911 operator dispatches this unit to the call.", + "mode": "NULLABLE" + }, + { + "name": "response_timestamp", + "type": "DATETIME", + "description": "Date and time this unit acknowledges the dispatch and records that the unit is en route to the location of the call.", + "mode": "NULLABLE" + }, + { + "name": "on_scene_timestamp", + "type": "DATETIME", + "description": "Date and time the unit records arriving to the location of the incident", + "mode": "NULLABLE" + }, + { + "name": "transport_timestamp", + "type": "DATETIME", + "description": "If this unit is an ambulance, date and time the unit begins the transport unit arrives to hospital", + "mode": "NULLABLE" + }, + { + "name": "hospital_timestamp", + "type": "DATETIME", + "description": "If this unit is an ambulance, date and time the unit arrives to the hospital.", + "mode": "NULLABLE" + }, + { + "name": "call_final_disposition", + "type": "STRING", + "description": "Disposition of the call (Code). For example TH2: Transport to Hospital - Code 2, FIR: Resolved by Fire Department", + "mode": "NULLABLE" + }, + { + "name": "available_timestamp", + "type": "DATETIME", + "description": "Date and time this unit is not longer assigned to this call and it is available for another dispatch.", + "mode": "NULLABLE" + }, + { + "name": "address", + "type": "STRING", + "description": "Address of midblock point associated with incident (obfuscated address to protect caller privacy)", + "mode": "NULLABLE" + }, + { + "name": "city", + "type": "STRING", + "description": "City of incident", + "mode": "NULLABLE" + }, + { + "name": "zipcode_of_incident", + "type": "STRING", + "description": "Zipcode of incident", + "mode": "NULLABLE" + }, + { + "name": "battalion", + "type": "STRING", + "description": "Emergency Response District (There are 9 Fire Emergency Response Districts)", + "mode": "NULLABLE" + }, + { + "name": "station_area", + "type": "STRING", + "description": "Fire Station First Response Area associated with the address of the incident", + "mode": "NULLABLE" + }, + { + "name": "box", + "type": "STRING", + "description": "Fire box associated with the address of the incident. A box is the smallest area used to divide the City. Each box is associated with a unique unit dispatch order. The City is divided into more than 2,400 boxes.", + "mode": "NULLABLE" + }, + { + "name": "original_priority", + "type": "STRING", + "description": "Initial call priority (Code 2: Non-Emergency or Code 3:Emergency).", + "mode": "NULLABLE" + }, + { + "name": "priority", + "type": "STRING", + "description": "Call priority (Code 2: Non-Emergency or Code 3:Emergency).", + "mode": "NULLABLE" + }, + { + "name": "final_priority", + "type": "INTEGER", + "description": "Final call priority (Code 2: Non-Emergency or Code 3:Emergency).", + "mode": "NULLABLE" + }, + { + "name": "als_unit", + "type": "BOOLEAN", + "description": "Does this unit includes ALS (Advance Life Support) resources? Is there a paramedic in this unit?", + "mode": "NULLABLE" + }, + { + "name": "call_type_group", + "type": "STRING", + "description": "Call types are divided into four main groups: Fire, Alarm, Potential Life Threatening and Non Life Threatening.", + "mode": "NULLABLE" + }, + { + "name": "number_of_alarms", + "type": "INTEGER", + "description": "Number of alarms associated with the incident. This is a number between 1 and 5.", + "mode": "NULLABLE" + }, + { + "name": "unit_type", + "type": "STRING", + "description": "Unit type", + "mode": "NULLABLE" + }, + { + "name": "unit_sequence_in_call_dispatch", + "type": "INTEGER", + "description": "A number that indicates the order this unit was assigned to this call", + "mode": "NULLABLE" + }, + { + "name": "fire_prevention_district", + "type": "STRING", + "description": "Bureau of Fire Prevention District associated with this address", + "mode": "NULLABLE" + }, + { + "name": "supervisor_district", + "type": "STRING", + "description": "Supervisor District associated with this address", + "mode": "NULLABLE" + }, + { + "name": "row_id", + "type": "STRING", + "description": "Unique identifier used for managing data updates. It is the concatenation of Call Number and Unit ID separated by a dash", + "mode": "NULLABLE" + }, + { + "name": "latitude", + "type": "FLOAT", + "description": "Latitude of the address", + "mode": "NULLABLE" + }, + { + "name": "longitude", + "type": "FLOAT", + "description": "Longitude of the address", + "mode": "NULLABLE" + }, + { + "name": "neighborhood_name", + "type": "STRING", + "description": "Text name of the neighborhood in which the incident occurred", + "mode": "NULLABLE" + }, + { + "name": "location_geom", + "type": "GEOGRAPHY", + "description": "Latitude and longitude of address obfuscated either to the midblock, intersection or call box", + "mode": "NULLABLE" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_sfpd_incidents_schema.json b/datasets/san_francisco/pipelines/_images/sf_sfpd_incidents_schema.json new file mode 100644 index 000000000..6581538c1 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_sfpd_incidents_schema.json @@ -0,0 +1,74 @@ +[ + { + "name": "unique_key", + "type": "INTEGER", + "description": "The unique incident number for this accident", + "mode": "nullable" + }, + { + "name": "category", + "type": "STRING", + "description": "", + "mode": "nullable" + }, + { + "name": "descript", + "type": "STRING", + "description": "", + "mode": "nullable" + }, + { + "name": "daysofweek", + "type": "STRING", + "description": "", + "mode": "nullable" + }, + { + "name": "pddistrict", + "type": "STRING", + "description": "Police Department district", + "mode": "nullable" + }, + { + "name": "resolution", + "type": "STRING", + "description": "Whether there was an arrest, citation, booking, etc.", + "mode": "nullable" + }, + { + "name": "address", + "type": "STRING", + "description": "", + "mode": "nullable" + }, + { + "name": "longitude", + "type": "FLOAT", + "description": "", + "mode": "nullable" + }, + { + "name": "latitude", + "type": "FLOAT", + "description": "", + "mode": "nullable" + }, + { + "name": "location", + "type": "STRING", + "description": "Longitude/Latitude", + "mode": "nullable" + }, + { + "name": "pdid", + "type": "INTEGER", + "description": "Unique Identifier for use in update and insert operations", + "mode": "nullable" + }, + { + "name": "timestamp", + "type": "DATETIME", + "description": "", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_street_trees_schema.json b/datasets/san_francisco/pipelines/_images/sf_street_trees_schema.json new file mode 100644 index 000000000..6052f003a --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_street_trees_schema.json @@ -0,0 +1,110 @@ +[ + { + "name": "tree_id", + "type": "INTEGER", + "description": "Unique ID for Tree", + "mode": "REQUIRED" + }, + { + "name": "legal_status", + "type": "STRING", + "description": "Legal staus: Permitted or DPW maintained", + "mode": "NULLABLE" + }, + { + "name": "species", + "type": "STRING", + "description": "Species of tree", + "mode": "NULLABLE" + }, + { + "name": "address", + "type": "STRING", + "description": "Address of Tree", + "mode": "NULLABLE" + }, + { + "name": "site_order", + "type": "INTEGER", + "description": "Order of tree at address where multiple trees are at same address. Trees are ordered in ascending address order", + "mode": "NULLABLE" + }, + { + "name": "site_info", + "type": "STRING", + "description": "Description of location of tree", + "mode": "NULLABLE" + }, + { + "name": "plant_type", + "type": "STRING", + "description": "Landscaping or Tree", + "mode": "NULLABLE" + }, + { + "name": "care_taker", + "type": "STRING", + "description": "Agency or person that is primary caregiver to tree. Owner of Tree", + "mode": "NULLABLE" + }, + { + "name": "care_assistant", + "type": "STRING", + "description": "Agency or person that is secondary caregiver to tree", + "mode": "NULLABLE" + }, + { + "name": "plant_date", + "type": "TIMESTAMP", + "description": "Date tree was planted", + "mode": "NULLABLE" + }, + { + "name": "dbh", + "type": "STRING", + "description": "depth height", + "mode": "NULLABLE" + }, + { + "name": "plot_size", + "type": "STRING", + "description": "dimension of tree plot", + "mode": "NULLABLE" + }, + { + "name": "permit_notes", + "type": "STRING", + "description": "Tree permit number reference", + "mode": "NULLABLE" + }, + { + "name": "x_coordinate", + "type": "FLOAT", + "description": "CA State Plane III", + "mode": "NULLABLE" + }, + { + "name": "y_coordinate", + "type": "FLOAT", + "description": "CA State Plane III", + "mode": "NULLABLE" + }, + { + "name": "latitude", + "type": "FLOAT", + "description": "WGS84", + "mode": "NULLABLE" + }, + { + "name": "longitude", + "type": "FLOAT", + "description": "WGS84", + "mode": "NULLABLE" + }, + { + "name": "location", + "type": "STRING", + "description": "Location formatted for mapping", + "mode": "NULLABLE" + } +] diff --git a/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml b/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml new file mode 100644 index 000000000..cd93686e7 --- /dev/null +++ b/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml @@ -0,0 +1,1109 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + - type: bigquery_table + table_id: "311_service_requests" + description: "san_francisco_etl_process" +dag: + airflow_version: 2 + initialize: + dag_id: san_francisco + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + tasks: + - operator: "KubernetesPodOperator" + description: "Run New York 311 Service Requests Pipeline" + args: + task_id: "sf_311_service_requests" + name: "sf_311_service_requests" + startup_timeout_seconds: 600 + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco 311 Service Requests" + SOURCE_URL: "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv" + CHUNKSIZE: "1000000" + SOURCE_FILE: "files/data_311_service_requests.csv" + TARGET_FILE: "files/data_output_311_service_requests.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_311" + TABLE_ID: "311_service_requests" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/311_service_requests/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_311_service_requests_schema.json" + HEADER_ROW_ORDINAL: "0" + INPUT_CSV_HEADERS: >- + [ + "CaseID", + "Opened", + "Closed", + "Updated", + "Status", + "Status Notes", + "Responsible Agency", + "Category", + "Request Type", + "Request Details", + "Address", + "Supervisor District", + "Neighborhood", + "Point", + "Source", + "Media URL", + "Latitude", + "Longitude", + "Police District" + ] + RENAME_HEADERS_LIST: >- + { + "CaseID": "unique_key", + "Opened": "created_date", + "Closed": "closed_date", + "Updated": "resolution_action_updated_date", + "Status": "status", + "Status Notes": "status_notes", + "Responsible Agency": "agency_name", + "Category": "category", + "Request Type": "complaint_type", + "Request Details": "descriptor", + "Address": "incident_address", + "Supervisor District": "supervisor_district", + "Neighborhood": "neighborhood", + "Point": "location", + "Source": "source", + "Media URL": "media_url", + "Latitude": "latitude", + "Longitude": "longitude", + "Police District": "police_district" + } + EMPTY_KEY_LIST: >- + [ + "unique_key" + ] + RESOLVE_DATATYPES_LIST: >- + { + "supervisor_district": "Int64" + } + REMOVE_PAREN_LIST: >- + [ + "latitude", + "longitude" + ] + STRIP_NEWLINES_LIST: >- + [ + "status_notes", + "descriptor" + ] + STRIP_WHITESPACE_LIST: >- + [ + "incident_address" + ] + DATE_FORMAT_LIST: >- + { + "created_date": "%Y-%m-%d %H:%M:%S", + "closed_date": "%Y-%m-%d %H:%M:%S", + "resolution_action_updated_date": "%Y-%m-%d %H:%M:%S" + } + REORDER_HEADERS_LIST: >- + [ + "unique_key", + "created_date", + "closed_date", + "resolution_action_updated_date", + "status", + "status_notes", + "agency_name", + "category", + "complaint_type", + "descriptor", + "incident_address", + "supervisor_district", + "neighborhood", + "location", + "source", + "media_url", + "latitude", + "longitude", + "police_district" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Calendar Pipeline" + args: + task_id: "sf_calendar" + name: "calendar" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Calendar" + SOURCE_URL_DICT: >- + { + "calendar": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/calendar.csv", + "calendar_attributes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/calendar_attributes.csv", + "calendar_dates": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/calendar_dates.csv" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_calendar.csv" + TARGET_FILE: "files/data_output_municipal_calendar.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "calendar" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_calendar/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_calendar_schema.json" + REORDER_HEADERS_LIST: >- + [ + "service_id", "service_desc", + "monday", "tuesday", "wednesday", + "thursday", "friday", "saturday", "sunday", + "exceptions", "exception_type" + ] + RENAME_HEADERS_LIST: >- + { + "monday_str": "monday", + "tuesday_str": "tuesday", + "wednesday_str": "wednesday", + "thursday_str": "thursday", + "friday_str": "friday", + "saturday_str": "saturday", + "sunday_str": "sunday", + "service_description": "service_desc", + "date": "exceptions", + "exception_type_str": "exception_type" + } + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Routes Pipeline" + args: + task_id: "sf_muni_routes" + name: "muni_routes" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Routes" + SOURCE_URL_DICT: >- + { + "routes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/routes.txt" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_routes.csv" + TARGET_FILE: "files/data_output_municipal_routes.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "routes" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_routes/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_muni_routes_schema.json" + REORDER_HEADERS_LIST: >- + [ + "route_id", + "route_short_name", + "route_long_name", + "route_type" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Shapes Pipeline" + args: + task_id: "sf_muni_shapes" + name: "muni_shapes" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Shapes" + SOURCE_URL_DICT: >- + { + "shapes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/shapes.txt" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_shapes.csv" + TARGET_FILE: "files/data_output_municipal_shapes.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "shapes" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_shapes/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_muni_shapes_schema.json" + RENAME_HEADERS_LIST: >- + { + "shape_pt_lon": "shape_point_lon", + "shape_pt_lat": "shape_point_lat", + "shape_pt_sequence": "shape_point_sequence", + "shape_dist_traveled": "shape_distance_traveled" + } + REORDER_HEADERS_LIST: >- + [ + "shape_id", + "shape_point_sequence", + "shape_point_lat", + "shape_point_lon", + "shape_point_geom", + "shape_distance_traveled" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Stops Pipeline" + args: + task_id: "sf_muni_stops" + name: "muni_stops" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Stops" + SOURCE_URL_DICT: >- + { + "stops": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/stops.txt" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_stops.csv" + TARGET_FILE: "files/data_output_municipal_stops.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "stops" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_stops/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_muni_stops_schema.json" + REORDER_HEADERS_LIST: >- + [ + "stop_id", + "stop_name", + "stop_lat", + "stop_lon", + "stop_geom" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Police Department Incidents Pipeline" + args: + task_id: "sfpd_incidents" + name: "sfpd_incidents" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Police Department Incidents" + SOURCE_URL_DICT: >- + { + "sfpd_incidents": "https://data.sfgov.org/api/views/tmnf-yvry/rows.csv" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_sfpd_incidents.csv" + TARGET_FILE: "files/data_output_sfpd_incidents.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_sfpd_incidents" + TABLE_ID: "sfpd_incidents" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/sfpd_incidents/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_sfpd_incidents_schema.json" + HEADER_ROW_ORDINAL: "0" + EMPTY_KEY_LIST: >- + [ + "unique_key" + ] + DATE_FORMAT_LIST: >- + { + "Date": "%Y-%m-%d %H:%M:%S" + } + RENAME_HEADERS_LIST: >- + { + "IncidntNum": "unique_key", + "Category": "category", + "Descript": "descript", + "DayOfWeek": "dayofweek", + "PdDistrict": "pddistrict", + "Resolution": "resolution", + "Address": "address", + "X": "longitude", + "Y": "latitude", + "Location": "location", + "PdId": "pdid", + "Date": "Date", + "Time": "Time" + } + REORDER_HEADERS_LIST: >- + [ + "unique_key", + "category", + "descript", + "dayofweek", + "pddistrict", + "resolution", + "address", + "longitude", + "latitude", + "location", + "pdid", + "timestamp" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Bikeshare Stations Pipeline" + args: + task_id: "sf_bikeshare_stations" + name: "bikeshare_stations" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Bikeshare Stations" + SOURCE_URL: "https://gbfs.baywheels.com/gbfs/fr/station_information" + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_bikeshare_station_info.csv" + TARGET_FILE: "files/data_output_bikeshare_station_info.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_bikeshare" + TABLE_ID: "bikeshare_station_info" + DROP_DEST_TABLE: "Y" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/bikeshare_stations/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_bikeshare_station_info_schema.json" + HEADER_ROW_ORDINAL: "0" + RENAME_HEADERS_LIST: >- + { + "data.stations.station_id": "station_id", + "data.stations.name": "name", + "data.stations.short_name": "short_name", + "data.stations.lat": "lat", + "data.stations.lon": "lon", + "data.stations.region_id": "region_id", + "data.stations.rental_methods": "rental_methods", + "data.stations.capacity": "capacity", + "data.stations.eightd_has_key_dispenser": "eightd_has_key_dispenser", + "data.stations.has_kiosk": "has_kiosk", + "data.stations.external_id": "external_id" + } + EMPTY_KEY_LIST: >- + [ + "station_id", + "name", + "lat", + "lon" + ] + GEN_LOCATION_LIST: >- + { + "station_geom": [ "lon", "lat" ] + } + RESOLVE_DATATYPES_LIST: >- + { + "region_id": "Int64" + } + REMOVE_PAREN_LIST: >- + [ + "latitude", + "longitude" + ] + STRIP_WHITESPACE_LIST: >- + [ + "incident_address" + ] + DATE_FORMAT_LIST: >- + { + "created_date": "%Y-%m-%d %H:%M:%S", + "closed_date": "%Y-%m-%d %H:%M:%S", + "resolution_action_updated_date": "%Y-%m-%d %H:%M:%S" + } + REORDER_HEADERS_LIST: >- + [ + "station_id", + "name", + "short_name", + "lat", + "lon", + "region_id", + "rental_methods", + "capacity", + "external_id", + "eightd_has_key_dispenser", + "has_kiosk", + "station_geom" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Bikeshare Status Pipeline" + args: + task_id: "sf_bikeshare_status" + name: "bikeshare_status" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Bikeshare Station Status" + SOURCE_URL: "https://gbfs.baywheels.com/gbfs/en/station_status" + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_bikeshare_status.csv" + TARGET_FILE: "files/data_output_bikeshare_status.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_bikeshare" + TABLE_ID: "bikeshare_station_status" + DROP_DEST_TABLE: "Y" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/bikeshare_status/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_bikeshare_station_status_schema.json" + HEADER_ROW_ORDINAL: "0" + RENAME_HEADERS_LIST: >- + { + "data.stations.eightd_has_available_keys": "eightd_has_available_keys", + "data.stations.is_installed": "is_installed", + "data.stations.is_renting": "is_renting", + "data.stations.is_returning": "is_returning", + "data.stations.last_reported": "last_reported", + "data.stations.num_bikes_available": "num_bikes_available", + "data.stations.num_bikes_disabled": "num_bikes_disabled", + "data.stations.num_docks_available": "num_docks_available", + "data.stations.num_docks_disabled": "num_docks_disabled", + "data.stations.num_ebikes_available": "num_ebikes_available", + "data.stations.station_id": "station_id" + } + EMPTY_KEY_LIST: >- + [ + "station_id", + "num_bikes_available", + "num_docks_available", + "is_installed", + "is_renting", + "is_returning", + "last_reported" + ] + REORDER_HEADERS_LIST: >- + [ + "station_id", + "num_bikes_available", + "num_bikes_disabled", + "num_docks_available", + "num_docks_disabled", + "is_installed", + "is_renting", + "is_returning", + "last_reported", + "num_ebikes_available", + "eightd_has_available_keys" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Bikeshare Trips Pipeline" + args: + task_id: "sf_bikeshare_trips" + name: "sf_bikeshare_trips" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Bikeshare Trips" + SOURCE_URL_LIST: >- + [ + "https://s3.amazonaws.com/fordgobike-data/201803-fordgobike-tripdata.csv.zip", + "https://s3.amazonaws.com/fordgobike-data/201804-fordgobike-tripdata.csv.zip", + "https://s3.amazonaws.com/fordgobike-data/201802-fordgobike-tripdata.csv.zip", + "https://s3.amazonaws.com/fordgobike-data/201801-fordgobike-tripdata.csv.zip", + "https://s3.amazonaws.com/fordgobike-data/2017-fordgobike-tripdata.csv", + "https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip", + "https://s3.amazonaws.com/babs-open-data/babs_open_data_year_2.zip", + "https://s3.amazonaws.com/babs-open-data/babs_open_data_year_3.zip" + ] + CHUNKSIZE: "1000000" + SOURCE_FILE: "files/data_bikeshare_trips.csv" + TARGET_FILE: "files/data_output_bikeshare_trips.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_bikeshare" + TABLE_ID: "bikeshare_trips" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/bikeshare_trips/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_bikeshare_trips_schema.json" + HEADER_ROW_ORDINAL: "0" + TRIP_DATA_NAMES: >- + [ + "source_file", + "trip_id", + "duration_sec", + "start_date", + "start_station_name", + "start_station_terminal", + "end_date", + "end_station_name", + "end_station_terminal", + "bike_number", + "subscription_type", + "zip_code" + ] + TRIP_DATA_DTYPES: >- + { + "source_file": "str", + "trip_id": "str", + "duration_sec": "str", + "start_date": "str", + "start_station_name": "str", + "start_station_terminal": "str", + "end_date": "str", + "end_station_name": "str", + "end_station_terminal": "str", + "bike_number": "str", + "subscription_type": "str", + "zip_code": "str" + } + TRIPDATA_NAMES: >- + [ + "source_file", + "duration_sec", + "start_date", + "end_date", + "start_station_terminal", + "start_station_name", + "start_station_latitude", + "start_station_longitude", + "end_station_terminal", + "end_station_name", + "end_station_latitude", + "end_station_longitude", + "bike_number", + "subscriber_type", + "member_birth_year", + "member_gender", + "bike_share_for_all_trip" + ] + TRIPDATA_DTYPES: >- + { + "source_file": "str", + "duration_sec": "int", + "start_date": "str", + "end_date": "str", + "start_station_terminal": "int", + "start_station_name": "str", + "start_station_latitude": "float", + "start_station_longitude": "float", + "end_station_terminal": "int", + "end_station_name": "str", + "end_station_latitude": "float", + "end_station_longitude": "float", + "bike_number": "int", + "subscriber_type": "str", + "member_birth_year": "str", + "member_gender": "str", + "bike_share_for_all_trip": "str" + } + RENAME_HEADERS_TRIPDATA: >- + { + "duration_sec": "Duration", + "start_time": "Start Date", + "start_station_name": "Start Station", + "start_station_id": "Start Terminal", + "end_time": "End Date", + "end_station_name": "End Station", + "end_station_id": "End Terminal", + "bike_id": "Bike #", + "user_type": "Subscription Type", + "start_station_latitude": "start_station_latitude", + "start_station_longitude": "start_station_longitude", + "end_station_latitude": "end_station_latitude", + "end_station_longitude": "end_station_longitude", + "member_birth_year": "member_birth_year", + "member_gender": "member_gender", + "bike_share_for_all_trip": "bike_share_for_all_trip" + } + RENAME_HEADERS_LIST: >- + { + "trip_id": "trip_id", + "duration_sec": "duration_sec", + "start_date": "start_date", + "start_station_name": "start_station_name", + "start_station_terminal": "start_station_id", + "end_date": "end_date", + "end_station_name": "end_station_name", + "end_station_terminal": "end_station_id", + "bike_number": "bike_number", + "zip_code": "zip_code", + "subscriber_type_new": "subscriber_type", + "subscription_type": "subscription_type", + "start_station_latitude": "start_station_latitude", + "start_station_longitude": "start_station_longitude", + "end_station_latitude": "end_station_latitude", + "end_station_longitude": "end_station_longitude", + "member_birth_year": "member_birth_year", + "member_gender": "member_gender", + "bike_share_for_all_trip": "bike_share_for_all_trip", + "start_station_geom": "start_station_geom", + "end_station_geom": "end_station_geom" + } + RESOLVE_DATATYPES_LIST: >- + { + "member_birth_year": "Int64" + } + DATE_FORMAT_LIST: >- + { + "start_date": "%Y-%m-%d %H:%M:%S", + "end_date": "%Y-%m-%d %H:%M:%S" + } + GEN_LOCATION_LIST: >- + { + "start_station_geom": [ "start_station_longitude", "start_station_latitude" ], + "end_station_geom": [ "end_station_longitude", "end_station_latitude" ] + } + REORDER_HEADERS_LIST: >- + [ + "trip_id", + "duration_sec", + "start_date", + "start_station_name", + "start_station_id", + "end_date", + "end_station_name", + "end_station_id", + "bike_number", + "zip_code", + "subscriber_type", + "subscription_type", + "start_station_latitude", + "start_station_longitude", + "end_station_latitude", + "end_station_longitude", + "member_birth_year", + "member_gender", + "bike_share_for_all_trip", + "start_station_geom", + "end_station_geom" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Film Locations Pipeline" + args: + task_id: "sf_film_locations" + name: "sf_film_locations" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Film Locations" + SOURCE_URL: "https://data.sfgov.org/api/views/yitu-d5am/rows.csv" + CHUNKSIZE: "1000000" + SOURCE_FILE: "files/data_film_locations.csv" + TARGET_FILE: "files/data_output_film_locations.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_film_locations" + TABLE_ID: "film_locations" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/film_locations/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_film_locations_schema.json" + HEADER_ROW_ORDINAL: "0" + RENAME_HEADERS_LIST: >- + { + "Title": "title", + "Release Year": "release_year", + "Locations": "locations", + "Fun Facts": "fun_facts", + "Production Company": "production_company", + "Distributor": "distributor", + "Director": "director", + "Writer": "writer", + "Actor 1": "actor_1", + "Actor 2": "actor_2", + "Actor 3": "actor_3" + } + STRIP_WHITESPACE_LIST: >- + [ + "distributor", + "director", + "actor_2" + ] + STRIP_NEWLINES_LIST: >- + [ + "production_company", + "fun_facts" + ] + REORDER_HEADERS_LIST: >- + [ + "title", + "release_year", + "locations", + "fun_facts", + "production_company", + "distributor", + "director", + "writer", + "actor_1", + "actor_2", + "actor_3" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Fire Department Service Calls Pipeline" + args: + task_id: "sffd_service_calls" + name: "sffd_service_calls" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Fire Department Service Calls" + SOURCE_URL: "https://data.sfgov.org/api/views/nuek-vuh3/rows.csv" + CHUNKSIZE: "1000000" + SOURCE_FILE: "files/data_sffd_service_calls.csv" + TARGET_FILE: "files/data_output_sffd_service_calls.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_sffd_service_calls" + TABLE_ID: "sffd_service_calls" + DROP_DEST_TABLE: "Y" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/sffd_service_calls/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_sffd_service_calls_schema.json" + HEADER_ROW_ORDINAL: "0" + INPUT_CSV_HEADERS: >- + [ + "Call Number", + "Unit ID", + "Incident Number", + "Call Type", + "Call Date", + "Watch Date", + "Received DtTm", + "Entry DtTm", + "Dispatch DtTm", + "Response DtTm", + "On Scene DtTm", + "Transport DtTm", + "Hospital DtTm", + "Call Final Disposition", + "Available DtTm", + "Address", + "City", + "Zipcode of Incident", + "Battalion", + "Station Area", + "Box", + "Original Priority", + "Priority", + "Final Priority", + "ALS Unit", + "Call Type Group", + "Number of Alarms", + "Unit Type", + "Unit sequence in call dispatch", + "Fire Prevention District", + "Supervisor District", + "Neighborhooods - Analysis Boundaries", + "RowID", + "case_location" + ] + DATA_DTYPES: >- + { + "Call Number": "int64", + "Unit ID": "str", + "Incident Number": "int64", + "Call Type": "str", + "Call Date": "str", + "Watch Date": "str", + "Received DtTm": "str", + "Entry DtTm": "str", + "Dispatch DtTm": "str", + "Response DtTm": "str", + "On Scene DtTm": "str", + "Transport DtTm": "str", + "Hospital DtTm": "str", + "Call Final Disposition": "str", + "Available DtTm": "str", + "Address": "str", + "City": "str", + "Zipcode of Incident": "str", + "Battalion": "str", + "Station Area": "str", + "Box": "str", + "Original Priority": "str", + "Priority": "str", + "Final Priority": "int64", + "ALS Unit": "str", + "Call Type Group": "str", + "Number of Alarms": "str", + "Unit Type": "str", + "Unit sequence in call dispatch": "str", + "Fire Prevention District": "str", + "Supervisor District": "str", + "Neighborhooods - Analysis Boundaries": "str", + "RowID": "str", + "case_location": "str" + } + RENAME_HEADERS_LIST: >- + { + "Call Number": "call_number", + "Unit ID": "unit_id", + "Incident Number": "incident_number", + "Call Type": "call_type", + "Call Date": "call_date", + "Watch Date": "watch_date", + "Received DtTm": "received_timestamp", + "Entry DtTm": "entry_timestamp", + "Dispatch DtTm": "dispatch_timestamp", + "Response DtTm": "response_timestamp", + "On Scene DtTm": "on_scene_timestamp", + "Transport DtTm": "transport_timestamp", + "Hospital DtTm": "hospital_timestamp", + "Call Final Disposition": "call_final_disposition", + "Available DtTm": "available_timestamp", + "Address": "address", + "City": "city", + "Zipcode of Incident": "zipcode_of_incident", + "Battalion": "battalion", + "Station Area": "station_area", + "Box": "box", + "Original Priority": "original_priority", + "Priority": "priority", + "Final Priority": "final_priority", + "ALS Unit": "als_unit", + "Call Type Group": "call_type_group", + "Number of Alarms": "number_of_alarms", + "Unit Type": "unit_type", + "Unit sequence in call dispatch": "unit_sequence_in_call_dispatch", + "Fire Prevention District": "fire_prevention_district", + "Supervisor District": "supervisor_district", + "Neighborhooods - Analysis Boundaries": "neighborhood_name", + "RowID": "row_id", + "case_location": "location_geom" + } + DATE_FORMAT_LIST: >- + { + "call_date": "%Y-%m-%d", + "watch_date": "%Y-%m-%d", + "available_timestamp": "%Y-%m-%d %H:%M:%S", + "dispatch_timestamp": "%Y-%m-%d %H:%M:%S", + "entry_timestamp": "%Y-%m-%d %H:%M:%S", + "on_scene_timestamp": "%Y-%m-%d %H:%M:%S", + "received_timestamp": "%Y-%m-%d %H:%M:%S", + "response_timestamp": "%Y-%m-%d %H:%M:%S", + "transport_timestamp": "%Y-%m-%d %H:%M:%S", + "hospital_timestamp": "%Y-%m-%d %H:%M:%S" + } + REORDER_HEADERS_LIST: >- + [ + "call_number", + "unit_id", + "incident_number", + "call_type", + "call_date", + "watch_date", + "received_timestamp", + "entry_timestamp", + "dispatch_timestamp", + "response_timestamp", + "on_scene_timestamp", + "transport_timestamp", + "hospital_timestamp", + "call_final_disposition", + "available_timestamp", + "address", + "city", + "zipcode_of_incident", + "battalion", + "station_area", + "box", + "original_priority", + "priority", + "final_priority", + "als_unit", + "call_type_group", + "number_of_alarms", + "unit_type", + "unit_sequence_in_call_dispatch", + "fire_prevention_district", + "supervisor_district", + "row_id", + "latitude", + "longitude", + "neighborhood_name", + "location_geom" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Street Trees Pipeline" + args: + task_id: "sf_street_trees" + name: "sf_street_trees" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Street Trees" + SOURCE_URL: "https://data.sfgov.org/api/views/tkzw-k3nq/rows.csv" + CHUNKSIZE: "1000000" + SOURCE_FILE: "files/data_sf_street_trees.csv" + TARGET_FILE: "files/data_output_sf_street_trees.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_trees" + TABLE_ID: "street_trees" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/sf_street_trees/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_street_trees_schema.json" + HEADER_ROW_ORDINAL: "0" + INPUT_CSV_HEADERS: >- + [ + "TreeID", + "qLegalStatus", + "qSpecies", + "qAddress", + "SiteOrder", + "qSiteInfo", + "PlantType", + "qCaretaker", + "qCareAssistant", + "PlantDate", + "DBH", + "PlotSize", + "PermitNotes", + "XCoord", + "YCoord", + "Latitude", + "Longitude", + "Location" + ] + DATA_DTYPES: >- + { + "TreeID": "int64", + "qLegalStatus": "str", + "qSpecies": "str", + "qAddress": "str", + "SiteOrder": "str", + "qSiteInfo": "str", + "PlantType": "str", + "qCaretaker": "str", + "qCareAssistant": "str", + "PlantDate": "str", + "DBH": "str", + "PlotSize": "str", + "PermitNotes": "str", + "XCoord": "float64", + "YCoord": "float64", + "Latitude": "float64", + "Longitude": "float64", + "Location": "str" + } + RENAME_HEADERS_LIST: >- + { + "TreeID" : "tree_id", + "qLegalStatus" : "legal_status", + "qSpecies" : "species", + "qAddress" : "address", + "SiteOrder" : "site_order", + "qSiteInfo" : "site_info", + "PlantType" : "plant_type", + "qCaretaker" : "care_taker", + "qCareAssistant" : "care_assistant", + "PlantDate" : "plant_date", + "DBH" : "dbh", + "PlotSize" : "plot_size", + "PermitNotes" : "permit_notes", + "XCoord" : "x_coordinate", + "YCoord" : "y_coordinate", + "Latitude" : "latitude", + "Longitude" : "longitude", + "Location" : "location" + } + DATE_FORMAT_LIST: >- + { + "plant_date": "%Y-%m-%d %H:%M:%S" + } + EMPTY_KEY_LIST: >- + [ + "tree_id" + ] + REORDER_HEADERS_LIST: >- + [ + "tree_id", + "legal_status", + "species", + "address", + "site_order", + "site_info", + "plant_type", + "care_taker", + "care_assistant", + "plant_date", + "dbh", + "plot_size", + "permit_notes", + "x_coordinate", + "y_coordinate", + "latitude", + "longitude", + "location" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + + graph_paths: + - "[ sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees ] >> sf_bikeshare_trips >> [ sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops ] >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests" diff --git a/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py b/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py new file mode 100644 index 000000000..7c4623f46 --- /dev/null +++ b/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py @@ -0,0 +1,454 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="san_francisco.san_francisco", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run New York 311 Service Requests Pipeline + sf_311_service_requests = kubernetes_pod.KubernetesPodOperator( + task_id="sf_311_service_requests", + name="sf_311_service_requests", + startup_timeout_seconds=600, + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco 311 Service Requests", + "SOURCE_URL": "https://data.sfgov.org/api/views/vw6y-z8j6/rows.csv", + "CHUNKSIZE": "1000000", + "SOURCE_FILE": "files/data_311_service_requests.csv", + "TARGET_FILE": "files/data_output_311_service_requests.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_311", + "TABLE_ID": "311_service_requests", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/311_service_requests/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_311_service_requests_schema.json", + "HEADER_ROW_ORDINAL": "0", + "INPUT_CSV_HEADERS": '[\n "CaseID",\n "Opened",\n "Closed",\n "Updated",\n "Status",\n "Status Notes",\n "Responsible Agency",\n "Category",\n "Request Type",\n "Request Details",\n "Address",\n "Supervisor District",\n "Neighborhood",\n "Point",\n "Source",\n "Media URL",\n "Latitude",\n "Longitude",\n "Police District"\n]', + "RENAME_HEADERS_LIST": '{\n "CaseID": "unique_key",\n "Opened": "created_date",\n "Closed": "closed_date",\n "Updated": "resolution_action_updated_date",\n "Status": "status",\n "Status Notes": "status_notes",\n "Responsible Agency": "agency_name",\n "Category": "category",\n "Request Type": "complaint_type",\n "Request Details": "descriptor",\n "Address": "incident_address",\n "Supervisor District": "supervisor_district",\n "Neighborhood": "neighborhood",\n "Point": "location",\n "Source": "source",\n "Media URL": "media_url",\n "Latitude": "latitude",\n "Longitude": "longitude",\n "Police District": "police_district"\n}', + "EMPTY_KEY_LIST": '[\n "unique_key"\n]', + "RESOLVE_DATATYPES_LIST": '{\n "supervisor_district": "Int64"\n}', + "REMOVE_PAREN_LIST": '[\n "latitude",\n "longitude"\n]', + "STRIP_NEWLINES_LIST": '[\n "status_notes",\n "descriptor"\n]', + "STRIP_WHITESPACE_LIST": '[\n "incident_address"\n]', + "DATE_FORMAT_LIST": '{\n "created_date": "%Y-%m-%d %H:%M:%S",\n "closed_date": "%Y-%m-%d %H:%M:%S",\n "resolution_action_updated_date": "%Y-%m-%d %H:%M:%S"\n}', + "REORDER_HEADERS_LIST": '[\n "unique_key",\n "created_date",\n "closed_date",\n "resolution_action_updated_date",\n "status",\n "status_notes",\n "agency_name",\n "category",\n "complaint_type",\n "descriptor",\n "incident_address",\n "supervisor_district",\n "neighborhood",\n "location",\n "source",\n "media_url",\n "latitude",\n "longitude",\n "police_district"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Municipal Calendar Pipeline + sf_calendar = kubernetes_pod.KubernetesPodOperator( + task_id="sf_calendar", + name="calendar", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Calendar", + "SOURCE_URL_DICT": '{\n "calendar": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/calendar.csv",\n "calendar_attributes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/calendar_attributes.csv",\n "calendar_dates": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/calendar_dates.csv"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_calendar.csv", + "TARGET_FILE": "files/data_output_municipal_calendar.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "calendar", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_calendar/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_calendar_schema.json", + "REORDER_HEADERS_LIST": '[\n "service_id", "service_desc",\n "monday", "tuesday", "wednesday",\n "thursday", "friday", "saturday", "sunday",\n "exceptions", "exception_type"\n]', + "RENAME_HEADERS_LIST": '{\n "monday_str": "monday",\n "tuesday_str": "tuesday",\n "wednesday_str": "wednesday",\n "thursday_str": "thursday",\n "friday_str": "friday",\n "saturday_str": "saturday",\n "sunday_str": "sunday",\n "service_description": "service_desc",\n "date": "exceptions",\n "exception_type_str": "exception_type"\n}', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Municipal Routes Pipeline + sf_muni_routes = kubernetes_pod.KubernetesPodOperator( + task_id="sf_muni_routes", + name="muni_routes", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Routes", + "SOURCE_URL_DICT": '{\n "routes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/routes.txt"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_routes.csv", + "TARGET_FILE": "files/data_output_municipal_routes.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "routes", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_routes/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_routes_schema.json", + "REORDER_HEADERS_LIST": '[\n "route_id",\n "route_short_name",\n "route_long_name",\n "route_type"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Municipal Shapes Pipeline + sf_muni_shapes = kubernetes_pod.KubernetesPodOperator( + task_id="sf_muni_shapes", + name="muni_shapes", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Shapes", + "SOURCE_URL_DICT": '{\n "shapes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/shapes.txt"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_shapes.csv", + "TARGET_FILE": "files/data_output_municipal_shapes.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "shapes", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_shapes/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_shapes_schema.json", + "RENAME_HEADERS_LIST": '{\n "shape_pt_lon": "shape_point_lon",\n "shape_pt_lat": "shape_point_lat",\n "shape_pt_sequence": "shape_point_sequence",\n "shape_dist_traveled": "shape_distance_traveled"\n}', + "REORDER_HEADERS_LIST": '[\n "shape_id",\n "shape_point_sequence",\n "shape_point_lat",\n "shape_point_lon",\n "shape_point_geom",\n "shape_distance_traveled"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Municipal Stops Pipeline + sf_muni_stops = kubernetes_pod.KubernetesPodOperator( + task_id="sf_muni_stops", + name="muni_stops", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Stops", + "SOURCE_URL_DICT": '{\n "stops": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/stops.txt"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_stops.csv", + "TARGET_FILE": "files/data_output_municipal_stops.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "stops", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_stops/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_stops_schema.json", + "REORDER_HEADERS_LIST": '[\n "stop_id",\n "stop_name",\n "stop_lat",\n "stop_lon",\n "stop_geom"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Police Department Incidents Pipeline + sfpd_incidents = kubernetes_pod.KubernetesPodOperator( + task_id="sfpd_incidents", + name="sfpd_incidents", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Police Department Incidents", + "SOURCE_URL_DICT": '{\n "sfpd_incidents": "https://data.sfgov.org/api/views/tmnf-yvry/rows.csv"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_sfpd_incidents.csv", + "TARGET_FILE": "files/data_output_sfpd_incidents.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_sfpd_incidents", + "TABLE_ID": "sfpd_incidents", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/sfpd_incidents/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_sfpd_incidents_schema.json", + "HEADER_ROW_ORDINAL": "0", + "EMPTY_KEY_LIST": '[\n "unique_key"\n]', + "DATE_FORMAT_LIST": '{\n "Date": "%Y-%m-%d %H:%M:%S"\n}', + "RENAME_HEADERS_LIST": '{\n "IncidntNum": "unique_key",\n "Category": "category",\n "Descript": "descript",\n "DayOfWeek": "dayofweek",\n "PdDistrict": "pddistrict",\n "Resolution": "resolution",\n "Address": "address",\n "X": "longitude",\n "Y": "latitude",\n "Location": "location",\n "PdId": "pdid",\n "Date": "Date",\n "Time": "Time"\n}', + "REORDER_HEADERS_LIST": '[\n "unique_key",\n "category",\n "descript",\n "dayofweek",\n "pddistrict",\n "resolution",\n "address",\n "longitude",\n "latitude",\n "location",\n "pdid",\n "timestamp"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Bikeshare Stations Pipeline + sf_bikeshare_stations = kubernetes_pod.KubernetesPodOperator( + task_id="sf_bikeshare_stations", + name="bikeshare_stations", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Bikeshare Stations", + "SOURCE_URL": "https://gbfs.baywheels.com/gbfs/fr/station_information", + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_bikeshare_station_info.csv", + "TARGET_FILE": "files/data_output_bikeshare_station_info.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_bikeshare", + "TABLE_ID": "bikeshare_station_info", + "DROP_DEST_TABLE": "Y", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/bikeshare_stations/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_bikeshare_station_info_schema.json", + "HEADER_ROW_ORDINAL": "0", + "RENAME_HEADERS_LIST": '{\n "data.stations.station_id": "station_id",\n "data.stations.name": "name",\n "data.stations.short_name": "short_name",\n "data.stations.lat": "lat",\n "data.stations.lon": "lon",\n "data.stations.region_id": "region_id",\n "data.stations.rental_methods": "rental_methods",\n "data.stations.capacity": "capacity",\n "data.stations.eightd_has_key_dispenser": "eightd_has_key_dispenser",\n "data.stations.has_kiosk": "has_kiosk",\n "data.stations.external_id": "external_id"\n}', + "EMPTY_KEY_LIST": '[\n "station_id",\n "name",\n "lat",\n "lon"\n]', + "GEN_LOCATION_LIST": '{\n "station_geom": [ "lon", "lat" ]\n}', + "RESOLVE_DATATYPES_LIST": '{\n "region_id": "Int64"\n}', + "REMOVE_PAREN_LIST": '[\n "latitude",\n "longitude"\n]', + "STRIP_WHITESPACE_LIST": '[\n "incident_address"\n]', + "DATE_FORMAT_LIST": '{\n "created_date": "%Y-%m-%d %H:%M:%S",\n "closed_date": "%Y-%m-%d %H:%M:%S",\n "resolution_action_updated_date": "%Y-%m-%d %H:%M:%S"\n}', + "REORDER_HEADERS_LIST": '[\n "station_id",\n "name",\n "short_name",\n "lat",\n "lon",\n "region_id",\n "rental_methods",\n "capacity",\n "external_id",\n "eightd_has_key_dispenser",\n "has_kiosk",\n "station_geom"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Bikeshare Status Pipeline + sf_bikeshare_status = kubernetes_pod.KubernetesPodOperator( + task_id="sf_bikeshare_status", + name="bikeshare_status", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Bikeshare Station Status", + "SOURCE_URL": "https://gbfs.baywheels.com/gbfs/en/station_status", + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_bikeshare_status.csv", + "TARGET_FILE": "files/data_output_bikeshare_status.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_bikeshare", + "TABLE_ID": "bikeshare_station_status", + "DROP_DEST_TABLE": "Y", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/bikeshare_status/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_bikeshare_station_status_schema.json", + "HEADER_ROW_ORDINAL": "0", + "RENAME_HEADERS_LIST": '{\n "data.stations.eightd_has_available_keys": "eightd_has_available_keys",\n "data.stations.is_installed": "is_installed",\n "data.stations.is_renting": "is_renting",\n "data.stations.is_returning": "is_returning",\n "data.stations.last_reported": "last_reported",\n "data.stations.num_bikes_available": "num_bikes_available",\n "data.stations.num_bikes_disabled": "num_bikes_disabled",\n "data.stations.num_docks_available": "num_docks_available",\n "data.stations.num_docks_disabled": "num_docks_disabled",\n "data.stations.num_ebikes_available": "num_ebikes_available",\n "data.stations.station_id": "station_id"\n}', + "EMPTY_KEY_LIST": '[\n "station_id",\n "num_bikes_available",\n "num_docks_available",\n "is_installed",\n "is_renting",\n "is_returning",\n "last_reported"\n]', + "REORDER_HEADERS_LIST": '[\n "station_id",\n "num_bikes_available",\n "num_bikes_disabled",\n "num_docks_available",\n "num_docks_disabled",\n "is_installed",\n "is_renting",\n "is_returning",\n "last_reported",\n "num_ebikes_available",\n "eightd_has_available_keys"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Bikeshare Trips Pipeline + sf_bikeshare_trips = kubernetes_pod.KubernetesPodOperator( + task_id="sf_bikeshare_trips", + name="sf_bikeshare_trips", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Bikeshare Trips", + "SOURCE_URL_LIST": '[\n "https://s3.amazonaws.com/fordgobike-data/201803-fordgobike-tripdata.csv.zip",\n "https://s3.amazonaws.com/fordgobike-data/201804-fordgobike-tripdata.csv.zip",\n "https://s3.amazonaws.com/fordgobike-data/201802-fordgobike-tripdata.csv.zip",\n "https://s3.amazonaws.com/fordgobike-data/201801-fordgobike-tripdata.csv.zip",\n "https://s3.amazonaws.com/fordgobike-data/2017-fordgobike-tripdata.csv",\n "https://s3.amazonaws.com/babs-open-data/babs_open_data_year_1.zip",\n "https://s3.amazonaws.com/babs-open-data/babs_open_data_year_2.zip",\n "https://s3.amazonaws.com/babs-open-data/babs_open_data_year_3.zip"\n]', + "CHUNKSIZE": "1000000", + "SOURCE_FILE": "files/data_bikeshare_trips.csv", + "TARGET_FILE": "files/data_output_bikeshare_trips.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_bikeshare", + "TABLE_ID": "bikeshare_trips", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/bikeshare_trips/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_bikeshare_trips_schema.json", + "HEADER_ROW_ORDINAL": "0", + "TRIP_DATA_NAMES": '[\n "source_file",\n "trip_id",\n "duration_sec",\n "start_date",\n "start_station_name",\n "start_station_terminal",\n "end_date",\n "end_station_name",\n "end_station_terminal",\n "bike_number",\n "subscription_type",\n "zip_code"\n]', + "TRIP_DATA_DTYPES": '{\n "source_file": "str",\n "trip_id": "str",\n "duration_sec": "str",\n "start_date": "str",\n "start_station_name": "str",\n "start_station_terminal": "str",\n "end_date": "str",\n "end_station_name": "str",\n "end_station_terminal": "str",\n "bike_number": "str",\n "subscription_type": "str",\n "zip_code": "str"\n}', + "TRIPDATA_NAMES": '[\n "source_file",\n "duration_sec",\n "start_date",\n "end_date",\n "start_station_terminal",\n "start_station_name",\n "start_station_latitude",\n "start_station_longitude",\n "end_station_terminal",\n "end_station_name",\n "end_station_latitude",\n "end_station_longitude",\n "bike_number",\n "subscriber_type",\n "member_birth_year",\n "member_gender",\n "bike_share_for_all_trip"\n]', + "TRIPDATA_DTYPES": '{\n "source_file": "str",\n "duration_sec": "int",\n "start_date": "str",\n "end_date": "str",\n "start_station_terminal": "int",\n "start_station_name": "str",\n "start_station_latitude": "float",\n "start_station_longitude": "float",\n "end_station_terminal": "int",\n "end_station_name": "str",\n "end_station_latitude": "float",\n "end_station_longitude": "float",\n "bike_number": "int",\n "subscriber_type": "str",\n "member_birth_year": "str",\n "member_gender": "str",\n "bike_share_for_all_trip": "str"\n}', + "RENAME_HEADERS_TRIPDATA": '{\n "duration_sec": "Duration",\n "start_time": "Start Date",\n "start_station_name": "Start Station",\n "start_station_id": "Start Terminal",\n "end_time": "End Date",\n "end_station_name": "End Station",\n "end_station_id": "End Terminal",\n "bike_id": "Bike #",\n "user_type": "Subscription Type",\n "start_station_latitude": "start_station_latitude",\n "start_station_longitude": "start_station_longitude",\n "end_station_latitude": "end_station_latitude",\n "end_station_longitude": "end_station_longitude",\n "member_birth_year": "member_birth_year",\n "member_gender": "member_gender",\n "bike_share_for_all_trip": "bike_share_for_all_trip"\n}', + "RENAME_HEADERS_LIST": '{\n "trip_id": "trip_id",\n "duration_sec": "duration_sec",\n "start_date": "start_date",\n "start_station_name": "start_station_name",\n "start_station_terminal": "start_station_id",\n "end_date": "end_date",\n "end_station_name": "end_station_name",\n "end_station_terminal": "end_station_id",\n "bike_number": "bike_number",\n "zip_code": "zip_code",\n "subscriber_type_new": "subscriber_type",\n "subscription_type": "subscription_type",\n "start_station_latitude": "start_station_latitude",\n "start_station_longitude": "start_station_longitude",\n "end_station_latitude": "end_station_latitude",\n "end_station_longitude": "end_station_longitude",\n "member_birth_year": "member_birth_year",\n "member_gender": "member_gender",\n "bike_share_for_all_trip": "bike_share_for_all_trip",\n "start_station_geom": "start_station_geom",\n "end_station_geom": "end_station_geom"\n}', + "RESOLVE_DATATYPES_LIST": '{\n "member_birth_year": "Int64"\n}', + "DATE_FORMAT_LIST": '{\n "start_date": "%Y-%m-%d %H:%M:%S",\n "end_date": "%Y-%m-%d %H:%M:%S"\n}', + "GEN_LOCATION_LIST": '{\n "start_station_geom": [ "start_station_longitude", "start_station_latitude" ],\n "end_station_geom": [ "end_station_longitude", "end_station_latitude" ]\n}', + "REORDER_HEADERS_LIST": '[\n "trip_id",\n "duration_sec",\n "start_date",\n "start_station_name",\n "start_station_id",\n "end_date",\n "end_station_name",\n "end_station_id",\n "bike_number",\n "zip_code",\n "subscriber_type",\n "subscription_type",\n "start_station_latitude",\n "start_station_longitude",\n "end_station_latitude",\n "end_station_longitude",\n "member_birth_year",\n "member_gender",\n "bike_share_for_all_trip",\n "start_station_geom",\n "end_station_geom"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Film Locations Pipeline + sf_film_locations = kubernetes_pod.KubernetesPodOperator( + task_id="sf_film_locations", + name="sf_film_locations", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Film Locations", + "SOURCE_URL": "https://data.sfgov.org/api/views/yitu-d5am/rows.csv", + "CHUNKSIZE": "1000000", + "SOURCE_FILE": "files/data_film_locations.csv", + "TARGET_FILE": "files/data_output_film_locations.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_film_locations", + "TABLE_ID": "film_locations", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/film_locations/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_film_locations_schema.json", + "HEADER_ROW_ORDINAL": "0", + "RENAME_HEADERS_LIST": '{\n "Title": "title",\n "Release Year": "release_year",\n "Locations": "locations",\n "Fun Facts": "fun_facts",\n "Production Company": "production_company",\n "Distributor": "distributor",\n "Director": "director",\n "Writer": "writer",\n "Actor 1": "actor_1",\n "Actor 2": "actor_2",\n "Actor 3": "actor_3"\n}', + "STRIP_WHITESPACE_LIST": '[\n "distributor",\n "director",\n "actor_2"\n]', + "STRIP_NEWLINES_LIST": '[\n "production_company",\n "fun_facts"\n]', + "REORDER_HEADERS_LIST": '[\n "title",\n "release_year",\n "locations",\n "fun_facts",\n "production_company",\n "distributor",\n "director",\n "writer",\n "actor_1",\n "actor_2",\n "actor_3"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Fire Department Service Calls Pipeline + sffd_service_calls = kubernetes_pod.KubernetesPodOperator( + task_id="sffd_service_calls", + name="sffd_service_calls", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Fire Department Service Calls", + "SOURCE_URL": "https://data.sfgov.org/api/views/nuek-vuh3/rows.csv", + "CHUNKSIZE": "1000000", + "SOURCE_FILE": "files/data_sffd_service_calls.csv", + "TARGET_FILE": "files/data_output_sffd_service_calls.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_sffd_service_calls", + "TABLE_ID": "sffd_service_calls", + "DROP_DEST_TABLE": "Y", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/sffd_service_calls/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_sffd_service_calls_schema.json", + "HEADER_ROW_ORDINAL": "0", + "INPUT_CSV_HEADERS": '[\n "Call Number",\n "Unit ID",\n "Incident Number",\n "Call Type",\n "Call Date",\n "Watch Date",\n "Received DtTm",\n "Entry DtTm",\n "Dispatch DtTm",\n "Response DtTm",\n "On Scene DtTm",\n "Transport DtTm",\n "Hospital DtTm",\n "Call Final Disposition",\n "Available DtTm",\n "Address",\n "City",\n "Zipcode of Incident",\n "Battalion",\n "Station Area",\n "Box",\n "Original Priority",\n "Priority",\n "Final Priority",\n "ALS Unit",\n "Call Type Group",\n "Number of Alarms",\n "Unit Type",\n "Unit sequence in call dispatch",\n "Fire Prevention District",\n "Supervisor District",\n "Neighborhooods - Analysis Boundaries",\n "RowID",\n "case_location"\n]', + "DATA_DTYPES": '{\n "Call Number": "int64",\n "Unit ID": "str",\n "Incident Number": "int64",\n "Call Type": "str",\n "Call Date": "str",\n "Watch Date": "str",\n "Received DtTm": "str",\n "Entry DtTm": "str",\n "Dispatch DtTm": "str",\n "Response DtTm": "str",\n "On Scene DtTm": "str",\n "Transport DtTm": "str",\n "Hospital DtTm": "str",\n "Call Final Disposition": "str",\n "Available DtTm": "str",\n "Address": "str",\n "City": "str",\n "Zipcode of Incident": "str",\n "Battalion": "str",\n "Station Area": "str",\n "Box": "str",\n "Original Priority": "str",\n "Priority": "str",\n "Final Priority": "int64",\n "ALS Unit": "str",\n "Call Type Group": "str",\n "Number of Alarms": "str",\n "Unit Type": "str",\n "Unit sequence in call dispatch": "str",\n "Fire Prevention District": "str",\n "Supervisor District": "str",\n "Neighborhooods - Analysis Boundaries": "str",\n "RowID": "str",\n "case_location": "str"\n}', + "RENAME_HEADERS_LIST": '{\n "Call Number": "call_number",\n "Unit ID": "unit_id",\n "Incident Number": "incident_number",\n "Call Type": "call_type",\n "Call Date": "call_date",\n "Watch Date": "watch_date",\n "Received DtTm": "received_timestamp",\n "Entry DtTm": "entry_timestamp",\n "Dispatch DtTm": "dispatch_timestamp",\n "Response DtTm": "response_timestamp",\n "On Scene DtTm": "on_scene_timestamp",\n "Transport DtTm": "transport_timestamp",\n "Hospital DtTm": "hospital_timestamp",\n "Call Final Disposition": "call_final_disposition",\n "Available DtTm": "available_timestamp",\n "Address": "address",\n "City": "city",\n "Zipcode of Incident": "zipcode_of_incident",\n "Battalion": "battalion",\n "Station Area": "station_area",\n "Box": "box",\n "Original Priority": "original_priority",\n "Priority": "priority",\n "Final Priority": "final_priority",\n "ALS Unit": "als_unit",\n "Call Type Group": "call_type_group",\n "Number of Alarms": "number_of_alarms",\n "Unit Type": "unit_type",\n "Unit sequence in call dispatch": "unit_sequence_in_call_dispatch",\n "Fire Prevention District": "fire_prevention_district",\n "Supervisor District": "supervisor_district",\n "Neighborhooods - Analysis Boundaries": "neighborhood_name",\n "RowID": "row_id",\n "case_location": "location_geom"\n}', + "DATE_FORMAT_LIST": '{\n "call_date": "%Y-%m-%d",\n "watch_date": "%Y-%m-%d",\n "available_timestamp": "%Y-%m-%d %H:%M:%S",\n "dispatch_timestamp": "%Y-%m-%d %H:%M:%S",\n "entry_timestamp": "%Y-%m-%d %H:%M:%S",\n "on_scene_timestamp": "%Y-%m-%d %H:%M:%S",\n "received_timestamp": "%Y-%m-%d %H:%M:%S",\n "response_timestamp": "%Y-%m-%d %H:%M:%S",\n "transport_timestamp": "%Y-%m-%d %H:%M:%S",\n "hospital_timestamp": "%Y-%m-%d %H:%M:%S"\n}', + "REORDER_HEADERS_LIST": '[\n "call_number",\n "unit_id",\n "incident_number",\n "call_type",\n "call_date",\n "watch_date",\n "received_timestamp",\n "entry_timestamp",\n "dispatch_timestamp",\n "response_timestamp",\n "on_scene_timestamp",\n "transport_timestamp",\n "hospital_timestamp",\n "call_final_disposition",\n "available_timestamp",\n "address",\n "city",\n "zipcode_of_incident",\n "battalion",\n "station_area",\n "box",\n "original_priority",\n "priority",\n "final_priority",\n "als_unit",\n "call_type_group",\n "number_of_alarms",\n "unit_type",\n "unit_sequence_in_call_dispatch",\n "fire_prevention_district",\n "supervisor_district",\n "row_id",\n "latitude",\n "longitude",\n "neighborhood_name",\n "location_geom"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Street Trees Pipeline + sf_street_trees = kubernetes_pod.KubernetesPodOperator( + task_id="sf_street_trees", + name="sf_street_trees", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Street Trees", + "SOURCE_URL": "https://data.sfgov.org/api/views/tkzw-k3nq/rows.csv", + "CHUNKSIZE": "1000000", + "SOURCE_FILE": "files/data_sf_street_trees.csv", + "TARGET_FILE": "files/data_output_sf_street_trees.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_trees", + "TABLE_ID": "street_trees", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/sf_street_trees/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_street_trees_schema.json", + "HEADER_ROW_ORDINAL": "0", + "INPUT_CSV_HEADERS": '[\n "TreeID",\n "qLegalStatus",\n "qSpecies",\n "qAddress",\n "SiteOrder",\n "qSiteInfo",\n "PlantType",\n "qCaretaker",\n "qCareAssistant",\n "PlantDate",\n "DBH",\n "PlotSize",\n "PermitNotes",\n "XCoord",\n "YCoord",\n "Latitude",\n "Longitude",\n "Location"\n]', + "DATA_DTYPES": '{\n "TreeID": "int64",\n "qLegalStatus": "str",\n "qSpecies": "str",\n "qAddress": "str",\n "SiteOrder": "str",\n "qSiteInfo": "str",\n "PlantType": "str",\n "qCaretaker": "str",\n "qCareAssistant": "str",\n "PlantDate": "str",\n "DBH": "str",\n "PlotSize": "str",\n "PermitNotes": "str",\n "XCoord": "float64",\n "YCoord": "float64",\n "Latitude": "float64",\n "Longitude": "float64",\n "Location": "str"\n}', + "RENAME_HEADERS_LIST": '{\n "TreeID" : "tree_id",\n "qLegalStatus" : "legal_status",\n "qSpecies" : "species",\n "qAddress" : "address",\n "SiteOrder" : "site_order",\n "qSiteInfo" : "site_info",\n "PlantType" : "plant_type",\n "qCaretaker" : "care_taker",\n "qCareAssistant" : "care_assistant",\n "PlantDate" : "plant_date",\n "DBH" : "dbh",\n "PlotSize" : "plot_size",\n "PermitNotes" : "permit_notes",\n "XCoord" : "x_coordinate",\n "YCoord" : "y_coordinate",\n "Latitude" : "latitude",\n "Longitude" : "longitude",\n "Location" : "location"\n}', + "DATE_FORMAT_LIST": '{\n "plant_date": "%Y-%m-%d %H:%M:%S"\n}', + "EMPTY_KEY_LIST": '[\n "tree_id"\n]', + "REORDER_HEADERS_LIST": '[\n "tree_id",\n "legal_status",\n "species",\n "address",\n "site_order",\n "site_info",\n "plant_type",\n "care_taker",\n "care_assistant",\n "plant_date",\n "dbh",\n "plot_size",\n "permit_notes",\n "x_coordinate",\n "y_coordinate",\n "latitude",\n "longitude",\n "location"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + ( + [sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees] + >> sf_bikeshare_trips + >> [sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops] + >> sffd_service_calls + >> sfpd_incidents + >> sf_311_service_requests + )