# Chicago Taxi Demo

In [1]:
USER_FLAG = "--user"

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0



In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [105]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


In [78]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  niveustraining


In [79]:
BUCKET_NAME="gs://" + "black-friday-demo" + "-bucket"

In [80]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics, Dataset

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [81]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/taxi-root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin


'gs://black-friday-demo-bucket/taxi-root/'

In [82]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/get-data/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs"]
)
def get_data(
    dataset: Output[Dataset]
):
    from pandas import read_csv

    df = read_csv('gs://black-friday-demo-bucket/chicago-taxi-dataset/data.csv')

    df.to_csv(dataset.uri, index=False)

In [83]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/prepare-data/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs"]
)
def prepare_data(
    dataset: Input[Dataset],
    X: Output[Artifact],
    Y: Output[Artifact],
):
    import pandas as pd

    df = pd.read_csv(dataset.uri)

    df.drop(['unique_key', 'taxi_id', 'trip_start_timestamp', 'trip_end_timestamp', 'pickup_census_tract',
           'dropoff_census_tract', 'pickup_community_area',
           'dropoff_community_area', 'pickup_latitude',
           'pickup_longitude', 'pickup_location', 'dropoff_latitude',
           'dropoff_longitude', 'dropoff_location'], axis=1, inplace=True)

    df.dropna(inplace=True, axis=0)

    x = df.drop('trip_total', axis=1)
    y = df.trip_total

    x.to_csv(X.uri, index=False)
    y.to_csv(Y.uri, index=False)

In [84]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/preprocess/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs", "sklearn"]
)
def preprocess(
    X: Input[Artifact],
    Y: Input[Artifact],
    X_train: Output[Artifact],
    Y_train: Output[Artifact],
    X_test: Output[Artifact],
    Y_test: Output[Artifact],
    column_transformer: Output[Artifact],
):
    import pandas as pd
    from pickle import dump
    from sklearn.compose import ColumnTransformer
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.model_selection import train_test_split

    x = pd.read_csv(X.uri)
    y = pd.read_csv(Y.uri)

    ct = ColumnTransformer(transformers=[('encoder', OneHotEncoder(), [6, 7])], remainder='passthrough')
    x = ct.fit_transform(x.values)


    x_train, x_test, y_train, y_test = train_test_split(x, y.values, test_size=0.2)

    pd.DataFrame(x_train.todense()).to_csv(X_train.uri, index=False)

    pd.DataFrame(y_train).to_csv(Y_train.uri, index=False)

    pd.DataFrame(x_test.todense()).to_csv(X_test.uri, index=False)

    pd.DataFrame(y_test).to_csv(Y_test.uri, index=False)

    with open(column_transformer.path, 'wb') as f:
        dump(ct, f)

In [85]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/feature-scale/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs", "sklearn"]
)
def feature_scale(
    X_train: Input[Artifact],
    X_test: Input[Artifact],
    X_train_feature_scaled: Output[Artifact],
    X_test_feature_scaled: Output[Artifact],
    Standard_scaler: Output[Artifact],
):
    import pickle
    from pandas import read_csv, DataFrame
    from json import dumps, dump
    from sklearn.preprocessing import StandardScaler

    x_train = read_csv(X_train.uri)
    x_test = read_csv(X_test.uri)

    standard_scaler = StandardScaler()
    x_train = standard_scaler.fit_transform(x_train)
    x_test = standard_scaler.transform(x_test)
    
    DataFrame(x_train).to_csv(X_train_feature_scaled.uri, index=False)
    DataFrame(x_test).to_csv(X_test_feature_scaled.uri, index=False)
    # with open(X_train_feature_scaled.path, 'w') as f:
    #     dump(dumps(x_train.tolist()), f)

    # with open (X_test_feature_scaled.path, 'w') as f:
    #     dump(dumps(x_test.tolist()), f)
    
    print(X_test_feature_scaled.path)
    print(X_test_feature_scaled.uri)
    
    with open(Standard_scaler.path, 'wb') as f:
        pickle.dump(standard_scaler, f)

In [86]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/neural-network/train/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs", "numpy", "tensorflow"]
)
def train(
    X_train_feature_scaled: Input[Artifact],
    Y_train: Input[Artifact],
    Model: Output[Model]
):
    from numpy import array
    from pickle import dump
    from pandas import read_csv
    from tensorflow.keras import layers
    from tensorflow.keras import Sequential
    from tensorflow.keras.optimizers import SGD
    from tensorflow.keras.metrics import MeanAbsoluteError
    from json import load, loads

    # print(Y_train.uri, Y_train.path)
    print(X_train_feature_scaled.uri)
    # with open(X_train.path, 'r') as f:
    #     x_train = array(loads(load(f)))
    x_train = read_csv(X_train_feature_scaled.uri).values
    y_train = read_csv(Y_train.uri).values

    model = Sequential()

    model.add(layers.Dense(10, activation='relu'))
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(1, activation='linear'))

    model.compile(
          loss='mse',
          optimizer=SGD(learning_rate=0.001),
          metrics=[MeanAbsoluteError()]
    )

    model.fit(x_train, y_train, batch_size=64, epochs=200)

    with open (Model.path, 'wb') as f:
        dump(model, f)

In [87]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/neural-network/test/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs", "numpy", "tensorflow", "sklearn"]
)
def test(
    X_test_feature_scaled: Input[Artifact],
    Y_test: Input[Artifact],
    Model: Input[Model]
):
    from pickle import load
    from numpy import array
    from pandas import read_csv
    from argparse import ArgumentParser
    from json import load as json_load, loads as json_loads
    from sklearn.metrics import mean_absolute_error


    # with open(args.x_test, 'r') as f:
    #     x_test = array(loads(json_load(f)))

    # with open(args.x_test, 'r') as f:
    #     x = array(json_loads(json_load(f)))

    x_test = read_csv(X_test_feature_scaled.uri)
    y_test = read_csv(Y_test.uri).values
    
    with open(Model.path, 'rb') as f:
        model = load(f)


    y_pred = model.predict(x_test)

    mse = mean_absolute_error(y_test, y_pred)

    print(mse)

In [88]:
@component()
def deploy(
    Model: Input[Model]
):
    pass

In [89]:
@pipeline(
    name="chicago-taxi",
    pipeline_root=PIPELINE_ROOT
)
def pipeline():
    get_data_task = get_data()
    prepare_data_task = prepare_data(get_data_task.outputs["dataset"])
    preprocessing_task = preprocess(prepare_data_task.outputs["X"], prepare_data_task.outputs["Y"])
    feature_scaling_task = feature_scale(preprocessing_task.outputs["X_train"], preprocessing_task.outputs["X_test"])
    training_task = train(feature_scaling_task.outputs["X_train_feature_scaled"], preprocessing_task.outputs["Y_train"])
    testing_task = test(feature_scaling_task.outputs["X_test_feature_scaled"], preprocessing_task.outputs["Y_test"], training_task.outputs["Model"])
    

In [90]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="pipeline.json"
)

In [91]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [92]:
job = aiplatform.PipelineJob(
    display_name="chicago-taxi-pipeline",
    template_path="pipeline.json",
    job_id="chicago-taxi-{0}".format(TIMESTAMP),
    enable_caching=True
)

In [93]:
job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/866354246469/locations/us-central1/pipelineJobs/chicago-taxi-20220112065523
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/866354246469/locations/us-central1/pipelineJobs/chicago-taxi-20220112065523')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chicago-taxi-20220112065523?project=866354246469
