From c7cd9dd0fd715f04d14f9d183d3b0c7facfcdd14 Mon Sep 17 00:00:00 2001 From: Nicholas Large <84149918+nlarge-google@users.noreply.github.com> Date: Fri, 10 Jun 2022 12:00:28 -0500 Subject: [PATCH] feat: Onboard City Health Dashboard dataset (#374) --- .../run_csv_transform_kub/csv_transform.py | 482 ++++++++++++++++-- .../run_csv_transform_kub/requirements.txt | 3 +- .../chdb_data_city_all_dag.py | 97 ---- .../chdb_data_city_all/pipeline.yaml | 135 ----- .../chdb_data_tract_all_dag.py | 98 ---- .../chdb_data_tract_all/pipeline.yaml | 138 ----- .../city_health_dashboard_dag.py | 107 ++++ .../data/chdb_data_city_schema.json | 107 ++++ .../data/chdb_data_tract_schema.json | 112 ++++ .../city_health_dashboard/pipeline.yaml | 222 ++++++++ .../city_health_dashboard_full_load_dag.py | 107 ++++ .../data/chdb_data_city_schema.json | 107 ++++ .../data/chdb_data_tract_schema.json | 112 ++++ .../pipeline.yaml | 222 ++++++++ 14 files changed, 1537 insertions(+), 512 deletions(-) delete mode 100644 datasets/city_health_dashboard/pipelines/chdb_data_city_all/chdb_data_city_all_dag.py delete mode 100644 datasets/city_health_dashboard/pipelines/chdb_data_city_all/pipeline.yaml delete mode 100644 datasets/city_health_dashboard/pipelines/chdb_data_tract_all/chdb_data_tract_all_dag.py delete mode 100644 datasets/city_health_dashboard/pipelines/chdb_data_tract_all/pipeline.yaml create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard/city_health_dashboard_dag.py create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_city_schema.json create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_tract_schema.json create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard/pipeline.yaml create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/city_health_dashboard_full_load_dag.py create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_city_schema.json create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_tract_schema.json create mode 100644 datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/pipeline.yaml diff --git a/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/csv_transform.py index e2e8779ec..328e39104 100644 --- a/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -13,6 +13,7 @@ # limitations under the License. import datetime +import fnmatch import json import logging import os @@ -22,61 +23,433 @@ import pandas as pd import requests -from google.cloud import storage +from google.api_core.exceptions import NotFound +from google.cloud import bigquery, storage def main( source_url: str, source_file: pathlib.Path, target_file: pathlib.Path, + chunksize: str, + project_id: str, + dataset_id: str, + table_id: str, target_gcs_bucket: str, target_gcs_path: str, - headers: typing.List[str], - rename_mappings: dict, + schema_path: str, + drop_dest_table: str, + truncate_table: str, + input_field_delimiter: str, + remove_source_file: str, + delete_target_file: str, + input_csv_headers: typing.List[str], + data_dtypes: dict, + rename_headers_list: dict, + output_csv_headers: typing.List[str], + table_description: str, pipeline_name: str, - file_name: str, + file_name_prefix: str, ) -> None: - - logging.info("Creating 'files' folder") + logging.info(f"{pipeline_name} process started") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + execute_pipeline( + source_url=source_url, + source_file=source_file, + target_file=target_file, + chunksize=chunksize, + project_id=project_id, + dataset_id=dataset_id, + destination_table=table_id, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + schema_path=schema_path, + drop_dest_table=drop_dest_table, + truncate_table=truncate_table, + input_field_delimiter=input_field_delimiter, + remove_source_file=remove_source_file, + delete_target_file=delete_target_file, + input_csv_headers=input_csv_headers, + data_dtypes=data_dtypes, + rename_headers_list=rename_headers_list, + output_csv_headers=output_csv_headers, + table_description=table_description, + file_name_prefix=file_name_prefix, + ) + logging.info(f"{pipeline_name} process completed") + - logging.info(f"Downloading file {source_url}") +def execute_pipeline( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + project_id: str, + dataset_id: str, + destination_table: str, + target_gcs_bucket: str, + target_gcs_path: str, + schema_path: str, + drop_dest_table: str, + truncate_table: bool, + input_field_delimiter: str, + remove_source_file: str, + delete_target_file: str, + input_csv_headers: typing.List[str], + data_dtypes: dict, + output_csv_headers: typing.List[str], + rename_headers_list: dict, + table_description: str, + file_name_prefix: str, +) -> None: download_file(source_url, source_file) + dest_path = os.path.split(source_file)[0] + unpack_file(infile=source_file, dest_path=dest_path) + datafile = find_file_in_path(dest_path, f"*{file_name_prefix}*.csv")[0] + process_source_file( + source_url=source_url, + source_file=datafile, + target_file=target_file, + chunksize=chunksize, + input_csv_headers=input_csv_headers, + data_dtypes=data_dtypes, + output_csv_headers=output_csv_headers, + rename_headers_list=rename_headers_list, + header_row_ordinal="0", + field_separator=input_field_delimiter, + ) + if os.path.exists(target_file): + upload_file_to_gcs( + file_path=target_file, + target_gcs_bucket=target_gcs_bucket, + target_gcs_path=target_gcs_path, + ) + table_exists = create_dest_table( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + schema_filepath=schema_path, + bucket_name=target_gcs_bucket, + drop_table=(drop_dest_table == "Y"), + table_description=table_description, + ) + if table_exists: + load_data_to_bq( + project_id=project_id, + dataset_id=dataset_id, + table_id=destination_table, + file_path=target_file, + truncate_table=truncate_table, + field_delimiter="|", + ) + if remove_source_file == "Y": + os.remove(datafile) + else: + pass + if delete_target_file == "Y": + os.remove(target_file) + else: + pass + else: + error_msg = f"Error: Data was not loaded because the destination table {project_id}.{dataset_id}.{destination_table} does not exist and/or could not be created." + raise ValueError(error_msg) + else: + logging.info( + f"Informational: The data file {target_file} was not generated because no data file was available. Continuing." + ) - logging.info(f"Opening file {source_file}") - with ZipFile(source_file) as myzip: - data = myzip.open(file_name) - df = pd.read_csv(data) - logging.info(f"Transformation Process Starting.. {source_file}") - rename_headers(df, rename_mappings) - df = df[headers] +def process_source_file( + source_url: str, + source_file: str, + target_file: str, + chunksize: str, + input_csv_headers: typing.List[str], + data_dtypes: dict, + output_csv_headers: typing.List[str], + rename_headers_list: typing.List[str], + header_row_ordinal: str = "0", + field_separator: str = ",", +) -> None: + logging.info(f"Opening source file {source_file}") + if header_row_ordinal is None or header_row_ordinal == "None": + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), # size of batch data, in no. of records + sep=field_separator, # data column separator, typically "," + names=input_csv_headers, + dtype=data_dtypes, + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + source_url=source_url, + target_file_batch=target_file_batch, + target_file=target_file, + skip_header=(not chunk_number == 0), + rename_headers_list=rename_headers_list, + output_csv_headers_list=output_csv_headers, + ) + else: + header = int(header_row_ordinal) + if data_dtypes != "[]": + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), # size of batch data, in no. of records + sep=field_separator, # data column separator, typically "," + header=header, # use when the data file does not contain a header + dtype=data_dtypes, + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + source_url=source_url, + target_file_batch=target_file_batch, + target_file=target_file, + skip_header=(not chunk_number == 0), + rename_headers_list=rename_headers_list, + output_csv_headers_list=output_csv_headers, + ) + else: + with pd.read_csv( + source_file, + engine="python", + encoding="utf-8", + quotechar='"', + chunksize=int(chunksize), # size of batch data, in no. of records + sep=field_separator, # data column separator, typically "," + header=header, # use when the data file does not contain a header + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + source_url=source_url, + target_file_batch=target_file_batch, + target_file=target_file, + skip_header=(not chunk_number == 0), + rename_headers_list=rename_headers_list, + output_csv_headers_list=output_csv_headers, + ) - logging.info(f"Transformation Process complete .. {source_file}") - logging.info(f"Saving to output file.. {target_file}") - try: - save_to_new_file(df, file_path=str(target_file)) - except Exception as e: - logging.error(f"Error saving output file: {e}.") +def process_chunk( + df: pd.DataFrame, + source_url: str, + target_file_batch: str, + target_file: str, + skip_header: bool, + rename_headers_list: dict, + output_csv_headers_list: typing.List[str], +) -> None: + logging.info(f"Processing batch file {target_file_batch}") + df = rename_headers(df, rename_headers_list) + df = add_metadata_cols(df, source_url) + df = df[output_csv_headers_list] + save_to_new_file(df, file_path=str(target_file_batch), sep="|") + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + logging.info(f"Processing batch file {target_file_batch} completed") + +def load_data_to_bq( + project_id: str, + dataset_id: str, + table_id: str, + file_path: str, + truncate_table: bool, + field_delimiter: str = "|", +) -> None: logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} started" ) - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - + client = bigquery.Client(project=project_id) + table_ref = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.field_delimiter = field_delimiter + if truncate_table: + job_config.write_disposition = "WRITE_TRUNCATE" + else: + job_config.write_disposition = "WRITE_APPEND" + job_config.skip_leading_rows = 1 # ignore the header + job_config.autodetect = False + with open(file_path, "rb") as source_file: + job = client.load_table_from_file(source_file, table_ref, job_config=job_config) + job.result() logging.info( - f"City Health Dashboard {pipeline_name} process completed at " - + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + f"Loading data from {file_path} into {project_id}.{dataset_id}.{table_id} completed" ) -def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: - df.rename(columns=rename_mappings, inplace=True) +def create_dest_table( + project_id: str, + dataset_id: str, + table_id: str, + schema_filepath: list, + bucket_name: str, + drop_table: bool = False, + table_description="", +) -> bool: + table_ref = f"{project_id}.{dataset_id}.{table_id}" + logging.info(f"Attempting to create table {table_ref} if it doesn't already exist") + client = bigquery.Client() + table_exists = False + try: + table = client.get_table(table_ref) + table_exists_id = table.table_id + logging.info(f"Table {table_exists_id} currently exists.") + if drop_table: + logging.info("Dropping existing table") + client.delete_table(table) + table = None + except NotFound: + table = None + if not table: + logging.info( + ( + f"Table {table_ref} currently does not exist. Attempting to create table." + ) + ) + if check_gcs_file_exists(schema_filepath, bucket_name): + schema = create_table_schema([], bucket_name, schema_filepath) + table = bigquery.Table(table_ref, schema=schema) + table.description = table_description + client.create_table(table) + print(f"Table {table_ref} was created".format(table_id)) + table_exists = True + else: + file_name = os.path.split(schema_filepath)[1] + file_path = os.path.split(schema_filepath)[0] + logging.info( + f"Error: Unable to create table {table_ref} because schema file {file_name} does not exist in location {file_path} in bucket {bucket_name}" + ) + table_exists = False + else: + table_exists = True + return table_exists -def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: - df.to_csv(file_path, float_format="%.0f", index=False) +def check_gcs_file_exists(file_path: str, bucket_name: str) -> bool: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + exists = storage.Blob(bucket=bucket, name=file_path).exists(storage_client) + return exists + + +def create_table_schema( + schema_structure: list, bucket_name: str = "", schema_filepath: str = "" +) -> list: + logging.info(f"Defining table schema... {bucket_name} ... {schema_filepath}") + schema = [] + if not (schema_filepath): + schema_struct = schema_structure + else: + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + blob = bucket.blob(schema_filepath) + schema_struct = json.loads(blob.download_as_string(client=None)) + for schema_field in schema_struct: + fld_name = schema_field["name"] + fld_type = schema_field["type"] + try: + fld_descr = schema_field["description"] + except KeyError: + fld_descr = "" + fld_mode = schema_field["mode"] + schema.append( + bigquery.SchemaField( + name=fld_name, field_type=fld_type, mode=fld_mode, description=fld_descr + ) + ) + return schema + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> pd.DataFrame: + logging.info("Renaming Headers") + return df.rename(columns=rename_mappings) + + +def add_metadata_cols(df: pd.DataFrame, source_url: str) -> pd.DataFrame: + logging.info("Adding metadata columns") + df["source_url"] = source_url + df["etl_timestamp"] = pd.to_datetime( + datetime.datetime.now(), format="%Y-%m-%d %H:%M:%S", infer_datetime_format=True + ) + return df + + +def save_to_new_file(df: pd.DataFrame, file_path: str, sep: str = "|") -> None: + logging.info(f"Saving data to target file.. {file_path} ...") + df.to_csv(file_path, index=False, sep=sep) + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + with open(batch_file_path, "r") as data_file: + if truncate_file: + target_file = open(target_file_path, "w+").close() + with open(target_file_path, "a+") as target_file: + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path}" + ) + target_file.write(data_file.read()) + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def find_file_in_path(root_path: str, pattern: str = "*") -> typing.List[str]: + logging.info(f"Searching for files ({pattern}) in {root_path}") + result = [] + for root, dirs, files in os.walk(root_path): + for name in files: + if fnmatch.fnmatch(name, pattern): + result.append(os.path.join(root, name)) + else: + pass + return result + + +def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None: + if os.path.exists(infile): + logging.info(f"Unpacking {infile} to {dest_path}") + with ZipFile(infile, mode="r") as zipf: + zipf.extractall(dest_path) + zipf.close() + else: + logging.info(f"{infile} not unpacked because it does not exist.") def download_file(source_url: str, source_file: pathlib.Path) -> None: @@ -90,24 +463,47 @@ def download_file(source_url: str, source_file: pathlib.Path) -> None: logging.error(f"Couldn't download {source_url}: {r.text}") -def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: - storage_client = storage.Client() - bucket = storage_client.bucket(gcs_bucket) - blob = bucket.blob(gcs_path) - blob.upload_from_filename(file_path) +def upload_file_to_gcs( + file_path: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str +) -> None: + if os.path.exists(file_path): + logging.info( + f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}" + ) + storage_client = storage.Client() + bucket = storage_client.bucket(target_gcs_bucket) + blob = bucket.blob(target_gcs_path) + blob.upload_from_filename(file_path) + else: + logging.info( + f"Cannot upload file to gs://{target_gcs_bucket}/{target_gcs_path} as it does not exist." + ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) main( - source_url=os.environ["SOURCE_URL"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - headers=json.loads(os.environ["CSV_HEADERS"]), - rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), - pipeline_name=os.environ["PIPELINE_NAME"], - file_name=os.environ["FILE_NAME"], + source_url=os.environ.get("SOURCE_URL", ""), + source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(), + target_file=pathlib.Path(os.environ.get("TARGET_FILE", "")).expanduser(), + chunksize=os.environ.get("CHUNKSIZE", "1500000"), + project_id=os.environ.get("PROJECT_ID", ""), + dataset_id=os.environ.get("DATASET_ID", ""), + table_id=os.environ.get("TABLE_ID", ""), + target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""), + target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""), + schema_path=os.environ.get("SCHEMA_PATH", ""), + drop_dest_table=os.environ.get("DROP_DEST_TABLE", "N"), + truncate_table=os.environ.get("TRUNCATE_TABLE", "N") == "Y", + input_field_delimiter=os.environ.get("INPUT_FIELD_DELIMITER", ","), + remove_source_file=os.environ.get("REMOVE_SOURCE_FILE", "N"), + delete_target_file=os.environ.get("DELETE_TARGET_FILE", "N"), + input_csv_headers=json.loads(os.environ.get("INPUT_CSV_HEADERS", r"[]")), + data_dtypes=json.loads(os.environ.get("DATA_DTYPES", r"{}")), + rename_headers_list=json.loads(os.environ.get("RENAME_HEADERS_LIST", r"{}")), + output_csv_headers=json.loads(os.environ.get("OUTPUT_CSV_HEADERS", r"[]")), + table_description=os.environ.get("TABLE_DESCRIPTION", ""), + pipeline_name=os.environ.get("PIPELINE_NAME", ""), + file_name_prefix=os.environ.get("FILE_NAME_PREFIX", ""), ) diff --git a/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/requirements.txt b/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/requirements.txt index 1c45cdfc3..fa116c33a 100644 --- a/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/requirements.txt +++ b/datasets/city_health_dashboard/pipelines/_images/run_csv_transform_kub/requirements.txt @@ -1,3 +1,4 @@ -requests +google-cloud-bigquery google-cloud-storage pandas +requests diff --git a/datasets/city_health_dashboard/pipelines/chdb_data_city_all/chdb_data_city_all_dag.py b/datasets/city_health_dashboard/pipelines/chdb_data_city_all/chdb_data_city_all_dag.py deleted file mode 100644 index b0c944755..000000000 --- a/datasets/city_health_dashboard/pipelines/chdb_data_city_all/chdb_data_city_all_dag.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod -from airflow.providers.google.cloud.transfers import gcs_to_bigquery - -default_args = { - "owner": "Google", - "depends_on_past": False, - "start_date": "2021-03-01", -} - - -with DAG( - dag_id="city_health_dashboard.chdb_data_city_all", - default_args=default_args, - max_active_runs=1, - schedule_interval="@daily", - catchup=False, - default_view="graph", -) as dag: - - # Run CSV transform within kubernetes pod - data_city_transform_csv = kubernetes_pod.KubernetesPodOperator( - task_id="data_city_transform_csv", - startup_timeout_seconds=600, - name="city_health_dashboard_chdb_data_city_all", - namespace="composer", - service_account_name="datasets", - image_pull_policy="Always", - image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}", - env_vars={ - "SOURCE_URL": "https://www.cityhealthdashboard.com/drupal/media/23/download", - "SOURCE_FILE": "files/data.zip", - "TARGET_FILE": "files/data_output.csv", - "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "data/city_health_dashboard/chdb_data_city_all/data_output.csv", - "CSV_HEADERS": '["state_abbr","state_fips","place_fips","stpl_fips","city_name","metric_name","group_name","metric_number","group_number","num","denom","est","lci","uci","county_indicator","multiplier_indicator","data_yr_type","geo_level","date_export"]', - "RENAME_MAPPINGS": '{"state_abbr": "state_abbr","state_fips": "state_fips","place_fips": "place_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","group_name": "group_name","metric_number": "metric_number","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","county_indicator": "county_indicator","multiplier_indicator": "multiplier_indicator","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"}', - "PIPELINE_NAME": "chdb_data_city_all", - "FILE_NAME": "CHDB_data_city_all_v13.1.csv", - }, - resources={ - "limit_memory": "2G", - "limit_cpu": "1", - "request_ephemeral_storage": "8G", - }, - ) - - # Task to load CSV data to a BigQuery table - load_data_city_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( - task_id="load_data_city_to_bq", - bucket="{{ var.value.composer_bucket }}", - source_objects=[ - "data/city_health_dashboard/chdb_data_city_all/data_output.csv" - ], - source_format="CSV", - destination_project_dataset_table="city_health_dashboard.chdb_data_city_all", - skip_leading_rows=1, - write_disposition="WRITE_TRUNCATE", - schema_fields=[ - {"name": "state_abbr", "type": "STRING", "mode": "NULLABLE"}, - {"name": "state_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "place_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "stpl_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "city_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "metric_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "group_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "group_number", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "metric_number", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "num", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "denom", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "est", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "lci", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "uci", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "county_indicator", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "multiplier_indicator", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "data_yr_type", "type": "STRING", "mode": "NULLABLE"}, - {"name": "geo_level", "type": "STRING", "mode": "NULLABLE"}, - {"name": "date_export", "type": "DATE", "mode": "NULLABLE"}, - ], - ) - - data_city_transform_csv >> load_data_city_to_bq diff --git a/datasets/city_health_dashboard/pipelines/chdb_data_city_all/pipeline.yaml b/datasets/city_health_dashboard/pipelines/chdb_data_city_all/pipeline.yaml deleted file mode 100644 index 03ed01a40..000000000 --- a/datasets/city_health_dashboard/pipelines/chdb_data_city_all/pipeline.yaml +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - ---- -resources: - - - type: bigquery_table - table_id: chdb_data_city_all - description: "City Health Dashboard Data Tract" - -dag: - airflow_version: 2 - initialize: - dag_id: chdb_data_city_all - default_args: - owner: "Google" - depends_on_past: False - start_date: '2021-03-01' - max_active_runs: 1 - schedule_interval: "@daily" - catchup: False - default_view: graph - - tasks: - - operator: "KubernetesPodOperator" - description: "Run CSV transform within kubernetes pod" - args: - task_id: "data_city_transform_csv" - startup_timeout_seconds: 600 - name: "city_health_dashboard_chdb_data_city_all" - namespace: "composer" - service_account_name: "datasets" - image_pull_policy: "Always" - image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}" - - env_vars: - SOURCE_URL: "https://www.cityhealthdashboard.com/drupal/media/23/download" - SOURCE_FILE: "files/data.zip" - TARGET_FILE: "files/data_output.csv" - TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "data/city_health_dashboard/chdb_data_city_all/data_output.csv" - CSV_HEADERS: >- - ["state_abbr","state_fips","place_fips","stpl_fips","city_name","metric_name","group_name","metric_number","group_number","num","denom","est","lci","uci","county_indicator","multiplier_indicator","data_yr_type","geo_level","date_export"] - RENAME_MAPPINGS: >- - {"state_abbr": "state_abbr","state_fips": "state_fips","place_fips": "place_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","group_name": "group_name","metric_number": "metric_number","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","county_indicator": "county_indicator","multiplier_indicator": "multiplier_indicator","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"} - PIPELINE_NAME: "chdb_data_city_all" - FILE_NAME: "CHDB_data_city_all_v13.1.csv" - resources: - limit_memory: "2G" - limit_cpu: "1" - request_ephemeral_storage: "8G" - - - operator: "GoogleCloudStorageToBigQueryOperator" - description: "Task to load CSV data to a BigQuery table" - args: - task_id: "load_data_city_to_bq" - bucket: "{{ var.value.composer_bucket }}" - source_objects: ["data/city_health_dashboard/chdb_data_city_all/data_output.csv"] - source_format: "CSV" - destination_project_dataset_table: "city_health_dashboard.chdb_data_city_all" - skip_leading_rows: 1 - write_disposition: "WRITE_TRUNCATE" - - schema_fields: - - name: "state_abbr" - type: "STRING" - mode: "NULLABLE" - - name: "state_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "place_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "stpl_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "city_name" - type: "STRING" - mode: "NULLABLE" - - name: "metric_name" - type: "STRING" - mode: "NULLABLE" - - name: "group_name" - type: "STRING" - mode: "NULLABLE" - - name: "group_number" - type: "INTEGER" - mode: "NULLABLE" - - name: "metric_number" - type: "INTEGER" - mode: "NULLABLE" - - name: "num" - type: "FLOAT" - mode: "NULLABLE" - - name: "denom" - type: "FLOAT" - mode: "NULLABLE" - - name: "est" - type: "FLOAT" - mode: "NULLABLE" - - name: "lci" - type: "FLOAT" - mode: "NULLABLE" - - name: "uci" - type: "FLOAT" - mode: "NULLABLE" - - name: "county_indicator" - type: "FLOAT" - mode: "NULLABLE" - - name: "multiplier_indicator" - type: "FLOAT" - mode: "NULLABLE" - - name: "data_yr_type" - type: "STRING" - mode: "NULLABLE" - - name: "geo_level" - type: "STRING" - mode: "NULLABLE" - - name: "date_export" - type: "DATE" - mode: "NULLABLE" - - graph_paths: - - "data_city_transform_csv >> load_data_city_to_bq" diff --git a/datasets/city_health_dashboard/pipelines/chdb_data_tract_all/chdb_data_tract_all_dag.py b/datasets/city_health_dashboard/pipelines/chdb_data_tract_all/chdb_data_tract_all_dag.py deleted file mode 100644 index 40c3e86b8..000000000 --- a/datasets/city_health_dashboard/pipelines/chdb_data_tract_all/chdb_data_tract_all_dag.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod -from airflow.providers.google.cloud.transfers import gcs_to_bigquery - -default_args = { - "owner": "Google", - "depends_on_past": False, - "start_date": "2021-03-01", -} - - -with DAG( - dag_id="city_health_dashboard.chdb_data_tract_all", - default_args=default_args, - max_active_runs=1, - schedule_interval="@daily", - catchup=False, - default_view="graph", -) as dag: - - # Run CSV transform within kubernetes pod - data_tract_transform_csv = kubernetes_pod.KubernetesPodOperator( - task_id="data_tract_transform_csv", - startup_timeout_seconds=600, - name="city_health_dashboard_chdb_data_tract_all", - namespace="composer", - service_account_name="datasets", - image_pull_policy="Always", - image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}", - env_vars={ - "SOURCE_URL": "https://www.cityhealthdashboard.com/drupal/media/23/download", - "SOURCE_FILE": "files/data.zip", - "TARGET_FILE": "files/data_output.csv", - "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "data/city_health_dashboard/chdb_data_tract_all/data_output.csv", - "CSV_HEADERS": '["state_abbr","state_fips","county_fips","county_name","tract_code","stcotr_fips","stpl_fips","city_name","metric_name","metric_number","group_name","group_number","num","denom","est","lci","uci","data_yr_type","geo_level","date_export"]', - "RENAME_MAPPINGS": '{"state_abbr": "state_abbr","state_fips": "state_fips","county_fips": "county_fips","county_name": "county_name","tract_code": "tract_code","stcotr_fips": "stcotr_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","metric_number": "metric_number","group_name": "group_name","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"}', - "PIPELINE_NAME": "chdb_data_tract_all", - "FILE_NAME": "CHDB_data_tract_all_v13.1.csv", - }, - resources={ - "limit_memory": "2G", - "limit_cpu": "1", - "request_ephemeral_storage": "8G", - }, - ) - - # Task to load CSV data to a BigQuery table - load_data_tract_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( - task_id="load_data_tract_to_bq", - bucket="{{ var.value.composer_bucket }}", - source_objects=[ - "data/city_health_dashboard/chdb_data_tract_all/data_output.csv" - ], - source_format="CSV", - destination_project_dataset_table="city_health_dashboard.chdb_data_tract_all", - skip_leading_rows=1, - write_disposition="WRITE_TRUNCATE", - schema_fields=[ - {"name": "state_abbr", "type": "STRING", "mode": "NULLABLE"}, - {"name": "state_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "county_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "county_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "tract_code", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "stcotr_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "stpl_fips", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "city_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "metric_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "metric_number", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "group_name", "type": "STRING", "mode": "NULLABLE"}, - {"name": "group_number", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "num", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "denom", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "est", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "lci", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "uci", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "data_yr_type", "type": "STRING", "mode": "NULLABLE"}, - {"name": "geo_level", "type": "STRING", "mode": "NULLABLE"}, - {"name": "date_export", "type": "DATE", "mode": "NULLABLE"}, - ], - ) - - data_tract_transform_csv >> load_data_tract_to_bq diff --git a/datasets/city_health_dashboard/pipelines/chdb_data_tract_all/pipeline.yaml b/datasets/city_health_dashboard/pipelines/chdb_data_tract_all/pipeline.yaml deleted file mode 100644 index 2c8ac3805..000000000 --- a/datasets/city_health_dashboard/pipelines/chdb_data_tract_all/pipeline.yaml +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - ---- -resources: - - - type: bigquery_table - table_id: chdb_data_tract_all - description: "City Health Dashboard Data Tract" - -dag: - airflow_version: 2 - initialize: - dag_id: chdb_data_tract_all - default_args: - owner: "Google" - depends_on_past: False - start_date: '2021-03-01' - max_active_runs: 1 - schedule_interval: "@daily" - catchup: False - default_view: graph - - tasks: - - operator: "KubernetesPodOperator" - description: "Run CSV transform within kubernetes pod" - args: - task_id: "data_tract_transform_csv" - startup_timeout_seconds: 600 - name: "city_health_dashboard_chdb_data_tract_all" - namespace: "composer" - service_account_name: "datasets" - image_pull_policy: "Always" - image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}" - - env_vars: - SOURCE_URL: "https://www.cityhealthdashboard.com/drupal/media/23/download" - SOURCE_FILE: "files/data.zip" - TARGET_FILE: "files/data_output.csv" - TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "data/city_health_dashboard/chdb_data_tract_all/data_output.csv" - CSV_HEADERS: >- - ["state_abbr","state_fips","county_fips","county_name","tract_code","stcotr_fips","stpl_fips","city_name","metric_name","metric_number","group_name","group_number","num","denom","est","lci","uci","data_yr_type","geo_level","date_export"] - RENAME_MAPPINGS: >- - {"state_abbr": "state_abbr","state_fips": "state_fips","county_fips": "county_fips","county_name": "county_name","tract_code": "tract_code","stcotr_fips": "stcotr_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","metric_number": "metric_number","group_name": "group_name","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"} - PIPELINE_NAME: "chdb_data_tract_all" - FILE_NAME: "CHDB_data_tract_all_v13.1.csv" - resources: - limit_memory: "2G" - limit_cpu: "1" - request_ephemeral_storage: "8G" - - - operator: "GoogleCloudStorageToBigQueryOperator" - description: "Task to load CSV data to a BigQuery table" - args: - task_id: "load_data_tract_to_bq" - bucket: "{{ var.value.composer_bucket }}" - source_objects: ["data/city_health_dashboard/chdb_data_tract_all/data_output.csv"] - source_format: "CSV" - destination_project_dataset_table: "city_health_dashboard.chdb_data_tract_all" - skip_leading_rows: 1 - write_disposition: "WRITE_TRUNCATE" - - schema_fields: - - name: "state_abbr" - type: "STRING" - mode: "NULLABLE" - - name: "state_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "county_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "county_name" - type: "STRING" - mode: "NULLABLE" - - name: "tract_code" - type: "INTEGER" - mode: "NULLABLE" - - name: "stcotr_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "stpl_fips" - type: "INTEGER" - mode: "NULLABLE" - - name: "city_name" - type: "STRING" - mode: "NULLABLE" - - name: "metric_name" - type: "STRING" - mode: "NULLABLE" - - name: "metric_number" - type: "INTEGER" - mode: "NULLABLE" - - name: "group_name" - type: "STRING" - mode: "NULLABLE" - - name: "group_number" - type: "INTEGER" - mode: "NULLABLE" - - name: "num" - type: "FLOAT" - mode: "NULLABLE" - - name: "denom" - type: "FLOAT" - mode: "NULLABLE" - - name: "est" - type: "FLOAT" - mode: "NULLABLE" - - name: "lci" - type: "FLOAT" - mode: "NULLABLE" - - name: "uci" - type: "FLOAT" - mode: "NULLABLE" - - name: "data_yr_type" - type: "STRING" - mode: "NULLABLE" - - name: "geo_level" - type: "STRING" - mode: "NULLABLE" - - name: "date_export" - type: "DATE" - mode: "NULLABLE" - - graph_paths: - - "data_tract_transform_csv >> load_data_tract_to_bq" diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard/city_health_dashboard_dag.py b/datasets/city_health_dashboard/pipelines/city_health_dashboard/city_health_dashboard_dag.py new file mode 100644 index 000000000..3dc9bfb3c --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard/city_health_dashboard_dag.py @@ -0,0 +1,107 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="city_health_dashboard.city_health_dashboard", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + chdb_data_city = kubernetes_pod.KubernetesPodOperator( + task_id="chdb_data_city", + startup_timeout_seconds=600, + name="city_health_dashboard_chdb_data_city_all", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cityhealthdashboard.com/drupal/media/23/download", + "SOURCE_FILE": "files/chdb_data_city_data.zip", + "TARGET_FILE": "files/chdb_data_city_data_output.csv", + "CHUNKSIZE": "500000", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "city_health_dashboard", + "TABLE_ID": "chdb_data_city_all", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/city_health_dashboard/chdb_data_city_all/data_output.csv", + "SCHEMA_PATH": "data/city_health_dashboard/schema/chdb_data_city_schema.json", + "DROP_DEST_TABLE": "N", + "TRUNCATE_TABLE": "Y", + "INPUT_FIELD_DELIMITER": ",", + "REMOVE_SOURCE_FILE": "Y", + "DELETE_TARGET_FILE": "Y", + "INPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name",\n "metric_name", "group_name", "metric_number", "group_number", "num",\n "denom", "est", "lci", "uci", "county_indicator",\n "multiplier_indicator", "data_yr_type", "geo_level", "date_export"\n]', + "DATA_DTYPES": '{\n "state_abbr": "str",\n "state_fips": "str",\n "place_fips": "str",\n "stpl_fips": "str",\n "city_name": "str",\n "metric_name": "str",\n "group_name": "str",\n "metric_number": "str",\n "group_number": "str",\n "num": "str",\n "denom": "str",\n "est": "str",\n "lci": "str",\n "uci": "str",\n "county_indicator": "str",\n "multiplier_indicator": "str",\n "data_yr_type": "str",\n "geo_level": "str",\n "date_export": "str"\n}', + "OUTPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name",\n "metric_name", "group_name", "metric_number", "group_number", "num",\n "denom", "est", "lci", "uci", "county_indicator",\n "multiplier_indicator", "data_yr_type", "geo_level", "date_export", "source_url",\n "etl_timestamp"\n]', + "RENAME_HEADERS_LIST": '{\n "state_abbr": "state_abbr",\n "state_fips": "state_fips",\n "place_fips": "place_fips",\n "stpl_fips": "stpl_fips",\n "city_name": "city_name",\n "metric_name": "metric_name",\n "group_name": "group_name",\n "metric_number": "metric_number",\n "group_number": "group_number",\n "num": "num",\n "denom": "denom",\n "est": "est",\n "lci": "lci",\n "uci": "uci",\n "county_indicator": "county_indicator",\n "multiplier_indicator": "multiplier_indicator",\n "data_yr_type": "data_yr_type",\n "geo_level": "geo_level",\n "date_export": "date_export"\n}', + "TABLE_DESCRIPTION": "City Health Dashboard Data Tract", + "PIPELINE_NAME": "chdb_data_city_all", + "FILE_NAME_PREFIX": "CHDB_data_city_all_", + }, + resources={"limit_memory": "8G", "limit_cpu": "1"}, + ) + + # Run CSV transform within kubernetes pod + chdb_data_tract = kubernetes_pod.KubernetesPodOperator( + task_id="chdb_data_tract", + startup_timeout_seconds=600, + name="city_health_dashboard_chdb_data_tract_all", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cityhealthdashboard.com/drupal/media/23/download", + "SOURCE_FILE": "files/chdb_data_tract_data.zip", + "TARGET_FILE": "files/chdb_data_tract_data_output.csv", + "CHUNKSIZE": "500000", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "city_health_dashboard", + "TABLE_ID": "chdb_data_tract_all", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/city_health_dashboard/chdb_data_tract_all/data_output.csv", + "SCHEMA_PATH": "data/city_health_dashboard/schema/chdb_data_tract_schema.json", + "DROP_DEST_TABLE": "N", + "TRUNCATE_TABLE": "Y", + "INPUT_FIELD_DELIMITER": ",", + "REMOVE_SOURCE_FILE": "Y", + "DELETE_TARGET_FILE": "Y", + "INPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "county_fips", "county_name", "tract_code",\n "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number",\n "group_name", "group_number", "num", "denom", "est",\n "lci", "uci", "data_yr_type", "geo_level", "date_export"\n]', + "DATA_DTYPES": '{\n "state_abbr": "str",\n "state_fips": "str",\n "county_fips": "str",\n "county_name": "str",\n "tract_code": "str",\n "stcotr_fips": "str",\n "stpl_fips": "str",\n "city_name": "str",\n "metric_name": "str",\n "metric_number": "str",\n "group_name": "str",\n "group_number": "str",\n "num": "str",\n "denom": "str",\n "est": "str",\n "lci": "str",\n "uci": "str",\n "data_yr_type": "str",\n "geo_level": "str",\n "date_export": "str"\n}', + "OUTPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "county_fips", "county_name", "tract_code",\n "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number",\n "group_name", "group_number", "num", "denom", "est",\n "lci", "uci", "data_yr_type", "geo_level", "date_export", "source_url",\n "etl_timestamp"\n]', + "RENAME_HEADERS_LIST": '{\n "state_abbr": "state_abbr",\n "state_fips": "state_fips",\n "county_fips": "county_fips",\n "county_name": "county_name",\n "tract_code": "tract_code",\n "stcotr_fips": "stcotr_fips",\n "stpl_fips": "stpl_fips",\n "city_name": "city_name",\n "metric_name": "metric_name",\n "metric_number": "metric_number",\n "group_name": "group_name",\n "group_number": "group_number",\n "num": "num",\n "denom": "denom",\n "est": "est",\n "lci": "lci",\n "uci": "uci",\n "data_yr_type": "data_yr_type",\n "geo_level": "geo_level",\n "date_export": "date_export"\n}', + "TABLE_DESCRIPTION": "City Health Dashboard Data Tract", + "PIPELINE_NAME": "chdb_data_tract_all", + "FILE_NAME_PREFIX": "CHDB_data_tract_all_", + }, + resources={"limit_memory": "8G", "limit_cpu": "1"}, + ) + + [chdb_data_tract, chdb_data_city] diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_city_schema.json b/datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_city_schema.json new file mode 100644 index 000000000..2eab66d33 --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_city_schema.json @@ -0,0 +1,107 @@ +[ + { + "name": "state_abbr", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "state_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "place_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "stpl_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "city_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "metric_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "group_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "group_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "metric_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "num", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "denom", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "est", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "lci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "uci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "county_indicator", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "multiplier_indicator", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "data_yr_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "geo_level", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "date_export", + "type": "DATE", + "mode": "NULLABLE" + }, + { + "name": "source_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "etl_timestamp", + "type": "TIMESTAMP", + "mode": "NULLABLE" + } +] diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_tract_schema.json b/datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_tract_schema.json new file mode 100644 index 000000000..5eacd9354 --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard/data/chdb_data_tract_schema.json @@ -0,0 +1,112 @@ +[ + { + "name": "state_abbr", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "state_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "county_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "county_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "tract_code", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "stcotr_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "stpl_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "city_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "metric_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "metric_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "group_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "group_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "num", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "denom", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "est", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "lci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "uci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "data_yr_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "geo_level", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "date_export", + "type": "DATE", + "mode": "NULLABLE" + }, + { + "name": "source_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "etl_timestamp", + "type": "TIMESTAMP", + "mode": "NULLABLE" + } +] diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard/pipeline.yaml b/datasets/city_health_dashboard/pipelines/city_health_dashboard/pipeline.yaml new file mode 100644 index 000000000..99a3aa11f --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard/pipeline.yaml @@ -0,0 +1,222 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + - type: bigquery_table + table_id: chdb_data_city_all + description: "City Health Dashboard Data Tract" + - type: bigquery_table + table_id: chdb_data_tract_all + description: "City Health Dashboard Data Tract" +dag: + airflow_version: 2 + initialize: + dag_id: city_health_dashboard + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "chdb_data_city" + startup_timeout_seconds: 600 + name: "city_health_dashboard_chdb_data_city_all" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://www.cityhealthdashboard.com/drupal/media/23/download" + SOURCE_FILE: "files/chdb_data_city_data.zip" + TARGET_FILE: "files/chdb_data_city_data_output.csv" + CHUNKSIZE: "500000" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "city_health_dashboard" + TABLE_ID: "chdb_data_city_all" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/city_health_dashboard/chdb_data_city_all/data_output.csv" + SCHEMA_PATH: "data/city_health_dashboard/schema/chdb_data_city_schema.json" + DROP_DEST_TABLE: "N" + TRUNCATE_TABLE: "Y" + INPUT_FIELD_DELIMITER: "," + REMOVE_SOURCE_FILE: "Y" + DELETE_TARGET_FILE: "Y" + INPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name", + "metric_name", "group_name", "metric_number", "group_number", "num", + "denom", "est", "lci", "uci", "county_indicator", + "multiplier_indicator", "data_yr_type", "geo_level", "date_export" + ] + DATA_DTYPES: >- + { + "state_abbr": "str", + "state_fips": "str", + "place_fips": "str", + "stpl_fips": "str", + "city_name": "str", + "metric_name": "str", + "group_name": "str", + "metric_number": "str", + "group_number": "str", + "num": "str", + "denom": "str", + "est": "str", + "lci": "str", + "uci": "str", + "county_indicator": "str", + "multiplier_indicator": "str", + "data_yr_type": "str", + "geo_level": "str", + "date_export": "str" + } + OUTPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name", + "metric_name", "group_name", "metric_number", "group_number", "num", + "denom", "est", "lci", "uci", "county_indicator", + "multiplier_indicator", "data_yr_type", "geo_level", "date_export", "source_url", + "etl_timestamp" + ] + RENAME_HEADERS_LIST: >- + { + "state_abbr": "state_abbr", + "state_fips": "state_fips", + "place_fips": "place_fips", + "stpl_fips": "stpl_fips", + "city_name": "city_name", + "metric_name": "metric_name", + "group_name": "group_name", + "metric_number": "metric_number", + "group_number": "group_number", + "num": "num", + "denom": "denom", + "est": "est", + "lci": "lci", + "uci": "uci", + "county_indicator": "county_indicator", + "multiplier_indicator": "multiplier_indicator", + "data_yr_type": "data_yr_type", + "geo_level": "geo_level", + "date_export": "date_export" + } + TABLE_DESCRIPTION: "City Health Dashboard Data Tract" + PIPELINE_NAME: "chdb_data_city_all" + FILE_NAME_PREFIX: "CHDB_data_city_all_" + resources: + limit_memory: "8G" + limit_cpu: "1" + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "chdb_data_tract" + startup_timeout_seconds: 600 + name: "city_health_dashboard_chdb_data_tract_all" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://www.cityhealthdashboard.com/drupal/media/23/download" + SOURCE_FILE: "files/chdb_data_tract_data.zip" + TARGET_FILE: "files/chdb_data_tract_data_output.csv" + CHUNKSIZE: "500000" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "city_health_dashboard" + TABLE_ID: "chdb_data_tract_all" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/city_health_dashboard/chdb_data_tract_all/data_output.csv" + SCHEMA_PATH: "data/city_health_dashboard/schema/chdb_data_tract_schema.json" + DROP_DEST_TABLE: "N" + TRUNCATE_TABLE: "Y" + INPUT_FIELD_DELIMITER: "," + REMOVE_SOURCE_FILE: "Y" + DELETE_TARGET_FILE: "Y" + INPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "county_fips", "county_name", "tract_code", + "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number", + "group_name", "group_number", "num", "denom", "est", + "lci", "uci", "data_yr_type", "geo_level", "date_export" + ] + DATA_DTYPES: >- + { + "state_abbr": "str", + "state_fips": "str", + "county_fips": "str", + "county_name": "str", + "tract_code": "str", + "stcotr_fips": "str", + "stpl_fips": "str", + "city_name": "str", + "metric_name": "str", + "metric_number": "str", + "group_name": "str", + "group_number": "str", + "num": "str", + "denom": "str", + "est": "str", + "lci": "str", + "uci": "str", + "data_yr_type": "str", + "geo_level": "str", + "date_export": "str" + } + OUTPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "county_fips", "county_name", "tract_code", + "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number", + "group_name", "group_number", "num", "denom", "est", + "lci", "uci", "data_yr_type", "geo_level", "date_export", "source_url", + "etl_timestamp" + ] + RENAME_HEADERS_LIST: >- + { + "state_abbr": "state_abbr", + "state_fips": "state_fips", + "county_fips": "county_fips", + "county_name": "county_name", + "tract_code": "tract_code", + "stcotr_fips": "stcotr_fips", + "stpl_fips": "stpl_fips", + "city_name": "city_name", + "metric_name": "metric_name", + "metric_number": "metric_number", + "group_name": "group_name", + "group_number": "group_number", + "num": "num", + "denom": "denom", + "est": "est", + "lci": "lci", + "uci": "uci", + "data_yr_type": "data_yr_type", + "geo_level": "geo_level", + "date_export": "date_export" + } + TABLE_DESCRIPTION: "City Health Dashboard Data Tract" + PIPELINE_NAME: "chdb_data_tract_all" + FILE_NAME_PREFIX: "CHDB_data_tract_all_" + resources: + limit_memory: "8G" + limit_cpu: "1" + + graph_paths: + - "[ chdb_data_tract, chdb_data_city ]" diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/city_health_dashboard_full_load_dag.py b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/city_health_dashboard_full_load_dag.py new file mode 100644 index 000000000..9bf009116 --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/city_health_dashboard_full_load_dag.py @@ -0,0 +1,107 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="city_health_dashboard.city_health_dashboard_full_load", + default_args=default_args, + max_active_runs=1, + schedule_interval="@once", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + chdb_data_city = kubernetes_pod.KubernetesPodOperator( + task_id="chdb_data_city", + startup_timeout_seconds=600, + name="city_health_dashboard_chdb_data_city_all", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cityhealthdashboard.com/drupal/media/23/download", + "SOURCE_FILE": "files/chdb_data_city_data.zip", + "TARGET_FILE": "files/chdb_data_city_data_output.csv", + "CHUNKSIZE": "500000", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "city_health_dashboard", + "TABLE_ID": "chdb_data_city_all", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/city_health_dashboard/chdb_data_city_all/data_output.csv", + "SCHEMA_PATH": "data/city_health_dashboard/schema/chdb_data_city_schema.json", + "DROP_DEST_TABLE": "Y", + "TRUNCATE_TABLE": "Y", + "INPUT_FIELD_DELIMITER": ",", + "REMOVE_SOURCE_FILE": "Y", + "DELETE_TARGET_FILE": "Y", + "INPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name",\n "metric_name", "group_name", "metric_number", "group_number", "num",\n "denom", "est", "lci", "uci", "county_indicator",\n "multiplier_indicator", "data_yr_type", "geo_level", "date_export"\n]', + "DATA_DTYPES": '{\n "state_abbr": "str",\n "state_fips": "str",\n "place_fips": "str",\n "stpl_fips": "str",\n "city_name": "str",\n "metric_name": "str",\n "group_name": "str",\n "metric_number": "str",\n "group_number": "str",\n "num": "str",\n "denom": "str",\n "est": "str",\n "lci": "str",\n "uci": "str",\n "county_indicator": "str",\n "multiplier_indicator": "str",\n "data_yr_type": "str",\n "geo_level": "str",\n "date_export": "str"\n}', + "OUTPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name",\n "metric_name", "group_name", "metric_number", "group_number", "num",\n "denom", "est", "lci", "uci", "county_indicator",\n "multiplier_indicator", "data_yr_type", "geo_level", "date_export", "source_url",\n "etl_timestamp"\n]', + "RENAME_HEADERS_LIST": '{\n "state_abbr": "state_abbr",\n "state_fips": "state_fips",\n "place_fips": "place_fips",\n "stpl_fips": "stpl_fips",\n "city_name": "city_name",\n "metric_name": "metric_name",\n "group_name": "group_name",\n "metric_number": "metric_number",\n "group_number": "group_number",\n "num": "num",\n "denom": "denom",\n "est": "est",\n "lci": "lci",\n "uci": "uci",\n "county_indicator": "county_indicator",\n "multiplier_indicator": "multiplier_indicator",\n "data_yr_type": "data_yr_type",\n "geo_level": "geo_level",\n "date_export": "date_export"\n}', + "TABLE_DESCRIPTION": "City Health Dashboard Data Tract", + "PIPELINE_NAME": "chdb_data_city_all", + "FILE_NAME_PREFIX": "CHDB_data_city_all_", + }, + resources={"limit_memory": "8G", "limit_cpu": "1"}, + ) + + # Run CSV transform within kubernetes pod + chdb_data_tract = kubernetes_pod.KubernetesPodOperator( + task_id="chdb_data_tract", + startup_timeout_seconds=600, + name="city_health_dashboard_chdb_data_tract_all", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cityhealthdashboard.com/drupal/media/23/download", + "SOURCE_FILE": "files/chdb_data_tract_data.zip", + "TARGET_FILE": "files/chdb_data_tract_data_output.csv", + "CHUNKSIZE": "500000", + "PROJECT_ID": "{{ var.value.gcp_project }}", + "DATASET_ID": "city_health_dashboard", + "TABLE_ID": "chdb_data_tract_all", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/city_health_dashboard/chdb_data_tract_all/data_output.csv", + "SCHEMA_PATH": "data/city_health_dashboard/schema/chdb_data_tract_schema.json", + "DROP_DEST_TABLE": "Y", + "TRUNCATE_TABLE": "Y", + "INPUT_FIELD_DELIMITER": ",", + "REMOVE_SOURCE_FILE": "Y", + "DELETE_TARGET_FILE": "Y", + "INPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "county_fips", "county_name", "tract_code",\n "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number",\n "group_name", "group_number", "num", "denom", "est",\n "lci", "uci", "data_yr_type", "geo_level", "date_export"\n]', + "DATA_DTYPES": '{\n "state_abbr": "str",\n "state_fips": "str",\n "county_fips": "str",\n "county_name": "str",\n "tract_code": "str",\n "stcotr_fips": "str",\n "stpl_fips": "str",\n "city_name": "str",\n "metric_name": "str",\n "metric_number": "str",\n "group_name": "str",\n "group_number": "str",\n "num": "str",\n "denom": "str",\n "est": "str",\n "lci": "str",\n "uci": "str",\n "data_yr_type": "str",\n "geo_level": "str",\n "date_export": "str"\n}', + "OUTPUT_CSV_HEADERS": '[\n "state_abbr", "state_fips", "county_fips", "county_name", "tract_code",\n "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number",\n "group_name", "group_number", "num", "denom", "est",\n "lci", "uci", "data_yr_type", "geo_level", "date_export", "source_url",\n "etl_timestamp"\n]', + "RENAME_HEADERS_LIST": '{\n "state_abbr": "state_abbr",\n "state_fips": "state_fips",\n "county_fips": "county_fips",\n "county_name": "county_name",\n "tract_code": "tract_code",\n "stcotr_fips": "stcotr_fips",\n "stpl_fips": "stpl_fips",\n "city_name": "city_name",\n "metric_name": "metric_name",\n "metric_number": "metric_number",\n "group_name": "group_name",\n "group_number": "group_number",\n "num": "num",\n "denom": "denom",\n "est": "est",\n "lci": "lci",\n "uci": "uci",\n "data_yr_type": "data_yr_type",\n "geo_level": "geo_level",\n "date_export": "date_export"\n}', + "TABLE_DESCRIPTION": "City Health Dashboard Data Tract", + "PIPELINE_NAME": "chdb_data_tract_all", + "FILE_NAME_PREFIX": "CHDB_data_tract_all_", + }, + resources={"limit_memory": "8G", "limit_cpu": "1"}, + ) + + [chdb_data_tract, chdb_data_city] diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_city_schema.json b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_city_schema.json new file mode 100644 index 000000000..2eab66d33 --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_city_schema.json @@ -0,0 +1,107 @@ +[ + { + "name": "state_abbr", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "state_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "place_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "stpl_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "city_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "metric_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "group_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "group_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "metric_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "num", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "denom", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "est", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "lci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "uci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "county_indicator", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "multiplier_indicator", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "data_yr_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "geo_level", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "date_export", + "type": "DATE", + "mode": "NULLABLE" + }, + { + "name": "source_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "etl_timestamp", + "type": "TIMESTAMP", + "mode": "NULLABLE" + } +] diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_tract_schema.json b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_tract_schema.json new file mode 100644 index 000000000..5eacd9354 --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/data/chdb_data_tract_schema.json @@ -0,0 +1,112 @@ +[ + { + "name": "state_abbr", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "state_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "county_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "county_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "tract_code", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "stcotr_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "stpl_fips", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "city_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "metric_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "metric_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "group_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "group_number", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "num", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "denom", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "est", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "lci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "uci", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "data_yr_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "geo_level", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "date_export", + "type": "DATE", + "mode": "NULLABLE" + }, + { + "name": "source_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "etl_timestamp", + "type": "TIMESTAMP", + "mode": "NULLABLE" + } +] diff --git a/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/pipeline.yaml b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/pipeline.yaml new file mode 100644 index 000000000..d9926d93a --- /dev/null +++ b/datasets/city_health_dashboard/pipelines/city_health_dashboard_full_load/pipeline.yaml @@ -0,0 +1,222 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + - type: bigquery_table + table_id: chdb_data_city_all + description: "City Health Dashboard Data Tract" + - type: bigquery_table + table_id: chdb_data_tract_all + description: "City Health Dashboard Data Tract" +dag: + airflow_version: 2 + initialize: + dag_id: city_health_dashboard_full_load + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@once" + catchup: False + default_view: graph + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "chdb_data_city" + startup_timeout_seconds: 600 + name: "city_health_dashboard_chdb_data_city_all" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://www.cityhealthdashboard.com/drupal/media/23/download" + SOURCE_FILE: "files/chdb_data_city_data.zip" + TARGET_FILE: "files/chdb_data_city_data_output.csv" + CHUNKSIZE: "500000" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "city_health_dashboard" + TABLE_ID: "chdb_data_city_all" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/city_health_dashboard/chdb_data_city_all/data_output.csv" + SCHEMA_PATH: "data/city_health_dashboard/schema/chdb_data_city_schema.json" + DROP_DEST_TABLE: "Y" + TRUNCATE_TABLE: "Y" + INPUT_FIELD_DELIMITER: "," + REMOVE_SOURCE_FILE: "Y" + DELETE_TARGET_FILE: "Y" + INPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name", + "metric_name", "group_name", "metric_number", "group_number", "num", + "denom", "est", "lci", "uci", "county_indicator", + "multiplier_indicator", "data_yr_type", "geo_level", "date_export" + ] + DATA_DTYPES: >- + { + "state_abbr": "str", + "state_fips": "str", + "place_fips": "str", + "stpl_fips": "str", + "city_name": "str", + "metric_name": "str", + "group_name": "str", + "metric_number": "str", + "group_number": "str", + "num": "str", + "denom": "str", + "est": "str", + "lci": "str", + "uci": "str", + "county_indicator": "str", + "multiplier_indicator": "str", + "data_yr_type": "str", + "geo_level": "str", + "date_export": "str" + } + OUTPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "place_fips", "stpl_fips", "city_name", + "metric_name", "group_name", "metric_number", "group_number", "num", + "denom", "est", "lci", "uci", "county_indicator", + "multiplier_indicator", "data_yr_type", "geo_level", "date_export", "source_url", + "etl_timestamp" + ] + RENAME_HEADERS_LIST: >- + { + "state_abbr": "state_abbr", + "state_fips": "state_fips", + "place_fips": "place_fips", + "stpl_fips": "stpl_fips", + "city_name": "city_name", + "metric_name": "metric_name", + "group_name": "group_name", + "metric_number": "metric_number", + "group_number": "group_number", + "num": "num", + "denom": "denom", + "est": "est", + "lci": "lci", + "uci": "uci", + "county_indicator": "county_indicator", + "multiplier_indicator": "multiplier_indicator", + "data_yr_type": "data_yr_type", + "geo_level": "geo_level", + "date_export": "date_export" + } + TABLE_DESCRIPTION: "City Health Dashboard Data Tract" + PIPELINE_NAME: "chdb_data_city_all" + FILE_NAME_PREFIX: "CHDB_data_city_all_" + resources: + limit_memory: "8G" + limit_cpu: "1" + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "chdb_data_tract" + startup_timeout_seconds: 600 + name: "city_health_dashboard_chdb_data_tract_all" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "https://www.cityhealthdashboard.com/drupal/media/23/download" + SOURCE_FILE: "files/chdb_data_tract_data.zip" + TARGET_FILE: "files/chdb_data_tract_data_output.csv" + CHUNKSIZE: "500000" + PROJECT_ID: "{{ var.value.gcp_project }}" + DATASET_ID: "city_health_dashboard" + TABLE_ID: "chdb_data_tract_all" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/city_health_dashboard/chdb_data_tract_all/data_output.csv" + SCHEMA_PATH: "data/city_health_dashboard/schema/chdb_data_tract_schema.json" + DROP_DEST_TABLE: "Y" + TRUNCATE_TABLE: "Y" + INPUT_FIELD_DELIMITER: "," + REMOVE_SOURCE_FILE: "Y" + DELETE_TARGET_FILE: "Y" + INPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "county_fips", "county_name", "tract_code", + "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number", + "group_name", "group_number", "num", "denom", "est", + "lci", "uci", "data_yr_type", "geo_level", "date_export" + ] + DATA_DTYPES: >- + { + "state_abbr": "str", + "state_fips": "str", + "county_fips": "str", + "county_name": "str", + "tract_code": "str", + "stcotr_fips": "str", + "stpl_fips": "str", + "city_name": "str", + "metric_name": "str", + "metric_number": "str", + "group_name": "str", + "group_number": "str", + "num": "str", + "denom": "str", + "est": "str", + "lci": "str", + "uci": "str", + "data_yr_type": "str", + "geo_level": "str", + "date_export": "str" + } + OUTPUT_CSV_HEADERS: >- + [ + "state_abbr", "state_fips", "county_fips", "county_name", "tract_code", + "stcotr_fips", "stpl_fips", "city_name", "metric_name", "metric_number", + "group_name", "group_number", "num", "denom", "est", + "lci", "uci", "data_yr_type", "geo_level", "date_export", "source_url", + "etl_timestamp" + ] + RENAME_HEADERS_LIST: >- + { + "state_abbr": "state_abbr", + "state_fips": "state_fips", + "county_fips": "county_fips", + "county_name": "county_name", + "tract_code": "tract_code", + "stcotr_fips": "stcotr_fips", + "stpl_fips": "stpl_fips", + "city_name": "city_name", + "metric_name": "metric_name", + "metric_number": "metric_number", + "group_name": "group_name", + "group_number": "group_number", + "num": "num", + "denom": "denom", + "est": "est", + "lci": "lci", + "uci": "uci", + "data_yr_type": "data_yr_type", + "geo_level": "geo_level", + "date_export": "date_export" + } + TABLE_DESCRIPTION: "City Health Dashboard Data Tract" + PIPELINE_NAME: "chdb_data_tract_all" + FILE_NAME_PREFIX: "CHDB_data_tract_all_" + resources: + limit_memory: "8G" + limit_cpu: "1" + + graph_paths: + - "[ chdb_data_tract, chdb_data_city ]"