In [1]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#### References

- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)
- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)
- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)
- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account.  
Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account.  

#### Permissions

Make sure that the service account used to run the notebook has the following roles:

- roles/aiplatform.serviceAgent
- roles/aiplatform.customCodeServiceAgent
- roles/storage.objectCreator
- roles/storage.objectViewer
- roles/dataproc.editor
- roles/dataproc.worker

#### Step 1:
#### Set Google Cloud properties

In [None]:
# User Configuration
# User Inputs

get_project_id = ! gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = get_project_id[0]
REGION = ""  # example"us-west1"
GCS_STAGING_LOCATION = "gs://<bucket_name>" # example "gs://bucket_name"
SUBNET = "" # example "projects/<project-id>/regions/<region-id>/subnetworks/<subnet-name>" 
INPUT_HIVE_DATABASE= ""
INPUT_HIVE_TABLES= "" # example "table1,table2,table3..." or "*"
OUTPUT_BIGQUERY_DATASET= ""
TEMP_BUCKET= "<bucket_name>"
HIVE_OUTPUT_MODE=""
HIVE_METASTORE= "" # example "thrift://hive-cluster-m:9083"
MAX_PARALLELISM=10 # Overwrite the value if you want to increase number of parallel Dataproc Batch Jobs

# Run Dataproc Templates from Vertex AI Pipelines

## Overview

This notebook shows how to build a Vertex AI Pipeline to run a Dataproc Template using the DataprocPySparkBatchOp component.

#### Step 2:
#### Install the required packages

In [None]:
# Google Cloud notebooks requires dependencies to be installed with '--user'
! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q
# Install latest JDK
! sudo apt-get update
! sudo apt-get install default-jdk

### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.

Uncomment & Run this cell if you want to restart the notebook

In [None]:
# import os

# if not os.getenv("IS_TESTING"):
#    import IPython
#    app = IPython.Application.instance()
#    app.kernel.do_shutdown(True)

#### Step 3:
#### Import dependencies

In [None]:
import google.cloud.aiplatform as aiplatform
from kfp import dsl
from kfp.v2 import compiler
from datetime import datetime
from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
import time
import os
from pyspark.sql import SparkSession
import pandas as pd


#### Step 4:
#### Change working directory to the Dataproc Templates python folder

In [None]:
WORKING_DIRECTORY = "/home/jupyter/dataproc-templates/python"
%cd /home/jupyter/dataproc-templates/python

#### Step 5:
#### Build Dataproc Templates python package

In [None]:
PACKAGE_EGG_FILE = "dist/dataproc_templates_distribution.egg"
! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE

#### Step 6:
#### Copy package to the GCS bucket

For this, make sure that the service account used to run the notebook has the following roles:
 - roles/storage.objectCreator
 - roles/storage.objectViewer

In [None]:
! gsutil cp main.py $GCS_STAGING_LOCATION/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dist/

#### Step 7:
#### Get Hive Tables 
In case user wants to load all the Hive tables from the database, we need to get the table list using the metastore.

Below cell will fetch all tables from the Hive database by running a Spark SQL query using the provided Hive Metastore.

In [None]:
if INPUT_HIVE_TABLES=="*":
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
    os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
    spark=SparkSession.builder \
          .master("local")\
          .appName("Spark Job to get HIVE table list") \
          .config("hive.metastore.uris","thrift://hive-cluster-m:9083") \
          .enableHiveSupport() \
          .getOrCreate()  
    TABLE_LIST_DF=spark.sql("show tables in "+INPUT_HIVE_DATABASE)
    TABLE_LIST=TABLE_LIST_DF.select("tableName").rdd.flatMap(lambda x: x).collect()
    print("Table Sets to Migrate: ")
    print(TABLE_LIST)
    spark.stop()
else:
    TABLE_LIST=INPUT_HIVE_TABLES.split(",")
    print("Table Sets to Migrate: ")
    print(TABLE_LIST)

#### Step 8:

Split Hive Tables list based on MAX_PARALLELISM value provided by the user.

In [None]:
import copy
COMPLETE_LIST = copy.deepcopy(TABLE_LIST)
PARALLEL_JOBS = len(TABLE_LIST)//MAX_PARALLELISM
JOB_LIST = []
while len(COMPLETE_LIST) > 0:
    SUB_LIST = []
    for i in range(MAX_PARALLELISM):
        if len(COMPLETE_LIST)>0 :
            SUB_LIST.append(COMPLETE_LIST[0])
            COMPLETE_LIST.pop(0)
        else:
            break
    JOB_LIST.append(SUB_LIST)
print("List of tables for execution : ")
print(JOB_LIST)

#### Step 9:

Set Dataproc Template Properties

In [None]:
PIPELINE_ROOT = GCS_STAGING_LOCATION + "/pipeline_root/dataproc_pyspark"
MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + "/main.py"
JARS = ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"]
PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + "/dist/dataproc_templates_distribution.egg"]

#### Step 10:
#### Build pipeline and run Dataproc Template on Vertex AI Pipelines to migrate Hive tables to BigQuery

For this, make sure that the service account used to run the notebook has the following roles:
 - roles/dataproc.editor
 - roles/dataproc.worker

In [None]:
runtime_prop={}
runtime_prop['spark.hadoop.hive.metastore.uris']=HIVE_METASTORE
runtime_prop['mapreduce.fileoutputcommitter.marksuccessfuljobs'] = "false"

def migrate_hive(EXECUTION_LIST):
    EXECUTION_LIST = EXECUTION_LIST
    aiplatform.init(project=PROJECT_ID, staging_bucket=GCS_STAGING_LOCATION)

    @dsl.pipeline(
        name="hive-to-bq-pyspark",
        description="Pipeline to migrate tables from hive to bq",
    )
    def pipeline(
        project_id: str = PROJECT_ID,
        location: str = REGION,
        main_python_file_uri: str = MAIN_PYTHON_FILE,
        python_file_uris: list = PYTHON_FILE_URIS,
        jar_file_uris: list = JARS,
        subnetwork_uri: str = SUBNET
    ):
        for table in EXECUTION_LIST:
            BATCH_ID = "hive2bq-{}-{}".format(table,datetime.now().strftime("%s"))
            TEMPLATE_SPARK_ARGS = [
                                    "--template=HIVETOBIGQUERY",
                                    "--hive.bigquery.input.database={}".format(INPUT_HIVE_DATABASE),
                                    "--hive.bigquery.input.table={}".format(table),
                                    "--hive.bigquery.output.table={}".format(table),
                                    "--hive.bigquery.output.dataset={}".format(OUTPUT_BIGQUERY_DATASET),
                                    "--hive.bigquery.output.mode={}".format(HIVE_OUTPUT_MODE),
                                    "--hive.bigquery.temp.bucket.name={}".format(TEMP_BUCKET)                                    ]
            _ = DataprocPySparkBatchOp(
                project=project_id,
                location=location,
                batch_id=BATCH_ID,
                main_python_file_uri=main_python_file_uri,
                python_file_uris=python_file_uris,
                jar_file_uris=jar_file_uris,
                subnetwork_uri=subnetwork_uri,
                runtime_config_properties=runtime_prop,
                args=TEMPLATE_SPARK_ARGS,
            )
            time.sleep(1)

    compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

    pipeline = aiplatform.PipelineJob(
            display_name="pipeline",
            template_path="pipeline.json",
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            )
    pipeline.run()

#### Step 11:

Run Dataproc Batch Template based on Hive Tables list calculated in Step 8.

The below cell will call function migrate_hive to migrate tables using dataproc serverless batch job and also add an entry in Audit Table for each Table Set.

In [None]:
AUDIT_DICT={}
AUDIT_DF = pd.DataFrame(columns=["Source_DB_Name","Source_Table_Set","Target_DB_Name","Target_Table_Set","Job_Start_Time","Job_End_Time","Job_Status"])
 
for execution_list in JOB_LIST:
    print("\n\nLoading Table Set: "+str(execution_list))
    AUDIT_DICT["Source_DB_Name"]=INPUT_HIVE_DATABASE
    AUDIT_DICT["Source_Table_Set"]='|'.join(execution_list)
    AUDIT_DICT["Target_DB_Name"]=OUTPUT_BIGQUERY_DATASET
    AUDIT_DICT["Target_Table_Set"]='|'.join(execution_list)
    AUDIT_DICT["Job_Start_Time"]=str(datetime.now())
    try:
        migrate_hive(execution_list)
    except Exception:
        AUDIT_DICT["Job_Status"]="FAIL"
        print("\n\nSome Error Occured while loading Table Set: "+str(execution_list))
    else:
        AUDIT_DICT["Job_Status"]="PASS"
        print("\n\nLoaded Table Set: "+str(execution_list))

    AUDIT_DICT["Job_End_Time"]=str(datetime.now())
    AUDIT_DF=AUDIT_DF.append(AUDIT_DICT, ignore_index = True)
    
if AUDIT_DF.empty:
    print("Audit Dataframe is Empty")
else:
    print(AUDIT_DF)
    AUDIT_DF.to_csv("gs://"+TEMP_BUCKET+"/audit/audit_file_{}.csv".format(str(datetime.now())),index=False,header = False)