Skip to content

Commit

Permalink
feat: Onboard ML dataset (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
gkodukula committed Mar 9, 2022
1 parent db04c54 commit 48e51af
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 12 deletions.
34 changes: 34 additions & 0 deletions datasets/ml_datasets/infra/iris_pipeline.tf
@@ -0,0 +1,34 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


resource "google_bigquery_table" "ml_datasets_iris" {
project = var.project_id
dataset_id = "ml_datasets"
table_id = "iris"
description = "iris table"
depends_on = [
google_bigquery_dataset.ml_datasets
]
}

output "bigquery_table-ml_datasets_iris-table_id" {
value = google_bigquery_table.ml_datasets_iris.table_id
}

output "bigquery_table-ml_datasets_iris-id" {
value = google_bigquery_table.ml_datasets_iris.id
}
16 changes: 16 additions & 0 deletions datasets/ml_datasets/infra/ml_datasets_dataset.tf
Expand Up @@ -33,3 +33,19 @@ resource "google_bigquery_dataset" "ml_datasets_uscentral1" {
output "bigquery_dataset-ml_datasets_uscentral1-dataset_id" {
value = google_bigquery_dataset.ml_datasets_uscentral1.dataset_id
}

resource "google_storage_bucket" "ml-datasets" {
name = "${var.bucket_name_prefix}-ml-datasets"
force_destroy = true
location = "US"
uniform_bucket_level_access = true
lifecycle {
ignore_changes = [
logging,
]
}
}

output "storage_bucket-ml-datasets-name" {
value = google_storage_bucket.ml-datasets.name
}
12 changes: 0 additions & 12 deletions datasets/ml_datasets/infra/penguins_pipeline.tf
Expand Up @@ -19,12 +19,6 @@ resource "google_bigquery_table" "ml_datasets_penguins" {
project = var.project_id
dataset_id = "ml_datasets"
table_id = "penguins"






depends_on = [
google_bigquery_dataset.ml_datasets
]
Expand All @@ -42,12 +36,6 @@ resource "google_bigquery_table" "ml_datasets_uscentral1_penguins" {
project = var.project_id
dataset_id = "ml_datasets_uscentral1"
table_id = "penguins"






depends_on = [
google_bigquery_dataset.ml_datasets_uscentral1
]
Expand Down
@@ -0,0 +1,37 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# The base image for this build
FROM python:3.8

# Allow statements and log messages to appear in Cloud logs
ENV PYTHONUNBUFFERED True

# Copy the requirements file into the image
COPY requirements.txt ./

# Install the packages specified in the requirements file
RUN python3 -m pip install --no-cache-dir -r requirements.txt

# The WORKDIR instruction sets the working directory for any RUN, CMD,
# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile.
# If the WORKDIR doesn’t exist, it will be created even if it’s not used in
# any subsequent Dockerfile instruction
WORKDIR /custom

# Copy the specific data processing script/s in the image under /custom/*
COPY ./csv_transform.py .

# Command to run the data processing script when the container is run
CMD ["python3", "csv_transform.py"]
@@ -0,0 +1,119 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import datetime
import json
import logging
import os
import pathlib
import typing

import pandas as pd
import requests
from google.cloud import storage


def main(
source_url: str,
source_file: pathlib.Path,
target_file: pathlib.Path,
target_gcs_bucket: str,
target_gcs_path: str,
headers: typing.List[str],
rename_mappings: dict,
pipeline_name: str,
) -> None:

logging.info(
f"ML datasets {pipeline_name} process started at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)

logging.info("Creating 'files' folder")
pathlib.Path("./files").mkdir(parents=True, exist_ok=True)

logging.info(f"Downloading file from {source_url}... ")
download_file(source_url, source_file)

logging.info(f"Opening file {source_file}...")
df = pd.read_csv(str(source_file))

logging.info(f"Transforming {source_file}... ")

logging.info("Transform: Rename columns... ")
rename_headers(df, rename_mappings)

logging.info("Transform: Replacing values... ")
df = df.replace(to_replace={"species": {"Iris-": ""}})

logging.info("Transform: Reordering headers..")
df = df[headers]

logging.info(f"Saving to output file.. {target_file}")
try:
save_to_new_file(df, file_path=str(target_file))
except Exception as e:
logging.error(f"Error saving output file: {e}.")

logging.info(
f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}"
)
upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path)

logging.info(
f"ML datasets {pipeline_name} process completed at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)


def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None:
df.rename(columns=rename_mappings, inplace=True)


def save_to_new_file(df: pd.DataFrame, file_path: str) -> None:
df.to_csv(file_path, index=False)


def download_file(source_url: str, source_file: pathlib.Path) -> None:
logging.info(f"Downloading {source_url} into {source_file}")
r = requests.get(source_url, stream=True)
if r.status_code == 200:
with open(source_file, "wb") as f:
for chunk in r:
f.write(chunk)
else:
logging.error(f"Couldn't download {source_url}: {r.text}")


def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None:
storage_client = storage.Client()
bucket = storage_client.bucket(gcs_bucket)
blob = bucket.blob(gcs_path)
blob.upload_from_filename(file_path)


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)

main(
source_url=os.environ["SOURCE_URL"],
source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(),
target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(),
target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"],
target_gcs_path=os.environ["TARGET_GCS_PATH"],
headers=json.loads(os.environ["CSV_HEADERS"]),
rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]),
pipeline_name=os.environ["PIPELINE_NAME"],
)
@@ -0,0 +1,3 @@
google-cloud-storage
pandas
requests
4 changes: 4 additions & 0 deletions datasets/ml_datasets/pipelines/dataset.yaml
Expand Up @@ -68,3 +68,7 @@ resources:
dataset_id: ml_datasets_uscentral1
location: us-central1
description: ~
- type: storage_bucket
name: ml-datasets
uniform_bucket_level_access: True
location: US
105 changes: 105 additions & 0 deletions datasets/ml_datasets/pipelines/iris/iris_dag.py
@@ -0,0 +1,105 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from airflow import DAG
from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator

default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}


with DAG(
dag_id="ml_datasets.iris",
default_args=default_args,
max_active_runs=1,
schedule_interval="@daily",
catchup=False,
default_view="graph",
) as dag:

# Run CSV transform within kubernetes pod
iris_transform_csv = kubernetes_pod_operator.KubernetesPodOperator(
task_id="iris_transform_csv",
startup_timeout_seconds=600,
name="iris",
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.ml_datasets.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "https://www.openml.org/data/get_csv/61/dataset_61_iris.csv",
"SOURCE_FILE": "files/data.csv",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/ml_datasets/iris/data_output.csv",
"PIPELINE_NAME": "iris",
"CSV_HEADERS": '["sepal_length","sepal_width","petal_length","petal_width","species"]',
"RENAME_MAPPINGS": '{"sepallength": "sepal_length","sepalwidth": "sepal_width","petallength": "petal_length","petalwidth": "petal_width","class": "species"}',
},
resources={
"request_memory": "2G",
"request_cpu": "1",
"request_ephemeral_storage": "5G",
},
)

# Task to load CSV data to a BigQuery table
load_iris_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id="load_iris_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/ml_datasets/iris/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="ml_datasets.iris",
skip_leading_rows=1,
allow_quoted_newlines=True,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "sepal_length",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "sepal_width",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "petal_length",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "petal_width",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "species",
"type": "string",
"description": "",
"mode": "nullable",
},
],
)

iris_transform_csv >> load_iris_to_bq

0 comments on commit 48e51af

Please sign in to comment.