## Data Pipeline using Spark and BigQuery on Dataproc
Let us see how we can manage Dataproc Workflows using gcloud commands.
* Step 1: Create Dataproc Workflow Template
* Step 2: Configure active Dataproc cluster (we can also configure new cluster)
* Step 3: Add Spark SQL or Pyspark Jobs to Dataproc Workflow Templates with Dependencies
* Step 4: Run and Validate the Dataproc Workflow Template

We can take care of all the steps using `gcloud` commands.

In [30]:
!gsutil ls -r gs://airetail_mld/scripts



Updates are available for some Google Cloud CLI components.  To install them,
please run:
  $ gcloud components update

gs://airetail_mld/scripts/:

gs://airetail_mld/scripts/daily_product_revenue/:
gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
gs://airetail_mld/scripts/daily_product_revenue/compute_daily_product_revenue.sql
gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
gs://airetail_mld/scripts/daily_product_revenue/unittest.sql


In [31]:
!gsutil cp -r ../../apps/daily_product_revenue_bq gs://airetail_mld/apps

Copying file://../../apps/daily_product_revenue_bq/app.py [Content-Type=text/x-python]...
/ [1 files][  584.0 B/  584.0 B]                                                
Operation completed over 1 objects/584.0 B.                                      


In [32]:
!gsutil ls -r gs://airetail_mld/apps/

gs://airetail_mld/apps/:

gs://airetail_mld/apps/daily_product_revenue_bq/:
gs://airetail_mld/apps/daily_product_revenue_bq/app.py


In [33]:
!gcloud config set dataproc/region us-central1

Updated property [dataproc/region].


In [34]:
!gcloud dataproc workflow-templates list

ID                           JOBS  UPDATE_TIME                  VERSION
getting-started              4     2023-06-11T23:01:29.613712Z  1
wf-daily-product-revenue     4     2023-06-12T00:18:39.027622Z  6
wf-daily-product-revenue-bq  5     2023-07-24T00:37:03.015491Z  9


Here is the command to delete Dataproc Workflow Template (multiline approach doesn't work on Windows)

```shell
gcloud dataproc workflow-templates \
    delete wf-daily-product-revenue-bq
```

In [35]:
!gcloud dataproc workflow-templates delete wf-daily-product-revenue-bq --quiet

Here is the command to create Dataproc Workflow.

```shell
gcloud dataproc workflow-templates \
    create wf-daily-product-revenue-bq
```

Create new workflow template that adds the daily product revenue to BigQuery

In [36]:
!gcloud dataproc workflow-templates create wf-daily-product-revenue-bq

In [37]:
!gcloud dataproc workflow-templates list

ID                           JOBS  UPDATE_TIME                  VERSION
getting-started              4     2023-06-11T23:01:29.613712Z  1
wf-daily-product-revenue     4     2023-06-12T00:18:39.027622Z  6
wf-daily-product-revenue-bq  0     2023-07-24T00:41:29.127620Z  1


Here is the command to attach running or active Dataproc Cluster to the Dataproc Workflow. We need to specify the label for the cluster.

```shell
gcloud dataproc workflow-templates \
    set-cluster-selector \
    wf-daily-product-revenue-bq \
    --cluster-labels goog-dataproc-cluster-name=aidataprocdev
```

In [38]:
!gcloud dataproc workflow-templates set-cluster-selector wf-daily-product-revenue-bq --cluster-labels goog-dataproc-cluster-name=cluster-b442

Here are the commands to add Spark SQL Jobs to the Dataproc Workflow.

```shell
gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-cleanup \
    --file=gs://airetail/scripts/daily_product_revenue/cleanup.sql \
    --workflow-template=wf-daily-product-revenue-bq

# File Format Converter jobs with dependency on cleanup
gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-convert-orders \
    --file=gs://airetail/scripts/daily_product_revenue/file_format_converter.sql \
    --params=bucket_name=gs://airetail,table_name=orders \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-cleanup

gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-convert-order-items \
    --file=gs://airetail/scripts/daily_product_revenue/file_format_converter.sql \
    --params=bucket_name=gs://airetail,table_name=order_items \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-cleanup

# Last Job which depends on convert orders and order_items jobs
gcloud dataproc workflow-templates add-job spark-sql \
    --step-id=job-daily-product-revenue \
    --file=gs://airetail/scripts/daily_product_revenue/compute_daily_product_revenue.sql \
    --params=bucket_name=gs://airetail \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-convert-orders,job-convert-order-items
```

In [39]:
!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-cleanup --file=gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql --workflow-template=wf-daily-product-revenue-bq

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
name: projects/dataanalytics-347914/regions/us-central1/workflowTemplates/wf-daily-product-revenue-bq
placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-name: cluster-b442
updateTime: '2023-07-24T00:41:36.526100Z'
version: 3


In [40]:

!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-convert-orders --file=gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql --params=bucket_name=gs://airetail_mld,table_name=orders --workflow-template=wf-daily-product-revenue-bq --start-after=job-cleanup

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: orders
  stepId: job-convert-orders
name: projects/dataanalytics-347914/regions/us-central1/workflowTemplates/wf-daily-product-revenue-bq
placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-name: cluster-b442
updateTime: '2023-07-24T00:41:39.240440Z'
version: 4


In [41]:
!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-convert-order-items --file=gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql --params=bucket_name=gs://airetail_mld,table_name=order_items --workflow-template=wf-daily-product-revenue-bq --start-after=job-cleanup

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: orders
  stepId: job-convert-orders
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: order_items
  stepId: job-convert-order-items
name: projects/dataanalytics-347914/regions/us-central1/workflowTemplates/wf-daily-product-revenue-bq
placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-name: cluster-b442
updateTime: '2023-07-24T00:41:41.942460Z'
version: 5


In [42]:
!gcloud dataproc workflow-templates add-job spark-sql --step-id=job-daily-product-revenue --file=gs://airetail_mld/scripts/daily_product_revenue/compute_daily_product_revenue.sql --params=bucket_name=gs://airetail_mld --workflow-template=wf-daily-product-revenue-bq --start-after=job-convert-orders,job-convert-order-items

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: orders
  stepId: job-convert-orders
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: order_items
  stepId: job-convert-order-items
- prerequisiteStepIds:
  - job-convert-orders
  - job-convert-order-items
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/compute_daily_product_revenue.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
  stepId: job-daily-product-reve

In [43]:
!gcloud dataproc workflow-templates list

ID                           JOBS  UPDATE_TIME                  VERSION
getting-started              4     2023-06-11T23:01:29.613712Z  1
wf-daily-product-revenue     4     2023-06-12T00:18:39.027622Z  6
wf-daily-product-revenue-bq  4     2023-07-24T00:41:45.387863Z  6


In [44]:
!gcloud dataproc workflow-templates add-job

[1;31mERROR:[0m (gcloud.dataproc.workflow-templates.add-job) Command name argument expected.

[m[1mAvailable commands for gcloud dataproc workflow-templates add-job:[m

      hadoop                  Add a hadoop job to the workflow template.
      hive                    Add a Hive job to the workflow template.
      pig                     Add a Pig job to the workflow template.
      presto                  Add a Presto job to the workflow template.
      pyspark                 Add a PySpark job to the workflow template.
      spark                   Add a Spark job to the workflow template.
      spark-r                 Add a SparkR job to the workflow template.
      spark-sql               Add a SparkSql job to the workflow template.

[mFor detailed information on this command and its flags, run:
  gcloud dataproc workflow-templates add-job --help


In [45]:
!gcloud dataproc workflow-templates add-job pyspark

[1;31mERROR:[0m (gcloud.dataproc.workflow-templates.add-job.pyspark) argument PY_FILE --step-id (--workflow-template : --region): Must be specified.
Usage: gcloud dataproc workflow-templates add-job pyspark PY_FILE --step-id=STEP_ID (--workflow-template=WORKFLOW_TEMPLATE : --region=REGION) [optional flags] [-- JOB_ARGS ...]
  optional flags may be  --archives | --driver-log-levels | --files | --help |
                         --jars | --labels | --properties | --properties-file |
                         --py-files | --region | --start-after

For detailed information on this command and its flags, run:
  gcloud dataproc workflow-templates add-job pyspark --help


* Make sure to specify right project id in below command. Also reformat it to single line while running on Windows.

You can use the `gcloud dataproc jobs submit` command as reference.

```
gcloud dataproc jobs submit \
    pyspark --cluster=aidataprocdev \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
	--properties=spark.app.name="BigQuery Loader - Daily Product Revenue" \
    --properties=spark.submit.deployMode=cluster \
    --properties=spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue \
    --properties=spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710 \
    --properties=spark.yarn.appMasterEnv.DATASET_NAME=retail \
    --properties=spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail \
    gs://airetail/apps/daily_product_revenue_bq/app.py
```

Add new pyspark job into workflow template

In [46]:
!gcloud dataproc workflow-templates \
    add-job pyspark \
    --step-id=job-load-dpr-bq \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.31.0.jar \
	--properties=spark.name="BigQuery Loader - Daily Product Revenue",spark.submit.deployMode=cluster,spark.yarn.appMasterEnv.DATA_URI=gs://airetail_mld/retail_gold.db/daily_product_revenue,spark.yarn.appMasterEnv.PROJECT_ID=dataanalytics-347914,spark.yarn.appMasterEnv.DATASET_NAME=retail,spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail_mld \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-daily-product-revenue \
    gs://airetail_mld/apps/daily_product_revenue_bq/app.py

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: orders
  stepId: job-convert-orders
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: order_items
  stepId: job-convert-order-items
- prerequisiteStepIds:
  - job-convert-orders
  - job-convert-order-items
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/compute_daily_product_revenue.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
  stepId: job-daily-product-reve

In [47]:
!gcloud dataproc workflow-templates describe wf-daily-product-revenue-bq

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: orders
  stepId: job-convert-orders
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: order_items
  stepId: job-convert-order-items
- prerequisiteStepIds:
  - job-convert-orders
  - job-convert-order-items
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/compute_daily_product_revenue.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
  stepId: job-daily-product-reve

Here is the command to instantiate or run Dataproc Workflow.

```shell
gcloud dataproc workflow-templates \
    instantiate wf-daily-product-revenue-bq
```

In [48]:
!gcloud dataproc clusters list

NAME          PLATFORM  WORKER_COUNT  PREEMPTIBLE_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-b442  GCE                                               RUNNING  us-central1-c


In [49]:
!gcloud dataproc clusters start cluster-b442

[1;31mERROR:[0m (gcloud.dataproc.clusters.start) FAILED_PRECONDITION: Cluster 'cluster-b442' must be stopped before it can be started, current cluster state is 'RUNNING'.


In [50]:
# This will take some time to run

!gcloud dataproc workflow-templates instantiate wf-daily-product-revenue-bq

Waiting on operation [projects/dataanalytics-347914/regions/us-central1/operations/bed725f3-52ce-32de-b0bc-8400dbb83ac5].
WorkflowTemplate [wf-daily-product-revenue-bq] RUNNING
Job ID job-cleanup-o37dkn32rthzg RUNNING
Job ID job-cleanup-o37dkn32rthzg COMPLETED
Job ID job-convert-orders-o37dkn32rthzg RUNNING
Job ID job-convert-order-items-o37dkn32rthzg RUNNING
Job ID job-convert-orders-o37dkn32rthzg COMPLETED
Job ID job-convert-order-items-o37dkn32rthzg COMPLETED
Job ID job-daily-product-revenue-o37dkn32rthzg RUNNING
Job ID job-daily-product-revenue-o37dkn32rthzg COMPLETED
Job ID job-load-dpr-bq-o37dkn32rthzg RUNNING
WorkflowTemplate [wf-daily-product-revenue-bq] DONE
Job ID job-load-dpr-bq-o37dkn32rthzg COMPLETED


In [51]:
!gcloud dataproc workflow-templates remove-job

[1;31mERROR:[0m (gcloud.dataproc.workflow-templates.remove-job) argument (TEMPLATE : --region=REGION): Must be specified.
Usage: gcloud dataproc workflow-templates remove-job (TEMPLATE : --region=REGION) [optional flags]
  optional flags may be  --help | --region | --step-id

For detailed information on this command and its flags, run:
  gcloud dataproc workflow-templates remove-job --help


In [52]:
!gcloud dataproc workflow-templates remove-job wf-daily-product-revenue-bq --step-id=job-load-dpr-bq --quiet

In [53]:
!gcloud dataproc workflow-templates \
    add-job pyspark \
    --step-id=job-load-dpr-bq \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.31.0.jar \
	--properties=spark.name="BigQuery Loader - Daily Product Revenue",spark.submit.deployMode=cluster,spark.yarn.appMasterEnv.DATA_URI=gs://airetail_mld/retail_gold.db/daily_product_revenue,spark.yarn.appMasterEnv.PROJECT_ID=dataanalytics-347914,spark.yarn.appMasterEnv.DATASET_NAME=retail,spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail_mld \
    --workflow-template=wf-daily-product-revenue-bq \
    --start-after=job-daily-product-revenue \
    gs://airetail_mld/apps/daily_product_revenue_bq/app.py

createTime: '2023-07-24T00:41:29.127620Z'
id: wf-daily-product-revenue-bq
jobs:
- sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/cleanup.sql
  stepId: job-cleanup
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: orders
  stepId: job-convert-orders
- prerequisiteStepIds:
  - job-cleanup
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/file_format_converter.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
      table_name: order_items
  stepId: job-convert-order-items
- prerequisiteStepIds:
  - job-convert-orders
  - job-convert-order-items
  sparkSqlJob:
    queryFileUri: gs://airetail_mld/scripts/daily_product_revenue/compute_daily_product_revenue.sql
    scriptVariables:
      bucket_name: gs://airetail_mld
  stepId: job-daily-product-reve

In [54]:
!gcloud dataproc workflow-templates instantiate wf-daily-product-revenue-bq

Waiting on operation [projects/dataanalytics-347914/regions/us-central1/operations/c122da01-c08c-3496-9e23-7b4091a8aa45].
WorkflowTemplate [wf-daily-product-revenue-bq] RUNNING
Job ID job-cleanup-5eewr5sbj5ojw RUNNING
Job ID job-cleanup-5eewr5sbj5ojw COMPLETED
Job ID job-convert-orders-5eewr5sbj5ojw RUNNING
Job ID job-convert-order-items-5eewr5sbj5ojw RUNNING
Job ID job-convert-orders-5eewr5sbj5ojw COMPLETED
Job ID job-convert-order-items-5eewr5sbj5ojw COMPLETED
Job ID job-daily-product-revenue-5eewr5sbj5ojw RUNNING
Job ID job-daily-product-revenue-5eewr5sbj5ojw COMPLETED
Job ID job-load-dpr-bq-5eewr5sbj5ojw RUNNING
WorkflowTemplate [wf-daily-product-revenue-bq] DONE
Job ID job-load-dpr-bq-5eewr5sbj5ojw COMPLETED
