diff --git a/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py index 0df600f6a..4cae79fa9 100644 --- a/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/san_francisco/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -23,6 +23,7 @@ import zipfile as zip from datetime import datetime +import numpy as np import pandas as pd import requests from google.api_core.exceptions import NotFound @@ -53,6 +54,7 @@ def main( tripdata_dtypes: dict, rename_headers_tripdata: dict, rename_headers_list: dict, + starts_with_pattern_list: typing.List[str], empty_key_list: typing.List[str], gen_location_list: dict, resolve_datatypes_list: dict, @@ -60,9 +62,9 @@ def main( strip_newlines_list: typing.List[str], strip_whitespace_list: typing.List[str], date_format_list: dict, + filter_headers_list: typing.List[str], reorder_headers_list: typing.List[str], ) -> None: - logging.info(f"{pipeline_name} process started") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) execute_pipeline( @@ -88,6 +90,7 @@ def main( tripdata_dtypes=tripdata_dtypes, rename_headers_tripdata=rename_headers_tripdata, rename_headers_list=rename_headers_list, + starts_with_pattern_list=starts_with_pattern_list, empty_key_list=empty_key_list, gen_location_list=gen_location_list, resolve_datatypes_list=resolve_datatypes_list, @@ -95,6 +98,7 @@ def main( strip_newlines_list=strip_newlines_list, strip_whitespace_list=strip_whitespace_list, date_format_list=date_format_list, + filter_headers_list=filter_headers_list, reorder_headers_list=reorder_headers_list, ) logging.info(f"{pipeline_name} process completed") @@ -123,6 +127,7 @@ def execute_pipeline( tripdata_dtypes: dict, rename_headers_tripdata: dict, rename_headers_list: typing.List[str], + starts_with_pattern_list: typing.List[str], empty_key_list: typing.List[str], gen_location_list: dict, resolve_datatypes_list: dict, @@ -130,6 +135,7 @@ def execute_pipeline( strip_newlines_list: typing.List[str], strip_whitespace_list: typing.List[str], date_format_list: dict, + filter_headers_list: typing.List[str], reorder_headers_list: typing.List[str], ) -> None: if ( @@ -150,6 +156,7 @@ def execute_pipeline( schema_path=schema_path, target_gcs_bucket=target_gcs_bucket, target_gcs_path=target_gcs_path, + filter_headers_list=filter_headers_list, rename_headers_list=rename_headers_list, reorder_headers_list=reorder_headers_list, ) @@ -197,6 +204,54 @@ def execute_pipeline( reorder_headers_list=reorder_headers_list, ) return None + elif destination_table == "stop_times": + process_sf_muni_stop_times( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + rename_headers_list=rename_headers_list, + starts_with_pattern_list=starts_with_pattern_list, + reorder_headers_list=reorder_headers_list, + ) + return None + elif destination_table == "fares": + process_sf_muni_fares( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + rename_headers_list=rename_headers_list, + starts_with_pattern_list=starts_with_pattern_list, + reorder_headers_list=reorder_headers_list, + ) + return None + elif destination_table == "trips": + process_sf_muni_trips( + source_url_dict=source_url_dict, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=destination_table, + drop_dest_table=drop_dest_table, + schema_path=schema_path, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + rename_headers_list=rename_headers_list, + starts_with_pattern_list=starts_with_pattern_list, + reorder_headers_list=reorder_headers_list, + ) + return None elif destination_table == "sfpd_incidents": download_file_http( source_url=source_url_dict["sfpd_incidents"], source_file=source_file @@ -322,6 +377,7 @@ def process_sf_calendar( schema_path: str, target_gcs_bucket: str, target_gcs_path: str, + filter_headers_list: typing.List[str], rename_headers_list: typing.List[str], reorder_headers_list: typing.List[str], ) -> None: @@ -374,23 +430,7 @@ def process_sf_calendar( df["exception_type_str"] = df["exception_type"].apply( lambda x: "True" if x == 1 else "False" ) - df = df[ - [ - "service_id", - "start_date", - "end_date", - "service_description", - "date", - "exception_type_str", - "monday_str", - "tuesday_str", - "wednesday_str", - "thursday_str", - "friday_str", - "saturday_str", - "sunday_str", - ] - ] + df = df[filter_headers_list] df = rename_headers(df=df, rename_headers_list=rename_headers_list) df["exceptions"] = df["exceptions"].apply( lambda x: f"{str(x).strip()[:4]}-{str(x).strip()[4:6]}-{str(x).strip()[6:8]}" @@ -563,6 +603,234 @@ def process_sf_muni_stops( ) +def process_sf_muni_stop_times( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + rename_headers_list: typing.List[str], + starts_with_pattern_list: typing.List[str], + reorder_headers_list: typing.List[str], +) -> None: + df_stop_times = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["stop_times"], + target_file_path=str(target_file), + ) + df_stop_times = rename_headers( + df=df_stop_times, rename_headers_list=rename_headers_list + ) + df_stop_times = df_replace_values( + df=df_stop_times, starts_with_pattern_list=starts_with_pattern_list + ) + df_stop_times.loc[ + df_stop_times["arrives_next_day"] == "", "arrives_next_day" + ] = "FALSE" + df_stop_times.loc[ + df_stop_times["departs_next_day"] == "", "departs_next_day" + ] = "FALSE" + df_stop_times = reorder_headers( + df=df_stop_times, output_headers_list=reorder_headers_list + ) + save_to_new_file(df=df_stop_times, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def process_sf_muni_fares( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + rename_headers_list: typing.List[str], + starts_with_pattern_list: typing.List[str], + reorder_headers_list: typing.List[str], +) -> None: + df_fare_rider_categories = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["fare_rider_categories"], + target_file_path=str(target_file), + ) + df_rider_categories = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["rider_categories"], + target_file_path=str(target_file), + ) + df_fare_attributes = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["fare_attributes"], + target_file_path=str(target_file), + ) + df_categories = pd.merge( + df_fare_rider_categories, + df_rider_categories, + left_on="rider_category_id", + right_on="rider_category_id", + how="left", + ) + df_fares = pd.merge( + df_fare_attributes, + df_categories, + left_on="fare_id", + right_on="fare_id", + how="left", + ) + df_fares = rename_headers(df=df_fares, rename_headers_list=rename_headers_list) + df_fares["rider_desc"].apply(lambda x: x.replace('"', "")) + df_fares.loc[df_fares["transfers_permitted"] == "", "transfers_permitted"] = "NULL" + df_fares = df_replace_values( + df=df_fares, starts_with_pattern_list=starts_with_pattern_list + ) + df_fares["transfer_duration"] = ( + df_fares["transfer_duration"].fillna(0.0).apply(np.int64) + ) + df_fares = reorder_headers(df=df_fares, output_headers_list=reorder_headers_list) + save_to_new_file(df=df_fares, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def process_sf_muni_trips( + source_url_dict: dict, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + drop_dest_table: str, + schema_path: str, + target_gcs_bucket: str, + target_gcs_path: str, + rename_headers_list: typing.List[str], + starts_with_pattern_list: typing.List[str], + reorder_headers_list: typing.List[str], +) -> None: + df_trips = gcs_to_df( + project_id=project_id, + source_file_gcs_path=source_url_dict["trips"], + target_file_path=str(target_file), + ) + df_simple_trips = http_to_df( + source_url=source_url_dict["simple_trips"], + target_file_path=str(target_file), + ) + df = pd.merge( + df_trips, + df_simple_trips, + left_on="route_id", + right_on="DIRECTION", + how="left", + ) + df = df_replace_values(df=df, starts_with_pattern_list=starts_with_pattern_list) + df = rename_headers(df=df, rename_headers_list=rename_headers_list) + df = reorder_headers(df=df, output_headers_list=reorder_headers_list) + save_to_new_file(df=df, file_path=target_file, sep="|") + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + drop_table = drop_dest_table == "Y" + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=drop_table, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + field_delimiter="|", + ) + + +def df_replace_values( + df: pd.DataFrame, + starts_with_pattern_list: typing.List[str], +) -> pd.DataFrame: + for lst in starts_with_pattern_list: + target_fieldname = lst[0][0] + source_fieldname = lst[0][1] + reg_exp = lst[1][0] + replace_val = lst[1][1] + logging.info( + f"Replacing values '{reg_exp}' with '{replace_val} in field {target_fieldname} from {source_fieldname}'" + ) + if target_fieldname not in df.columns: + df[target_fieldname] = "" + df[source_fieldname] = df[source_fieldname].astype("str") + if "(" in reg_exp and ("\\(" not in reg_exp): + df.loc[ + df[source_fieldname].str.match(rf"^{reg_exp}", case=False), + target_fieldname, + ] = replace_val.replace("$2", "") + df[source_fieldname].str.extract( + rf"^{reg_exp}", expand=False + ) + else: + df.loc[ + df[source_fieldname].str.contains(reg_exp, regex=True, na=False), + target_fieldname, + ] = replace_val + return df + + def create_geometry_columns(long: float, lat: float) -> pd.DataFrame: return f"POINT({str(long)} {str(lat)})".replace("POINT( )", "") @@ -1320,6 +1588,9 @@ def upload_file_to_gcs( rename_headers_tripdata=json.loads( os.environ.get("RENAME_HEADERS_TRIPDATA", r"{}") ), + starts_with_pattern_list=json.loads( + os.environ.get("STARTS_WITH_PATTERN_LIST", r"[]") + ), empty_key_list=json.loads(os.environ.get("EMPTY_KEY_LIST", r"[]")), gen_location_list=json.loads(os.environ.get("GEN_LOCATION_LIST", r"{}")), resolve_datatypes_list=json.loads( @@ -1331,5 +1602,6 @@ def upload_file_to_gcs( os.environ.get("STRIP_WHITESPACE_LIST", r"[]") ), date_format_list=json.loads(os.environ.get("DATE_FORMAT_LIST", r"[]")), + filter_headers_list=json.loads(os.environ.get("FILTER_HEADERS_LIST", r"[]")), reorder_headers_list=json.loads(os.environ.get("REORDER_HEADERS_LIST", r"[]")), ) diff --git a/datasets/san_francisco/pipelines/_images/sf_muni_fares_schema.json b/datasets/san_francisco/pipelines/_images/sf_muni_fares_schema.json new file mode 100644 index 000000000..e4a13aec6 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_muni_fares_schema.json @@ -0,0 +1,44 @@ +[ + { + "name": "fare_id", + "type": "STRING", + "description": "Unique identifier a fare class.", + "mode": "nullable" + }, + { + "name": "rider_id", + "type": "STRING", + "description": "Unique identifier a rider category. Rider categories are used to assign different pricing to different groups of individuals", + "mode": "nullable" + }, + { + "name": "rider_desc", + "type": "STRING", + "description": "Text description of the rider category", + "mode": "nullable" + }, + { + "name": "price", + "type": "NUMERIC", + "description": "Contains the fare price, in USD", + "mode": "nullable" + }, + { + "name": "payment_method", + "type": "STRING", + "description": "Indicates when the fare must be paid. The following are valid values for this field: - during: Indicates the fare is paid when riders board - before: Indicates fare must be paid before riders board", + "mode": "nullable" + }, + { + "name": "transfers_permitted", + "type": "STRING", + "description": "Specifies the number of transfers permitted on this fare. - 0: No transfers are permitted on this fare. - 1: One transfer is permitted on this fare. - 2: Two transfers are permitted on this fare. - NULL: Unlimited transfers are permitted on this fare.", + "mode": "nullable" + }, + { + "name": "transfer_duration", + "type": "INTEGER", + "description": "Specifies the length of time, in seconds, before a transfer expires. When used with a transfers_permitted value of 0, the transfer_duration field indicates how long a ticket is valid for a fare where no transfers are allowed.", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_muni_stop_times_schema.json b/datasets/san_francisco/pipelines/_images/sf_muni_stop_times_schema.json new file mode 100644 index 000000000..b299c06e0 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_muni_stop_times_schema.json @@ -0,0 +1,56 @@ +[ + { + "name": "stop_id", + "type": "INTEGER", + "description": "Identifies the serviced stop. Multiple routes can use the same stop.", + "mode": "nullable" + }, + { + "name": "trip_id", + "type": "INTEGER", + "description": "Identifies a trip. Multiple trips can have the same stop", + "mode": "nullable" + }, + { + "name": "stop_sequence", + "type": "INTEGER", + "description": "Identifies the order of the stops for a particular trip. The values for stop_sequence must increase throughout the trip but do not need to be consecutive.", + "mode": "nullable" + }, + { + "name": "arrival_time", + "type": "TIME", + "description": "Specifies the scheduled arrival time at a specific stop for a specific trip on a route.", + "mode": "nullable" + }, + { + "name": "arrives_next_day", + "type": "BOOLEAN", + "description": "Several routes begin before midnight, with service continuing throughout the night. This variable specifies whether arrival_time occurs after midnight the day after the scheduled day. This is useful for identifying routes that begin one day and end the next day", + "mode": "nullable" + }, + { + "name": "departure_time", + "type": "TIME", + "description": "Specifies the scheduled departure time at a specific stop for a specific trip on a route.", + "mode": "nullable" + }, + { + "name": "departs_next_day", + "type": "BOOLEAN", + "description": "Several routes begin before midnight, with service continuing throughout the night. This variable specifies whether departure_time occurs after midnight the day after the scheduled day. This is useful for identifying routes that begin one day and end the next day", + "mode": "nullable" + }, + { + "name": "dropoff_type", + "type": "STRING", + "description": "Indicates whether riders are dropped off at a stop as part of the normal schedule or whether a drop off at the stop isn't available. Available options: - regular - none - phone (indicates must phone agency to arrange drop off) - driver (indicates must coordinate with driver to arrange drop off)", + "mode": "nullable" + }, + { + "name": "exact_timepoint", + "type": "BOOLEAN", + "description": "Indicates if the specified arrival and departure times for a stop are strictly adhered to by the transit vehicle, or if they're instead approximate or interpolated times.", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/_images/sf_muni_trips_schema.json b/datasets/san_francisco/pipelines/_images/sf_muni_trips_schema.json new file mode 100644 index 000000000..11c6b6b21 --- /dev/null +++ b/datasets/san_francisco/pipelines/_images/sf_muni_trips_schema.json @@ -0,0 +1,50 @@ +[ + { + "name": "trip_id", + "type": "STRING", + "description": "Unique identifier for each trip", + "mode": "nullable" + }, + { + "name": "route_id", + "type": "STRING", + "description": "Unique identifier for each route. A single route will have multiple trips", + "mode": "nullable" + }, + { + "name": "direction", + "type": "STRING", + "description": "Indicates the direction of travel for a trip. The following values are valid: - O: Outbound travel - I: Inbound travel This field is not used in routing, but instead provides a way to separate trips by direction", + "mode": "nullable" + }, + { + "name": "block_id", + "type": "STRING", + "description": "Identifies the block to which the trip belongs. A block consists of a single trip or many sequential trips made with the same vehicle. The trips are grouped into a block by the use of a shared service day andblock_id. A block_id can include trips with different service days, which then makes distinct blocks. For more details, see: https://developers.google.com/transit/gtfs/reference/#example-showing-blocks-and-service-day", + "mode": "nullable" + }, + { + "name": "service_category", + "type": "STRING", + "description": "Indicates the type of service for this trip", + "mode": "nullable" + }, + { + "name": "trip_headsign", + "type": "STRING", + "description": "Contains the text that appears on signage that identifies the trip's destination to riders. Use this field to distinguish between different patterns of service on the same route.", + "mode": "nullable" + }, + { + "name": "shape_id", + "type": "STRING", + "description": "Unique identifier for the geospatial shape that describes the vehicle travel for a trip along individual points. Use to JOIN with the shapes available in the shapes table", + "mode": "nullable" + }, + { + "name": "trip_shape", + "type": "GEOGRAPHY", + "description": "Geographical representation of the trip's entire route.", + "mode": "nullable" + } +] diff --git a/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml b/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml index cd93686e7..b4a9f2a04 100644 --- a/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml +++ b/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml @@ -179,6 +179,22 @@ dag: TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/san_francisco/transit_municipal_calendar/data_output.csv" SCHEMA_PATH: "data/san_francisco/schema/sf_calendar_schema.json" + FILTER_HEADERS_LIST: >- + [ + "service_id", + "start_date", + "end_date", + "service_description", + "date", + "exception_type_str", + "monday_str", + "tuesday_str", + "wednesday_str", + "thursday_str", + "friday_str", + "saturday_str", + "sunday_str" + ] REORDER_HEADERS_LIST: >- [ "service_id", "service_desc", @@ -321,6 +337,203 @@ dag: limit_memory: "8G" limit_cpu: "3" request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Stop Times Pipeline" + args: + task_id: "sf_muni_stop_times" + name: "muni_stop_times" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Stop Times" + SOURCE_URL_DICT: >- + { + "stop_times": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/stop_times.txt" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_stop_times.csv" + TARGET_FILE: "files/data_output_municipal_stop_times.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "stop_times" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_stop_times/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_muni_stop_times_schema.json" + RENAME_HEADERS_LIST: >- + { + "drop_off_type": "dropoff_type" + } + STARTS_WITH_PATTERN_LIST: >- + [ + [ ["dropoff_type", "dropoff_type"], [ "0", "regular" ] ], + [ ["dropoff_type", "dropoff_type"], [ "1", "none" ] ], + [ ["dropoff_type", "dropoff_type"], [ "2", "phone" ] ], + [ ["dropoff_type", "dropoff_type"], [ "3", "driver" ] ], + [ ["pickup_type", "pickup_type"], [ "0", "regular" ] ], + [ ["pickup_type", "pickup_type"], [ "1", "none" ] ], + [ ["pickup_type", "pickup_type"], [ "2", "phone" ] ], + [ ["pickup_type", "pickup_type"], [ "3", "driver" ] ], + [ ["exact_timepoint", "timepoint"], [ "0", "FALSE" ] ], + [ ["exact_timepoint", "timepoint"], [ "1", "TRUE" ] ], + [ ["arrives_next_day", "arrival_time"], ["24", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["25", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["26", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["27", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["28", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["29", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["30", "TRUE"] ], + [ ["arrives_next_day", "arrival_time"], ["31", "TRUE"] ], + [ ["arrival_time", "arrival_time"], ["^24(.*)", "00$2"] ], + [ ["arrival_time", "arrival_time"], ["^25(.*)", "01$2"] ], + [ ["arrival_time", "arrival_time"], ["^26(.*)", "02$2"] ], + [ ["arrival_time", "arrival_time"], ["^27(.*)", "03$2"] ], + [ ["arrival_time", "arrival_time"], ["^28(.*)", "04$2"] ], + [ ["arrival_time", "arrival_time"], ["^29(.*)", "05$2"] ], + [ ["arrival_time", "arrival_time"], ["^30(.*)", "06$2"] ], + [ ["arrival_time", "arrival_time"], ["^31(.*)", "07$2"] ], + [ ["departs_next_day", "departure_time"], ["24", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["25", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["26", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["27", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["28", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["29", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["30", "TRUE"] ], + [ ["departs_next_day", "departure_time"], ["31", "TRUE"] ], + [ ["departure_time", "departure_time"], ["^24(.*)", "00$2"] ], + [ ["departure_time", "departure_time"], ["^25(.*)", "01$2"] ], + [ ["departure_time", "departure_time"], ["^26(.*)", "02$2"] ], + [ ["departure_time", "departure_time"], ["^27(.*)", "03$2"] ], + [ ["departure_time", "departure_time"], ["^28(.*)", "04$2"] ], + [ ["departure_time", "departure_time"], ["^29(.*)", "05$2"] ], + [ ["departure_time", "departure_time"], ["^30(.*)", "06$2"] ], + [ ["departure_time", "departure_time"], ["^31(.*)", "07$2"] ] + ] + REORDER_HEADERS_LIST: >- + [ + "stop_id", + "trip_id", + "stop_sequence", + "arrival_time", + "arrives_next_day", + "departure_time", + "departs_next_day", + "dropoff_type", + "exact_timepoint" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Fares Pipeline" + args: + task_id: "sf_muni_fares" + name: "muni_fares" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Fares" + SOURCE_URL_DICT: >- + { + "fare_rider_categories": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/fare_rider_categories.txt", + "rider_categories": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/rider_categories.txt", + "fare_attributes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/fare_attributes.txt" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_muni_fares.csv" + TARGET_FILE: "files/data_output_municipal_muni_fares.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "fares" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_fares/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_muni_fares_schema.json" + RENAME_HEADERS_LIST: >- + { + "rider_category_description": "rider_desc", + "rider_category_id": "rider_id", + "transfers": "transfers_permitted", + "price_x": "price" + } + STARTS_WITH_PATTERN_LIST: >- + [ + [ ["payment_type", "payment_type"], [ "0", "during" ] ], + [ ["payment_type", "payment_type"], [ "1", "after" ] ] + ] + REORDER_HEADERS_LIST: >- + [ + "fare_id", + "rider_id", + "rider_desc", + "price", + "payment_method", + "transfers_permitted", + "transfer_duration" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" + - operator: "KubernetesPodOperator" + description: "Run San Francisco Municipal Trips Pipeline" + args: + task_id: "sf_muni_trips" + name: "muni_trips" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE_NAME: "San Francisco Municipal Trips" + SOURCE_URL_DICT: >- + { + "trips": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/trips.txt", + "simple_trips": "https://data.sfgov.org/api/views/9exe-acju/rows.csv" + } + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_municipal_muni_trips.csv" + TARGET_FILE: "files/data_output_municipal_muni_trips.csv" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "san_francisco_transit_muni" + TABLE_ID: "trips" + DROP_DEST_TABLE: "N" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/san_francisco/transit_municipal_trips/data_output.csv" + SCHEMA_PATH: "data/san_francisco/schema/sf_muni_trips_schema.json" + RENAME_HEADERS_LIST: >- + { + "DIRECTION": "direction_old", + "direction_id": "direction", + "SERVICE_CA": "service_category", + "shape": "trip_shape" + } + STARTS_WITH_PATTERN_LIST: >- + [ + [ ["direction", "direction"], [ "0", "O" ] ], + [ ["direction", "direction"], [ "1", "I" ] ], + [ ["SERVICE_CA", "SERVICE_CA"], [ "nan", "" ] ] + ] + REORDER_HEADERS_LIST: >- + [ + "trip_id", + "route_id", + "direction", + "block_id", + "service_category", + "trip_headsign", + "shape_id", + "trip_shape" + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + request_ephemeral_storage: "10G" - operator: "KubernetesPodOperator" description: "Run San Francisco Police Department Incidents Pipeline" args: @@ -1106,4 +1319,4 @@ dag: request_ephemeral_storage: "10G" graph_paths: - - "[ sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees ] >> sf_bikeshare_trips >> [ sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops ] >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests" + - "[ sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees ] >> sf_bikeshare_trips >> [ sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops, sf_muni_stop_times, sf_muni_fares, sf_muni_trips ] >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests" diff --git a/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py b/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py index 7c4623f46..72da5840b 100644 --- a/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py +++ b/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py @@ -93,6 +93,7 @@ "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_calendar/data_output.csv", "SCHEMA_PATH": "data/san_francisco/schema/sf_calendar_schema.json", + "FILTER_HEADERS_LIST": '[\n "service_id",\n "start_date",\n "end_date",\n "service_description",\n "date",\n "exception_type_str",\n "monday_str",\n "tuesday_str",\n "wednesday_str",\n "thursday_str",\n "friday_str",\n "saturday_str",\n "sunday_str"\n]', "REORDER_HEADERS_LIST": '[\n "service_id", "service_desc",\n "monday", "tuesday", "wednesday",\n "thursday", "friday", "saturday", "sunday",\n "exceptions", "exception_type"\n]', "RENAME_HEADERS_LIST": '{\n "monday_str": "monday",\n "tuesday_str": "tuesday",\n "wednesday_str": "wednesday",\n "thursday_str": "thursday",\n "friday_str": "friday",\n "saturday_str": "saturday",\n "sunday_str": "sunday",\n "service_description": "service_desc",\n "date": "exceptions",\n "exception_type_str": "exception_type"\n}', }, @@ -194,6 +195,102 @@ }, ) + # Run San Francisco Municipal Stop Times Pipeline + sf_muni_stop_times = kubernetes_pod.KubernetesPodOperator( + task_id="sf_muni_stop_times", + name="muni_stop_times", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Stop Times", + "SOURCE_URL_DICT": '{\n "stop_times": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/stop_times.txt"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_stop_times.csv", + "TARGET_FILE": "files/data_output_municipal_stop_times.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "stop_times", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_stop_times/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_stop_times_schema.json", + "RENAME_HEADERS_LIST": '{\n "drop_off_type": "dropoff_type"\n}', + "STARTS_WITH_PATTERN_LIST": '[\n [ ["dropoff_type", "dropoff_type"], [ "0", "regular" ] ],\n [ ["dropoff_type", "dropoff_type"], [ "1", "none" ] ],\n [ ["dropoff_type", "dropoff_type"], [ "2", "phone" ] ],\n [ ["dropoff_type", "dropoff_type"], [ "3", "driver" ] ],\n [ ["pickup_type", "pickup_type"], [ "0", "regular" ] ],\n [ ["pickup_type", "pickup_type"], [ "1", "none" ] ],\n [ ["pickup_type", "pickup_type"], [ "2", "phone" ] ],\n [ ["pickup_type", "pickup_type"], [ "3", "driver" ] ],\n [ ["exact_timepoint", "timepoint"], [ "0", "FALSE" ] ],\n [ ["exact_timepoint", "timepoint"], [ "1", "TRUE" ] ],\n [ ["arrives_next_day", "arrival_time"], ["24", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["25", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["26", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["27", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["28", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["29", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["30", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["31", "TRUE"] ],\n [ ["arrival_time", "arrival_time"], ["^24(.*)", "00$2"] ],\n [ ["arrival_time", "arrival_time"], ["^25(.*)", "01$2"] ],\n [ ["arrival_time", "arrival_time"], ["^26(.*)", "02$2"] ],\n [ ["arrival_time", "arrival_time"], ["^27(.*)", "03$2"] ],\n [ ["arrival_time", "arrival_time"], ["^28(.*)", "04$2"] ],\n [ ["arrival_time", "arrival_time"], ["^29(.*)", "05$2"] ],\n [ ["arrival_time", "arrival_time"], ["^30(.*)", "06$2"] ],\n [ ["arrival_time", "arrival_time"], ["^31(.*)", "07$2"] ],\n [ ["departs_next_day", "departure_time"], ["24", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["25", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["26", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["27", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["28", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["29", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["30", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["31", "TRUE"] ],\n [ ["departure_time", "departure_time"], ["^24(.*)", "00$2"] ],\n [ ["departure_time", "departure_time"], ["^25(.*)", "01$2"] ],\n [ ["departure_time", "departure_time"], ["^26(.*)", "02$2"] ],\n [ ["departure_time", "departure_time"], ["^27(.*)", "03$2"] ],\n [ ["departure_time", "departure_time"], ["^28(.*)", "04$2"] ],\n [ ["departure_time", "departure_time"], ["^29(.*)", "05$2"] ],\n [ ["departure_time", "departure_time"], ["^30(.*)", "06$2"] ],\n [ ["departure_time", "departure_time"], ["^31(.*)", "07$2"] ]\n]', + "REORDER_HEADERS_LIST": '[\n "stop_id",\n "trip_id",\n "stop_sequence",\n "arrival_time",\n "arrives_next_day",\n "departure_time",\n "departs_next_day",\n "dropoff_type",\n "exact_timepoint"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Municipal Fares Pipeline + sf_muni_fares = kubernetes_pod.KubernetesPodOperator( + task_id="sf_muni_fares", + name="muni_fares", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Fares", + "SOURCE_URL_DICT": '{\n "fare_rider_categories": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/fare_rider_categories.txt",\n "rider_categories": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/rider_categories.txt",\n "fare_attributes": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/fare_attributes.txt"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_muni_fares.csv", + "TARGET_FILE": "files/data_output_municipal_muni_fares.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "fares", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_fares/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_fares_schema.json", + "RENAME_HEADERS_LIST": '{\n "rider_category_description": "rider_desc",\n "rider_category_id": "rider_id",\n "transfers": "transfers_permitted",\n "price_x": "price"\n}', + "STARTS_WITH_PATTERN_LIST": '[\n [ ["payment_type", "payment_type"], [ "0", "during" ] ],\n [ ["payment_type", "payment_type"], [ "1", "after" ] ]\n]', + "REORDER_HEADERS_LIST": '[\n "fare_id",\n "rider_id",\n "rider_desc",\n "price",\n "payment_method",\n "transfers_permitted",\n "transfer_duration"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Run San Francisco Municipal Trips Pipeline + sf_muni_trips = kubernetes_pod.KubernetesPodOperator( + task_id="sf_muni_trips", + name="muni_trips", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE_NAME": "San Francisco Municipal Trips", + "SOURCE_URL_DICT": '{\n "trips": "gs://pdp-feeds-staging/SF_Muni/GTFSTransitData_SF/trips.txt",\n "simple_trips": "https://data.sfgov.org/api/views/9exe-acju/rows.csv"\n}', + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_municipal_muni_trips.csv", + "TARGET_FILE": "files/data_output_municipal_muni_trips.csv", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "san_francisco_transit_muni", + "TABLE_ID": "trips", + "DROP_DEST_TABLE": "N", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/san_francisco/transit_municipal_trips/data_output.csv", + "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_trips_schema.json", + "RENAME_HEADERS_LIST": '{\n "DIRECTION": "direction_old",\n "direction_id": "direction",\n "SERVICE_CA": "service_category",\n "shape": "trip_shape"\n}', + "STARTS_WITH_PATTERN_LIST": '[\n [ ["direction", "direction"], [ "0", "O" ] ],\n [ ["direction", "direction"], [ "1", "I" ] ],\n [ ["SERVICE_CA", "SERVICE_CA"], [ "nan", "" ] ]\n]', + "REORDER_HEADERS_LIST": '[\n "trip_id",\n "route_id",\n "direction",\n "block_id",\n "service_category",\n "trip_headsign",\n "shape_id",\n "trip_shape"\n]', + }, + resources={ + "limit_memory": "8G", + "limit_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + # Run San Francisco Police Department Incidents Pipeline sfpd_incidents = kubernetes_pod.KubernetesPodOperator( task_id="sfpd_incidents", @@ -447,7 +544,15 @@ ( [sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees] >> sf_bikeshare_trips - >> [sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops] + >> [ + sf_calendar, + sf_muni_routes, + sf_muni_shapes, + sf_muni_stops, + sf_muni_stop_times, + sf_muni_fares, + sf_muni_trips, + ] >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests