## Workflow 9: Create an ETL pipeline of financial data from MS-SQL Server to GCP using Dataproc, Deployment Manager, Apache Nifi, and PySpark

### Tools: 

**GCP**: Deployment Manager, Dataproc, Cloud Functions, Compute (VM), BigQuery, GCS

**External**: MSSQL, AWS-RDS, SSMS

**Frameworks**: Apache NiFi, PySpark, SQL, Jinja

#### Below we create a pipeline for a financial dataset involving these broad steps.

a. Ingest dataset from Stock Exchange APIs into an MSSQL server on AWS-RDS and managed locally through SQL Server Management Studio (SSMS)  
b. Deploy cloud infrastructure for ETL job as a code (IaC) using Deployment Manager  
c. Transfer dataset from MSSQL server into a GCS bucket using Apache Nifi running on a VM  
d. Transform dataset using PySpark on a Dataproc cluster auto-created by a cloud function trigger  
e. Load processed dataset into BQ using the same cloud function for query and insights  

![image.png](attachment:1c3ba724-1558-4cb7-a87c-590cd35bee6a.png)

#### 1. Download the dataset by selecting appropriate filters here: https://www.bseindia.com/markets/equity/EQReports/StockPrcHistori.html?flag=0


#### 2. Create an MSSQL server on AWS-RDS with credentials and copy the generated endpoint.

aws rds create-db-instance --db-instance-identifier mssql-etl-gcp --db-instance-class db.m5.large --engine sqlserver-se --master-username admin --master-user-password <*****>

**Endpoint**: mssql-etl-gcp.c1yeyuauigzt.ap-south-1.rds.amazonaws.com, 1433

Add a new inbound firewall rule in the default VPC security group to allow custom TCP connection from local public IP as xx.xx.xx.xxx:32.

#### 3. Set up a new connection in SSMS using above endpoint and credentials. 

#### Create a new dataset mssql-gcp and import CSV file using the wizard: Tasks > Import Data > Flat File. Specify MSSQL database credentials and destination as SQL Server Native Client.

#### Add an id column to the table:

ALTER TABLE mssql-gcp.dbo.financial_data add id int not null identity(1,1)

#### 4. Customise the jinja templates below for Deployment Manager to create (a) GCS buckets (b) VM instance (c) BQ output tables.

In [None]:
# gcs.jinja

{% set INPUT_BUCKET_NAME = env['project'] + '-etl-input' %}
{% set RESOURCES_BUCKET_NAME = env['project'] + '-etl-resources' %}

resources:
- type: gcp-types/storage-v1:buckets
  name: {{ INPUT_BUCKET_NAME }}
  properties:
    predefinedAcl: projectPrivate
    projection: full
    location: US
    storageClass: STANDARD

- type: gcp-types/storage-v1:buckets
  name: {{ RESOURCES_BUCKET_NAME }}
  properties:
    predefinedAcl: projectPrivate
    projection: full
    location: US
    storageClass: STANDARD   

In [None]:
# vm.jinja

{% set VM_NAME = env['project'] + '-etl-vm' %}

resources:
- type: compute.v1.instance
  name: {{ VM_NAME }}
  properties:
    zone: {{ properties["zone"] }}
    machineType: https://www.googleapis.com/compute/v1/projects/{{ env['project'] }}/zones/{{ properties["zone"] }}/machineTypes/{{ properties["machineType"] }}
    tags:
        items: ["http-server","https-server"]
    disks:
    - deviceName: boot
      type: PERSISTENT
      boot: true
      autoDelete: true
      initializeParams:
        sourceImage: https://www.googleapis.com/compute/v1/projects/ubuntu-os-cloud/global/images/family/ubuntu-2004-lts
    networkInterfaces:
    - network: https://www.googleapis.com/compute/v1/projects/{{ env['project'] }}/global/networks/default
      # Access Config required to give the instance a public IP address
      accessConfigs:
      - name: External NAT
        type: ONE_TO_ONE_NAT
# This is to enable or disable firewall
- name: default-allow-http
  type: compute.v1.firewall
  properties:
    network: https://www.googleapis.com/compute/v1/projects/{{ env['project'] }}/global/networks/default
    targetTags: ["http-server","https-server"]
    sourceRanges: ["0.0.0.0/0"]
    allowed:
    - IPProtocol: TCP
      ports: ["80"]

In [None]:
# bigquery.jinja

# Can't use deployment name as it is going to be filled in with a generated
# name which has dashes in it, which are not valid bigquery name characters.
{% set DATASET_NAME = (env['project'] + "-etl-dataset")|replace("-","_") %}
{% set BQ_TABLE_NAME = "financial_data" %}


resources:
# Dataset Resource
- name: {{ DATASET_NAME }}
  type: gcp-types/bigquery-v2:datasets
  properties:
    datasetReference:
      datasetId: {{ DATASET_NAME }}
      description: The {{ DATASET_NAME }} dataset contains all the tables for bq etl pipeline.
      location: US

# Employee Table
- name: {{ BQ_TABLE_NAME }}
  type: gcp-types/bigquery-v2:tables
  properties:
    datasetId: $(ref.{{ DATASET_NAME }}.datasetReference.datasetId)
    tableReference:
      tableId: {{ BQ_TABLE_NAME }}
    schema:
          fields:
          - name: id
            type: INTEGER
            mode: NULLABLE
          - name: Symbol
            type: STRING
            mode: NULLABLE
          - name: Date
            type: date
            mode: NULLABLE
          - name: Open_Price
            type: FLOAT
            mode: NULLABLE
          - name: High_Price
            type: FLOAT
            mode: NULLABLE
          - name: Low_Price
            type: FLOAT
            mode: NULLABLE
          - name: Close_Price
            type: FLOAT
            mode: NULLABLE
          - name: WAP
            type: FLOAT
            mode: NULLABLE
          - name: No_of_Shares
            type: INTEGER
            mode: NULLABLE
          - name: No_of_Trades
            type: INTEGER
            mode: NULLABLE
          - name: Total_Turnover
            type: FLOAT
            mode: NULLABLE
          - name: Deliverable_Quantity
            type: INTEGER
            mode: NULLABLE
          - name: Percentage_Deli_Qty_to_Traded_Qty
            type: FLOAT
            mode: NULLABLE
          - name: Spread_High_Low
            type: FLOAT
            mode: NULLABLE
          - name: Spread_Close_Open
            type: FLOAT
            mode: NULLABLE

#### 5. Customize the yaml config file below through which Deployment Manager instantiates above templates.

In [None]:
imports:
 - path: vm.jinja
 - path: bigquery.jinja
 - path: gcs.jinja

resources:
- name: vm
  type: vm.jinja
  properties:
    zone: us-central1-a
    machineType: n2-standard-4
- name: gcs
  type: gcs.jinja
- name: bq
  type: bigquery.jinja"

In [None]:
#### 6. Deploy specified resources using Deployment Manager and verify.

gcloud deployment-manager deployments create mssql-bq-etl-deployment --config finance_project_deployment.yaml

gcloud deployment-manager deployments describe mssql-bq-etl-deployment

#### 7. Open SSH shell from newly created VM and install Nifi.

sudo su  
apt update  
apt install openjdk-8-jdk  
wget https://archive.apache.org/dist/nifi/1.16.0/nifi-1.16.0-bin.tar.gz  

tar -xzvf nifi-1.16.0-bin.tar.gz  

#### 8. Modify the below parameters in Nifi's configuration file.

nano nifi-1.16.0/conf/nifi.properties

In [None]:
#### nifi.properties configuration #### 
---------------------------------------

nifi.remote.input.http.enabled = false  

#### web properties ####
nifi.web.http.host=  
nifi.web.http.port=8088  

nifi.web.https.host=  
nifi.web.https.port=  

#### security properties ####
nifi.security.keystore=  
nifi.security.keystoreType=  
nifi.security.keystorePasswd=  
nifi.security.keyPasswd=  
nifi.security.truststore=  
nifi.security.truststoreType=  
nifi.security.truststorePasswd=  

#### 7. Create a new firewall rule in default VPC Network allowing TCP connections from local machine by specifying public IPv4 with port 8088.

gcloud compute firewall-rules create mssql-gcs-firewall --allow=tcp:8088 --source-ranges=xx.xx.xx.xxx

gcloud computer firewall-rules list

#### 8. Launch Nifi, monitor logs, copy external IP of the VM instance and open Nifi UI.

./nifi.sh start  
tail -100f nifi-app.log  
gcloud compute instances list  

**Open Nifi interface as:** External IP of VM Instance:8088/nifi  

#### 9. Create a process group MSSQL-GCP-ETL on Nifi. Within this, create 2 processors, linked by a connection.

#### Configure processors and connections as below.

![image.png](attachment:8b613f8b-bfda-4382-892c-9bb50bfe5d9b.png)

#### **Processor 1: queryDatabaseTablerecord**  

**Database Connection Pooling Service**: DBCP Connection Pool

#### Configure Controller Services for DBCPConnectionPool.

**Database Connection URL**: jdbc:sqlserver://mssql-etl-gcp.c1yeyuauigzt.ap-south1.rds.amazonaws.com;port=1433;database=mssql_gcp2;authentication=NotSpecified;encrypt=true;trustServerCertificate=true

**Database Driver Class Name**: com.microsoft.sqlserver.jdbc.SQLServerDriver

#### Install JDBC driver for MS-SQL Server from https://go.microsoft.com/fwlink/?linkid=2272720 and upload source file to Cloud Home through SSH terminal.

#### Specify location of the jre jar file as,

**Database Driver Location(s)**: /home/kumararunachal1/sqljdbc_12.6/enu/jars/mssql-jdbc-12.6.1.jre8.jar

#### Specify Database User and Password

**Database User**: admin  
**Password**: *******

**Database Type**: MS SQL 2012+  
**Table Name**: SampleData1

**Record Writer**: ParquetRecordSetWriter

#### Configure controller service as:

**Compression Type**: SNAPPY

**Maximum-value Columns**: id

#### **Connection 1 (between Processor 1 and Processor 2)** 

**Relationship**: Success

#### **Processor 2: PutGCSObject**

**Project ID**: gcloud-etl-422807
**GCP Credentials Provider Service**: GCPCredentialsControllerService

#### Configure controller service: GCPCredentialsControllerService

**Service Account JSON**: paste JSON from google service account key file

**Bucket**: gcloud-etl-422807-etl-input

**Key**: "insert_dt=${now():format("yyyy-MM-dd")}/${filename}"

#### Set Relationships as:

**failure**: terminate  
**success**: terminate

#### 10. Run the process group MSSQL-GCP-ETL. 

The two processors change their states.  
MSSQL database table SampleData1 is populated as a parquet file in the created bucket gs://gcloud-etl-422807-etl-input with date as a key (as per specified format above).

#### 11. Create a Cloud Function mssql-gcs-trigger-function. This function:

a. is triggered by any (parquet) file placed in the created bucket gs://gcloud-etl-422807-etl-input   
b. creates a temporary Dataproc cluster  
c. uses a PySpark job to transform the parquet file in the input bucket into the final BQ table  

#### 12. Customize the PySpark job template as below. This requires specifying a jar file. The correct version can be found using the steps shown here: https://github.com/tfayyaz/cloud-dataproc/blob/master/notebooks/python/1.2.%20BigQuery%20Storage%20%26%20Spark%20SQL%20-%20Python.ipynb

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *
import sys

# creating spark session
spark = SparkSession.builder \
.appName("gcs-to-bq") \
.config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar") \
.getOrCreate()

# Reading the commnad line arguments
data_uri = sys.argv[1]
file_format = sys.argv[2]
partition_field = sys.argv[3]
project_id = sys.argv[4]
dataset_name = sys.argv[5]
table_name = sys.argv[6]
gcs_temp_bucket = sys.argv[7]

# creating bigquery table by concatinating the project_id and dataset_name and table_name
bigquery_table = f"{project_id}.{dataset_name}.{table_name}"


today = datetime.now().strftime("%Y-%m-%d")
print(today)
path = f"{data_uri}/{partition_field}={today}"
print(path)

# Reading the parquet files from GCS bucket, assigning the schema manually and converting the date to proper format

input_df = spark \
.read \
.option("recursiveFilelookup","True") \
.format(file_format) \
.option("header","True") \
.load(path) \
.withColumn("id", col('id').cast('int')) \
.withColumn("Symbol", col('Symbol').cast('string')) \
.withColumn("Date",to_date("Date", 'dd-MMM-yy')) \
.withColumn("Open_Price", col('Open_Price').cast('float')) \
.withColumn("High_Price", col('High_Price').cast('float')) \
.withColumn('Low_Price',col('Low_Price').cast('float')) \
.withColumn("Close_Price", col('Close_Price').cast('float')) \
.withColumn("WAP", col('WAP').cast('float')) \
.withColumn("No_of_Shares", col('No_of_Shares').cast('int')) \
.withColumn("No_of_Trades", col('No_of_Trades').cast('int')) \
.withColumn("Total_Turnover", col('Total_Turnover').cast('float')) \
.withColumn("Deliverable_Quantity", col('Deliverable_Quantity').cast('int')) \
.withColumn("Percentage_Deli_Qty_to_Traded_Qty", col('Percentage_Deli_Qty_to_Traded_Qty').cast('float')) \
.withColumn("Spread_High_Low", col('Spread_High_Low').cast('float')) \
.withColumn("Spread_Close_Open", col('Spread_Close_Open').cast('float'))


# Load the processed data from GCS location into a BigQuery table
input_df.write \
    .mode("append") \
    .format("com.google.cloud.spark.bigquery") \
    .option("temporaryGcsBucket", gcs_temp_bucket) \
    .option("table", bigquery_table) \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .save()

#### Place the PySpark job file to the created resources bucket.

gsutil cp gsc_to_bq_test_v2.py gs://gcloud-etl-422807-etl-resources/

#### 13. Configure the Cloud Function as below

a. 1st Gen  
b. Trigger: Cloud Storage: gs://gcloud-etl-422807-etl-input  
c. Memory: 512 MB  
d. Timeout: 540 s  
e. Set runtime environment variable  

**main_python_file_uri**: gs://gcloud-etl-422807-etl-resources/gsc_to_bq_test_v2.py

f. Python 3.7 Runtime  
g. Requirements:

google-cloud-dataproc==3.0.0  
google-cloud-storage==1.42.2

#### 14. Customize the Cloud Function template below. 

In [None]:
from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage
import os
import logging
import json


def instantiate_inline_workflow_template(event, context):
    # Initialise clients
    storage_client = storage.Client()

    # Get variables
    project_id = os.environ.get("GCP_PROJECT")  
    region = os.environ.get("FUNCTION_REGION")  
    main_python_file_uri = os.environ.get(
        "main_python_file_uri") 
    input_file = f"gs://{event['bucket']}/{event['name']}"

    # Create a client with the endpoint set to the desired region.
    workflow_template_client = dataproc.WorkflowTemplateServiceClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )
    parent = f"projects/{project_id}/regions/{region}"

    template = {
        "jobs": [
            {
                "pyspark_job": {
                    "main_python_file_uri": main_python_file_uri,
                    "jar_file_uris": ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"],
                    "args": ["gs://gcloud-etl-422807-etl-input",
                    "parquet",
                    "insert_dt", 
                    "gcloud-etl-422807", 
                    "gcloud_etl_422807_etl_dataset",
                    "financial_data",
                    "gs://gcloud-etl-422807-etl-resources/"],
                },
                "step_id": "pyspark_etl",
            },
        ],
        "placement": {
            "managed_cluster": {
                "cluster_name": "cluster-d02b",
                "config": {
                    "gce_cluster_config": {"zone_uri": ""},
                    "master_config": {"num_instances": 1, "machine_type_uri": "e2-standard-2"},
                    "worker_config": {"num_instances": 0, "machine_type_uri": "e2-standard-2"},
                    "software_config": {"image_version": "2.2-debian"},
                },
            }
        },
    }

    logging.info(
        f"Creating temporary dataproc cluster to run pyspark job on {input_file} and extract result to")

    operation = workflow_template_client.instantiate_inline_workflow_template(
        request={"parent": parent, "template": template}
    )
    operation.result()
    logging.info("Workflow ran successfully.")

#### Specify Entry Point: instantiate_inline_workflow_template

#### Author and deploy the Cloud Function.

#### 15. Clear the state of Processor 1 on Nifi. Run the Process Group. 

The dataset from MSSQL server populates into input bucket as a parquet file with date in the folder name. 

Cloud Function is triggered which creates a Dataproc cluster to run the PySpark job. 

The PySpark job runs and outputs data into the specified table on BQ, ready for query and visualization.