Skip to content

Commit

Permalink
feat: Onboard America Health Rankings dataset (#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
varshika06 committed Dec 21, 2021
1 parent b95d62c commit 8ecbfda
Show file tree
Hide file tree
Showing 10 changed files with 524 additions and 0 deletions.
@@ -0,0 +1,34 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
FROM python:3.8

# Allow statements and log messages to appear in Cloud logs
ENV PYTHONUNBUFFERED True

# Copy the requirements file into the image
COPY requirements.txt ./

# Install the packages specified in the requirements file
RUN python3 -m pip install --no-cache-dir -r requirements.txt

# The WORKDIR instruction sets the working directory for any RUN, CMD,
# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile.
# If the WORKDIR doesn’t exist, it will be created even if it’s not used in
# any subsequent Dockerfile instruction
WORKDIR /custom

# Copy the specific data processing script/s in the image under /custom/*
COPY ./csv_transform.py .

# Command to run the data processing script when the container is run
CMD ["python3", "csv_transform.py"]
@@ -0,0 +1,115 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json
import logging
import os
import pathlib
import typing

import pandas as pd
import requests
from google.cloud import storage


def main(
source_url: str,
source_file: pathlib.Path,
target_file: pathlib.Path,
target_gcs_bucket: str,
target_gcs_path: str,
headers: typing.List[str],
rename_mappings: dict,
) -> None:

logging.info(
"America Health Rankings process started at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)

logging.info("Creating 'files' folder")
pathlib.Path("./files").mkdir(parents=True, exist_ok=True)

logging.info(f"Downloading file {source_url}")
download_file(source_url, source_file)

logging.info(f"Opening file {source_file}")
df = pd.read_excel(source_file)

logging.info(f"Transformation Process Starting.. {source_file}")
rename_headers(df, rename_mappings)
df = df[headers]

logging.info(f"Transformation Process complete .. {source_file}")
logging.info(f"Saving to output file.. {target_file}")

try:
save_to_new_file(df, file_path=str(target_file))
except Exception as e:
logging.error(f"Error saving output file: {e}.")

logging.info(
f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}"
)
upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path)

logging.info(
"America Health Rankings process completed at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)


def convert_dt_format(date_str: str, time_str: str) -> None:
return str(datetime.datetime.strptime(date_str, "%m/%d/%Y").date()) + " " + time_str


def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None:
df.rename(columns=rename_mappings, inplace=True)


def save_to_new_file(df: pd.DataFrame, file_path: str) -> None:
df.to_csv(file_path, float_format="%.0f", index=False)


def download_file(source_url: str, source_file: pathlib.Path) -> None:
logging.info(f"Downloading {source_url} into {source_file}")
r = requests.get(source_url, stream=True)
if r.status_code == 200:
with open(source_file, "wb") as f:
for chunk in r:
f.write(chunk)
else:
logging.error(f"Couldn't download {source_url}: {r.text}")


def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None:
storage_client = storage.Client()
bucket = storage_client.bucket(gcs_bucket)
blob = bucket.blob(gcs_path)
blob.upload_from_filename(file_path)


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)

main(
source_url=os.environ["SOURCE_URL"],
source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(),
target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(),
target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"],
target_gcs_path=os.environ["TARGET_GCS_PATH"],
headers=json.loads(os.environ["CSV_HEADERS"]),
rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]),
)
@@ -0,0 +1,4 @@
requests
google-cloud-storage
pandas
openpyxl
39 changes: 39 additions & 0 deletions datasets/america_health_rankings/_terraform/ahr_pipeline.tf
@@ -0,0 +1,39 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


resource "google_bigquery_table" "america_health_rankings_america_health_rankings" {
project = var.project_id
dataset_id = "america_health_rankings"
table_id = "america_health_rankings"

description = "America Health Rankings"




depends_on = [
google_bigquery_dataset.america_health_rankings
]
}

output "bigquery_table-america_health_rankings_america_health_rankings-table_id" {
value = google_bigquery_table.america_health_rankings_america_health_rankings.table_id
}

output "bigquery_table-america_health_rankings_america_health_rankings-id" {
value = google_bigquery_table.america_health_rankings_america_health_rankings.id
}
@@ -0,0 +1,37 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


resource "google_bigquery_dataset" "america_health_rankings" {
dataset_id = "america_health_rankings"
project = var.project_id
description = "America Health Rankings"
}

output "bigquery_dataset-america_health_rankings-dataset_id" {
value = google_bigquery_dataset.america_health_rankings.dataset_id
}

resource "google_storage_bucket" "america-health-rankings" {
name = "${var.bucket_name_prefix}-america-health-rankings"
force_destroy = true
location = "US"
uniform_bucket_level_access = true
}

output "storage_bucket-america-health-rankings-name" {
value = google_storage_bucket.america-health-rankings.name
}
28 changes: 28 additions & 0 deletions datasets/america_health_rankings/_terraform/provider.tf
@@ -0,0 +1,28 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


provider "google" {
project = var.project_id
impersonate_service_account = var.impersonating_acct
region = var.region
}

data "google_client_openid_userinfo" "me" {}

output "impersonating-account" {
value = data.google_client_openid_userinfo.me.email
}
23 changes: 23 additions & 0 deletions datasets/america_health_rankings/_terraform/variables.tf
@@ -0,0 +1,23 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


variable "project_id" {}
variable "bucket_name_prefix" {}
variable "impersonating_acct" {}
variable "region" {}
variable "env" {}

95 changes: 95 additions & 0 deletions datasets/america_health_rankings/ahr/ahr_dag.py
@@ -0,0 +1,95 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from airflow import DAG
from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator

default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}


with DAG(
dag_id="america_health_rankings.ahr",
default_args=default_args,
max_active_runs=1,
schedule_interval="@daily",
catchup=False,
default_view="graph",
) as dag:

# Run CSV transform within kubernetes pod
ahr_transform_csv = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ahr_transform_csv",
startup_timeout_seconds=600,
name="america_health_rankings_ahr",
namespace="default",
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["pool-e2-standard-4"],
}
]
}
]
}
}
},
image_pull_policy="Always",
image="{{ var.json.america_health_rankings.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://assets.americashealthrankings.org/app/uploads/ahr_heath-disparities_data.xlsx",
"SOURCE_FILE": "files/data.xlsx",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/america_health_rankings/ahr/data_output.csv",
"CSV_HEADERS": '["edition","report_type","measure_name","state_name","subpopulation","value","lower_ci","upper_ci","source","source_date"]',
"RENAME_MAPPINGS": '{"Edition": "edition","Report Type": "report_type","Measure Name": "measure_name","State Name": "state_name","Subpopulation": "subpopulation","Value": "value","Lower CI": "lower_ci","Upper CI": "upper_ci","Source": "source","Source Date": "source_date"}',
},
resources={"limit_memory": "2G", "limit_cpu": "1"},
)

# Task to load CSV data to a BigQuery table
load_ahr_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_ahr_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/america_health_rankings/ahr/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="america_health_rankings.ahr",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{"name": "edition", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "report_type", "type": "STRING", "mode": "NULLABLE"},
{"name": "measure_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "state_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "subpopulation", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "lower_ci", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "upper_ci", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "source", "type": "STRING", "mode": "NULLABLE"},
{"name": "source_date", "type": "STRING", "mode": "NULLABLE"},
],
)

ahr_transform_csv >> load_ahr_to_bq

0 comments on commit 8ecbfda

Please sign in to comment.