Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Minimal Data Platform blueprint #1451

Merged
merged 15 commits into from
Jun 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ locals {
CURATED_PRJ = module.cur-project.project_id
DP_KMS_KEY = var.service_encryption_keys.compute
DP_REGION = var.region
GCP_REGION = var.region
LAND_PRJ = module.land-project.project_id
LAND_GCS = module.land-cs-0.name
PHS_CLUSTER_NAME = try(module.processing-dp-historyserver[0].name, null)
PROCESSING_GCS = module.processing-cs-0.name
LAND_GCS = module.land-cs-0.url
PHS_CLUSTER_NAME = try(module.processing-dp-historyserver[0].name, "")
PROCESSING_GCS = module.processing-cs-0.url
PROCESSING_PRJ = module.processing-project.project_id
PROCESSING_SA = module.processing-sa-0.email
PROCESSING_SUBNET = local.processing_subnet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

locals {
iam_processing = {
"roles/bigquery.jobUser" = [
module.processing-sa-cmp-0.iam_email,
module.processing-sa-0.iam_email
]
"roles/composer.admin" = [local.groups_iam.data-engineers]
"roles/dataflow.admin" = [module.processing-sa-cmp-0.iam_email]
"roles/dataflow.worker" = [module.processing-sa-0.iam_email]
"roles/composer.environmentAndStorageObjectAdmin" = [local.groups_iam.data-engineers]
"roles/composer.ServiceAgentV2Ext" = [
"serviceAccount:${module.processing-project.service_accounts.robots.composer}"
Expand Down Expand Up @@ -78,6 +84,7 @@ module "processing-project" {
"composer.googleapis.com",
"compute.googleapis.com",
"container.googleapis.com",
"dataflow.googleapis.com",
"dataproc.googleapis.com",
"iam.googleapis.com",
"servicenetworking.googleapis.com",
Expand All @@ -96,7 +103,7 @@ module "processing-project" {
host_project = var.network_config.host_project
service_identity_iam = {
"roles/compute.networkUser" = [
"cloudservices", "compute", "container-engine"
"cloudservices", "compute", "container-engine", "dataflow"
]
"roles/composer.sharedVpcAgent" = [
"composer"
Expand Down
18 changes: 10 additions & 8 deletions blueprints/data-solutions/data-platform-minimal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The following diagram is a high-level reference of the resources created and man

![Data Platform architecture overview](./images/diagram.png "Data Platform architecture overview")

A demo [Airflow pipeline](demo/orchestrate_pyspark.py) is also part of this blueprint: it can be built and run on top of the foundational infrastructure to verify or test the setup quickly.
A set of demo [Airflow pipelines](./demo/) are also part of this blueprint: they can be run on top of the foundational infrastructure to verify and test the setup.

## Design overview and choices

Expand Down Expand Up @@ -203,7 +203,7 @@ module "data-platform" {
prefix = "myprefix"
}

# tftest modules=21 resources=112
# tftest modules=21 resources=116
```

## Customizations
Expand Down Expand Up @@ -299,11 +299,13 @@ The application layer is out of scope of this script. As a demo purpuse only, on
| name | description | sensitive |
|---|---|:---:|
| [bigquery-datasets](outputs.tf#L17) | BigQuery datasets. | |
| [dataproc-history-server](outputs.tf#L24) | List of bucket names which have been assigned to the cluster. | |
| [gcs-buckets](outputs.tf#L29) | GCS buckets. | ✓ |
| [kms_keys](outputs.tf#L39) | Cloud MKS keys. | |
| [projects](outputs.tf#L44) | GCP Projects informations. | |
| [vpc_network](outputs.tf#L62) | VPC network. | |
| [vpc_subnet](outputs.tf#L70) | VPC subnetworks. | |
| [composer](outputs.tf#L24) | Composer variables. | |
| [dataproc-history-server](outputs.tf#L31) | List of bucket names which have been assigned to the cluster. | |
| [gcs_buckets](outputs.tf#L36) | GCS buckets. | |
| [kms_keys](outputs.tf#L46) | Cloud MKS keys. | |
| [projects](outputs.tf#L51) | GCP Projects informations. | |
| [service_accounts](outputs.tf#L69) | Service account created. | |
| [vpc_network](outputs.tf#L78) | VPC network. | |
| [vpc_subnet](outputs.tf#L86) | VPC subnetworks. | |

<!-- END TFDOC -->
58 changes: 58 additions & 0 deletions blueprints/data-solutions/data-platform-minimal/demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Data ingestion Demo

In this folder, you can find Airflow DAG examples to process data on the `minimal data platform` instantiated [here](../). Examples are focused on importing data from `landing` to `curated` resources.

Examples are not intended to be a production-ready code, but a bollerplate to verify and test the setup.

## Demo use case

The demo imports CSV customer data from the `landing` GCS bucket to the `curated` BigQuery dataset.

## Input files

Data are uploaded to the `landing` GCS bucket. File structure:

- [`customers.csv`](./data/customers.csv): Comma separate value with customer information in the following format: Customer ID, Name, Surname, Registration Timestamp

## Configuration files

Data relies on the following configuration files:

- [`customers_schema.json`](./data/customers_schema.json): customer BigQuery table schema definition.
- [`customers_udf.js`](./data/customers_udf.js): dataflow user defined function to transform CSV files into BigQuery schema
- [`customers.json`](./data/customers.json): customer CSV file schema definition

## Data processing pipelines

Different data pipelines are provided to highlight different ways import data.

Below you can find a description of each example:

- `bq_import.py`: Importing data using BigQuery import capability.
- `dataflow_import.py`: Importing data using Cloud Dataflow.
- `dataproc_import.py`: Importing data using Cloud Dataproc.

## Running the demo

To run demo examples, please follow the following steps:

1. Copy sample data to the `landing` Cloud Storage bucket impersonating the `landing` service account.
1. Copy sample data structure definition in the `processing` Cloud Storage bucket impersonating the `orchestration` service account.
1. Copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account.
1. Open the Cloud Composer Airflow UI and run the imported DAG.
1. Run the BigQuery query to see results.

Below you can find computed commands to perform steps.

```bash
terraform output -json | jq -r '@sh "export LND_SA=\(.service_accounts.value.landing)\nexport PRC_SA=\(.service_accounts.value.processing)\nexport CMP_SA=\(.service_accounts.value.composer)"' > env.sh

terraform output -json | jq -r '@sh "export LND_GCS=\(.gcs_buckets.value.landing_cs_0)\nexport PRC_GCS=\(.gcs_buckets.value.processing_cs_0)\nexport CMP_GCS=\(.gcs_buckets.value.composer)"' >> env.sh

source ./env.sh

gsutil -i $LND_SA cp demo/data/*.csv gs://$LND_GCS
gsutil -i $CMP_SA cp demo/data/*.j* gs://$PRC_GCS
gsutil -i $CMP_SA cp demo/data/pyspark_* gs://$PRC_GCS
lcaggio marked this conversation as resolved.
Show resolved Hide resolved
gsutil -i $CMP_SA cp demo/dag_*.py $CMP_GCS
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------

import csv
import datetime
import io
import json
import logging
import os

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup

# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
CURATED_PRJ = os.environ.get("CURATED_PRJ")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
CURATED_GCS = os.environ.get("CURATED_GCS")
LAND_PRJ = os.environ.get("LAND_PRJ")
LAND_GCS = os.environ.get("LAND_GCS")
PROCESSING_GCS = os.environ.get("PROCESSING_GCS")
PROCESSING_SA = os.environ.get("PROCESSING_SA")
PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ")
PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET")
PROCESSING_VPC = os.environ.get("PROCESSING_VPC")
DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "")
DP_REGION = os.environ.get("DP_REGION")
DP_ZONE = os.environ.get("DP_REGION") + "-b"

# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'dataflow_default_options': {
'location': DP_REGION,
'zone': DP_ZONE,
'stagingLocation': PROCESSING_GCS + "/staging",
'tempLocation': PROCESSING_GCS + "/tmp",
'serviceAccountEmail': PROCESSING_SA,
'subnetwork': PROCESSING_SUBNET,
'ipConfiguration': "WORKER_IP_PRIVATE",
'kmsKeyName' : DP_KMS_KEY
},
}

# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------

with models.DAG(
'dataflow_gcs2bq',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)

end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)

# Bigquery Tables automatically created for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
customers_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_customers_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
project_id=PROCESSING_PRJ,
location=DP_REGION,
parameters={
"javascriptTextTransformFunctionName": "transform",
"JSONPath": PROCESSING_GCS + "/customers_schema.json",
"javascriptTextTransformGcsPath": PROCESSING_GCS + "/customers_udf.js",
"inputFilePattern": LAND_GCS + "/customers.csv",
"outputTable": CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
"bigQueryLoadingTemporaryDirectory": PROCESSING_GCS + "/tmp/bq/",
},
)

start >> customers_import >> end

Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import time
import os

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator

)
from airflow.utils.dates import days_ago

# --------------------------------------------------------------------------------
# Get variables
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
CURATED_GCS = os.environ.get("CURATED_GCS")
CURATED_PRJ = os.environ.get("CURATED_PRJ")
DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "")
DP_REGION = os.environ.get("DP_REGION")
GCP_REGION = os.environ.get("GCP_REGION")
LAND_PRJ = os.environ.get("LAND_PRJ")
LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET")
LAND_GCS = os.environ.get("LAND_GCS")
PHS_CLUSTER_NAME = os.environ.get("PHS_CLUSTER_NAME")
PROCESSING_GCS = os.environ.get("PROCESSING_GCS")
PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ")
PROCESSING_SA = os.environ.get("PROCESSING_SA")
PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET")
PROCESSING_VPC = os.environ.get("PROCESSING_VPC")

PYTHON_FILE_LOCATION = PROCESSING_GCS+"/pyspark_gcs2bq.py"
PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_CLUSTER_NAME
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.29.0.jar"
BATCH_ID = "batch-create-phs-"+str(int(time.time()))

default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"region": DP_REGION,
}
with models.DAG(
"dataproc_batch_gcs2bq", # The id you will see in the DAG airflow page
default_args=default_args, # The interval with which to schedule the DAG
schedule_interval=None, # Override to match your needs
) as dag:

create_batch = DataprocCreateBatchOperator(
task_id="batch_create",
project_id=PROCESSING_PRJ,
batch_id=BATCH_ID,
batch={
"environment_config": {
"execution_config": {
"service_account": PROCESSING_SA,
"subnetwork_uri": PROCESSING_SUBNET
},
"peripherals_config": {
"spark_history_server_config":{
"dataproc_cluster": PHS_CLUSTER_PATH
}
}
},
"pyspark_batch": {
"args": [
LAND_GCS + "/customers.csv",
CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
PROCESSING_GCS[5:]
],
"main_python_file_uri": PYTHON_FILE_LOCATION,
"jar_file_uris": [SPARK_BIGQUERY_JAR_FILE]
}
}
)

list_batches = DataprocListBatchesOperator(
task_id="list-all-batches",
)

get_batch = DataprocGetBatchOperator(
task_id="get_batch",
batch_id=BATCH_ID,
)

create_batch >> list_batches >> get_batch