#### Installing Packages

In [None]:
# pip install 'apache-beam[gcp]' --quiet --user

In [None]:
# pip install gooogle-cloud-dataflow --quiet --user

#### Importing Packages

In [None]:
import time
import requests
import pprint as pp
import google.auth.transport.requests
from google.oauth2 import service_account
from googleapiclient.discovery import build
from datetime import datetime, timedelta

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import SetupOptions, PipelineOptions

#### Setting credentials and scope

In [None]:
SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
SERVICE_ACCOUNT_FILE = 'path_of_service_account_json_key'

credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)

auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
access_token = credentials.token

headers = {'Authorization': 'Bearer' + access_token,
           'Content-Type': 'application/json; charset=utf-8'}

#### Create classic template via vertexai jupyter workbench
We can directly create a classic template from a vertexai jupyter notebook if is created via the service account

In [None]:
class MyPipelineOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        # Use add_value_provider_argument for arguments to be templatable
        # Use add_argument as usual for non-templatable arguments

        parser.add_value_provider_argument('--input',
                                           type=str,
                                           help='Path of the file to read from')
        
        parser.add_value_provider_argument('--output',
                                           type=str,
                                           help='Output file to write results to.')
        
pipeline_options = {
    'project': 'project_id',
    'runner': 'DataflowRunner',
    'region': 'us-east4',
    'staging_location': 'gs://your_staging_bucket_location',
    'temp_location': 'gs://your_temp_bucket_location',
    'template_location': 'gs://your_classic_template_location'
}

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
beam_options = pipeline_options.view_as(MyPipelineOptions)

pipeline = beam.Pipeline(options=pipeline_options)

lines = (pipeline 
         | 'Read' >> ReadFromText(beam_options.input)
         | 'Write' >> WriteToText(beam_options.output, num_shards=1, shard_name_template='' ))

pipeline.run().wait_unitl_finish()


#### Running Dataflow Template via REST API

In [None]:
project = 'project_id'
job = f'test-dataflow-job-{datetime.now().strftime("%y%m%d%H%M%S")}' # for unique job name
template = 'gs://your_template_location'

environment = {
    'ipConfiguration': 'WORKER_IP_PRIVATE',
    'numWorkers': 4,
    'maxWorkers': 8,
    'workerRegion': 'us-central1',
    'machineType': 'c2-standard-8',
    'serviceAccountEmail': 'your_service_account_name',
    'subnetwork': 'your_full_subnetwork_url',
    'temp_location': 'gs://temp_location_for_beam_temp_file_storage',
    'additionalExperiments': ['use_runner_v2']
}

parameters = {
    'input': 'your_runtime_value_provider input_value',
    'output': 'your_runtime_value_provider output value'
}

body = {
    'jobName': job,
    'parameters': parameters,
    'environment': environment
}

print("Request Body for dataflow job.")
pp.pprint(body)

In [None]:
dataflow = build('dataflow',
                 'v1b3',
                 credentials=credentials)

try:
    request = dataflow.projects().templates().launch(projectId=project,
                                                     gcsPath=template,
                                                     body=body)
    
    response = request.execute()

    if(str('error') in response.keys()):
        raise NameError(response)
    
    print('Dataflow Job got triggered : ' + response['job']['name'])

except NameError as e:
    print(f'Dataflow Pipeline failed : {e}')


In [None]:
# validate response
pp.pprint(response)

In [None]:
# wait for all jobs to complete

job_execution_completed = False
time_var = 0 

while not job_execution_completed:
    if (time_var >= 900):
        dataflow = build('dataflow', 'v1b3')
        dataflow.projects()
        time_var = 0

    job = dataflow.projects().locations().jobs().get(projectId=response['job']['projectId'],
                                                     location=response['job']['location'],
                                                     jobId=response['job']['jobId']).execute()
    
    if job['currentState'] != 'JOB_STATE_RUNNING' \
        and job['currentState'] != 'JOB_STATE_PENDING' \
            and job['currentState'] != 'JOB_STATE_QUEUED':
        
        print('Job id: {} - Job execution status: {} '.format(job['id'], job['currentState']))

        if job['currentState'] != 'JOB_STATE_DONE':
            raise Exception('Dataflow job got failed. Please check logs for more details.')

        job_execution_completed = True

    time.sleep(10)

pp.pprint(response) 

#### _________________________________________________________________________________________________________

#### Running Dataflow Template via gcloud command

1. To set the active account, run: `gcloud config set account SERVICE_ACCOUNT`

2. Validate if the gcloud config is set or not by running: `gcloud config list`

Output should look like this -
- account = SERVICE_ACCOUNT
- disable_usage_reposting = True
- project = PROJECT_ID

3. If your current active account `SERVICE_ACCOUNT` does not have any valid ccredentials:
- Please run: `gcloud auth login` to obtain new cerdentials
- For service account, please activate it first: `gcloud auth activate-service-account ACCOUNT`


**POSITIONAL ARGUMENTS**

- `JOB_NAME` : The unique name to assign to the job.

**REQUIRED FLAGS**

`--gcs-location=GCS_LOCATION` : The Google Cloud Storage location of the job template to run. (Must be a URL beginning with 'gs://'.)

**OPTIONAL FLAGS**


`--additional-experiments=[ADDITIONAL_EXPERIMENTS,]` : Additional experiments to pass to the job. These experiments are appended to any experiments already set by the template.

`--disable-public-ips` : The Cloud Dataflow workers must not use public IP addresses. Overrides the default dataflow/disable_public_ips property value for this command invocation.

`--max-workers=MAX_WORKERS` : The maximum number of workers to run.

`--network=NETWORK` : The Compute Engine network for launching instances to run your pipeline.

`--num-workers=NUM_WORKERS` : The initial number of workers to use.

`--parameters=[PARAMETERS,…]` : The parameters to pass to the job. Set parameters to the comma-seperated list of parametrs to pass to the job. Spaces between commas and values are not allowed.

`--region=REGION_ID` : Region ID of the job's regional endpoint. Defaults to 'us-central1'.

`--service-account-email=SERVICE_ACCOUNT_EMAIL` : The service account to run the workers as.

`--staging-location=STAGING_LOCATION` : The Google Cloud Storage location to stage temporary files. (Must be a URL beginning with 'gs://'.)

`--subnetwork=SUBNETWORK` : The Compute Engine subnetwork for launching instances to run your pipeline.

`--worker-machine-type=WORKER_MACHINE_TYPE` : The type of machine to use for workers. Defaults to server-specified.


**WORKER LOCATION OPTIONS**

At most one of these can be specified:

`--worker-region=WORKER_REGION` : The region to run the workers in.

`--worker-zone=WORKER_ZONE` : The zone to run the workers in.

In [None]:
! gcloud dataflow jobs run test-classic-job \
--project projectId \
--gcs-location gs://your_classic_template_bucket_file_location/template.json \
--region us-central1 \
--service-account-email your_service_account_name \
--staging-location gs://your_staging_bucket_location \
--subnetwork your_full_subnetwork_url \
--num-workers 4 \
--max-workers 8 \
--disable-public-ips \
--worker-region us-central1 \
--worker-machine c2-standard-8 \
--parameters input=gs://your_runtime_value_provider_input_value,output=gs://your_runtime_value_provider_output_value