# Upgrade chicago Pipeline

In the lab session 2 you had to create component for a basic pipeline, now we will add preprocessing and model push to minio

In [49]:
# load dependancies
import kfp as kfp
import kfp.dsl as dsl
from kfp import components
import os
from kfp.components import InputPath, OutputPath, create_component_from_func
from minio import Minio
import urllib3
import datetime as dt
import pandas as pd

## 3.0 Recover a stable state for development

### 3.0.1 Get the data locally to test components functions

In [29]:
!pwd

/home/jovyan/aiengineercourse/Data_Pipeline_2


In [71]:
!mkdir ./chicagodata
!mkdir ./data

mkdir: cannot create directory ‘./chicagodata’: File exists
mkdir: cannot create directory ‘./data’: File exists


### 3.0.1.1 Get data from opendata data.cityofchicago

In [58]:
# Get the dataset Taxi Trips as CSV
!curl --get 'https://data.cityofchicago.org/resource/wrvz-psew.csv' \
  --data-urlencode '$limit=10000' \
  --data-urlencode '$where=trip_start_timestamp >= "2023-01-01" AND trip_start_timestamp < "2023-02-01"' \
  --data-urlencode '$select=tips,trip_start_timestamp,trip_seconds,trip_miles,pickup_community_area,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area,fare,tolls,extras,trip_total' \
  | tr -d '"' > "./chicagodata/trip.csv"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1087k    0 1087k    0     0   351k      0 --:--:--  0:00:03 --:--:--  351k


### 3.0.1.2 Ensure parquet dataset is in Minio

In [59]:
bucket=''#firstname-name

In [60]:
## Create a client with the access key and the secret key given
client = Minio(
    "storage-api.course.aiengineer.codex-platform.com",
    access_key=os.getenv("MINIO-ACCESS-KEY"),
    secret_key=os.getenv("MINIO-SECRET-KEY"),
    secure=True,
    http_client=urllib3.PoolManager(
        
        retries=urllib3.Retry(
            total=5,
            backoff_factor=0.2,
            status_forcelist=[500, 502, 503, 504],
        ),
    ),
)

In [61]:
### use the api to list objects into the bucket
objects = client.list_objects(bucket,prefix="datasets",recursive=True)
for obj in objects:
    print(obj)

<Object: bucket_name: guillaume-etevenard object_name: b'datasets/chicago/trips.parquet' version_id: None last_modified: 2023-11-13 09:51:03.094000+00:00 etag: a8bb9b8e7ea668308e0ec481e21435c1 size: 138492 content_type: None is_dir: False metadata: None >


### 3.0.1.3 If not, put it from local data

In [62]:
# import depandancies
from io import BytesIO
import pyarrow

data = pd.read_csv("./chicagodata/trip.csv",sep=",")

### We will persist using "parquet" instead of csv for encoding/typing purpose
### convert data to parquet using pandas (if you struggle with the parquet engine used by pandas, choose pyarrow)
parquet_bytes=data.to_parquet(engine='auto')

### Use BytesIO to wrap parquet into a bytes stream objetc
parquet_buffer = BytesIO(parquet_bytes)

path_minio="datasets/chicago/trips.parquet"

### put the parquet file
### fill the params with the put_object documentation
client.put_object(bucket,
                   path_minio,
                    data=parquet_buffer,
                    length=len(parquet_bytes),
                    content_type='application/parquet')

('a8bb9b8e7ea668308e0ec481e21435c1', None)

## 3.1 From last session pipelines and components

### 3.1.1 Cpmponents

You have access to precompiled components to build the initial pipeline, you can find the yaml and python definitions in ./components

In [63]:
get_data_from_minio_op = components.load_component_from_file('components/get_data_from_minio.yaml')
xgboost_predict_on_csv_op=components.load_component_from_file('components/xgb_predict.yaml')
xgboost_train_on_csv_op=components.load_component_from_file('components/xgb_train_dbg.yaml')

Here is the base code of the chicago pipeline, using the 3 former components

In [64]:
### username is like the last sessions, firstname-lastname
username = ''#firstname-name
namespace=f'kubeflow-user-{username}'

### 3.1.2 Taxi Tips Pipeline

In [65]:
@dsl.pipeline(name='xgboost_chicago_base')
def xgboost_pipeline(namespace=namespace):
    import datetime
    from kfp.onprem import use_k8s_secret
    
    bucket=''#firstname-name
    
    data = get_data_from_minio_op(
        minio_path = 'datasets/chicago/trips.parquet',
        bucket = bucket,
    )
    
    ### this allows using real secret in a component
    data.apply(
        use_k8s_secret(
            secret_name='minio-service-account',
            k8s_secret_key_to_env={
                'access_key':'MINIO_ACCESS_KEY',
                'secret_key':'MINIO_SECRET_KEY'
            }
        )
    )
    
    
    # Training and prediction on dataset in CSV format
    model_trained_on_csv = xgboost_train_on_csv_op(
        training_data=data.output,
        label_column=0,
        objective='reg:squarederror',
        num_iterations=200,
    ).set_memory_limit('1Gi').outputs
    
    xgboost_predict_on_csv_op(
        data=data.output,
        model=model_trained_on_csv['model'],
        label_column=0,
    ).set_memory_limit('1Gi')

In [66]:
### a token has been automatically provided in the KF_PIPELINES_SA_TOKEN_PATH variable. This token allow accès to only your namespace
token_file = os.getenv("KF_PIPELINES_SA_TOKEN_PATH")
with open(token_file) as f:
    token = f.readline()
client = kfp.Client(host='http://ml-pipeline.kubeflow.svc.cluster.local:8888',
               existing_token=token)

In [67]:
### Define a new experiment for this session runs
EXPERIMENT_NAME = 'Aiengineer labs session3'

In [44]:
### submit the base pipeline
run_id = client.create_run_from_pipeline_func(
    pipeline_func = xgboost_pipeline, 
    namespace=namespace, 
    experiment_name=EXPERIMENT_NAME,
    run_name=f"XGB_chicago_base{dt.datetime.today().isoformat()}",
    arguments={},
).run_id
print("Run ID: ", run_id)

Run ID:  3ae94be4-5504-4c76-b955-f09a0f8c78d2


## 3.2 Upgrade the pipeline developping new components

### 3.2.1 Preprocessing component

He will be responsible of 
- impute missing values on train/test
- get rid of outliers
- separate train/test from validation datam
- Return 2 parquet datasets : train_test_set that will be used by train component and validation_set that will be used by predict component

In [68]:
def preprocess_data(
    input_data_path: InputPath('parquet'), 
    preprocess_train_test_data: OutputPath(),
    preprocess_validation_data: OutputPath(),
    label_column: int = 0,
):

    import pandas as pd
    import numpy as np
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import IsolationForest
    # Missing values imputations with mean values
    from sklearn.impute import SimpleImputer

    
    ### load data ###
    
    df = pd.read_parquet(
        input_data_path,
    )
    
    ### autoclean data to allow only copatible types in features
    numerics = ['int','float']
    df = df.select_dtypes(include=numerics)

    ### separate train_test from validation

    # Create our imputer to replace missing values with the mean e.g.
    imp = SimpleImputer(missing_values=np.nan, strategy='mean')
    imp_df = imp.fit(df)

    # Impute our data, then transform train_test dataset 
    df_imp = imp_df.transform(df)

    # Instanciate isolation forest to get rid of outliers
    isolate_forest = IsolationForest(n_jobs=-1, random_state=1)
    isolate_forest.fit(df_imp)
    isolate_predictions =isolate_forest.predict(df_imp)

    ### clean dataset with results
    df_isolated = df_imp[np.where(isolate_predictions == 1, True, False)]
    
    ### back to dataframe
    dataset = pd.DataFrame(df_isolated,columns = df.columns)
    
    ### separate train_test from validation
    train_test,validation_set = train_test_split(dataset, test_size=0.2)
    
    ### write to parquet
    train_test.to_parquet(preprocess_train_test_data)
    validation_set.to_parquet(preprocess_validation_data)

#### test the preprocess component locally

In [70]:
### need parquet in input
data = pd.read_csv("./chicagodata/trip.csv",sep=",")
data.to_parquet("./chicagodata/trip.parquet")
### define output targets
train="./data/train_test.parquet"
val="./data/validation.parquet"
### execute the preprocess
preprocess_data("./chicagodata/trip.parquet",train,val)
###verify the cut
print(pd.read_parquet(train).count())
print(pd.read_parquet(val).count())

tips                         6704
trip_seconds                 6704
trip_miles                   6704
pickup_community_area        6704
pickup_centroid_latitude     6704
pickup_centroid_longitude    6704
dropoff_community_area       6704
fare                         6704
tolls                        6704
extras                       6704
trip_total                   6704
dtype: int64
tips                         1677
trip_seconds                 1677
trip_miles                   1677
pickup_community_area        1677
pickup_centroid_latitude     1677
pickup_centroid_longitude    1677
dropoff_community_area       1677
fare                         1677
tolls                        1677
extras                       1677
trip_total                   1677
dtype: int64


#### Build the preprocess component

In [72]:
create_component_from_func(
    preprocess_data,
    output_component_file='components/preprocess_data.yaml',
    base_image='python:3.8',
    packages_to_install=[
        'numpy==1.21.6',
        'xgboost==1.1.1',
        'pandas==1.0.5',
        'tensorboardX==2.5.1',
        'scikit-learn==1.0',
        'pyarrow==10.0.1'
    ],
)

<function Preprocess data(input_data: 'parquet', label_column: int = '0')>

### 3.2.2 Model export component

In [88]:
def save_xgboost_model_bst(
    bucket: str,
    input_model_path: InputPath('XGBoostModel'),
    minio_model_path:str
):
    '''Make predictions using a trained XGBoost model.
    Args:
        bucket: Bucket name used in Minio to store the model 
        model_path: Path for the trained model in binary XGBoost format.
    '''
    import xgboost
    import urllib3
    from minio import Minio
    from datetime import datetime
    import os

    # load model using input model_path
    model = xgboost.Booster(model_file=input_model_path)

    model_dir = "."
    BST_FILE = "model.bst"
    model_file = os.path.join((model_dir), BST_FILE)
    model.save_model(model_file)

    client = Minio(
        "storage-api.course.aiengineer.codex-platform.com",
        access_key=os.getenv("MINIO_ACCESS_KEY"),
        secret_key=os.getenv("MINIO_SECRET_KEY"),
        secure=True,
        http_client=urllib3.PoolManager(

            retries=urllib3.Retry(
                total=5,
                backoff_factor=0.2,
                status_forcelist=[500, 502, 503, 504],
            ),
        ),
    )

    ### define path where the object will be stored
    minio_model_name = f'{minio_model_path}/{BST_FILE}'
    ### put object
    client.fput_object(bucket, minio_model_name, BST_FILE)

In [89]:
create_component_from_func(
    save_xgboost_model_bst,
    output_component_file='components/save_xgboost_model_bst.yaml',
    base_image='python:3.8',
    packages_to_install=[
        'numpy==1.21.6',
        'minio==6.0.2',
        'xgboost==1.1.1',
    ],
)

<function Save xgboost model bst(bucket: str, input_model: 'XGBoostModel', minio_model_path: str)>

### 3.2.3 Upgraded pipeline

![pipeline2](./images/pipeline2.png)

In [90]:
preprocess_data_op =components.load_component_from_file('components/preprocess_data.yaml')
save_xgboost_model_bst_op =components.load_component_from_file('components/save_xgboost_model_bst.yaml')

In [93]:
user=''#firstname-lastname
namespace=f'kubeflow-user-{user}'
dsl.pipeline(name='xgboost_chicago_base')
def xgboost_pipeline_upgraded(namespace=namespace):
    import datetime
    from kfp.onprem import use_k8s_secret
    
    bucket=''#firstname-lastname
    
    data = get_data_from_minio_op(
        minio_path = 'datasets/chicago/trips.parquet',
        bucket = bucket,
    )
    
    data.apply(
        use_k8s_secret(
            secret_name='minio-service-account',
            k8s_secret_key_to_env={
                'access_key':'MINIO_ACCESS_KEY',
                'secret_key':'MINIO_SECRET_KEY'
            }
        )
    )
    
    preprocessed_data = preprocess_data_op(
        input_data = data.output
    ).set_memory_limit('1Gi')
    
    
    # Training and prediction on dataset in CSV format
    model_trained_on_csv = xgboost_train_on_csv_op(
        training_data=preprocessed_data.outputs['preprocess_train_test_data'],
        label_column=0,
        objective='reg:squarederror',
        num_iterations=200,
    ).set_memory_limit('1Gi').outputs
    
    xgboost_predict_on_csv_op(
        data=preprocessed_data.outputs['preprocess_validation_data'],
        model=model_trained_on_csv['model'],
        label_column=0,
    ).set_memory_limit('1Gi')
    
    saved = save_xgboost_model_bst_op(
        bucket=bucket,
        input_model = model_trained_on_csv['model'],
        minio_model_path ='models/frompipeline/xgboost/chicago'
    ).set_memory_limit('1Gi')
    
    saved.apply(
        use_k8s_secret(
            secret_name='minio-service-account',
            k8s_secret_key_to_env={
                'access_key':'MINIO_ACCESS_KEY',
                'secret_key':'MINIO_SECRET_KEY'
            }
        )
    )

In [94]:
### submit the base pipeline
run_id = client.create_run_from_pipeline_func(
    pipeline_func = xgboost_pipeline_upgraded, 
    namespace=namespace, 
    experiment_name=EXPERIMENT_NAME,
    run_name=f"XGB_chicago_upgrade{dt.datetime.today().isoformat()}",
    arguments={},
).run_id
print("Run ID: ", run_id)

Run ID:  f2cd5cb4-8c59-4342-9368-f4d3355923b6
