In [None]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [None]:
!PIP_DISABLE_PIP_VERSION_CHECK=1 pip3 install kfp --upgrade --user --quiet

In [None]:
FIRSTNAME_LASTNAME = 'firstname_lastname'
EXPERIMENT_NAME = 'Hello world! - BigQuery to TFrecords ' + FIRSTNAME_LASTNAME
BUCKET_NAME = '<BUCKET_NAME>'

In [None]:
PROJECT_ID = 'meetup-kfp'
REGION = 'europe-west1'
DATASET_SIZE = 10000
GCS_WORKING_DIR = 'gs://{}'.format(BUCKET_NAME) # No ending slash
GCS_OUTPUT_DIR = '{}/bigquery_to_tfrecords/output/{}'.format(GCS_WORKING_DIR, FIRSTNAME_LASTNAME)

## Load component definitions

In [None]:
import kfp.components as comp

https://aihub.cloud.google.com/u/0/p/products%2F28a006d0-c833-4c68-98ff-37358eeb7726

In [None]:
import kfp.components as comp

bigquery_tfrecords_op = comp.load_component_from_url('https://storage.googleapis.com/kf-pipeline-contrib-public/release-0.1.5/kfp-components/data_converter/bigquery_to_tfrecords/component.yaml')
help(bigquery_tfrecords_op)

## Define component definitions



### google cloud Auth methods
To retrieve a GCP secret from K8 you can use the container operation 
```console
.apply(use_gcp_secret('secret name'))
```
Application Default Credentials (ADC) provide a method to get credentials used in calling Google APIs.However to add the secret to user enviroment to use for applications such as Gsutil you need to run the following 
``` console
gcloud auth activate-service-account --key-file /secret/gcp-credentials/user-gcp-sa.json'''
```
The gcloud auth application-default command group allows you to manage active credentials on your machine that are used for local application development.


Specifically note that you can view, or add secrets to your K8 cluster using:

```console
kubectl get secrets
kubectl create secret generic db-user-pass --from-file=./username.txt --from-file=./password.txt
```

In [None]:
def ls(dir_path):
    import subprocess
    print(subprocess.check_call(['gcloud', 'auth', 'activate-service-account', '--key-file','/secret/gcp-credentials/user-gcp-sa.json']))
    print(subprocess.check_output(['gsutil', 'ls', dir_path]).decode('utf-8'))

In [None]:
# image needs gsutil
ls_op = comp.func_to_container_op(ls, base_image='gcr.io/google.com/cloudsdktool/cloud-sdk')

#### Define the pipeline
Pipeline function has to be decorated with the `@dsl.pipeline` decorator

In [None]:
import kfp.dsl as dsl
import kfp.gcp as gcp
import json
@dsl.pipeline(
    name='Bigquery query pipeline',
    description='Bigquery query pipeline'
)
def bigquery_tfrecords_ls(
    input_dir,
    output,
    project,
    region,
    float32='',
    categorical='',
    mode='local',
    limit=-1,
    num_workers=2
):
    bigquery_tfrecords_task = bigquery_tfrecords_op(
        input_dir,
        output,
        project,
        region,
        float32,
        categorical,
        mode,
        limit,
        num_workers
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    
    ls_task = ls_op(bigquery_tfrecords_task.outputs['output']).apply(gcp.use_gcp_secret('user-gcp-sa'))

#### Compile the pipeline

In [None]:
pipeline_func = bigquery_tfrecords_ls
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

#### Submit the pipeline for execution

In [None]:
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

Make sure the component files (query, preprocess_row.py, etc) are stored in bigquery_to_tfrecords/ folder in your bucket!

In [None]:
#Specify pipeline argument values
arguments = {
    'input_dir': 'gs://{}/bigquery_to_tfrecords'.format(BUCKET_NAME),
    'output': GCS_OUTPUT_DIR,
    'limit': DATASET_SIZE,
    'mode': 'local' # mode 'local' = DirectRunner, mode 'dataflow' = DataflowRunner
    'project': PROJECT_ID,
    'region': REGION,
}


#Submit a pipeline run
run_name = pipeline_func.__name__ + ' ' + FIRSTNAME_LASTNAME + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)