Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Update and fix city_health_dashboard dataset #285

Merged
merged 4 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@


from airflow import DAG
from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
"owner": "Google",
Expand All @@ -33,28 +34,12 @@
) as dag:

# Run CSV transform within kubernetes pod
data_city_transform_csv = kubernetes_pod_operator.KubernetesPodOperator(
data_city_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="data_city_transform_csv",
startup_timeout_seconds=600,
name="city_health_dashboard_chdb_data_city_all",
namespace="default",
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["pool-e2-standard-4"],
}
]
}
]
}
}
},
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -66,13 +51,17 @@
"CSV_HEADERS": '["state_abbr","state_fips","place_fips","stpl_fips","city_name","metric_name","group_name","metric_number","group_number","num","denom","est","lci","uci","county_indicator","multiplier_indicator","data_yr_type","geo_level","date_export"]',
"RENAME_MAPPINGS": '{"state_abbr": "state_abbr","state_fips": "state_fips","place_fips": "place_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","group_name": "group_name","metric_number": "metric_number","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","county_indicator": "county_indicator","multiplier_indicator": "multiplier_indicator","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"}',
"PIPELINE_NAME": "chdb_data_city_all",
"FILE_NAME": "CHDB_data_city_all v13_0.csv",
"FILE_NAME": "CHDB_data_city_all_v13.1.csv",
},
resources={
"limit_memory": "2G",
"limit_cpu": "1",
"request_ephemeral_storage": "8G",
},
resources={"limit_memory": "2G", "limit_cpu": "1"},
)

# Task to load CSV data to a BigQuery table
load_data_city_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
load_data_city_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_data_city_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=[
Expand Down
18 changes: 5 additions & 13 deletions datasets/city_health_dashboard/chdb_data_city_all/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ resources:
description: "City Health Dashboard Data Tract"

dag:
airflow_version: 1
airflow_version: 2
initialize:
dag_id: chdb_data_city_all
default_args:
Expand All @@ -39,17 +39,8 @@ dag:
task_id: "data_city_transform_csv"
startup_timeout_seconds: 600
name: "city_health_dashboard_chdb_data_city_all"
namespace: "default"
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-nodepool
operator: In
values:
- "pool-e2-standard-4"

namespace: "composer"
service_account_name: "datasets"
image_pull_policy: "Always"
image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}"

Expand All @@ -64,10 +55,11 @@ dag:
RENAME_MAPPINGS: >-
{"state_abbr": "state_abbr","state_fips": "state_fips","place_fips": "place_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","group_name": "group_name","metric_number": "metric_number","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","county_indicator": "county_indicator","multiplier_indicator": "multiplier_indicator","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"}
PIPELINE_NAME: "chdb_data_city_all"
FILE_NAME: "CHDB_data_city_all v13_0.csv"
FILE_NAME: "CHDB_data_city_all_v13.1.csv"
resources:
limit_memory: "2G"
limit_cpu: "1"
request_ephemeral_storage: "8G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@


from airflow import DAG
from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
"owner": "Google",
Expand All @@ -33,28 +34,12 @@
) as dag:

# Run CSV transform within kubernetes pod
data_tract_transform_csv = kubernetes_pod_operator.KubernetesPodOperator(
data_tract_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="data_tract_transform_csv",
startup_timeout_seconds=600,
name="city_health_dashboard_chdb_data_tract_all",
namespace="default",
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["pool-e2-standard-4"],
}
]
}
]
}
}
},
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -66,13 +51,17 @@
"CSV_HEADERS": '["state_abbr","state_fips","county_fips","county_name","tract_code","stcotr_fips","stpl_fips","city_name","metric_name","metric_number","group_name","group_number","num","denom","est","lci","uci","data_yr_type","geo_level","date_export"]',
"RENAME_MAPPINGS": '{"state_abbr": "state_abbr","state_fips": "state_fips","county_fips": "county_fips","county_name": "county_name","tract_code": "tract_code","stcotr_fips": "stcotr_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","metric_number": "metric_number","group_name": "group_name","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"}',
"PIPELINE_NAME": "chdb_data_tract_all",
"FILE_NAME": "CHDB_data_tract_all v13_0.csv",
"FILE_NAME": "CHDB_data_tract_all_v13.1.csv",
},
resources={
"limit_memory": "2G",
"limit_cpu": "1",
"request_ephemeral_storage": "8G",
},
resources={"limit_memory": "2G", "limit_cpu": "1"},
)

# Task to load CSV data to a BigQuery table
load_data_tract_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
load_data_tract_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_data_tract_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=[
Expand Down
18 changes: 5 additions & 13 deletions datasets/city_health_dashboard/chdb_data_tract_all/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ resources:
description: "City Health Dashboard Data Tract"

dag:
airflow_version: 1
airflow_version: 2
initialize:
dag_id: chdb_data_tract_all
default_args:
Expand All @@ -39,17 +39,8 @@ dag:
task_id: "data_tract_transform_csv"
startup_timeout_seconds: 600
name: "city_health_dashboard_chdb_data_tract_all"
namespace: "default"
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-nodepool
operator: In
values:
- "pool-e2-standard-4"

namespace: "composer"
service_account_name: "datasets"
image_pull_policy: "Always"
image: "{{ var.json.city_health_dashboard.container_registry.run_csv_transform_kub }}"

Expand All @@ -64,10 +55,11 @@ dag:
RENAME_MAPPINGS: >-
{"state_abbr": "state_abbr","state_fips": "state_fips","county_fips": "county_fips","county_name": "county_name","tract_code": "tract_code","stcotr_fips": "stcotr_fips","stpl_fips": "stpl_fips","city_name": "city_name","metric_name": "metric_name","metric_number": "metric_number","group_name": "group_name","group_number": "group_number","num": "num","denom": "denom","est": "est","lci": "lci","uci": "uci","data_yr_type": "data_yr_type","geo_level": "geo_level","date_export": "date_export"}
PIPELINE_NAME: "chdb_data_tract_all"
FILE_NAME: "CHDB_data_tract_all v13_0.csv"
FILE_NAME: "CHDB_data_tract_all_v13.1.csv"
resources:
limit_memory: "2G"
limit_cpu: "1"
request_ephemeral_storage: "8G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down