From 7d21778b3968795f4f545c9cada8d0762b4f1a58 Mon Sep 17 00:00:00 2001 From: Nicholas Large <84149918+nlarge-google@users.noreply.github.com> Date: Wed, 26 Oct 2022 14:03:07 -0500 Subject: [PATCH] Fix: Resolve Failures In New york Pipeline And Merge To One Image (#516) --- .../Dockerfile | 0 .../run_csv_transform_kub/csv_transform.py | 754 ++++++++++++++++++ .../requirements.txt | 0 .../csv_transform.py | 404 ---------- .../Dockerfile | 38 - .../csv_transform.py | 492 ------------ .../requirements.txt | 4 - .../Dockerfile | 38 - .../csv_transform.py | 494 ------------ .../requirements.txt | 4 - .../Dockerfile | 38 - .../csv_transform.py | 368 --------- .../requirements.txt | 4 - .../pipelines/new_york/new_york_dag.py | 100 +-- .../new_york/pipelines/new_york/pipeline.yaml | 106 +-- 15 files changed, 857 insertions(+), 1987 deletions(-) rename datasets/new_york/pipelines/_images/{run_csv_transform_kub_311_service_requests => run_csv_transform_kub}/Dockerfile (100%) create mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub/csv_transform.py rename datasets/new_york/pipelines/_images/{run_csv_transform_kub_311_service_requests => run_csv_transform_kub}/requirements.txt (100%) delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/csv_transform.py delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/Dockerfile delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/csv_transform.py delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/requirements.txt delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/Dockerfile delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/csv_transform.py delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/requirements.txt delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/Dockerfile delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/csv_transform.py delete mode 100644 datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/requirements.txt diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/Dockerfile b/datasets/new_york/pipelines/_images/run_csv_transform_kub/Dockerfile similarity index 100% rename from datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/Dockerfile rename to datasets/new_york/pipelines/_images/run_csv_transform_kub/Dockerfile diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/new_york/pipelines/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..c929e36d2 --- /dev/null +++ b/datasets/new_york/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,754 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +import os +import pathlib +import typing + +import pandas as pd +import requests +from google.api_core.exceptions import NotFound +from google.cloud import bigquery, storage + + +def main( + pipeline_name: str, + source_url: str, + source_url_stations_json: str, + source_url_status_json: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + table_id: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + schema_path: str, + transform_list: typing.List[str], + data_dtypes: typing.List[str], + null_rows_list: typing.List[str], + parse_dates_list: dict, + rename_headers_list: dict, + reorder_headers_list: typing.List[str], + output_headers_list: typing.List[str], + datetime_fieldlist: typing.List[str], + resolve_datatypes_list: dict, + normalize_data_list: typing.List[str], + boolean_datapoints_list: typing.List[str], + remove_whitespace_list: typing.List[str], + regex_list: typing.List[typing.List], + crash_field_list: typing.List[typing.List], + date_format_list: typing.List[typing.List], +) -> None: + logging.info(f"{pipeline_name} process started") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + successful_completion = execute_pipeline( + source_url=source_url, + source_url_stations_json=source_url_stations_json, + source_url_status_json=source_url_status_json, + source_file=source_file, + target_file=target_file, + project_id=project_id, + dataset_id=dataset_id, + destination_table=table_id, + chunksize=chunksize, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + schema_path=schema_path, + transform_list=transform_list, + data_dtypes=data_dtypes, + null_rows_list=null_rows_list, + parse_dates_list=parse_dates_list, + rename_headers_list=rename_headers_list, + output_headers_list=output_headers_list, + datetime_fieldlist=datetime_fieldlist, + resolve_datatypes_list=resolve_datatypes_list, + normalize_data_list=normalize_data_list, + boolean_datapoints_list=boolean_datapoints_list, + remove_whitespace_list=remove_whitespace_list, + regex_list=regex_list, + crash_field_list=crash_field_list, + date_format_list=date_format_list, + reorder_headers_list=reorder_headers_list, + ) + if successful_completion: + logging.info(f"{pipeline_name} process completed") + else: + logging.info(f"{pipeline_name} process was unknown and failed") + + +def execute_pipeline( + source_url: str, + source_url_stations_json: str, + source_url_status_json: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + project_id: str, + dataset_id: str, + destination_table: str, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + schema_path: str, + transform_list: typing.List[str], + data_dtypes: typing.List[str], + parse_dates_list: dict, + null_rows_list: typing.List[str], + rename_headers_list: dict, + output_headers_list: typing.List[str], + datetime_fieldlist: typing.List[str], + resolve_datatypes_list: dict, + reorder_headers_list: typing.List[str], + regex_list: typing.List[typing.List], + crash_field_list: typing.List[typing.List], + date_format_list: typing.List[typing.List], + normalize_data_list: typing.List[str], + boolean_datapoints_list: typing.List[str], + remove_whitespace_list: typing.List[str], +) -> bool: + if destination_table not in [ + "311_service_requests", + "citibike_stations", + "nypd_mv_collisions", + "tree_census_1995", + ]: + logging.info("Unknown pipeline") + return False + else: + sep = "," + if destination_table == "311_service_requests": + download_file(source_url, source_file) + elif destination_table == "citibike_stations": + download_and_merge_source_files( + source_url_stations_json=source_url_stations_json, + source_url_status_json=source_url_status_json, + source_file=source_file, + resolve_datatypes_list=resolve_datatypes_list, + normalize_data_list=normalize_data_list, + boolean_datapoints_list=boolean_datapoints_list, + ) + sep = "|" + elif destination_table == "nypd_mv_collisions": + download_file(source_url, source_file) + elif destination_table == "tree_census_1995": + download_file(source_url=source_url, source_file=source_file) + process_source_file( + source_file=source_file, + target_file=target_file, + chunksize=chunksize, + data_dtypes=data_dtypes, + parse_dates_list=parse_dates_list, + null_rows_list=null_rows_list, + rename_headers_list=rename_headers_list, + output_headers_list=output_headers_list, + destination_table=destination_table, + transform_list=transform_list, + reorder_headers_list=reorder_headers_list, + datetime_fieldlist=datetime_fieldlist, + resolve_datatypes_list=resolve_datatypes_list, + regex_list=regex_list, + remove_whitespace_list=remove_whitespace_list, + crash_field_list=crash_field_list, + date_format_list=date_format_list, + sep=sep, + ) + if os.path.exists(target_file): + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=True, + ) + else: + error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." + raise ValueError(error_msg) + else: + logging.info( + f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." + ) + return True + + +def download_and_merge_source_files( + source_url_stations_json: str, + source_url_status_json: str, + source_file: str, + resolve_datatypes_list: dict, + normalize_data_list: typing.List[str], + boolean_datapoints_list: typing.List[str], +) -> None: + source_file_stations_csv = str(source_file).replace(".csv", "") + "_stations.csv" + source_file_stations_json = str(source_file).replace(".csv", "") + "_stations" + source_file_status_csv = str(source_file).replace(".csv", "") + "_status.csv" + source_file_status_json = str(source_file).replace(".csv", "") + "_status" + download_file_json( + source_url_stations_json, source_file_stations_json, source_file_stations_csv + ) + download_file_json( + source_url_status_json, source_file_status_json, source_file_status_csv + ) + df_stations = pd.read_csv( + source_file_stations_csv, engine="python", encoding="utf-8", quotechar='"' + ) + df_status = pd.read_csv( + source_file_status_csv, engine="python", encoding="utf-8", quotechar='"' + ) + logging.info("Merging files") + df = df_stations.merge(df_status, left_on="station_id", right_on="station_id") + df = clean_data_points( + df, + resolve_datatypes_list=resolve_datatypes_list, + normalize_data_list=normalize_data_list, + boolean_datapoints_list=boolean_datapoints_list, + ) + save_to_new_file(df, source_file) + + +def download_file_json( + source_url: str, source_file_json: pathlib.Path, source_file_csv: pathlib.Path +) -> None: + logging.info(f"Downloading file {source_url}.json.") + r = requests.get(source_url + ".json", stream=True) + with open(source_file_json + ".json", "wb") as f: + for chunk in r: + f.write(chunk) + df = pd.read_json(source_file_json + ".json")["data"]["stations"] + df = pd.DataFrame(df) + df.to_csv(source_file_csv, index=False) + + +def resolve_datatypes(df: pd.DataFrame, resolve_datatypes_list: dict) -> pd.DataFrame: + for column, datatype in resolve_datatypes_list.items(): + logging.info(f"Resolving datatype for column {column} to {datatype}") + if datatype.lower() in ("int64", "float"): + df[column] = df[column].fillna(0).astype(datatype) + else: + df[column] = df[column].astype(datatype) + return df + + +def normalize_data( + df: pd.DataFrame, normalize_data_list: typing.List[str] +) -> pd.DataFrame: + for column in normalize_data_list: + logging.info(f"Normalizing data in column {column}") + # Data is in list format in this column. + # Therefore remove square brackets and single quotes + df[column] = ( + str(pd.Series(df[column])[0]) + .replace("[", "") + .replace("'", "") + .replace("]", "") + ) + return df + + +def resolve_boolean_datapoints( + df: pd.DataFrame, boolean_datapoints_list: typing.List[str] +) -> pd.DataFrame: + for column in boolean_datapoints_list: + logging.info(f"Resolving boolean datapoints in column {column}") + df[column] = df[column].apply(lambda x: "True" if x == "0" else "False") + return df + + +def process_source_file( + source_file: str, + target_file: str, + destination_table: str, + chunksize: str, + data_dtypes: dict, + parse_dates_list: typing.List[str], + null_rows_list: typing.List[str], + rename_headers_list: dict, + output_headers_list: typing.List[str], + transform_list: typing.List[str], + reorder_headers_list: typing.List[str], + datetime_fieldlist: typing.List[str], + resolve_datatypes_list: dict, + regex_list: typing.List[typing.List], + remove_whitespace_list: typing.List[str], + crash_field_list: typing.List[typing.List], + date_format_list: typing.List[typing.List], + sep: str = ",", +) -> None: + logging.info(f"Processing file {source_file}") + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), + dtype=data_dtypes, + parse_dates=parse_dates_list, + sep=sep, + ) as reader: + for chunk_number, chunk in enumerate(reader): + logging.info(f"Processing batch {chunk_number}") + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + target_file_batch=target_file_batch, + target_file=target_file, + destination_table=destination_table, + skip_header=(not chunk_number == 0), + rename_headers_list=rename_headers_list, + null_rows_list=null_rows_list, + parse_dates_list=parse_dates_list, + reorder_headers_list=reorder_headers_list, + transform_list=transform_list, + output_headers_list=output_headers_list, + datetime_fieldlist=datetime_fieldlist, + resolve_datatypes_list=resolve_datatypes_list, + regex_list=regex_list, + remove_whitespace_list=remove_whitespace_list, + crash_field_list=crash_field_list, + date_format_list=date_format_list, + ) + + +def load_data_to_bq( + project_id: str, + dataset_id: str, + table_id: str, + file_path: str, + truncate_table: bool, +) -> None: + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" + ) + client = bigquery.Client(project=project_id) + table_ref = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.field_delimiter = "|" + if truncate_table: + job_config.write_disposition = "WRITE_TRUNCATE" + else: + job_config.write_disposition = "WRITE_APPEND" + job_config.skip_leading_rows = 1 # ignore the header + job_config.autodetect = False + with open(file_path, "rb") as source_file: + job = client.load_table_from_file(source_file, table_ref, job_config=job_config) + job.result() + logging.info( + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" + ) + + +def create_dest_table( + project_id: str, + dataset_id: str, + table_id: str, + schema_filepath: list, + bucket_name: str, +) -> bool: + table_ref = f"{project_id}.{dataset_id}.{table_id}" + logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") + client = bigquery.Client() + table_exists = False + try: + table = client.get_table(table_ref) + table_exists_id = table.table_id + logging.info(f"Table {table_exists_id} currently exists.") + except NotFound: + table = None + if not table: + logging.info( + ( + f"Table {table_ref} currently does not exist. Attempting to create table." + ) + ) + if check_gcs_file_exists(schema_filepath, bucket_name): + schema = create_table_schema([], bucket_name, schema_filepath) + table = bigquery.Table(table_ref, schema=schema) + client.create_table(table) + print(f"Table {table_ref} was created".format(table_id)) + table_exists = True + else: + file_name = os.path.split(schema_filepath)[1] + file_path = os.path.split(schema_filepath)[0] + logging.info( + f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" + ) + table_exists = False + else: + table_exists = True + return table_exists + + +def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) + return exists + + +def create_table_schema( + schema_structure: list, bucket_name: str = "", schema_filepath: str = "" +) -> list: + logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") + schema = [] + if not (schema_filepath): + schema_struct = schema_structure + else: + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(schema_filepath) + schema_struct = json.loads(blob.download_as_string(client=None)) + for schema_field in schema_struct: + fld_name = schema_field["name"] + fld_type = schema_field["type"] + try: + fld_descr = schema_field["description"] + except KeyError: + fld_descr = "" + fld_mode = schema_field["mode"] + schema.append( + bigquery.SchemaField( + name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr + ) + ) + return schema + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + with open(batch_file_path, "r") as data_file: + if truncate_file: + target_file = open(target_file_path, "w+").close() + with open(target_file_path, "a+") as target_file: + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path}" + ) + target_file.write(data_file.read()) + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def process_chunk( + df: pd.DataFrame, + target_file_batch: str, + target_file: str, + destination_table: str, + skip_header: bool, + transform_list: typing.List[str], + rename_headers_list: dict, + output_headers_list: typing.List[str], + null_rows_list: typing.List[str], + parse_dates_list: typing.List[str], + reorder_headers_list: typing.List[str], + datetime_fieldlist: typing.List[str], + resolve_datatypes_list: dict, + regex_list: typing.List[typing.List], + remove_whitespace_list: typing.List[str], + crash_field_list: typing.List[typing.List], + date_format_list: typing.List[typing.List], +) -> None: + logging.info(f"Processing batch file {target_file_batch}") + if destination_table == "311_service_requests": + df = parse_date_formats(df, parse_dates_list) + df = rename_headers(df, rename_headers_list) + df = remove_null_rows(df, null_rows_list) + df = reorder_headers(df, reorder_headers_list) + if destination_table == "citibike_stations": + df = convert_datetime_from_int(df, datetime_fieldlist) + df = rename_headers(df, rename_headers_list) + df = reorder_headers(df, output_headers_list) + if destination_table == "nypd_mv_collisions": + for transform in transform_list: + if transform == "replace_regex": + df = replace_regex(df, regex_list) + elif transform == "add_crash_timestamp": + for fld in crash_field_list: + new_crash_field = fld[0] + crash_date_field = fld[1] + crash_time_field = fld[2] + df[new_crash_field] = "" + df = add_crash_timestamp( + df, new_crash_field, crash_date_field, crash_time_field + ) + elif transform == "convert_date_format": + df = resolve_date_format(df, date_format_list) + elif transform == "resolve_datatypes": + df = resolve_datatypes(df, resolve_datatypes_list) + elif transform == "rename_headers": + df = rename_headers(df, rename_headers_list) + elif transform == "reorder_headers": + df = reorder_headers(df, reorder_headers_list) + if destination_table == "tree_census_1995": + df = rename_headers(df, rename_headers_list) + df = remove_whitespace(df, remove_whitespace_list) + df = reorder_headers(df, reorder_headers_list) + if not df.empty: + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file( + batch_file_path=target_file_batch, + target_file_path=target_file, + skip_header=skip_header, + truncate_file=not (skip_header), + ) + logging.info(f"Processing batch file {target_file_batch} completed") + + +def add_crash_timestamp( + df: pd.DataFrame, new_crash_field: str, crash_date_field: str, crash_time_field: str +) -> pd.DataFrame: + logging.info( + f"add_crash_timestamp '{new_crash_field}' '{crash_date_field}' '{crash_time_field}'" + ) + df[new_crash_field] = df.apply( + lambda x, crash_date_field, crash_time_field: crash_timestamp( + x["" + crash_date_field], x["" + crash_time_field] + ), + args=[crash_date_field, crash_time_field], + axis=1, + ) + return df + + +def crash_timestamp(crash_date: str, crash_time: str) -> str: + # if crash time format is H:MM then convert to HH:MM:SS + if len(crash_time) == 4: + crash_time = f"0{crash_time}:00" + return f"{crash_date} {crash_time}" + + +def remove_whitespace( + df: pd.DataFrame, remove_whitespace_list: typing.List[str] +) -> pd.DataFrame: + for column in remove_whitespace_list: + logging.info(f"Removing whitespace in column {column}..") + df[column] = df[column].apply(lambda x: str(x).strip()) + return df + + +def replace_regex(df: pd.DataFrame, regex_list: dict) -> pd.DataFrame: + for regex_item in regex_list: + field_name = regex_item[0] + search_expr = regex_item[1] + replace_expr = regex_item[2] + logging.info( + f"Replacing data via regex on field {field_name} '{field_name}' '{search_expr}' '{replace_expr}'" + ) + df[field_name] = df[field_name].replace( + r"" + search_expr, replace_expr, regex=True + ) + return df + + +def convert_datetime_from_int( + df: pd.DataFrame, datetime_columns_list: typing.List[str] +) -> pd.DataFrame: + for column in datetime_columns_list: + logging.info(f"Converting Datetime column {column}") + df[column] = df[column].astype(str).astype(int).apply(datetime_from_int) + return df + + +def datetime_from_int(dt_int: int) -> str: + return datetime.datetime.fromtimestamp(dt_int).strftime("%Y-%m-%d %H:%M:%S") + + +def clean_data_points( + df: pd.DataFrame, + resolve_datatypes_list: dict, + normalize_data_list: typing.List[str], + boolean_datapoints_list: typing.List[str], +) -> pd.DataFrame: + df = resolve_datatypes(df, resolve_datatypes_list) + df = normalize_data(df, normalize_data_list) + df = resolve_boolean_datapoints(df, boolean_datapoints_list) + return df + + +def remove_null_rows( + df: pd.DataFrame, null_rows_list: typing.List[str] +) -> pd.DataFrame: + logging.info("Removing rows with empty keys") + for column in null_rows_list: + df = df[df[column] != ""] + return df + + +def reorder_headers(df: pd.DataFrame, output_headers: typing.List[str]) -> pd.DataFrame: + logging.info("Reordering headers..") + return df[output_headers] + + +def resolve_date_format(df: pd.DataFrame, date_fields: list = []) -> pd.DataFrame: + for dt_fld in date_fields: + field_name = dt_fld[0] + logging.info(f"Resolving date format in column {field_name}") + from_format = dt_fld[1] + to_format = dt_fld[2] + df[field_name] = df[field_name].apply( + lambda x: convert_dt_format(str(x), from_format, to_format) + ) + return df + + +def convert_dt_format( + dt_str: str, from_format: str, to_format: str = "%Y-%m-%d %H:%M:%S" +) -> str: + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + dt_str = "" + return dt_str + else: + if from_format == "%Y%m%d": + year = dt_str[0:4] + month = dt_str[4:6] + day = dt_str[6:8] + dt_str = f"{year}-{month}-{day} 00:00:00" + from_format = "%Y-%m-%d %H:%M:%S" + elif len(dt_str.strip().split(" ")[1]) == 8: + # if format of time portion is 00:00:00 then use 00:00 format + dt_str = dt_str[:-3] + elif (len(dt_str.strip().split("-")[0]) == 4) and ( + len(from_format.strip().split("/")[0]) == 2 + ): + # if the format of the date portion of the data is in YYYY-MM-DD format + # and from_format is in MM-DD-YYYY then resolve this by modifying the from_format + # to use the YYYY-MM-DD. This resolves mixed date formats in files + from_format = "%Y-%m-%d " + from_format.strip().split(" ")[1] + return datetime.datetime.strptime(dt_str, from_format).strftime(to_format) + + +def parse_date_formats(df: pd.DataFrame, parse_dates: typing.List[str]) -> pd.DataFrame: + for dt_fld in parse_dates: + logging.info(f"Evaluating date format in column {dt_fld}") + df[dt_fld] = df[dt_fld].apply(parse_date_format_value) + return df + + +def parse_date_format_value(dt_str: str) -> str: + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + return "" + elif ( + str(dt_str).strip()[2] == "/" + ): # if there is a '/' in 3rd position, then we have a date format mm/dd/yyyy + return datetime.datetime.strptime(dt_str, "%m/%d/%Y %H:%M:%S %p").strftime( + "%Y-%m-%d %H:%M:%S" + ) + else: + return str(dt_str) + + +def rename_headers(df: pd.DataFrame, header_names: dict) -> pd.DataFrame: + logging.info("Renaming Headers") + df = df.rename(columns=header_names) + return df + + +def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: + logging.info(f"Saving data to target file.. {file_path} ...") + df.to_csv(file_path, index=False, sep=sep) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} to {source_file}") + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def upload_file_to_gcs( + file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + if os.path.exists(file_path): + logging.info( + f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" + ) + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(file_path) + else: + logging.info( + f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." + ) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + pipeline_name=os.environ.get("PIPELINE_NAME", ""), + source_url=os.environ.get("SOURCE_URL", ""), + chunksize=os.environ.get("CHUNKSIZE", ""), + source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(), + target_file=pathlib.Path(os.environ.get("TARGET_FILE", "")).expanduser(), + project_id=os.environ.get("PROJECT_ID", ""), + dataset_id=os.environ.get("DATASET_ID", ""), + table_id=os.environ.get("TABLE_ID", ""), + target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""), + target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""), + schema_path=os.environ.get("SCHEMA_PATH", ""), + data_dtypes=json.loads(os.environ.get("DATA_DTYPES", "{}")), + null_rows_list=json.loads(os.environ.get("NULL_ROWS_LIST", "[]")), + parse_dates_list=json.loads(os.environ.get("PARSE_DATES", "{}")), + rename_headers_list=json.loads(os.environ.get("RENAME_HEADERS_LIST", "{}")), + reorder_headers_list=json.loads(os.environ.get("REORDER_HEADERS_LIST", "[]")), + output_headers_list=json.loads(os.environ.get("OUTPUT_CSV_HEADERS", "[]")), + source_url_stations_json=os.environ.get("SOURCE_URL_STATIONS_JSON", ""), + source_url_status_json=os.environ.get("SOURCE_URL_STATUS_JSON", ""), + transform_list=json.loads(os.environ.get("TRANSFORM_LIST", "[]")), + datetime_fieldlist=json.loads(os.environ.get("DATETIME_FIELDLIST", "[]")), + resolve_datatypes_list=json.loads( + os.environ.get("RESOLVE_DATATYPES_LIST", "{}") + ), + normalize_data_list=json.loads(os.environ.get("NORMALIZE_DATA_LIST", "[]")), + boolean_datapoints_list=json.loads( + os.environ.get("BOOLEAN_DATAPOINTS_LIST", "[]") + ), + remove_whitespace_list=json.loads( + os.environ.get("REMOVE_WHITESPACE_LIST", "[]") + ), + regex_list=json.loads(os.environ.get("REGEX_LIST", "[]")), + crash_field_list=json.loads(os.environ.get("CRASH_FIELD_LIST", "[]")), + date_format_list=json.loads(os.environ.get("DATE_FORMAT_LIST", "[]")), + ) diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/requirements.txt b/datasets/new_york/pipelines/_images/run_csv_transform_kub/requirements.txt similarity index 100% rename from datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/requirements.txt rename to datasets/new_york/pipelines/_images/run_csv_transform_kub/requirements.txt diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/csv_transform.py b/datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/csv_transform.py deleted file mode 100644 index 18f9ce14a..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_311_service_requests/csv_transform.py +++ /dev/null @@ -1,404 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime -import json -import logging -import os -import pathlib -import typing - -import pandas as pd -import requests -from google.api_core.exceptions import NotFound -from google.cloud import bigquery, storage - - -def main( - pipeline_name: str, - source_url: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - table_id: str, - chunksize: str, - target_gcs_bucket: str, - target_gcs_path: str, - schema_path: str, - data_dtypes: typing.List[str], - null_rows_list: typing.List[str], - parse_dates: dict, - rename_headers_list: dict, - output_headers_list: typing.List[str], -) -> None: - logging.info(f"{pipeline_name} process started") - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - execute_pipeline( - source_url=source_url, - source_file=source_file, - target_file=target_file, - project_id=project_id, - dataset_id=dataset_id, - destination_table=table_id, - chunksize=chunksize, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - schema_path=schema_path, - data_dtypes=data_dtypes, - null_rows_list=null_rows_list, - parse_dates=parse_dates, - rename_headers_list=rename_headers_list, - output_headers_list=output_headers_list, - ) - logging.info(f"{pipeline_name} process completed") - - -def execute_pipeline( - source_url: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - destination_table: str, - chunksize: str, - target_gcs_bucket: str, - target_gcs_path: str, - schema_path: str, - data_dtypes: typing.List[str], - parse_dates: dict, - null_rows_list: typing.List[str], - rename_headers_list: dict, - output_headers_list: typing.List[str], -) -> None: - download_file(source_url, source_file) - process_source_file( - source_file, - target_file, - chunksize, - data_dtypes, - parse_dates, - null_rows_list, - rename_headers_list, - output_headers_list, - ) - if os.path.exists(target_file): - upload_file_to_gcs( - file_path=target_file, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - ) - table_exists = create_dest_table( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - schema_filepath=schema_path, - bucket_name=target_gcs_bucket, - ) - if table_exists: - load_data_to_bq( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - file_path=target_file, - truncate_table=True, - ) - else: - error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." - raise ValueError(error_msg) - else: - logging.info( - f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." - ) - - -def process_source_file( - source_file: str, - target_file: str, - chunksize: str, - data_dtypes: dict, - parse_dates_list: typing.List[str], - null_rows_list: typing.List[str], - rename_headers_list: dict, - output_headers_list: typing.List[str], -) -> None: - logging.info(f"Processing file {source_file}") - with pd.read_csv( - source_file, - engine="python", - encoding="utf-8", - quotechar='"', - chunksize=int(chunksize), - dtype=data_dtypes, - parse_dates=parse_dates_list, - ) as reader: - for chunk_number, chunk in enumerate(reader): - logging.info(f"Processing batch {chunk_number}") - target_file_batch = str(target_file).replace( - ".csv", "-" + str(chunk_number) + ".csv" - ) - df = pd.DataFrame() - df = pd.concat([df, chunk]) - process_chunk( - df=df, - target_file_batch=target_file_batch, - target_file=target_file, - skip_header=(not chunk_number == 0), - rename_headers_list=rename_headers_list, - null_rows_list=null_rows_list, - parse_dates_list=parse_dates_list, - reorder_headers_list=output_headers_list, - ) - - -def load_data_to_bq( - project_id: str, - dataset_id: str, - table_id: str, - file_path: str, - truncate_table: bool, -) -> None: - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" - ) - client = bigquery.Client(project=project_id) - table_ref = client.dataset(dataset_id).table(table_id) - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.CSV - job_config.field_delimiter = "|" - if truncate_table: - job_config.write_disposition = "WRITE_TRUNCATE" - else: - job_config.write_disposition = "WRITE_APPEND" - job_config.skip_leading_rows = 1 # ignore the header - job_config.autodetect = False - with open(file_path, "rb") as source_file: - job = client.load_table_from_file(source_file, table_ref, job_config=job_config) - job.result() - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" - ) - - -def create_dest_table( - project_id: str, - dataset_id: str, - table_id: str, - schema_filepath: list, - bucket_name: str, -) -> bool: - table_ref = f"{project_id}.{dataset_id}.{table_id}" - logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") - client = bigquery.Client() - table_exists = False - try: - table = client.get_table(table_ref) - table_exists_id = table.table_id - logging.info(f"Table {table_exists_id} currently exists.") - except NotFound: - table = None - if not table: - logging.info( - ( - f"Table {table_ref} currently does not exist. Attempting to create table." - ) - ) - if check_gcs_file_exists(schema_filepath, bucket_name): - schema = create_table_schema([], bucket_name, schema_filepath) - table = bigquery.Table(table_ref, schema=schema) - client.create_table(table) - print(f"Table {table_ref} was created".format(table_id)) - table_exists = True - else: - file_name = os.path.split(schema_filepath)[1] - file_path = os.path.split(schema_filepath)[0] - logging.info( - f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" - ) - table_exists = False - else: - table_exists = True - return table_exists - - -def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) - return exists - - -def create_table_schema( - schema_structure: list, bucket_name: str = "", schema_filepath: str = "" -) -> list: - logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") - schema = [] - if not (schema_filepath): - schema_struct = schema_structure - else: - storage_client = storage.Client() - bucket = storage_client.get_bucket(bucket_name) - blob = bucket.blob(schema_filepath) - schema_struct = json.loads(blob.download_as_string(client=None)) - for schema_field in schema_struct: - fld_name = schema_field["name"] - fld_type = schema_field["type"] - try: - fld_descr = schema_field["description"] - except KeyError: - fld_descr = "" - fld_mode = schema_field["mode"] - schema.append( - bigquery.SchemaField( - name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr - ) - ) - return schema - - -def append_batch_file( - batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool -) -> None: - with open(batch_file_path, "r") as data_file: - if truncate_file: - target_file = open(target_file_path, "w+").close() - with open(target_file_path, "a+") as target_file: - if skip_header: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path} with skip header" - ) - next(data_file) - else: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path}" - ) - target_file.write(data_file.read()) - if os.path.exists(batch_file_path): - os.remove(batch_file_path) - - -def process_chunk( - df: pd.DataFrame, - target_file_batch: str, - target_file: str, - skip_header: bool, - rename_headers_list: dict, - null_rows_list: typing.List[str], - parse_dates_list: typing.List[str], - reorder_headers_list: typing.List[str], -) -> None: - df = resolve_date_format(df, parse_dates_list) - df = rename_headers(df, rename_headers_list) - df = remove_null_rows(df, null_rows_list) - df = reorder_headers(df, reorder_headers_list) - save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) - - -def remove_null_rows( - df: pd.DataFrame, null_rows_list: typing.List[str] -) -> pd.DataFrame: - logging.info("Removing rows with empty keys") - for column in null_rows_list: - df = df[df[column] != ""] - return df - - -def reorder_headers(df: pd.DataFrame, output_headers: typing.List[str]) -> pd.DataFrame: - logging.info("Reordering headers..") - return df[output_headers] - - -def resolve_date_format( - df: pd.DataFrame, parse_dates: typing.List[str] -) -> pd.DataFrame: - for dt_fld in parse_dates: - logging.info(f"Resolving date format in column {dt_fld}") - df[dt_fld] = df[dt_fld].apply(convert_dt_format) - return df - - -def convert_dt_format(dt_str: str) -> str: - if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": - return "" - elif ( - str(dt_str).strip()[2] == "/" - ): # if there is a '/' in 3rd position, then we have a date format mm/dd/yyyy - return datetime.datetime.strptime(dt_str, "%m/%d/%Y %H:%M:%S %p").strftime( - "%Y-%m-%d %H:%M:%S" - ) - else: - return str(dt_str) - - -def rename_headers(df: pd.DataFrame, header_names: dict) -> pd.DataFrame: - logging.info("Renaming Headers") - df = df.rename(columns=header_names) - return df - - -def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: - logging.info(f"Saving data to target file.. {file_path} ...") - df.to_csv(file_path, index=False, sep=sep) - - -def download_file(source_url: str, source_file: pathlib.Path) -> None: - logging.info(f"Downloading {source_url} to {source_file}") - r = requests.get(source_url, stream=True) - if r.status_code == 200: - with open(source_file, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.error(f"Couldn't download {source_url}: {r.text}") - - -def upload_file_to_gcs( - file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str -) -> None: - if os.path.exists(file_path): - logging.info( - f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" - ) - storage_client = storage.Client() - bucket = storage_client.bucket(target_gcs_bucket) - blob = bucket.blob(target_gcs_path) - blob.upload_from_filename(file_path) - else: - logging.info( - f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." - ) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - - main( - pipeline_name=os.environ["PIPELINE_NAME"], - source_url=os.environ["SOURCE_URL"], - chunksize=os.environ["CHUNKSIZE"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - project_id=os.environ["PROJECT_ID"], - dataset_id=os.environ["DATASET_ID"], - table_id=os.environ["TABLE_ID"], - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - schema_path=os.environ["SCHEMA_PATH"], - data_dtypes=json.loads(os.environ["DATA_DTYPES"]), - null_rows_list=json.loads(os.environ["NULL_ROWS_LIST"]), - parse_dates=json.loads(os.environ["PARSE_DATES"]), - rename_headers_list=json.loads(os.environ["RENAME_HEADERS"]), - output_headers_list=json.loads(os.environ["OUTPUT_CSV_HEADERS"]), - ) diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/Dockerfile b/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/Dockerfile deleted file mode 100644 index 85af90570..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/Dockerfile +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim -FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs -ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image -COPY requirements.txt ./ - -# Install the packages specified in the requirements file -RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction -WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* -COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run -CMD ["python3", "csv_transform.py"] diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/csv_transform.py b/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/csv_transform.py deleted file mode 100644 index 46daeb12f..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/csv_transform.py +++ /dev/null @@ -1,492 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime -import json -import logging -import os -import pathlib -import typing - -import pandas as pd -import requests -from google.api_core.exceptions import NotFound -from google.cloud import bigquery, storage - - -def main( - pipeline_name: str, - source_url_stations_json: str, - source_url_status_json: str, - chunksize: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - table_id: str, - target_gcs_bucket: str, - target_gcs_path: str, - schema_path: str, - data_dtypes: dict, - rename_headers_list: dict, - output_headers_list: typing.List[str], - datetime_fieldlist: typing.List[str], - resolve_datatypes_list: dict, - normalize_data_list: typing.List[str], - boolean_datapoints_list: typing.List[str], -) -> None: - logging.info(f"{pipeline_name} process started") - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - execute_pipeline( - source_url_stations_json=source_url_stations_json, - source_url_status_json=source_url_status_json, - chunksize=chunksize, - source_file=source_file, - target_file=target_file, - project_id=project_id, - dataset_id=dataset_id, - destination_table=table_id, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - schema_path=schema_path, - data_dtypes=data_dtypes, - rename_headers_list=rename_headers_list, - output_headers_list=output_headers_list, - datetime_fieldlist=datetime_fieldlist, - resolve_datatypes_list=resolve_datatypes_list, - normalize_data_list=normalize_data_list, - boolean_datapoints_list=boolean_datapoints_list, - ) - logging.info(f"{pipeline_name} process completed") - - -def execute_pipeline( - source_url_stations_json: str, - source_url_status_json: str, - chunksize: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - destination_table: str, - target_gcs_bucket: str, - target_gcs_path: str, - schema_path: str, - data_dtypes: dict, - rename_headers_list: dict, - output_headers_list: typing.List[str], - datetime_fieldlist: typing.List[str], - resolve_datatypes_list: dict, - normalize_data_list: typing.List[str], - boolean_datapoints_list: typing.List[str], -) -> None: - download_and_merge_source_files( - source_url_stations_json=source_url_stations_json, - source_url_status_json=source_url_status_json, - source_file=source_file, - ) - process_source_file( - source_file=source_file, - target_file=target_file, - chunksize=chunksize, - data_dtypes=data_dtypes, - rename_headers_list=rename_headers_list, - output_headers_list=output_headers_list, - datetime_fieldlist=datetime_fieldlist, - resolve_datatypes_list=resolve_datatypes_list, - normalize_data_list=normalize_data_list, - boolean_datapoints_list=boolean_datapoints_list, - ) - if os.path.exists(target_file): - upload_file_to_gcs( - file_path=target_file, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - ) - table_exists = create_dest_table( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - schema_filepath=schema_path, - bucket_name=target_gcs_bucket, - ) - if table_exists: - load_data_to_bq( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - file_path=target_file, - truncate_table=True, - ) - else: - error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." - raise ValueError(error_msg) - else: - logging.info( - f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." - ) - - -def load_data_to_bq( - project_id: str, - dataset_id: str, - table_id: str, - file_path: str, - truncate_table: bool, -) -> None: - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" - ) - client = bigquery.Client(project=project_id) - table_ref = client.dataset(dataset_id).table(table_id) - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.CSV - job_config.field_delimiter = "|" - if truncate_table: - job_config.write_disposition = "WRITE_TRUNCATE" - else: - job_config.write_disposition = "WRITE_APPEND" - job_config.skip_leading_rows = 1 # ignore the header - job_config.autodetect = False - with open(file_path, "rb") as source_file: - job = client.load_table_from_file(source_file, table_ref, job_config=job_config) - job.result() - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" - ) - - -def create_dest_table( - project_id: str, - dataset_id: str, - table_id: str, - schema_filepath: list, - bucket_name: str, -) -> bool: - table_ref = f"{project_id}.{dataset_id}.{table_id}" - logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") - client = bigquery.Client() - table_exists = False - try: - table = client.get_table(table_ref) - table_exists_id = table.table_id - logging.info(f"Table {table_exists_id} currently exists.") - except NotFound: - table = None - if not table: - logging.info( - ( - f"Table {table_ref} currently does not exist. Attempting to create table." - ) - ) - if check_gcs_file_exists(schema_filepath, bucket_name): - schema = create_table_schema([], bucket_name, schema_filepath) - table = bigquery.Table(table_ref, schema=schema) - client.create_table(table) - print(f"Table {table_ref} was created".format(table_id)) - table_exists = True - else: - file_name = os.path.split(schema_filepath)[1] - file_path = os.path.split(schema_filepath)[0] - logging.info( - f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" - ) - table_exists = False - else: - table_exists = True - return table_exists - - -def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) - return exists - - -def create_table_schema( - schema_structure: list, bucket_name: str = "", schema_filepath: str = "" -) -> list: - logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") - schema = [] - if not (schema_filepath): - schema_struct = schema_structure - else: - storage_client = storage.Client() - bucket = storage_client.get_bucket(bucket_name) - blob = bucket.blob(schema_filepath) - schema_struct = json.loads(blob.download_as_string(client=None)) - for schema_field in schema_struct: - fld_name = schema_field["name"] - fld_type = schema_field["type"] - try: - fld_descr = schema_field["description"] - except KeyError: - fld_descr = "" - fld_mode = schema_field["mode"] - schema.append( - bigquery.SchemaField( - name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr - ) - ) - return schema - - -def process_source_file( - source_file: str, - target_file: str, - chunksize: str, - data_dtypes: dict, - rename_headers_list: dict, - output_headers_list: typing.List[str], - datetime_fieldlist: typing.List[str], - resolve_datatypes_list: dict, - normalize_data_list: typing.List[str], - boolean_datapoints_list: typing.List[str], -) -> None: - logging.info(f"Processing source file {source_file}") - with pd.read_csv( - source_file, - engine="python", - encoding="utf-8", - quotechar='"', - chunksize=int(chunksize), - sep="|", - dtype=data_dtypes, - ) as reader: - for chunk_number, chunk in enumerate(reader): - target_file_batch = str(target_file).replace( - ".csv", "-" + str(chunk_number) + ".csv" - ) - df = pd.DataFrame() - df = pd.concat([df, chunk]) - process_chunk( - df=df, - target_file_batch=target_file_batch, - target_file=target_file, - skip_header=(not chunk_number == 0), - rename_headers_list=rename_headers_list, - output_headers_list=output_headers_list, - datetime_fieldlist=datetime_fieldlist, - resolve_datatypes_list=resolve_datatypes_list, - normalize_data_list=normalize_data_list, - boolean_datapoints_list=boolean_datapoints_list, - ) - - -def download_and_merge_source_files( - source_url_stations_json: str, source_url_status_json: str, source_file: str -) -> None: - source_file_stations_csv = str(source_file).replace(".csv", "") + "_stations.csv" - source_file_stations_json = str(source_file).replace(".csv", "") + "_stations" - source_file_status_csv = str(source_file).replace(".csv", "") + "_status.csv" - source_file_status_json = str(source_file).replace(".csv", "") + "_status" - download_file_json( - source_url_stations_json, source_file_stations_json, source_file_stations_csv - ) - download_file_json( - source_url_status_json, source_file_status_json, source_file_status_csv - ) - df_stations = pd.read_csv( - source_file_stations_csv, engine="python", encoding="utf-8", quotechar='"' - ) - df_status = pd.read_csv( - source_file_status_csv, engine="python", encoding="utf-8", quotechar='"' - ) - logging.info("Merging files") - df = df_stations.merge(df_status, left_on="station_id", right_on="station_id") - save_to_new_file(df, source_file) - - -def download_file_json( - source_url: str, source_file_json: pathlib.Path, source_file_csv: pathlib.Path -) -> None: - logging.info(f"Downloading file {source_url}.json.") - r = requests.get(source_url + ".json", stream=True) - with open(source_file_json + ".json", "wb") as f: - for chunk in r: - f.write(chunk) - df = pd.read_json(source_file_json + ".json")["data"]["stations"] - df = pd.DataFrame(df) - df.to_csv(source_file_csv, index=False) - - -def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: - logging.info(f"Saving data to target file.. {file_path} ...") - df.to_csv(file_path, index=False, sep=sep) - - -def process_chunk( - df: pd.DataFrame, - target_file_batch: str, - target_file: str, - skip_header: bool, - rename_headers_list: dict, - output_headers_list: typing.List[str], - datetime_fieldlist: typing.List[str], - resolve_datatypes_list: dict, - normalize_data_list: typing.List[str], - boolean_datapoints_list: typing.List[str], -) -> None: - logging.info(f"Processing batch file {target_file_batch}") - df = convert_datetime_from_int(df, datetime_fieldlist) - df = clean_data_points( - df, - resolve_datatypes_list=resolve_datatypes_list, - normalize_data_list=normalize_data_list, - boolean_datapoints_list=boolean_datapoints_list, - ) - df = rename_headers(df, rename_headers_list) - df = reorder_headers(df, output_headers_list) - save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file( - batch_file_path=target_file_batch, - target_file_path=target_file, - skip_header=skip_header, - truncate_file=not (skip_header), - ) - logging.info(f"Processing batch file {target_file_batch} completed") - - -def convert_datetime_from_int( - df: pd.DataFrame, datetime_columns_list: typing.List[str] -) -> pd.DataFrame: - for column in datetime_columns_list: - logging.info(f"Converting Datetime column {column}") - df[column] = df[column].astype(str).astype(int).apply(datetime_from_int) - return df - - -def datetime_from_int(dt_int: int) -> str: - return datetime.datetime.fromtimestamp(dt_int).strftime("%Y-%m-%d %H:%M:%S") - - -def clean_data_points( - df: pd.DataFrame, - resolve_datatypes_list: dict, - normalize_data_list: typing.List[str], - boolean_datapoints_list: typing.List[str], -) -> pd.DataFrame: - df = resolve_datatypes(df, resolve_datatypes_list) - df = normalize_data(df, normalize_data_list) - df = resolve_boolean_datapoints(df, boolean_datapoints_list) - return df - - -def resolve_datatypes(df: pd.DataFrame, resolve_datatypes_list: dict) -> pd.DataFrame: - for column, datatype in resolve_datatypes_list.items(): - logging.info(f"Resolving datatype for column {column} to {datatype}") - df[column] = df[column].astype(datatype) - return df - - -def normalize_data( - df: pd.DataFrame, normalize_data_list: typing.List[str] -) -> pd.DataFrame: - for column in normalize_data_list: - logging.info(f"Normalizing data in column {column}") - # Data is in list format in this column. - # Therefore remove square brackets and single quotes - df[column] = ( - str(pd.Series(df[column])[0]) - .replace("[", "") - .replace("'", "") - .replace("]", "") - ) - return df - - -def resolve_boolean_datapoints( - df: pd.DataFrame, boolean_datapoints_list: typing.List[str] -) -> pd.DataFrame: - for column in boolean_datapoints_list: - logging.info(f"Resolving boolean datapoints in column {column}") - df[column] = df[column].apply(lambda x: "True" if x == "0" else "False") - return df - - -def rename_headers(df: pd.DataFrame, rename_headers_list: dict) -> pd.DataFrame: - df.rename(columns=rename_headers_list, inplace=True) - return df - - -def reorder_headers( - df: pd.DataFrame, output_headers_list: typing.List[str] -) -> pd.DataFrame: - logging.info("Re-ordering Headers") - return df[output_headers_list] - - -def append_batch_file( - batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool -) -> None: - with open(batch_file_path, "r") as data_file: - if truncate_file: - target_file = open(target_file_path, "w+").close() - with open(target_file_path, "a+") as target_file: - if skip_header: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path} with skip header" - ) - next(data_file) - else: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path}" - ) - target_file.write(data_file.read()) - if os.path.exists(batch_file_path): - os.remove(batch_file_path) - - -def upload_file_to_gcs( - file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str -) -> None: - if os.path.exists(file_path): - logging.info( - f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" - ) - storage_client = storage.Client() - bucket = storage_client.bucket(target_gcs_bucket) - blob = bucket.blob(target_gcs_path) - blob.upload_from_filename(file_path) - else: - logging.info( - f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." - ) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - - main( - pipeline_name=os.environ["PIPELINE_NAME"], - source_url_stations_json=os.environ["SOURCE_URL_STATIONS_JSON"], - source_url_status_json=os.environ["SOURCE_URL_STATUS_JSON"], - chunksize=os.environ["CHUNKSIZE"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - project_id=os.environ["PROJECT_ID"], - dataset_id=os.environ["DATASET_ID"], - table_id=os.environ["TABLE_ID"], - schema_path=os.environ["SCHEMA_PATH"], - data_dtypes=json.loads(os.environ["DATA_DTYPES"]), - rename_headers_list=json.loads(os.environ["RENAME_HEADERS_LIST"]), - output_headers_list=json.loads(os.environ["OUTPUT_CSV_HEADERS"]), - datetime_fieldlist=json.loads(os.environ["DATETIME_FIELDLIST"]), - resolve_datatypes_list=json.loads(os.environ["RESOLVE_DATATYPES_LIST"]), - normalize_data_list=json.loads(os.environ["NORMALIZE_DATA_LIST"]), - boolean_datapoints_list=json.loads(os.environ["BOOLEAN_DATAPOINTS"]), - ) diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/requirements.txt b/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/requirements.txt deleted file mode 100644 index fa116c33a..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_citibike_stations/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -google-cloud-bigquery -google-cloud-storage -pandas -requests diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/Dockerfile b/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/Dockerfile deleted file mode 100644 index 85af90570..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/Dockerfile +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim -FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs -ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image -COPY requirements.txt ./ - -# Install the packages specified in the requirements file -RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction -WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* -COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run -CMD ["python3", "csv_transform.py"] diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/csv_transform.py b/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/csv_transform.py deleted file mode 100644 index 4e449211b..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/csv_transform.py +++ /dev/null @@ -1,494 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime -import json -import logging -import os -import pathlib -import typing - -import pandas as pd -import requests -from google.api_core.exceptions import NotFound -from google.cloud import bigquery, storage - - -def main( - pipeline_name: str, - source_url: str, - chunksize: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - table_id: str, - target_gcs_bucket: str, - target_gcs_path: str, - data_dtypes: dict, - schema_path: str, - transform_list: typing.List[str], - resolve_datatypes_list: dict, - reorder_headers_list: typing.List[str], - rename_headers_list: dict, - regex_list: typing.List[typing.List], - crash_field_list: typing.List[typing.List], - date_format_list: typing.List[typing.List], -) -> None: - logging.info(f"{pipeline_name} process started") - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - execute_pipeline( - source_url=source_url, - source_file=source_file, - target_file=target_file, - chunksize=chunksize, - project_id=project_id, - dataset_id=dataset_id, - destination_table=table_id, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - data_dtypes=data_dtypes, - schema_path=schema_path, - resolve_datatypes_list=resolve_datatypes_list, - transform_list=transform_list, - reorder_headers_list=reorder_headers_list, - rename_headers_list=rename_headers_list, - regex_list=regex_list, - crash_field_list=crash_field_list, - date_format_list=date_format_list, - ) - logging.info(f"{pipeline_name} process completed") - - -def execute_pipeline( - source_url: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - chunksize: str, - project_id: str, - dataset_id: str, - destination_table: str, - target_gcs_bucket: str, - target_gcs_path: str, - data_dtypes: dict, - schema_path: str, - resolve_datatypes_list: dict, - transform_list: typing.List[str], - reorder_headers_list: typing.List[str], - rename_headers_list: dict, - regex_list: typing.List[typing.List], - crash_field_list: typing.List[typing.List], - date_format_list: typing.List[typing.List], -) -> None: - download_file(source_url, source_file) - process_source_file( - source_file=source_file, - target_file=target_file, - chunksize=chunksize, - source_dtypes=data_dtypes, - resolve_datatypes_list=resolve_datatypes_list, - transform_list=transform_list, - reorder_headers_list=reorder_headers_list, - rename_headers_list=rename_headers_list, - regex_list=regex_list, - crash_field_list=crash_field_list, - date_format_list=date_format_list, - ) - if os.path.exists(target_file): - upload_file_to_gcs( - file_path=target_file, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - ) - table_exists = create_dest_table( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - schema_filepath=schema_path, - bucket_name=target_gcs_bucket, - ) - if table_exists: - load_data_to_bq( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - file_path=target_file, - truncate_table=True, - ) - else: - error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." - raise ValueError(error_msg) - else: - logging.info( - f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." - ) - - -def load_data_to_bq( - project_id: str, - dataset_id: str, - table_id: str, - file_path: str, - truncate_table: bool, -) -> None: - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" - ) - client = bigquery.Client(project=project_id) - table_ref = client.dataset(dataset_id).table(table_id) - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.CSV - job_config.field_delimiter = "|" - if truncate_table: - job_config.write_disposition = "WRITE_TRUNCATE" - else: - job_config.write_disposition = "WRITE_APPEND" - job_config.skip_leading_rows = 1 # ignore the header - job_config.autodetect = False - with open(file_path, "rb") as source_file: - job = client.load_table_from_file(source_file, table_ref, job_config=job_config) - job.result() - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" - ) - - -def create_dest_table( - project_id: str, - dataset_id: str, - table_id: str, - schema_filepath: list, - bucket_name: str, -) -> bool: - table_ref = f"{project_id}.{dataset_id}.{table_id}" - logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") - client = bigquery.Client() - table_exists = False - try: - table = client.get_table(table_ref) - table_exists_id = table.table_id - logging.info(f"Table {table_exists_id} currently exists.") - except NotFound: - table = None - if not table: - logging.info( - ( - f"Table {table_ref} currently does not exist. Attempting to create table." - ) - ) - if check_gcs_file_exists(schema_filepath, bucket_name): - schema = create_table_schema([], bucket_name, schema_filepath) - table = bigquery.Table(table_ref, schema=schema) - client.create_table(table) - print(f"Table {table_ref} was created".format(table_id)) - table_exists = True - else: - file_name = os.path.split(schema_filepath)[1] - file_path = os.path.split(schema_filepath)[0] - logging.info( - f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" - ) - table_exists = False - else: - table_exists = True - return table_exists - - -def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) - return exists - - -def create_table_schema( - schema_structure: list, bucket_name: str = "", schema_filepath: str = "" -) -> list: - logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") - schema = [] - if not (schema_filepath): - schema_struct = schema_structure - else: - storage_client = storage.Client() - bucket = storage_client.get_bucket(bucket_name) - blob = bucket.blob(schema_filepath) - schema_struct = json.loads(blob.download_as_string(client=None)) - for schema_field in schema_struct: - fld_name = schema_field["name"] - fld_type = schema_field["type"] - try: - fld_descr = schema_field["description"] - except KeyError: - fld_descr = "" - fld_mode = schema_field["mode"] - schema.append( - bigquery.SchemaField( - name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr - ) - ) - return schema - - -def process_source_file( - source_file: pathlib.Path, - target_file: pathlib.Path, - chunksize: str, - source_dtypes: dict, - resolve_datatypes_list: dict, - transform_list: typing.List[str], - reorder_headers_list: typing.List[str], - rename_headers_list: dict, - regex_list: typing.List[typing.List], - crash_field_list: typing.List[typing.List], - date_format_list: typing.List[typing.List], -) -> None: - logging.info(f"Opening source file {source_file}") - with pd.read_csv( - source_file, - engine="python", - encoding="utf-8", - quotechar='"', - sep=",", - dtype=source_dtypes, - chunksize=int(chunksize), - ) as reader: - for chunk_number, chunk in enumerate(reader): - target_file_batch = str(target_file).replace( - ".csv", "-" + str(chunk_number) + ".csv" - ) - df = pd.DataFrame() - df = pd.concat([df, chunk]) - process_chunk( - df=df, - target_file_batch=target_file_batch, - target_file=target_file, - skip_header=(not chunk_number == 0), - resolve_datatypes_list=resolve_datatypes_list, - transform_list=transform_list, - reorder_headers_list=reorder_headers_list, - rename_headers_list=rename_headers_list, - regex_list=regex_list, - crash_field_list=crash_field_list, - date_format_list=date_format_list, - ) - - -def process_chunk( - df: pd.DataFrame, - target_file_batch: str, - target_file: str, - skip_header: bool, - resolve_datatypes_list: dict, - transform_list: list, - reorder_headers_list: list, - rename_headers_list: list, - regex_list: list, - crash_field_list: list, - date_format_list: list, -) -> None: - logging.info(f"Processing batch file {target_file_batch}") - for transform in transform_list: - if transform == "replace_regex": - df = replace_regex(df, regex_list) - elif transform == "add_crash_timestamp": - for fld in crash_field_list: - new_crash_field = fld[0] - crash_date_field = fld[1] - crash_time_field = fld[2] - df[new_crash_field] = "" - df = add_crash_timestamp( - df, new_crash_field, crash_date_field, crash_time_field - ) - elif transform == "convert_date_format": - df = resolve_date_format(df, date_format_list) - elif transform == "resolve_datatypes": - df = resolve_datatypes(df, resolve_datatypes_list) - elif transform == "rename_headers": - df = rename_headers(df, rename_headers_list) - elif transform == "reorder_headers": - df = reorder_headers(df, reorder_headers_list) - save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) - logging.info(f"Processing batch file {target_file_batch} completed") - - -def resolve_datatypes(df: pd.DataFrame, resolve_datatypes_list: dict) -> pd.DataFrame: - logging.info("Resolving column datatypes") - return df.astype(resolve_datatypes_list, errors="ignore") - - -def reorder_headers(df: pd.DataFrame, headers_list: list) -> pd.DataFrame: - logging.info("Reordering Headers") - return df[headers_list] - - -def rename_headers(df: pd.DataFrame, header_list: dict) -> pd.DataFrame: - logging.info("Renaming Headers") - df.rename(columns=header_list, inplace=True) - return df - - -def replace_regex(df: pd.DataFrame, regex_list: dict) -> pd.DataFrame: - for regex_item in regex_list: - field_name = regex_item[0] - search_expr = regex_item[1] - replace_expr = regex_item[2] - logging.info( - f"Replacing data via regex on field {field_name} '{field_name}' '{search_expr}' '{replace_expr}'" - ) - df[field_name] = df[field_name].replace( - r"" + search_expr, replace_expr, regex=True - ) - return df - - -def resolve_date_format(df: pd.DataFrame, date_fields: list = []) -> pd.DataFrame: - for dt_fld in date_fields: - field_name = dt_fld[0] - logging.info(f"Resolving date format in column {field_name}") - from_format = dt_fld[1] - to_format = dt_fld[2] - df[field_name] = df[field_name].apply( - lambda x: convert_dt_format(str(x), from_format, to_format) - ) - return df - - -def convert_dt_format( - dt_str: str, from_format: str, to_format: str = "%Y-%m-%d %H:%M:%S" -) -> str: - if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": - dt_str = "" - return dt_str - else: - if from_format == "%Y%m%d": - year = dt_str[0:4] - month = dt_str[4:6] - day = dt_str[6:8] - dt_str = f"{year}-{month}-{day} 00:00:00" - from_format = "%Y-%m-%d %H:%M:%S" - elif len(dt_str.strip().split(" ")[1]) == 8: - # if format of time portion is 00:00:00 then use 00:00 format - dt_str = dt_str[:-3] - elif (len(dt_str.strip().split("-")[0]) == 4) and ( - len(from_format.strip().split("/")[0]) == 2 - ): - # if the format of the date portion of the data is in YYYY-MM-DD format - # and from_format is in MM-DD-YYYY then resolve this by modifying the from_format - # to use the YYYY-MM-DD. This resolves mixed date formats in files - from_format = "%Y-%m-%d " + from_format.strip().split(" ")[1] - return datetime.datetime.strptime(dt_str, from_format).strftime(to_format) - - -def add_crash_timestamp( - df: pd.DataFrame, new_crash_field: str, crash_date_field: str, crash_time_field: str -) -> pd.DataFrame: - logging.info( - f"add_crash_timestamp '{new_crash_field}' '{crash_date_field}' '{crash_time_field}'" - ) - df[new_crash_field] = df.apply( - lambda x, crash_date_field, crash_time_field: crash_timestamp( - x["" + crash_date_field], x["" + crash_time_field] - ), - args=[crash_date_field, crash_time_field], - axis=1, - ) - return df - - -def crash_timestamp(crash_date: str, crash_time: str) -> str: - # if crash time format is H:MM then convert to HH:MM:SS - if len(crash_time) == 4: - crash_time = f"0{crash_time}:00" - return f"{crash_date} {crash_time}" - - -def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: - logging.info(f"Saving data to target file.. {file_path} ...") - df.to_csv(file_path, index=False, sep=sep) - - -def append_batch_file( - batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool -) -> None: - with open(batch_file_path, "r") as data_file: - if truncate_file: - target_file = open(target_file_path, "w+").close() - with open(target_file_path, "a+") as target_file: - if skip_header: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path} with skip header" - ) - next(data_file) - else: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path}" - ) - target_file.write(data_file.read()) - if os.path.exists(batch_file_path): - os.remove(batch_file_path) - - -def download_file(source_url: str, source_file: pathlib.Path) -> None: - logging.info(f"Downloading {source_url} to {source_file}") - r = requests.get(source_url, stream=True) - if r.status_code == 200: - with open(source_file, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.error(f"Couldn't download {source_url}: {r.text}") - - -def upload_file_to_gcs( - file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str -) -> None: - if os.path.exists(file_path): - logging.info( - f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" - ) - storage_client = storage.Client() - bucket = storage_client.bucket(target_gcs_bucket) - blob = bucket.blob(target_gcs_path) - blob.upload_from_filename(file_path) - else: - logging.info( - f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." - ) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - - main( - pipeline_name=os.environ["PIPELINE_NAME"], - source_url=os.environ["SOURCE_URL"], - chunksize=os.environ["CHUNKSIZE"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - project_id=os.environ["PROJECT_ID"], - dataset_id=os.environ["DATASET_ID"], - table_id=os.environ["TABLE_ID"], - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - data_dtypes=json.loads(os.environ["DATA_DTYPES"]), - schema_path=os.environ["SCHEMA_PATH"], - resolve_datatypes_list=json.loads(os.environ["RESOLVE_DATATYPES_LIST"]), - transform_list=json.loads(os.environ["TRANSFORM_LIST"]), - reorder_headers_list=json.loads(os.environ["REORDER_HEADERS_LIST"]), - rename_headers_list=json.loads(os.environ["RENAME_HEADERS_LIST"]), - regex_list=json.loads(os.environ["REGEX_LIST"]), - crash_field_list=json.loads(os.environ["CRASH_FIELD_LIST"]), - date_format_list=json.loads(os.environ["DATE_FORMAT_LIST"]), - ) diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/requirements.txt b/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/requirements.txt deleted file mode 100644 index fa116c33a..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_nypd_mv_collisions/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -google-cloud-bigquery -google-cloud-storage -pandas -requests diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/Dockerfile b/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/Dockerfile deleted file mode 100644 index 85af90570..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/Dockerfile +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# The base image for this build -# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim -FROM python:3.8 - -# Allow statements and log messages to appear in Cloud logs -ENV PYTHONUNBUFFERED True - -# Copy the requirements file into the image -COPY requirements.txt ./ - -# Install the packages specified in the requirements file -RUN python3 -m pip install --no-cache-dir -r requirements.txt - -# The WORKDIR instruction sets the working directory for any RUN, CMD, -# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. -# If the WORKDIR doesn’t exist, it will be created even if it’s not used in -# any subsequent Dockerfile instruction -WORKDIR /custom - -# Copy the specific data processing script/s in the image under /custom/* -COPY ./csv_transform.py . - -# Command to run the data processing script when the container is run -CMD ["python3", "csv_transform.py"] diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/csv_transform.py b/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/csv_transform.py deleted file mode 100644 index ba8194745..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/csv_transform.py +++ /dev/null @@ -1,368 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -import os -import pathlib -import typing - -import pandas as pd -import requests -from google.api_core.exceptions import NotFound -from google.cloud import bigquery, storage - - -def main( - pipeline_name: str, - source_url: str, - chunksize: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - table_id: str, - target_gcs_bucket: str, - target_gcs_path: str, - schema_path: str, - rename_headers_list: dict, - remove_whitespace_list: typing.List[str], - reorder_headers_list: typing.List[str], -) -> None: - logging.info(f"{pipeline_name} process started") - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - execute_pipeline( - source_url=source_url, - chunksize=chunksize, - source_file=source_file, - target_file=target_file, - project_id=project_id, - dataset_id=dataset_id, - destination_table=table_id, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - schema_path=schema_path, - rename_headers_list=rename_headers_list, - remove_whitespace_list=remove_whitespace_list, - reorder_headers_list=reorder_headers_list, - ) - logging.info(f"{pipeline_name} process completed") - - -def execute_pipeline( - source_url: str, - chunksize: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - project_id: str, - dataset_id: str, - destination_table: str, - target_gcs_bucket: str, - target_gcs_path: str, - schema_path: str, - rename_headers_list: dict, - remove_whitespace_list: typing.List[str], - reorder_headers_list: typing.List[str], -) -> None: - download_file(source_url=source_url, source_file=source_file) - process_source_file( - source_file=source_file, - target_file=target_file, - chunksize=chunksize, - rename_headers_list=rename_headers_list, - remove_whitespace_list=remove_whitespace_list, - reorder_headers_list=reorder_headers_list, - ) - if os.path.exists(target_file): - upload_file_to_gcs( - file_path=target_file, - target_gcs_bucket=target_gcs_bucket, - target_gcs_path=target_gcs_path, - ) - table_exists = create_dest_table( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - schema_filepath=schema_path, - bucket_name=target_gcs_bucket, - ) - if table_exists: - load_data_to_bq( - project_id=project_id, - dataset_id=dataset_id, - table_id=destination_table, - file_path=target_file, - truncate_table=True, - ) - else: - error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." - raise ValueError(error_msg) - else: - logging.info( - f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." - ) - - -def process_source_file( - chunksize: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - rename_headers_list: dict, - remove_whitespace_list: typing.List[str], - reorder_headers_list: typing.List[str], -) -> None: - logging.info(f"Processing source file {source_file}") - with pd.read_csv( - source_file, - engine="python", - encoding="utf-8", - quotechar='"', - sep=",", - chunksize=int(chunksize), - ) as reader: - for chunk_number, chunk in enumerate(reader): - target_file_batch = str(target_file).replace( - ".csv", "-" + str(chunk_number) + ".csv" - ) - df = pd.DataFrame() - df = pd.concat([df, chunk]) - process_chunk( - df=df, - target_file_batch=target_file_batch, - target_file=target_file, - skip_header=(not chunk_number == 0), - rename_headers_list=rename_headers_list, - remove_whitespace_list=remove_whitespace_list, - reorder_headers_list=reorder_headers_list, - ) - - -def process_chunk( - df: pd.DataFrame, - target_file_batch: str, - target_file: str, - skip_header: bool, - rename_headers_list: dict, - remove_whitespace_list: typing.List[str], - reorder_headers_list: typing.List[str], -) -> None: - logging.info(f"Processing batch file {target_file_batch}") - df = rename_headers(df, rename_headers_list) - df = remove_whitespace(df, remove_whitespace_list) - df = reorder_headers(df, reorder_headers_list) - save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) - logging.info(f"Processing batch file {target_file_batch} completed") - - -def load_data_to_bq( - project_id: str, - dataset_id: str, - table_id: str, - file_path: str, - truncate_table: bool, -) -> None: - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" - ) - client = bigquery.Client(project=project_id) - table_ref = client.dataset(dataset_id).table(table_id) - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.CSV - job_config.field_delimiter = "|" - if truncate_table: - job_config.write_disposition = "WRITE_TRUNCATE" - else: - job_config.write_disposition = "WRITE_APPEND" - job_config.skip_leading_rows = 1 # ignore the header - job_config.autodetect = False - with open(file_path, "rb") as source_file: - job = client.load_table_from_file(source_file, table_ref, job_config=job_config) - job.result() - logging.info( - f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" - ) - - -def create_dest_table( - project_id: str, - dataset_id: str, - table_id: str, - schema_filepath: list, - bucket_name: str, -) -> bool: - table_ref = f"{project_id}.{dataset_id}.{table_id}" - logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") - client = bigquery.Client() - table_exists = False - try: - table = client.get_table(table_ref) - table_exists_id = table.table_id - logging.info(f"Table {table_exists_id} currently exists.") - except NotFound: - table = None - if not table: - logging.info( - ( - f"Table {table_ref} currently does not exist. Attempting to create table." - ) - ) - if check_gcs_file_exists(schema_filepath, bucket_name): - schema = create_table_schema([], bucket_name, schema_filepath) - table = bigquery.Table(table_ref, schema=schema) - client.create_table(table) - print(f"Table {table_ref} was created".format(table_id)) - table_exists = True - else: - file_name = os.path.split(schema_filepath)[1] - file_path = os.path.split(schema_filepath)[0] - logging.info( - f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" - ) - table_exists = False - else: - table_exists = True - return table_exists - - -def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) - return exists - - -def create_table_schema( - schema_structure: list, bucket_name: str = "", schema_filepath: str = "" -) -> list: - logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") - schema = [] - if not (schema_filepath): - schema_struct = schema_structure - else: - storage_client = storage.Client() - bucket = storage_client.get_bucket(bucket_name) - blob = bucket.blob(schema_filepath) - schema_struct = json.loads(blob.download_as_string(client=None)) - for schema_field in schema_struct: - fld_name = schema_field["name"] - fld_type = schema_field["type"] - try: - fld_descr = schema_field["description"] - except KeyError: - fld_descr = "" - fld_mode = schema_field["mode"] - schema.append( - bigquery.SchemaField( - name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr - ) - ) - return schema - - -def rename_headers(df: pd.DataFrame, rename_headers_list: dict) -> pd.DataFrame: - logging.info("Renaming headers..") - df.rename(columns=rename_headers_list, inplace=True) - return df - - -def remove_whitespace( - df: pd.DataFrame, remove_whitespace_list: typing.List[str] -) -> pd.DataFrame: - for column in remove_whitespace_list: - logging.info(f"Removing whitespace in column {column}..") - df[column] = df[column].apply(lambda x: str(x).strip()) - return df - - -def reorder_headers( - df: pd.DataFrame, reorder_headers_list: typing.List[str] -) -> pd.DataFrame: - logging.info("Reordering headers..") - return df[reorder_headers_list] - - -def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: - logging.info(f"Saving data to target file.. {file_path} ...") - df.to_csv(file_path, index=False, sep=sep) - - -def append_batch_file( - batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool -) -> None: - with open(batch_file_path, "r") as data_file: - if truncate_file: - target_file = open(target_file_path, "w+").close() - with open(target_file_path, "a+") as target_file: - if skip_header: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path} with skip header" - ) - next(data_file) - else: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path}" - ) - target_file.write(data_file.read()) - if os.path.exists(batch_file_path): - os.remove(batch_file_path) - - -def download_file(source_url: str, source_file: pathlib.Path) -> None: - logging.info(f"Downloading {source_url} to {source_file}") - r = requests.get(source_url, stream=True) - if r.status_code == 200: - with open(source_file, "wb") as f: - for chunk in r: - f.write(chunk) - else: - logging.error(f"Couldn't download {source_url}: {r.text}") - - -def upload_file_to_gcs( - file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str -) -> None: - if os.path.exists(file_path): - logging.info( - f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" - ) - storage_client = storage.Client() - bucket = storage_client.bucket(target_gcs_bucket) - blob = bucket.blob(target_gcs_path) - blob.upload_from_filename(file_path) - else: - logging.info( - f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." - ) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - - main( - pipeline_name=os.environ["PIPELINE_NAME"], - source_url=os.environ["SOURCE_URL"], - chunksize=os.environ["CHUNKSIZE"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - project_id=os.environ["PROJECT_ID"], - dataset_id=os.environ["DATASET_ID"], - table_id=os.environ["TABLE_ID"], - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - schema_path=os.environ["SCHEMA_PATH"], - rename_headers_list=json.loads(os.environ["RENAME_HEADERS_LIST"]), - remove_whitespace_list=json.loads(os.environ["REMOVE_WHITESPACE_LIST"]), - reorder_headers_list=json.loads(os.environ["REORDER_HEADERS_LIST"]), - ) diff --git a/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/requirements.txt b/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/requirements.txt deleted file mode 100644 index fa116c33a..000000000 --- a/datasets/new_york/pipelines/_images/run_csv_transform_kub_tree_census_1995/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -google-cloud-bigquery -google-cloud-storage -pandas -requests diff --git a/datasets/new_york/pipelines/new_york/new_york_dag.py b/datasets/new_york/pipelines/new_york/new_york_dag.py index 391fd5086..7f23c57f6 100644 --- a/datasets/new_york/pipelines/new_york/new_york_dag.py +++ b/datasets/new_york/pipelines/new_york/new_york_dag.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,11 +37,11 @@ location="us-central1-c", body={ "name": "pubds-new-york", - "initial_node_count": 4, + "initial_node_count": 2, "network": "{{ var.value.vpc_network }}", "ip_allocation_policy": {"cluster_ipv4_cidr_block": "/26"}, "node_config": { - "machine_type": "e2-standard-16", + "machine_type": "e2-standard-8", "oauth_scopes": [ "https://www.googleapis.com/auth/devstorage.read_write", "https://www.googleapis.com/auth/cloud-platform", @@ -59,26 +59,26 @@ cluster_name="pubds-new-york", namespace="default", image_pull_policy="Always", - image="{{ var.json.new_york.container_registry.run_csv_transform_kub_ny_311_service_requests }}", + image="{{ var.json.new_york.container_registry.run_csv_transform_kub }}", env_vars={ - "PIPELINE_NAME": "{{ var.json.new_york.ny_311_service_requests.pipeline_name }}", - "SOURCE_URL": "{{ var.json.new_york.ny_311_service_requests.source_url }}", - "CHUNKSIZE": "{{ var.json.new_york.ny_311_service_requests.chunksize }}", - "SOURCE_FILE": "files/data_{{ var.json.new_york.ny_311_service_requests.dataset_id }}_{{ var.json.new_york.ny_311_service_requests.destination_table }}.csv", - "TARGET_FILE": "files/data_output_{{ var.json.new_york.ny_311_service_requests.dataset_id }}_{{ var.json.new_york.ny_311_service_requests.destination_table }}.csv", + "PIPELINE_NAME": "New York - 311 Service Requests", + "SOURCE_URL": "https://data.cityofnewyork.us/api/views/erm2-nwe9/rows.csv", + "CHUNKSIZE": "500000", + "SOURCE_FILE": "files/data_new_york_311_311_service_requests.csv", + "TARGET_FILE": "files/data_output_new_york_311_311_service_requests.csv", "PROJECT_ID": "{{ var.value.gcp_project }}", - "DATASET_ID": "{{ var.json.new_york.ny_311_service_requests.dataset_id }}", - "TABLE_ID": "{{ var.json.new_york.ny_311_service_requests.destination_table }}", + "DATASET_ID": "new_york_311", + "TABLE_ID": "311_service_requests", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "{{ var.json.new_york.ny_311_service_requests.target_path }}", - "SCHEMA_PATH": "{{ var.json.new_york.ny_311_service_requests.schema_path }}", + "TARGET_GCS_PATH": "data/new_york/311_service_requests/data_output.csv", + "SCHEMA_PATH": "data/new_york/schema/ny_311_service_requests_schema.json", "DATA_DTYPES": '{\n "Unique Key": "int",\n "Created Date": "str",\n "Closed Date": "str",\n "Agency": "str",\n "Agency Name": "str",\n "Complaint Type": "str",\n "Descriptor": "str",\n "Location Type": "str",\n "Incident Zip": "str",\n "Incident Address": "str",\n "Street Name": "str",\n "Cross Street 1": "str",\n "Cross Street 2": "str",\n "Intersection Street 1": "str",\n "Intersection Street 2": "str",\n "Address Type": "str",\n "City": "str",\n "Landmark": "str",\n "Facility Type": "str",\n "Status": "str",\n "Due Date": "str",\n "Resolution Description": "str",\n "Resolution Action Updated Date": "str",\n "Community Board": "str",\n "BBL": "str",\n "Borough": "str",\n "X Coordinate (State Plane)": "str",\n "Y Coordinate (State Plane)": "str",\n "Open Data Channel Type": "str",\n "Park Facility Name": "str",\n "Park Borough": "str",\n "Vehicle Type": "str",\n "Taxi Company Borough": "str",\n "Taxi Pick Up Location": "str",\n "Bridge Highway Name": "str",\n "Bridge Highway Direction": "str",\n "Road Ramp": "str",\n "Bridge Highway Segment": "str",\n "Latitude": "float64",\n "Longitude": "float64",\n "Location": "str"\n}', "PARSE_DATES": '[\n "Created Date",\n "Closed Date",\n "Due Date",\n "Resolution Action Updated Date"\n]', "NULL_ROWS_LIST": '[\n "unique_key"\n]', - "RENAME_HEADERS": '{\n "Unique Key": "unique_key",\n "Created Date": "created_date",\n "Closed Date": "closed_date",\n "Agency": "agency",\n "Agency Name": "agency_name",\n "Complaint Type": "complaint_type",\n "Descriptor": "descriptor",\n "Location Type": "location_type",\n "Incident Zip": "incident_zip",\n "Incident Address": "incident_address",\n "Street Name": "street_name",\n "Cross Street 1": "cross_street_1",\n "Cross Street 2": "cross_street_2",\n "Intersection Street 1": "intersection_street_1",\n "Intersection Street 2": "intersection_street_2",\n "Address Type": "address_type",\n "City": "city",\n "Landmark": "landmark",\n "Facility Type": "facility_type",\n "Status": "status",\n "Due Date": "due_date",\n "Resolution Description": "resolution_description",\n "Resolution Action Updated Date": "resolution_action_updated_date",\n "Community Board": "community_board",\n "Open Data Channel Type": "open_data_channel_type",\n "Borough": "borough",\n "X Coordinate (State Plane)": "x_coordinate",\n "Y Coordinate (State Plane)": "y_coordinate",\n "Park Facility Name": "park_facility_name",\n "Park Borough": "park_borough",\n "Vehicle Type": "vehicle_type",\n "Taxi Company Borough": "taxi_company_borough",\n "Taxi Pick Up Location": "taxi_pickup_location",\n "Bridge Highway Name": "bridge_highway_name",\n "Bridge Highway Direction": "bridge_highway_direction",\n "Road Ramp": "road_ramp",\n "Bridge Highway Segment": "bridge_highway_segment",\n "Latitude": "latitude",\n "Longitude": "longitude",\n "Location": "location",\n "BBL": "bbl"\n}', + "RENAME_HEADERS_LIST": '{\n "Unique Key": "unique_key",\n "Created Date": "created_date",\n "Closed Date": "closed_date",\n "Agency": "agency",\n "Agency Name": "agency_name",\n "Complaint Type": "complaint_type",\n "Descriptor": "descriptor",\n "Location Type": "location_type",\n "Incident Zip": "incident_zip",\n "Incident Address": "incident_address",\n "Street Name": "street_name",\n "Cross Street 1": "cross_street_1",\n "Cross Street 2": "cross_street_2",\n "Intersection Street 1": "intersection_street_1",\n "Intersection Street 2": "intersection_street_2",\n "Address Type": "address_type",\n "City": "city",\n "Landmark": "landmark",\n "Facility Type": "facility_type",\n "Status": "status",\n "Due Date": "due_date",\n "Resolution Description": "resolution_description",\n "Resolution Action Updated Date": "resolution_action_updated_date",\n "Community Board": "community_board",\n "Open Data Channel Type": "open_data_channel_type",\n "Borough": "borough",\n "X Coordinate (State Plane)": "x_coordinate",\n "Y Coordinate (State Plane)": "y_coordinate",\n "Park Facility Name": "park_facility_name",\n "Park Borough": "park_borough",\n "Vehicle Type": "vehicle_type",\n "Taxi Company Borough": "taxi_company_borough",\n "Taxi Pick Up Location": "taxi_pickup_location",\n "Bridge Highway Name": "bridge_highway_name",\n "Bridge Highway Direction": "bridge_highway_direction",\n "Road Ramp": "road_ramp",\n "Bridge Highway Segment": "bridge_highway_segment",\n "Latitude": "latitude",\n "Longitude": "longitude",\n "Location": "location",\n "BBL": "bbl"\n}', "OUTPUT_CSV_HEADERS": '[\n "unique_key",\n "created_date",\n "closed_date",\n "agency",\n "agency_name",\n "complaint_type",\n "descriptor",\n "location_type",\n "incident_zip",\n "incident_address",\n "street_name",\n "cross_street_1",\n "cross_street_2",\n "intersection_street_1",\n "intersection_street_2",\n "address_type",\n "city",\n "landmark",\n "facility_type",\n "status",\n "due_date",\n "resolution_description",\n "resolution_action_updated_date",\n "community_board",\n "borough",\n "x_coordinate",\n "y_coordinate",\n "park_facility_name",\n "park_borough",\n "bbl",\n "open_data_channel_type",\n "vehicle_type",\n "taxi_company_borough",\n "taxi_pickup_location",\n "bridge_highway_name",\n "bridge_highway_direction",\n "road_ramp",\n "bridge_highway_segment",\n "latitude",\n "longitude",\n "location"\n]', }, - resources={"request_ephemeral_storage": "32G", "limit_cpu": "3"}, + resources={"request_ephemeral_storage": "10G", "limit_cpu": "2"}, ) # Run New York Citibike Stations Pipeline @@ -90,20 +90,20 @@ cluster_name="pubds-new-york", namespace="default", image_pull_policy="Always", - image="{{ var.json.new_york.container_registry.run_csv_transform_kub_citibike_stations }}", + image="{{ var.json.new_york.container_registry.run_csv_transform_kub }}", env_vars={ - "PIPELINE_NAME": "{{ var.json.new_york.citibike_stations.pipeline_name }}", - "SOURCE_URL_STATIONS_JSON": "{{ var.json.new_york.citibike_stations.source_url_stations }}", - "SOURCE_URL_STATUS_JSON": "{{ var.json.new_york.citibike_stations.source_url_status }}", - "CHUNKSIZE": "{{ var.json.new_york.citibike_stations.chunksize }}", - "SOURCE_FILE": "files/data_{{ var.json.new_york.citibike_stations.dataset_id }}_{{ var.json.new_york.citibike_stations.destination_table }}.csv", - "TARGET_FILE": "files/data_output_{{ var.json.new_york.citibike_stations.dataset_id }}_{{ var.json.new_york.citibike_stations.destination_table }}.csv", + "PIPELINE_NAME": "New York - Citibike Stations", + "SOURCE_URL_STATIONS_JSON": "https://gbfs.citibikenyc.com/gbfs/en/station_information", + "SOURCE_URL_STATUS_JSON": "https://gbfs.citibikenyc.com/gbfs/en/station_status", + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_new_york_citibike_citibike_stations.csv", + "TARGET_FILE": "files/data_output_new_york_citibike_citibike_stations.csv", "PROJECT_ID": "{{ var.value.gcp_project }}", - "DATASET_ID": "{{ var.json.new_york.citibike_stations.dataset_id }}", - "TABLE_ID": "{{ var.json.new_york.citibike_stations.destination_table }}", + "DATASET_ID": "new_york_citibike", + "TABLE_ID": "citibike_stations", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "{{ var.json.new_york.citibike_stations.target_path }}", - "SCHEMA_PATH": "{{ var.json.new_york.citibike_stations.schema_path }}", + "TARGET_GCS_PATH": "data/new_york/citibike_stations/data_output.csv", + "SCHEMA_PATH": "data/new_york/schema/citibike_stations_schema.json", "DATA_DTYPES": '{\n "rental_uris": "str",\n "lat": "float64",\n "eightd_station_services": "str",\n "legacy_id_x": "int64",\n "short_name": "str",\n "external_id": "str",\n "station_id": "int64",\n "rental_methods": "str",\n "station_type": "str",\n "short_name": "str",\n "eightd_has_key_dispenser": "bool",\n "electric_bike_surcharge_waiver": "bool",\n "lon": "float64",\n "has_kiosk": "bool",\n "capacity": "int64",\n "region_id": "int64",\n "is_installed": "bool",\n "num_docks_available": "int64",\n "num_docks_disabled": "int64",\n "num_bikes_disabled": "int64",\n "num_bikes_available": "int64",\n "station_status": "str",\n "last_reported": "int64",\n "eightd_has_available_keys": "bool",\n "num_ebikes_available": "int64",\n "is_returning": "int64",\n "is_renting": "int64",\n "legacy_id_y": "int64",\n "valet": "str",\n "eightd_active_station_services": "str"\n}', "RENAME_HEADERS_LIST": '{\n "lat": "latitude",\n "lon": "longitude"\n}', "BOOLEAN_DATAPOINTS": '[\n "eightd_has_key_dispenser",\n "is_installed",\n "is_renting",\n "is_returning"\n]', @@ -113,7 +113,7 @@ "INPUT_CSV_HEADERS": '[\n "rental_uris",\n "lat",\n "eightd_station_services",\n "legacy_id_x",\n "short_name",\n "external_id",\n "station_id",\n "rental_methods",\n "station_type",\n "short_name",\n "eightd_has_key_dispenser",\n "electric_bike_surcharge_waiver",\n "lon",\n "has_kiosk",\n "capacity",\n "region_id",\n "is_installed",\n "num_docks_available",\n "num_docks_disabled",\n "num_bikes_disabled",\n "num_bikes_available",\n "station_status",\n "last_reported",\n "eightd_has_available_keys",\n "num_ebikes_available",\n "is_returning",\n "is_renting",\n "legacy_id_y",\n "valet",\n "eightd_active_station_services"\n]', "OUTPUT_CSV_HEADERS": '[\n "station_id",\n "name",\n "short_name",\n "latitude",\n "longitude",\n "region_id",\n "rental_methods",\n "capacity",\n "eightd_has_key_dispenser",\n "num_bikes_available",\n "num_bikes_disabled",\n "num_docks_available",\n "num_docks_disabled",\n "is_installed",\n "is_renting",\n "is_returning",\n "eightd_has_available_keys",\n "last_reported"\n]', }, - resources={"request_ephemeral_storage": "16G", "limit_cpu": "3"}, + resources={"request_ephemeral_storage": "10G", "limit_cpu": "2"}, ) # Run New York NYPD MV Collisions Pipeline @@ -125,19 +125,19 @@ cluster_name="pubds-new-york", namespace="default", image_pull_policy="Always", - image="{{ var.json.new_york.container_registry.run_csv_transform_kub_nypd_mv_collisions }}", + image="{{ var.json.new_york.container_registry.run_csv_transform_kub }}", env_vars={ - "PIPELINE_NAME": "{{ var.json.new_york.nypd_mv_collisions.pipeline_name }}", - "SOURCE_URL": "{{ var.json.new_york.nypd_mv_collisions.source_url }}", - "CHUNKSIZE": "{{ var.json.new_york.nypd_mv_collisions.chunksize }}", - "SOURCE_FILE": "files/data_{{ var.json.new_york.nypd_mv_collisions.dataset_id }}_{{ var.json.new_york.nypd_mv_collisions.destination_table }}.csv", - "TARGET_FILE": "files/data_output_{{ var.json.new_york.nypd_mv_collisions.dataset_id }}_{{ var.json.new_york.nypd_mv_collisions.destination_table }}.csv", + "PIPELINE_NAME": "New York - NYPD Motor Vehicle Collisions", + "SOURCE_URL": "https://nycopendata.socrata.com/api/views/h9gi-nx95/rows.csv", + "CHUNKSIZE": "150000", + "SOURCE_FILE": "files/data_new_york_mv_collisions_nypd_mv_collisions.csv", + "TARGET_FILE": "files/data_output_new_york_mv_collisions_nypd_mv_collisions.csv", "PROJECT_ID": "{{ var.value.gcp_project }}", - "DATASET_ID": "{{ var.json.new_york.nypd_mv_collisions.dataset_id }}", - "TABLE_ID": "{{ var.json.new_york.nypd_mv_collisions.destination_table }}", + "DATASET_ID": "new_york_mv_collisions", + "TABLE_ID": "nypd_mv_collisions", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "{{ var.json.new_york.nypd_mv_collisions.target_path }}", - "SCHEMA_PATH": "{{ var.json.new_york.nypd_mv_collisions.schema_path }}", + "TARGET_GCS_PATH": "data/new_york/nypd_mv_collisions/data_output.csv", + "SCHEMA_PATH": "data/new_york/schema/nypd_mv_collisions_schema.json", "DATA_DTYPES": '{\n "CRASH DATE": "str",\n "CRASH TIME": "str",\n "BOROUGH": "str",\n "ZIP CODE": "str",\n "LATITUDE": "float64",\n "LONGITUDE": "float64",\n "LOCATION": "str",\n "ON STREET NAME": "str",\n "CROSS STREET NAME": "str",\n "OFF STREET NAME": "str",\n "NUMBER OF PERSONS INJURED": "str",\n "NUMBER OF PERSONS KILLED" : "str",\n "NUMBER OF PEDESTRIANS INJURED" : "str",\n "NUMBER OF PEDESTRIANS KILLED" : "str",\n "NUMBER OF CYCLIST INJURED" : "str",\n "NUMBER OF CYCLIST KILLED" : "str",\n "NUMBER OF MOTORIST INJURED" : "str",\n "NUMBER OF MOTORIST KILLED" : "str",\n "CONTRIBUTING FACTOR VEHICLE 1" : "str",\n "CONTRIBUTING FACTOR VEHICLE 2" : "str",\n "CONTRIBUTING FACTOR VEHICLE 3" : "str",\n "CONTRIBUTING FACTOR VEHICLE 4" : "str",\n "CONTRIBUTING FACTOR VEHICLE 5" : "str",\n "COLLISION_ID": "int64",\n "VEHICLE TYPE CODE 1" : "str",\n "VEHICLE TYPE CODE 2" : "str",\n "VEHICLE TYPE CODE 3" : "str",\n "VEHICLE TYPE CODE 4" : "str",\n "VEHICLE TYPE CODE 5": "str"\n}', "RESOLVE_DATATYPES_LIST": '{\n "latitude": "float64",\n "longitude": "float64",\n "number_of_cyclist_injured": "int64",\n "number_of_cyclist_killed": "int64",\n "number_of_motorist_injured": "int64",\n "number_of_motorist_killed": "int64",\n "number_of_pedestrians_injured": "int64",\n "number_of_pedestrians_killed": "int64",\n "number_of_persons_injured": "int64",\n "number_of_persons_killed": "int64"\n}', "TRANSFORM_LIST": '[ "replace_regex", "add_crash_timestamp", "convert_date_format", "rename_headers", "resolve_datatypes", "reorder_headers" ]', @@ -147,7 +147,7 @@ "RENAME_HEADERS_LIST": '{\n "BOROUGH": "borough",\n "CONTRIBUTING FACTOR VEHICLE 1": "contributing_factor_vehicle_1",\n "CONTRIBUTING FACTOR VEHICLE 2": "contributing_factor_vehicle_2",\n "CONTRIBUTING FACTOR VEHICLE 3": "contributing_factor_vehicle_3",\n "CONTRIBUTING FACTOR VEHICLE 4": "contributing_factor_vehicle_4",\n "CONTRIBUTING FACTOR VEHICLE 5": "contributing_factor_vehicle_5",\n "CROSS STREET NAME": "cross_street_name",\n "LATITUDE": "latitude",\n "LONGITUDE": "longitude",\n "LOCATION": "location",\n "NUMBER OF CYCLIST INJURED": "number_of_cyclist_injured",\n "NUMBER OF CYCLIST KILLED": "number_of_cyclist_killed",\n "NUMBER OF MOTORIST INJURED": "number_of_motorist_injured",\n "NUMBER OF MOTORIST KILLED": "number_of_motorist_killed",\n "NUMBER OF PEDESTRIANS INJURED": "number_of_pedestrians_injured",\n "NUMBER OF PEDESTRIANS KILLED": "number_of_pedestrians_killed",\n "NUMBER OF PERSONS INJURED": "number_of_persons_injured",\n "NUMBER OF PERSONS KILLED": "number_of_persons_killed",\n "OFF STREET NAME": "off_street_name",\n "ON STREET NAME": "on_street_name",\n "COLLISION_ID": "unique_key",\n "VEHICLE TYPE CODE 1": "vehicle_type_code1",\n "VEHICLE TYPE CODE 2": "vehicle_type_code2",\n "VEHICLE TYPE CODE 3": "vehicle_type_code_3",\n "VEHICLE TYPE CODE 4": "vehicle_type_code_4",\n "VEHICLE TYPE CODE 5": "vehicle_type_code_5",\n "ZIP CODE": "zip_code"\n}', "REORDER_HEADERS_LIST": '[\n "borough",\n "contributing_factor_vehicle_1",\n "contributing_factor_vehicle_2",\n "contributing_factor_vehicle_3",\n "contributing_factor_vehicle_4",\n "contributing_factor_vehicle_5",\n "cross_street_name",\n "timestamp",\n "latitude",\n "longitude",\n "location",\n "number_of_cyclist_injured",\n "number_of_cyclist_killed",\n "number_of_motorist_injured",\n "number_of_motorist_killed",\n "number_of_pedestrians_injured",\n "number_of_pedestrians_killed",\n "number_of_persons_injured",\n "number_of_persons_killed",\n "off_street_name",\n "on_street_name",\n "unique_key",\n "vehicle_type_code1",\n "vehicle_type_code2",\n "vehicle_type_code_3",\n "vehicle_type_code_4",\n "vehicle_type_code_5",\n "zip_code"\n]', }, - resources={"request_ephemeral_storage": "16G", "limit_cpu": "3"}, + resources={"request_ephemeral_storage": "10G", "limit_cpu": "2"}, ) # Run New York Tree Census 1995 Pipeline @@ -159,24 +159,24 @@ cluster_name="pubds-new-york", namespace="default", image_pull_policy="Always", - image="{{ var.json.new_york.container_registry.run_csv_transform_kub_tree_census_1995 }}", + image="{{ var.json.new_york.container_registry.run_csv_transform_kub }}", env_vars={ - "PIPELINE_NAME": "{{ var.json.new_york.tree_census_1995.pipeline_name }}", - "SOURCE_URL": "{{ var.json.new_york.tree_census_1995.source_url }}", - "CHUNKSIZE": "{{ var.json.new_york.tree_census_1995.chunksize }}", - "SOURCE_FILE": "files/data_{{ var.json.new_york.tree_census_1995.dataset_id }}_{{ var.json.new_york.tree_census_1995.destination_table }}.csv", - "TARGET_FILE": "files/data_output_{{ var.json.new_york.tree_census_1995.dataset_id }}_{{ var.json.new_york.tree_census_1995.destination_table }}.csv", + "PIPELINE_NAME": "New York - Tree Census 1995", + "SOURCE_URL": "https://data.cityofnewyork.us/api/views/kyad-zm4j/rows.csv", + "CHUNKSIZE": "750000", + "SOURCE_FILE": "files/data_new_york_trees_tree_census_1995.csv", + "TARGET_FILE": "files/data_output_new_york_trees_tree_census_1995.csv", "PROJECT_ID": "{{ var.value.gcp_project }}", - "DATASET_ID": "{{ var.json.new_york.tree_census_1995.dataset_id }}", - "TABLE_ID": "{{ var.json.new_york.tree_census_1995.destination_table }}", + "DATASET_ID": "new_york_trees", + "TABLE_ID": "tree_census_1995", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "{{ var.json.new_york.tree_census_1995.target_path }}", - "SCHEMA_PATH": "{{ var.json.new_york.tree_census_1995.schema_path }}", + "TARGET_GCS_PATH": "data/new_york/tree_census_1995/data_output.csv", + "SCHEMA_PATH": "data/new_york/schema/tree_census_1995_schema.json", "RENAME_HEADERS_LIST": '{\n "RecordId": "recordid",\n "Address": "address",\n "House_Number": "house_number",\n "Street": "street",\n "Postcode_Original": "zip_original",\n "Community Board_Original": "cb_original",\n "Site": "site",\n "Species": "species",\n "Diameter": "diameter",\n "Condition": "status",\n "Wires": "wires",\n "Sidewalk_Condition": "sidewalk_condition",\n "Support_Structure": "support_structure",\n "Borough": "borough",\n "X": "x",\n "Y": "y",\n "Longitude": "longitude",\n "Latitude": "latitude",\n "CB_New": "cb_new",\n "Zip_New": "zip_new",\n "CensusTract_2010": "censustract_2010",\n "CensusBlock_2010": "censusblock_2010",\n "NTA_2010": "nta_2010",\n "SegmentID": "segmentid",\n "Spc_Common": "spc_common",\n "Spc_Latin": "spc_latin",\n "Location": "location"\n}', "REMOVE_WHITESPACE_LIST": '[\n "spc_latin"\n]', "REORDER_HEADERS_LIST": '[\n "recordid",\n "address",\n "house_number",\n "street",\n "zip_original",\n "cb_original",\n "site",\n "species",\n "diameter",\n "status",\n "wires",\n "sidewalk_condition",\n "support_structure",\n "borough",\n "x",\n "y",\n "longitude",\n "latitude",\n "cb_new",\n "zip_new",\n "censustract_2010",\n "censusblock_2010",\n "nta_2010",\n "segmentid",\n "spc_common",\n "spc_latin",\n "location"\n]', }, - resources={"request_ephemeral_storage": "16G", "limit_cpu": "3"}, + resources={"request_ephemeral_storage": "10G", "limit_cpu": "2"}, ) delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( task_id="delete_cluster", @@ -191,7 +191,7 @@ transform_csv_nypd_mv_collisions, transform_csv_ny_citibike_stations, transform_csv_ny_tree_census_1995, - transform_csv_ny_311_service_requests, ] + >> transform_csv_ny_311_service_requests >> delete_cluster ) diff --git a/datasets/new_york/pipelines/new_york/pipeline.yaml b/datasets/new_york/pipelines/new_york/pipeline.yaml index b17ef3395..669c85685 100644 --- a/datasets/new_york/pipelines/new_york/pipeline.yaml +++ b/datasets/new_york/pipelines/new_york/pipeline.yaml @@ -40,12 +40,12 @@ dag: location: "us-central1-c" body: name: pubds-new-york - initial_node_count: 4 + initial_node_count: 2 network: "{{ var.value.vpc_network }}" ip_allocation_policy: cluster_ipv4_cidr_block: "/26" node_config: - machine_type: e2-standard-16 + machine_type: e2-standard-8 oauth_scopes: - https://www.googleapis.com/auth/devstorage.read_write - https://www.googleapis.com/auth/cloud-platform @@ -60,19 +60,19 @@ dag: cluster_name: pubds-new-york namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.new_york.container_registry.run_csv_transform_kub_ny_311_service_requests }}" + image: "{{ var.json.new_york.container_registry.run_csv_transform_kub }}" env_vars: - PIPELINE_NAME: "{{ var.json.new_york.ny_311_service_requests.pipeline_name }}" - SOURCE_URL: "{{ var.json.new_york.ny_311_service_requests.source_url }}" - CHUNKSIZE: "{{ var.json.new_york.ny_311_service_requests.chunksize }}" - SOURCE_FILE: "files/data_{{ var.json.new_york.ny_311_service_requests.dataset_id }}_{{ var.json.new_york.ny_311_service_requests.destination_table }}.csv" - TARGET_FILE: "files/data_output_{{ var.json.new_york.ny_311_service_requests.dataset_id }}_{{ var.json.new_york.ny_311_service_requests.destination_table }}.csv" + PIPELINE_NAME: "New York - 311 Service Requests" + SOURCE_URL: "https://data.cityofnewyork.us/api/views/erm2-nwe9/rows.csv" + CHUNKSIZE: "500000" + SOURCE_FILE: "files/data_new_york_311_311_service_requests.csv" + TARGET_FILE: "files/data_output_new_york_311_311_service_requests.csv" PROJECT_ID: "{{ var.value.gcp_project }}" - DATASET_ID: "{{ var.json.new_york.ny_311_service_requests.dataset_id }}" - TABLE_ID: "{{ var.json.new_york.ny_311_service_requests.destination_table }}" + DATASET_ID: "new_york_311" + TABLE_ID: "311_service_requests" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "{{ var.json.new_york.ny_311_service_requests.target_path }}" - SCHEMA_PATH: "{{ var.json.new_york.ny_311_service_requests.schema_path }}" + TARGET_GCS_PATH: "data/new_york/311_service_requests/data_output.csv" + SCHEMA_PATH: "data/new_york/schema/ny_311_service_requests_schema.json" DATA_DTYPES: >- { "Unique Key": "int", @@ -128,7 +128,7 @@ dag: [ "unique_key" ] - RENAME_HEADERS: >- + RENAME_HEADERS_LIST: >- { "Unique Key": "unique_key", "Created Date": "created_date", @@ -217,8 +217,8 @@ dag: "location" ] resources: - request_ephemeral_storage: "32G" - limit_cpu: "3" + request_ephemeral_storage: "10G" + limit_cpu: "2" - operator: "GKEStartPodOperator" description: "Run New York Citibike Stations Pipeline" @@ -230,20 +230,20 @@ dag: cluster_name: pubds-new-york namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.new_york.container_registry.run_csv_transform_kub_citibike_stations }}" + image: "{{ var.json.new_york.container_registry.run_csv_transform_kub }}" env_vars: - PIPELINE_NAME: "{{ var.json.new_york.citibike_stations.pipeline_name }}" - SOURCE_URL_STATIONS_JSON: "{{ var.json.new_york.citibike_stations.source_url_stations }}" - SOURCE_URL_STATUS_JSON: "{{ var.json.new_york.citibike_stations.source_url_status }}" - CHUNKSIZE: "{{ var.json.new_york.citibike_stations.chunksize }}" - SOURCE_FILE: "files/data_{{ var.json.new_york.citibike_stations.dataset_id }}_{{ var.json.new_york.citibike_stations.destination_table }}.csv" - TARGET_FILE: "files/data_output_{{ var.json.new_york.citibike_stations.dataset_id }}_{{ var.json.new_york.citibike_stations.destination_table }}.csv" + PIPELINE_NAME: "New York - Citibike Stations" + SOURCE_URL_STATIONS_JSON: "https://gbfs.citibikenyc.com/gbfs/en/station_information" + SOURCE_URL_STATUS_JSON: "https://gbfs.citibikenyc.com/gbfs/en/station_status" + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_new_york_citibike_citibike_stations.csv" + TARGET_FILE: "files/data_output_new_york_citibike_citibike_stations.csv" PROJECT_ID: "{{ var.value.gcp_project }}" - DATASET_ID: "{{ var.json.new_york.citibike_stations.dataset_id }}" - TABLE_ID: "{{ var.json.new_york.citibike_stations.destination_table }}" + DATASET_ID: "new_york_citibike" + TABLE_ID: "citibike_stations" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "{{ var.json.new_york.citibike_stations.target_path }}" - SCHEMA_PATH: "{{ var.json.new_york.citibike_stations.schema_path }}" + TARGET_GCS_PATH: "data/new_york/citibike_stations/data_output.csv" + SCHEMA_PATH: "data/new_york/schema/citibike_stations_schema.json" DATA_DTYPES: >- { "rental_uris": "str", @@ -358,8 +358,8 @@ dag: "last_reported" ] resources: - request_ephemeral_storage: "16G" - limit_cpu: "3" + request_ephemeral_storage: "10G" + limit_cpu: "2" - operator: "GKEStartPodOperator" description: "Run New York NYPD MV Collisions Pipeline" @@ -371,19 +371,19 @@ dag: cluster_name: pubds-new-york namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.new_york.container_registry.run_csv_transform_kub_nypd_mv_collisions }}" + image: "{{ var.json.new_york.container_registry.run_csv_transform_kub }}" env_vars: - PIPELINE_NAME: "{{ var.json.new_york.nypd_mv_collisions.pipeline_name }}" - SOURCE_URL: "{{ var.json.new_york.nypd_mv_collisions.source_url }}" - CHUNKSIZE: "{{ var.json.new_york.nypd_mv_collisions.chunksize }}" - SOURCE_FILE: "files/data_{{ var.json.new_york.nypd_mv_collisions.dataset_id }}_{{ var.json.new_york.nypd_mv_collisions.destination_table }}.csv" - TARGET_FILE: "files/data_output_{{ var.json.new_york.nypd_mv_collisions.dataset_id }}_{{ var.json.new_york.nypd_mv_collisions.destination_table }}.csv" + PIPELINE_NAME: "New York - NYPD Motor Vehicle Collisions" + SOURCE_URL: "https://nycopendata.socrata.com/api/views/h9gi-nx95/rows.csv" + CHUNKSIZE: "150000" + SOURCE_FILE: "files/data_new_york_mv_collisions_nypd_mv_collisions.csv" + TARGET_FILE: "files/data_output_new_york_mv_collisions_nypd_mv_collisions.csv" PROJECT_ID: "{{ var.value.gcp_project }}" - DATASET_ID: "{{ var.json.new_york.nypd_mv_collisions.dataset_id }}" - TABLE_ID: "{{ var.json.new_york.nypd_mv_collisions.destination_table }}" + DATASET_ID: "new_york_mv_collisions" + TABLE_ID: "nypd_mv_collisions" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "{{ var.json.new_york.nypd_mv_collisions.target_path }}" - SCHEMA_PATH: "{{ var.json.new_york.nypd_mv_collisions.schema_path }}" + TARGET_GCS_PATH: "data/new_york/nypd_mv_collisions/data_output.csv" + SCHEMA_PATH: "data/new_york/schema/nypd_mv_collisions_schema.json" DATA_DTYPES: >- { "CRASH DATE": "str", @@ -503,8 +503,8 @@ dag: "zip_code" ] resources: - request_ephemeral_storage: "16G" - limit_cpu: "3" + request_ephemeral_storage: "10G" + limit_cpu: "2" - operator: "GKEStartPodOperator" description: "Run New York Tree Census 1995 Pipeline" @@ -516,19 +516,19 @@ dag: cluster_name: pubds-new-york namespace: "default" image_pull_policy: "Always" - image: "{{ var.json.new_york.container_registry.run_csv_transform_kub_tree_census_1995 }}" + image: "{{ var.json.new_york.container_registry.run_csv_transform_kub }}" env_vars: - PIPELINE_NAME: "{{ var.json.new_york.tree_census_1995.pipeline_name }}" - SOURCE_URL: "{{ var.json.new_york.tree_census_1995.source_url }}" - CHUNKSIZE: "{{ var.json.new_york.tree_census_1995.chunksize }}" - SOURCE_FILE: "files/data_{{ var.json.new_york.tree_census_1995.dataset_id }}_{{ var.json.new_york.tree_census_1995.destination_table }}.csv" - TARGET_FILE: "files/data_output_{{ var.json.new_york.tree_census_1995.dataset_id }}_{{ var.json.new_york.tree_census_1995.destination_table }}.csv" + PIPELINE_NAME: "New York - Tree Census 1995" + SOURCE_URL: "https://data.cityofnewyork.us/api/views/kyad-zm4j/rows.csv" + CHUNKSIZE: "750000" + SOURCE_FILE: "files/data_new_york_trees_tree_census_1995.csv" + TARGET_FILE: "files/data_output_new_york_trees_tree_census_1995.csv" PROJECT_ID: "{{ var.value.gcp_project }}" - DATASET_ID: "{{ var.json.new_york.tree_census_1995.dataset_id }}" - TABLE_ID: "{{ var.json.new_york.tree_census_1995.destination_table }}" + DATASET_ID: "new_york_trees" + TABLE_ID: "tree_census_1995" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "{{ var.json.new_york.tree_census_1995.target_path }}" - SCHEMA_PATH: "{{ var.json.new_york.tree_census_1995.schema_path }}" + TARGET_GCS_PATH: "data/new_york/tree_census_1995/data_output.csv" + SCHEMA_PATH: "data/new_york/schema/tree_census_1995_schema.json" RENAME_HEADERS_LIST: >- { "RecordId": "recordid", @@ -594,8 +594,8 @@ dag: "location" ] resources: - request_ephemeral_storage: "16G" - limit_cpu: "3" + request_ephemeral_storage: "10G" + limit_cpu: "2" - operator: "GKEDeleteClusterOperator" args: @@ -605,4 +605,4 @@ dag: name: pubds-new-york graph_paths: - - "create_cluster >> [transform_csv_nypd_mv_collisions, transform_csv_ny_citibike_stations, transform_csv_ny_tree_census_1995, transform_csv_ny_311_service_requests ] >> delete_cluster" + - "create_cluster >> [transform_csv_nypd_mv_collisions, transform_csv_ny_citibike_stations, transform_csv_ny_tree_census_1995 ] >> transform_csv_ny_311_service_requests >> delete_cluster"