In [1]:
USER_FLAG = "--user"
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0



In [2]:
import os

if not os.getenv("IS_TESTING") and False:
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [3]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


In [63]:
PROJECT_ID: str = "" #redacted
BUCKET_NAME: str = "gs://dsa-ross"

In [146]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [None]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics, Dataset, OutputPath, importer, Condition
from kfp.v2.google import experimental
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform as aip
from google.cloud import storage
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [6]:
PATH = %env PATH
%env PATH = {PATH}:/home/jupyter/.local/bin
REGION = "us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://dsa-ross/pipeline_root/'

In [10]:
! gcloud config set project REDACTED

Updated property [core/project].


In [12]:
! gsutil ls -al $BUCKET_NAME

     45010  2021-11-22T03:23:36Z  gs://dsa-ross/store.csv#1637551416983830  metageneration=1
     34508  2021-11-22T03:23:23Z  gs://dsa-ross/train_sample.csv#1637551403833928  metageneration=1
TOTAL: 2 objects, 79518 bytes (77.65 KiB)


In [402]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

In [403]:
# @component( packages_to_install=['pandas==1.1.4'], output_component_file='mergedat.yaml' )
@component( packages_to_install=['pandas==1.1.4'] )
def merge_dat(store: Input[Dataset], train: Input[Dataset], merged_out_csv: Output[Dataset]):
    
    import pandas as pd
    
    store_df = pd.read_csv(store.path)
    train_df = pd.read_csv(train.path, parse_dates=['Date'])
    
    merged_df = train_df.merge(store_df, left_on='Store', right_on='Store')
    merged_df['StateHoliday'] = merged_df['StateHoliday'].replace(0,'0')
    merged_df = merged_df.sort_values(by=['Date','Store'])
    
    merged_df.to_csv(merged_out_csv.path, index=False)

In [404]:
@component( packages_to_install=['pandas==1.1.4'] )
def feature_eng(merged: Input[Dataset], engineer_out_csv: Output[Dataset]):
    import pandas as pd
    
    merged_df = pd.read_csv(merged.path, parse_dates=['Date'])
    merged_df['StateHoliday'] = merged_df['StateHoliday'].replace(0,'0')
    
    datesplit = merged_df.Date.astype(str).apply(lambda x: x.split('-'))
    years = [i[0] for i in datesplit]
    months = [i[1] for i in datesplit]
    days = [i[2] for i in datesplit]
    merged_df['year'] = years
    merged_df['year'] = merged_df['year'].astype(int)
    merged_df['month'] = months
    merged_df['month'] = merged_df['month'].astype(int)
    merged_df['day'] = days
    merged_df['day'] = merged_df['day'].astype(int)
    
    merged_df = merged_df.drop(['Date', 'Customers'], axis=1) #make sure to drop custs!
    merged_df.to_csv(engineer_out_csv.path, index=False)

In [405]:
@component( packages_to_install=['pandas==1.1.4','sklearn','joblib','google-cloud'] )
def prep( df: Input[Dataset], train_csv: Output[Dataset], test_csv: Output[Dataset], ordinalEnc: Output[Model], ord_name: str ):
    
    import pandas as pd
    from sklearn.preprocessing import OrdinalEncoder
    from sklearn.model_selection import train_test_split
    import joblib

    merged_df = pd.read_csv(df.path)
    merged_df['StateHoliday'] = merged_df['StateHoliday'].replace(0,'0')
    
    categoricals = merged_df.select_dtypes(object).columns
    ord_enc = OrdinalEncoder()
    merged_df[categoricals] = ord_enc.fit_transform(merged_df[categoricals])
    
    train, test = train_test_split(merged_df, shuffle=False, random_state=42)
    
    train.to_csv(train_csv.path, index=False)
    test.to_csv(test_csv.path, index=False)
    
    ordinalEnc.metadata["framework"] = "scikit-learn"
    joblib.dump(ord_enc, ordinalEnc.path)
    
    from google.cloud import storage
    storage_client = storage.Client()
    bucket = storage_client.bucket("dsa-ross")
    blob = bucket.blob(ord_name)
    blob.upload_from_filename(ordinalEnc.path)

In [456]:
@component( packages_to_install=['pandas==1.1.4','xgboost', 'sktime','google-cloud'])
def train_xgbreg(train_df: Input[Dataset], xgbreg_model: Output[Model])-> NamedTuple("Outputs", [("xgb_path", str)]):
    from xgboost import XGBRegressor
    from sktime.performance_metrics.forecasting import MeanSquaredPercentageError
    import pandas as pd
    
    
    train = pd.read_csv(train_df.path)
    X = train.drop('Sales', axis=1)
    y = train[['Sales']]
    
    #TODO: GPU
    # model = XGBRegressor(random_state=42, n_jobs=-1, n_estimators=1000, max_depth=5, tree_method='gpu_hist', eval_metric=MeanSquaredPercentageError(square_root=True))
    model = XGBRegressor(random_state=42, n_jobs=-1, n_estimators=1500, max_depth=5, eval_metric=MeanSquaredPercentageError(square_root=True))
    model.fit(X, y)
    
    y_tr_nonzero = y[(y.values > 0)]
    X_tr_nonzero = X[(y.values > 0)]
    preds_nz = model.predict(X_tr_nonzero)
    rmspe = MeanSquaredPercentageError(square_root=True)
    train_rmspe = rmspe(y_tr_nonzero, preds_nz)
    
    xgbreg_model.metadata["train_score"] = float(train_rmspe)
    xgbreg_model.metadata["framework"] = "XGBoost"
    model.save_model(xgbreg_model.path)
    
    from google.cloud import storage
    storage_client = storage.Client()
    bucket = storage_client.bucket("dsa-ross")
    blob = bucket.blob("xgb/model.bst")
    blob.upload_from_filename(xgbreg_model.path)
    return (xgbreg_model.path,)


In [457]:
@component( packages_to_install=['pandas==1.1.4','xgboost', 'sktime', 'sklearn'])
def eval_model(test_df: Input[Dataset], trained_xgb: Input[Model], metrics: Output[ClassificationMetrics], smetrics: Output[Metrics]
              ) -> NamedTuple("Outputs", [("dep_decision", str)]):
    from xgboost import XGBRegressor
    from sktime.performance_metrics.forecasting import MeanSquaredPercentageError
    import pandas as pd
    import logging
    
    test = pd.read_csv(test_df.path)
    X = test.drop('Sales', axis=1)
    y = test[['Sales']]
    
    model = XGBRegressor()
    model.load_model(trained_xgb.path)
    
    y_te_nonzero = y[(y.values > 0)]
    X_te_nonzero = X[(y.values > 0)]
    preds_nz = model.predict(X_te_nonzero)
    rmspe = MeanSquaredPercentageError(square_root=True)
    test_rmspe = rmspe(y_te_nonzero, preds_nz)
    
    #TODO: ROC for regression graph?
    # from sklearn.metrics import roc_curve
    # # y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    # fpr, tpr, thresholds = roc_curve(
    #      y_true=y_te_nonzero, y_score=preds_nz, pos_label=True
    # )
    # metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    
    decision = "yes"
    if float(test_rmspe) >= 0.5: decision = "no"
    logging.getLogger().setLevel(logging.INFO)
    logging.info(f"deployment decision is {decision}")
    
    trained_xgb.metadata["test_score"] = float(test_rmspe)
    smetrics.log_metric("score", float(test_rmspe))
    # smetrics.log_metric("RMSPE Score", float(test_rmspe))
    
    return (decision,)

In [458]:
@component
def get_xgb_path(xgb:Input[Model]) -> str:
    return xgb.path

In [460]:
WORKING_DIR = f"{PIPELINE_ROOT}977584873066/ross-xgb-{TIMESTAMP}"
@dsl.pipeline( name="ross-xgb", description="Rossman XGB Regression Pipe", pipeline_root=PIPELINE_ROOT,)
def pipe1(project: str=PROJECT_ID,
          gcp_region: str = "us-central1", api_endpoint: str = "us-central1-aiplatform.googleapis.com",
          model_display_name: str = f"trained{TIMESTAMP}",
          serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest",):
    store = importer(
        artifact_uri=f"gs://dsa-ross/store.csv",
        artifact_class=Dataset,
        reimport=False,
    )
    train = importer(
        artifact_uri=f"gs://dsa-ross/train.csv",
        # artifact_uri=f"gs://dsa-ross/train_sample.csv",
        artifact_class=Dataset,
        reimport=False,
    )
    merged = merge_dat(store=store.output, train=train.output)
    featured = feature_eng(merged=merged.outputs["merged_out_csv"])
    prepped = prep(df=featured.outputs["engineer_out_csv"], ord_name=f"ord{model_display_name}.joblib")
    train = train_xgbreg(train_df=prepped.outputs["train_csv"])
    # train = train_xgbreg(train_df=prepped.outputs["train_csv"]).add_node_selector_constraint(label_name="cloud.google.com/gke-accelerator",  value="NVIDIA_TESLA_T4" ).set_gpu_limit(1)
    # train = train_xgbreg(train_df=prepped.outputs["train_csv"]).apply(kf.gcp.use_preemptible_nodepool()).set_gpu_limit( 1, 'nvidia')
    evals = eval_model(test_df=prepped.outputs["test_csv"], trained_xgb=train.outputs["xgbreg_model"])
    # import logging
    # logging.getLogger().setLevel(logging.INFO)
    # xgb_path = get_xgb_path(xgb=train.outputs["xgbreg_model"]).output
    # logging.info(f"xgb path is {xgb_path}")
    
    with dsl.Condition(
        evals.outputs["dep_decision"] == "yes",
        name="deploy_decision",
    ):
        model_upload = gcc_aip.ModelUploadOp(
            project=project,
            display_name=model_display_name,
            artifact_uri="gs://dsa-ross/xgb",
            # artifact_uri=train.outputs["xgb_path"],
            # artifact_uri=WORKING_DIR,
            serving_container_image_uri=serving_container_image_uri,
        # serving_container_environment_variables={"NOT_USED": "NO_VALUE"},
        )
        model_upload.after(train)
        
        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="Rossman_XGBRegressor",
        )
        gcc_aip.ModelDeployOp(
            model=model_upload.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )
        

In [461]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [462]:
compiler.Compiler().compile(pipeline_func=pipe1, package_path="pipe1.json")

In [464]:
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket("dsa-ross")
blob = bucket.blob("dsa-rossman-pipe")
blob.upload_from_filename("pipe1.json")

In [None]:
DISPLAY_NAME = f"dsa-rossman-{TIMESTAMP}"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="pipe1.json",
    pipeline_root=PIPELINE_ROOT,
    # parameter_values={},
)

job.run()

In [None]:
# api_client = AIPlatformClient(project_id=PROJECT_ID, region="us-central1")

# api_client.create_schedule_from_job_spec(
#     job_spec_path="pipe1.json",
#     schedule=0 0 * * 0,
#     # time_zone=TIME_ZONE,
#     # parameter_values=PIPELINE_PARAMETERS
# )