## Taxi Ride Fare Prediction Using Kubeflow, Feast, and TFX

* Predict taxi ride fares using Feast and Kubeflow

Setup the notebook
- Install `feast` with pip.
- Activate user service account with credentials JSON.
- Hacks to retrieve essential information for deployments and serving.

**NOTE**: This code block might hangs for a long time.

In [1]:
import demo_util
demo_util.notebook_setup()

In [2]:
import importlib
importlib.reload(demo_util)
working_dir = "/home/jovyan/LinearModel"

In [3]:
PROJECT, ZONE, APP_NAME = demo_util.get_project_config()
print('PROJECT =', PROJECT)
print('APP_NAME =', APP_NAME)
print('ZONE =', ZONE)

PROJECT = aliz-development
APP_NAME = kubeflow-asia
ZONE = asia-southeast1-a


In [4]:
# fairing:include-cell
from google.cloud import storage
import datetime
import demo_util
import kfp
import kfp.components as comp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
import sys
import importlib
import uuid
import logging
import os
import json
import requests
import pandas as pd
import numpy as np
from retrying import retry
from feast.sdk.resources.entity import Entity
from feast.sdk.resources.storage import Storage
from feast.sdk.resources.feature import Feature, Datastore, ValueType
from feast.sdk.resources.feature_set import FeatureSet, FileType
import feast.specs.FeatureSpec_pb2 as feature_pb

from feast.sdk.importer import Importer
from feast.sdk.client import Client
import tensorflow as tf
import tensorflow_data_validation as tfdv

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


In [5]:
# Keep this imports separate because we don't want to import in the container built by fairing
import fairing
from fairing.deployers import job

In [6]:
# fairing:include-cell
class TaxiFeast(object):
    """Taxi code."""
    # Connect to the Feast deployment
    FEAST_CORE_URL = '10.148.0.99:6565'
    FEAST_SERVING_URL = '10.148.0.100:6566'
    STAGING_LOCATION = 'gs://kubecon-19-gojek/staging'
    
    STATS_KEY = "training_stats"

    def __init__(self):
        self._fs = None
        self._train_feature_set = None
    @property
    def fs(self):
        if not self._fs:
            self._fs = Client(core_url=self.FEAST_CORE_URL,serving_url=self.FEAST_SERVING_URL, verbose=True)
        return self._fs
    
    def compute_training_stats(self, stats_path=None):
        """compute training stats."""
        dataset = self.fs.create_dataset(self.train_feature_set, "2009-01-01", "2016-01-01")
        training_df = self.fs.download_dataset_to_df(dataset, self.STAGING_LOCATION)
        training_stats = tfdv.generate_statistics_from_dataframe(training_df)
        
        if stats_path:
            logging.info("Saving training stats to %s", stats_path)
            demo_util.save_proto(training_stats, stats_path)
        else:
            logging.info("No stats_path provided; not saving stats")
        return training_stats
    
    @property
    def train_feature_set(self):
        if not self._train_feature_set:
            self._train_feature_set = FeatureSet(entity=ENTITY_ID, 
                                                 features=TRAINING_FEATURES_SET)
        return self._train_feature_set
            

In [7]:
fshelper = TaxiFeast()

## Load raw data

In [8]:
df = pd.read_csv('taxi_ride.csv', index_col=False)
df.head()

Unnamed: 0,ride_id,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,20090615_172621,4.5,2009-06-15T17:26:21Z,-73.844311,40.721319,-73.84161,40.712278,1
1,20100105_165216,16.9,2010-01-05T16:52:16Z,-74.016048,40.711303,-73.979268,40.782004,1
2,20110818_003500,5.7,2011-08-18T00:35:00Z,-73.982738,40.76127,-73.991242,40.750562,2
3,20120421_043042,7.7,2012-04-21T04:30:42Z,-73.98713,40.733143,-73.991567,40.758092,1
4,20100309_075100,5.3,2010-03-09T07:51:00Z,-73.968095,40.768008,-73.956655,40.783762,1


## Extract more features

In [9]:
from distance_utils import haversine_array, dummy_manhattan_distance, bearing_array  

# location features
df.loc[:, 'distance_haversine'] = haversine_array(
    df['pickup_latitude'].values, 
    df['pickup_longitude'].values, 
    df['dropoff_latitude'].values, 
    df['dropoff_longitude'].values)

df.loc[:, 'distance_dummy_manhattan'] =  dummy_manhattan_distance(
    df['pickup_latitude'].values, 
    df['pickup_longitude'].values, 
    df['dropoff_latitude'].values, 
    df['dropoff_longitude'].values)

df.loc[:, 'direction'] = bearing_array(
    df['pickup_latitude'].values, 
    df['pickup_longitude'].values, 
    df['dropoff_latitude'].values, 
    df['dropoff_longitude'].values)

# time features
df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime']).apply(lambda d: d.replace(tzinfo=None))
df['month'] = df['pickup_datetime'].dt.month
df['day_of_month'] = df['pickup_datetime'].dt.day
df['hour'] = df['pickup_datetime'].dt.hour
df['day_of_week'] = df['pickup_datetime'].dt.dayofweek

# drop unused columns
df = df.drop(columns=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'], axis=1)

df.head()

Unnamed: 0,ride_id,fare_amount,pickup_datetime,passenger_count,distance_haversine,distance_dummy_manhattan,direction,month,day_of_month,hour,day_of_week
0,20090615_172621,4.5,2009-06-15 17:26:21,1,1.030764,1.232937,167.240469,6,15,17,0
1,20100105_165216,16.9,2010-01-05 16:52:16,1,8.450134,10.961646,21.498365,1,5,16,1
2,20110818_003500,5.7,2011-08-18 00:35:00,2,1.389525,1.906909,-148.966789,8,18,0,3
3,20120421_043042,7.7,2012-04-21 04:30:42,1,2.79927,3.148058,-7.672164,4,21,4,5
4,20100309_075100,5.3,2010-03-09 07:51:00,1,1.999157,2.71518,28.802783,3,9,7,1


## Register entity and features

In [10]:
# Create importer
importer = Importer.from_df(df, 
                           entity='taxi_ride', 
                           owner='user@website.com',  
                           staging_location=fshelper.STAGING_LOCATION,
                           id_column='ride_id', 
                           timestamp_column='pickup_datetime',
                           serving_store=Datastore(id='SERVING'),
                           warehouse_store=Datastore(id='WAREHOUSE'))

# Update feature and entity metadata. Ideally you want to update these manually
# so that they contain adequate information for the next user
importer.entity.description = 'entity level description' 
for feature_id in importer.features:
    importer.features[feature_id].description = 'feature level description'
    
# Ingest the feature data into the store
fshelper.fs.run(importer, apply_features=True, apply_entity=True)

Successfully applied entity with name: taxi_ride
---
name: taxi_ride
description: entity level description

Successfully applied feature with id: taxi_ride.fare_amount
---
id: taxi_ride.fare_amount
name: fare_amount
owner: user@website.com
description: feature level description
valueType: DOUBLE
entity: taxi_ride
dataStores:
  serving:
    id: SERVING
  warehouse:
    id: WAREHOUSE

Successfully applied feature with id: taxi_ride.passenger_count
---
id: taxi_ride.passenger_count
name: passenger_count
owner: user@website.com
description: feature level description
valueType: INT64
entity: taxi_ride
dataStores:
  serving:
    id: SERVING
  warehouse:
    id: WAREHOUSE

Successfully applied feature with id: taxi_ride.distance_haversine
---
id: taxi_ride.distance_haversine
name: distance_haversine
owner: user@website.com
description: feature level description
valueType: DOUBLE
entity: taxi_ride
dataStores:
  serving:
    id: SERVING
  warehouse:
    id: WAREHOUSE

Successfully applied featu

'feastimport1558543670935'

## Define a Feature Set for this project

In [11]:
# fairing:include-cell
ENTITY_ID = 'taxi_ride'
TRAINING_FEATURES_SET = [
    'taxi_ride.passenger_count',
    'taxi_ride.distance_haversine',
    'taxi_ride.distance_dummy_manhattan',
    'taxi_ride.direction',
    'taxi_ride.month',
    'taxi_ride.day_of_month',
    'taxi_ride.day_of_week',
    'taxi_ride.hour',
    'taxi_ride.fare_amount'
]

In [12]:
feature_set = FeatureSet(entity=ENTITY_ID, 
                         features=TRAINING_FEATURES_SET)

## Retrieve a Training Set from Feast

In [13]:
# Retrieve feature data for training from Feast
dataset = fshelper.fs.create_dataset(feature_set, "2009-01-01", "2016-01-01")
training_df =  fshelper.fs.download_dataset_to_df(dataset, fshelper.STAGING_LOCATION)

training_df.head()

creating training dataset for features: ['taxi_ride.passenger_count', 'taxi_ride.distance_haversine', 'taxi_ride.distance_dummy_manhattan', 'taxi_ride.direction', 'taxi_ride.month', 'taxi_ride.day_of_month', 'taxi_ride.day_of_week', 'taxi_ride.hour', 'taxi_ride.fare_amount']
created dataset taxi_ride_1558543683588_20090101_20160101: aliz-development.fs_taxi_ride.1558543683588_20090101_20160101


Unnamed: 0,id,event_timestamp,fare_amount,passenger_count,distance_haversine,distance_dummy_manhattan,direction,month,day_of_month,hour,day_of_week
0,20090119_103800,2009-01-19 10:38:00 UTC,3.7,1,0.605668,0.823081,-151.064653,1,19,10,0
1,20090119_103800,2009-01-19 10:38:00 UTC,3.7,1,0.605668,0.823081,-151.064653,1,19,10,0
2,20090119_103800,2009-01-19 10:38:00 UTC,3.7,1,0.605668,0.823081,-151.064653,1,19,10,0
3,20090119_103800,2009-01-19 10:38:00 UTC,3.7,1,0.605668,0.823081,-151.064653,1,19,10,0
4,20090119_103800,2009-01-19 10:38:00 UTC,3.7,1,0.605668,0.823081,-151.064653,1,19,10,0


## Visualize statistics with TFDV

In [14]:
now = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
training_stats_path = "gs://kubecon-19-gojek/data/training_stats/{0}/training_stats.hdf5".format(now)

training_stats = fshelper.compute_training_stats(training_stats_path)
tfdv.visualize_statistics(training_stats)

creating training dataset for features: ['taxi_ride.passenger_count', 'taxi_ride.distance_haversine', 'taxi_ride.distance_dummy_manhattan', 'taxi_ride.direction', 'taxi_ride.month', 'taxi_ride.day_of_month', 'taxi_ride.day_of_week', 'taxi_ride.hour', 'taxi_ride.fare_amount']
created dataset taxi_ride_1558543694901_20090101_20160101: aliz-development.fs_taxi_ride.1558543694901_20090101_20160101


INFO:root:Saving training stats to gs://kubecon-19-gojek/data/training_stats/190522_164814/training_stats.hdf5
INFO:root:Saving proto to /tmp/training_stats.hdf5
INFO:root:Uploading proto to gs://kubecon-19-gojek/data/training_stats/190522_164814/training_stats.hdf5
INFO:root:Saved proto to gs://kubecon-19-gojek/data/training_stats/190522_164814/training_stats.hdf5


## Train Linear Model

In [15]:
# fairing:include-cell
class TaxiRideModel(TaxiFeast):
  """Model class."""
  SERVING_FEATURE_SET = [
        'taxi_ride.passenger_count',
        'taxi_ride.distance_haversine',
        'taxi_ride.distance_dummy_manhattan',
        'taxi_ride.direction',
        'taxi_ride.month',
        'taxi_ride.day_of_month',
        'taxi_ride.day_of_week',
        'taxi_ride.hour']

  def __init__(self):
    super(TaxiRideModel, self).__init__()
    self.m = None
    self.b = None
    self.serving_fs = None

    logging.basicConfig(level=logging.INFO,
        format=('%(levelname)s|%(asctime)s'
                '|%(pathname)s|%(lineno)d| %(message)s'),
        datefmt='%Y-%m-%dT%H:%M:%S',
        )
    logging.getLogger().setLevel(logging.INFO)

  # Train model 
  def train(self, training_df, model_path):
    np.set_printoptions(precision=3)
    train_data = training_df[[x.split('.')[1] for x in TRAINING_FEATURES_SET]].to_numpy()
    train_data[:, len(train_data[0]) - 1] = 1
    Y = training_df['fare_amount'].to_numpy()

    x = np.linalg.lstsq(train_data, Y, rcond=0)[0]
    m, b = x[:len(train_data[0])-1], x[len(train_data[0])-1]

    self.m = m
    self.b = b
    
    self.save_model(model_path)
    
    return m,b

  def train_on_time_range(self, start_day, end_day, model_path):
    dataset = self.fs.create_dataset(self.train_feature_set, start_day, end_day)
    training_df =  self.fs.download_dataset_to_df(dataset, self.STAGING_LOCATION)
    return self.train(training_df, model_path)
    
  def predict(self, feature_id, feature_names):
    logging.info('feature_id = %s', feature_id)
    logging.info('feature_names = %s', feature_names)
    if any([i is None for i in [self.m, self.b, self.fs, self.serving_fs]]):      
      with open('simple_model.dat', 'r') as f:
        model = json.load(f)
        self.m = np.array(model.get('m', []))
        self.b = float(model.get('b', 0))

        _FEAST_CORE_URL = model['FEAST_CORE_URL']
        _FEAST_SERVING_URL = model['FEAST_SERVING_URL']
        _ENTITY_ID = model['ENTITY_ID']

        logging.info('FEAST_CORE_URL: %s', _FEAST_CORE_URL)
        logging.info('FEAST_SERVING_URL: %s', _FEAST_SERVING_URL)
        logging.info('ENTITY_ID: %s', _ENTITY_ID)
        logging.info('FEATURES_SET: %s', self.SERVING_FEATURE_SET)

        self.serving_fs = FeatureSet(
            entity=_ENTITY_ID,
            features=self.SERVING_FEATURE_SET)

    features = self.fs.get_serving_data(
        self.serving_fs,
        entity_keys=[feature_id])
    X = features.to_numpy()[0][1:]
    logging.info('X: %s', str(X))

    return [sum(self.m * X) + self.b]

  def save_model(self, model_path):
    """Save the model to a json file."""
    MODEL_FILE = 'simple_model.dat'

    model = {
        'm': self.m.tolist(),
        'b': self.b,
        'FEAST_CORE_URL': self.FEAST_CORE_URL,
        'FEAST_SERVING_URL': self.FEAST_SERVING_URL,
        'ENTITY_ID': ENTITY_ID,
    }
    
    logging.info('Saving model to %s', model_path)

    demo_util.save_as_json(model, model_path)
  
  def preprocess(self):
    pass

  def validate(self):
    pass



## Train Locally

In [41]:
now = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
model_path = 'gs://kubecon-19-gojek/models/linear_model/{0}/model.json'.format(now)

model = TaxiRideModel()
m, b = model.train(training_df, model_path)
print(m, b)

INFO:root:Saving model to gs://kubecon-19-gojek/models/linear_model/190523_052138/model.json
INFO:root:Saving data to: /tmp/model.json
INFO:root:Uploading data to gs://kubecon-19-gojek/models/linear_model/190523_052138/model.json


[ 1.764e-01  1.292e+01 -8.776e+00  4.971e-03  8.376e-02 -3.989e-03
  1.175e-01 -5.744e-03] 5.571301543075601


## Local Prediction

In [42]:
model.predict('20090202_084343', None)


INFO:root:feature_id = 20090202_084343
INFO:root:feature_names = None
INFO:root:FEAST_CORE_URL: 10.148.0.99:6565
INFO:root:FEAST_SERVING_URL: 10.148.0.100:6566
INFO:root:ENTITY_ID: taxi_ride
INFO:root:FEATURES_SET: ['taxi_ride.passenger_count', 'taxi_ride.distance_haversine', 'taxi_ride.distance_dummy_manhattan', 'taxi_ride.direction', 'taxi_ride.month', 'taxi_ride.day_of_month', 'taxi_ride.day_of_week', 'taxi_ride.hour']
INFO:root:X: [1 8.677072903409417 10.460815704762517 13.462847407418641 2 2 0 8]


[26.20793146551558]

## Train and Deploy on Kubernetes

### Use fairing to build the docker image

* This uses the append builder to rapidly build docker images

In [19]:
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
base_image = "gcr.io/aliz-development/kubecon-demo/notebook:v20190521-2a6d5e9-dirty-20b776"

In [43]:
from fairing.builders import append
import fairing_util
preprocessor = fairing_util.ConvertNotebookPreprocessorWithFire("TaxiRideModel")

if not preprocessor.input_files:
    preprocessor.input_files = set()

# Bake the model into the container    
input_files=["simple_model.dat", "demo_util.py"]
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()
builder = append.append.AppendBuilder(registry=DOCKER_REGISTRY,
                                      base_image=base_image, preprocessor=preprocessor)
builder.build()


INFO:root:Creating docker context: /tmp/fairing.context.tar.gz
INFO:root:Adding files to context: [PosixPath('ames-feast-taxi-job.py'), 'simple_model.dat', 'demo_util.py']
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding /home/jovyan/LinearModel/fairing/fairing/__init__.py at /app/fairing/__init__.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding /home/jovyan/LinearModel/fairing/fairing/runtime_config.py at /app/fairing/runtime_config.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding ames-feast-taxi-job.py at /app/ames-feast-taxi-job.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding simple_model.dat at /app/simple_model.dat
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding demo_util.py at /app/demo_util.py
INFO:root:Loading Docker credentials for repository 'gcr.io/aliz-development/kubecon-demo/notebook:v20190521-2a6d5e9-dirty-20b776'
INFO:root:Invoking 'docker-credential-gcloud' to obtain Docker credentials.
INFO:root:Successfully obtained Docker cre

### Launch a K8s job to compute the stats

In [44]:
now = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
training_model_path = 'gs://kubecon-19-gojek/model/{0}/model.json'.format(now)
pod_spec = builder.generate_pod_spec()
train_deployer = job.job.Job(namespace="kubeflow", 
                             cleanup=False,
                             pod_spec_mutators=[
                             fairing.cloud.gcp.add_gcp_credentials_if_exists,])

# Add command line arguments
pod_spec.containers[0].command.extend(["train-on-time-range", "2009-01-01", "2016-01-01", training_model_path])
result = train_deployer.deploy(pod_spec)

INFO:fairing.kubernetes.manager:Pod started running True


  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '
Saving model to gs://kubecon-19-gojek/model/190523_052403/model.json
Saving data to: /tmp/model.json
Uploading data to gs://kubecon-19-gojek/model/190523_052403/model.json
creating training dataset for features: ['taxi_ride.passenger_count', 'taxi_ride.distance_haversine', 'taxi_ride.distance_dummy_manhattan', 'taxi_ride.direction', 'taxi_ride.month', 'taxi_ride.day_of_month', 'taxi_ride.day_of_week', 'taxi_ride.hour', 'taxi_ride.fare_amount']
created dataset taxi_ride_1558589052083_20090101_20160101: aliz-development.fs_taxi_ride.1558589052083_20090101_20160101
(array([ 1.764e-01,  1.292e+01, -8.776e+00,  4.971e-03,  8.376e-02,        -3.989e-03,  1.175e-01, -5.744e-03]), 5.571301543075475)


In [22]:
!kubectl get jobs -l fairing-id={train_deployer.job_id} -o yaml

apiVersion: v1
items:
- apiVersion: batch/v1
  kind: Job
  metadata:
    creationTimestamp: "2019-05-22T16:49:01Z"
    generateName: fairing-job-
    labels:
      fairing-deployer: job
      fairing-id: 8076441a-7cb1-11e9-8ecc-a6a881dd7379
    name: fairing-job-smgl8
    namespace: kubeflow
    resourceVersion: "3045599"
    selfLink: /apis/batch/v1/namespaces/kubeflow/jobs/fairing-job-smgl8
    uid: 80804235-7cb1-11e9-852c-42010a9400a1
  spec:
    backoffLimit: 6
    completions: 1
    parallelism: 1
    selector:
      matchLabels:
        controller-uid: 80804235-7cb1-11e9-852c-42010a9400a1
    template:
      metadata:
        creationTimestamp: null
        labels:
          controller-uid: 80804235-7cb1-11e9-852c-42010a9400a1
          fairing-deployer: job
          fairing-id: 8076441a-7cb1-11e9-8ecc-a6a881dd7379
          job-name: fairing-job-smgl8
        name: fairing-deployer
      spec:
        containers:
        - command:
          - 

## Deploy with Kubeflow

In [45]:
from fairing.deployers import serving
import fairing_util
pod_spec = builder.generate_pod_spec()

module_name = os.path.splitext(preprocessor.executable.name)[0]
deployer = serving.serving.Serving(module_name + ".TaxiRideModel",
                                   service_type="ClusterIP",
                                   labels={"app": "ames"})

url = deployer.deploy(pod_spec)

logging.info("Created deployment %s", print(deployer.deployment.metadata.name))

INFO:root:Cluster endpoint: http://fairing-service-t4sb4.kubeflow.svc.cluster.local
INFO:root:Created deployment None


fairing-deployer-474ws


In [24]:
!kubectl get deploy -o yaml {deployer.deployment.metadata.name}

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
  creationTimestamp: "2019-05-22T16:49:27Z"
  generateName: fairing-deployer-
  generation: 1
  labels:
    app: ames
    fairing-deployer: serving
    fairing-id: 9011d858-7cb1-11e9-8ecc-a6a881dd7379
  name: fairing-deployer-gdd2c
  namespace: kubeflow
  resourceVersion: "3045634"
  selfLink: /apis/extensions/v1beta1/namespaces/kubeflow/deployments/fairing-deployer-gdd2c
  uid: 901475d3-7cb1-11e9-852c-42010a9400a1
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: ames
      fairing-deployer: serving
      fairing-id: 9011d858-7cb1-11e9-8ecc-a6a881dd7379
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: ames
        fairing-deployer: serv

## Call the prediction endpoint

In [25]:
@retry(wait_exponential_multiplier=1000, wait_exponential_max=5000,
       stop_max_delay=2*60*1000)
def predict(url, id):
    pdata={
        'strData': id,
    }
    serialized_data = json.dumps(pdata)
    r = requests.post(url, data={'json':serialized_data}, timeout=5)
    return r

In [26]:
full_url = url + ":5000/predict"
r = predict(full_url, '20090202_084343')
if r.ok:
    logging.info("Response: %s", r.content)
else:
    logging.error("Prediction failed; %s", r.content)

INFO:root:Response: b'{"data":{"tensor":{"shape":[1],"values":[26.20793146551558]}},"meta":{}}\n'


## Pipelines

In [46]:
CICD_EXPERIMENT_NAME = 'Taxi Cab CICD'
@dsl.pipeline(
   name='Taxi CICD pipeline',
   description='A pipeline that trains a linear model for Taxi cab dataset.'
)
def cicd_pipeline(
   model_path = 'gs://kubecon-19-gojek/models/linear_model/model.json',
):  
    command=["python", preprocessor.executable.name, "preprocess"]
    preprocess = dsl.ContainerOp(
            name="preprocess", 
            image=builder.image_tag,        
            command=command,
            ).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    preprocess.container.working_dir = "/app"    
    
    command=["python", preprocessor.executable.name, "train-on-time-range", "2009-01-01", "2016-01-01", model_path]
    train_op = dsl.ContainerOp(
            name="train", 
            image=builder.image_tag,        
            command=command,
            ).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    train_op.container.working_dir = "/app"    

    train_op.after(preprocess)
    
    command=["python", preprocessor.executable.name, "validate"]
    validate = dsl.ContainerOp(
            name="validate",
            image=builder.image_tag,        
            command=command,
            ).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    validate.container.working_dir = "/app"    
    
    validate.after(train_op)
    
pipeline_func = cicd_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

import datetime
gcs_model_file = 'gs://kubecon-19-gojek/models/' + datetime.datetime.now().strftime("%y%m%d_%H%M%S") + 'linear_model/model.json'

#Specify pipeline argument values
arguments = {"model_path": gcs_model_file}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
experiment = client.create_experiment(CICD_EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)


INFO:root:Creating experiment Taxi Cab CICD.
