**Kubeflow Pipeline Building - House Price model training and prediction**
* Create python function
    1. Download data
    2. Prepare data (split into train test df)
    3. Preprocess training data
    4. Train model
    5. Predict on test data
    6. Get metrics
    7. Store model to GCS
* Create components from python functions
* Initialise kubeflow pipeline
* Define the pipeline function and put together all the components, include disable cache
* Create run from pipeline function using the code

![pipeline](kube_pipeline.png)

In [95]:
import kfp
import requests
import kfp.dsl as dsl
import kfp.components as comp
from dotenv import load_dotenv
import kfp.gcp as gcp

In [96]:
!pip show kfp

Name: kfp
Version: 1.8.20
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors

## Authenticating Kubeflow to GCP
visit [here](https://v0-6.kubeflow.org/docs/gke/authentication/) to find the instruction, create secret in your namespace with gcp service account json. Make sure when you run **kubectl get secret -n yourkubeflow_namespace**, it returns the secret you just set. **kubectl create secret generic user-gcp-sa -n yourkubeflow_namespace --from-file=user-gcp-sa.json=path\sa.json**

## Create python function

### download data

In [97]:
web_downloader = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')


Author-email: 
License: 
Location: c:\users\deviy\anaconda3\envs\kube_pro\lib\site-packages
Requires: absl-py, click, cloudpickle, Deprecated, docstring-parser, fire, google-api-core, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, typing-extensions, uritemplate
Required-by: 


In [98]:
import_packages = ['pandas==1.3.5', 'numpy==1.21.6', 'scikit-learn==1.0.2']

### prepare data

In [99]:
def prepare_data(file_path: comp.InputPath(), train_output_csv: comp.OutputPath(), test_output_csv: comp.OutputPath()):
    import pandas as pd
    from zipfile import ZipFile
    from sklearn.model_selection import train_test_split
    
    def check_null(df):
        """ 
        Fucntion to check null values
        """
        null_counts = df.isnull().sum()
        total_rows = len(df)
        null_ratios = null_counts / total_rows
    
        # Create a DataFrame to hold the result
        result_df = pd.DataFrame({
            'column_name': null_counts.index,
            'null_count' : null_counts.values,
            'null_ratio': null_ratios.values
        })
        return result_df
        
    # Extracting from zip file 
    with ZipFile(file_path, 'r') as zip:
        zip.extractall()
    
    file_dir = 'Housing.csv'
    df = pd.read_csv(file_dir)
    check_null_df = check_null(df)
    print('shape:', df.shape)
    print(check_null_df)

    train_df, test_df = train_test_split(df, test_size=0.3, random_state=0)
    train_df.to_csv(train_output_csv, index=False)
    test_df.to_csv(test_output_csv, index=False)   
    print("\n ---- train and test csv is saved")

### preprocess_training_data

In [100]:
def preprocess_training_data(train_path: comp.InputPath(), cleaned_train_output_csv:comp. OutputPath()):
    import pandas as pd

    def get_feature_lists(df, target):
        """
        Function to categorize DataFrame columns into categorical, numerical, and date columns
        """
        categorical_feature = []
        numerical_feature = []
        date_columns = []
        for col in df.drop(columns=target).columns:
            if df[col].dtype == 'object':
                categorical_feature.append(col)
            elif df[col].dtype == 'datetime64[ns]':
                date_columns.append(col)
            else:
                numerical_feature.append(col)
        return categorical_feature, numerical_feature, date_columns
    
    def convert_columns_to_binary(df, columns_to_convert):
        """
        Function to convert specified columns in a DataFrame to binary format
        """
        for col in columns_to_convert:
            df[col] = df[col].map(lambda x: 1 if x == 'yes' else 0)
        return df

    def convert_furnish_column(df, type, categorical_column_list):
        """
        Function to convert categorical
        """   
        if type == 'train':
            df = pd.get_dummies(df, drop_first=True, columns = categorical_column_list)
        else: 
            df = pd.get_dummies(df,drop_first=False, columns = categorical_column_list)
        return df

    train_df = pd.read_csv(train_path)
    target = 'price'
    categorical_feature, numerical_feature, date_columns = get_feature_lists(train_df, target)
    print('categorical feature:', categorical_feature)
    print('numerical_feature:', numerical_feature)
    print('date column:', date_columns)

    
    columns_to_convert_bool = ['mainroad', 'guestroom', 'basement', 'hotwaterheating', 'airconditioning', 'prefarea']
    train_df = convert_columns_to_binary(train_df, columns_to_convert_bool)

    train_df = convert_furnish_column(train_df, 'train', ['furnishingstatus',])

    train_df.fillna(0, inplace=True)
    train_df.to_csv(cleaned_train_output_csv, index=False)
    

### train model

In [101]:
def train_model(cleaned_train_path: comp.InputPath(), model: comp.OutputPath()):
    import pandas as pd
    import pickle
    from sklearn.ensemble import RandomForestRegressor

    train_df = pd.read_csv(cleaned_train_path)
    target = 'price'
    x_train = train_df.drop(columns=[target])
    y_train = train_df[target]
    
    rf_regressor = RandomForestRegressor(n_estimators=25, random_state=42)
    rf_regressor.fit(x_train, y_train)
    
    
    # with open(f'workspace/model.pkl', 'wb') as f:
    with open(model, 'wb') as f:
        pickle.dump(rf_regressor, f)
    
    print("\n rf regressor wass trained and saved to /data/model.pkl ----")

### predict_on_test_data

In [102]:
def predict_on_test_data(cleaned_train_df_path: comp.InputPath(), test_df_path: comp.InputPath(), model: comp.InputPath(), cleaned_test_result_csv: comp.OutputPath()):
    import pandas as pd
    import numpy as np
    import pickle

    def convert_columns_to_binary(df, columns_to_convert):
        """
        Function to convert specified columns in a DataFrame to binary format
        """
        for col in columns_to_convert:
            df[col] = df[col].map(lambda x: 1 if x == 'yes' else 0)
        return df

    def convert_furnish_column(df, type, categorical_column_list):
        """
        Function to convert categorical
        """   
        if type == 'train':
            df = pd.get_dummies(df, drop_first=True, columns = categorical_column_list)
        else: 
            df = pd.get_dummies(df,drop_first=False, columns = categorical_column_list)
        return df
    
    # with open(f'workspace/model.pkl','rb') as f:
    with open(model,'rb') as f:
         rf_regressor = pickle.load(f)

    target = 'price'
    x_train = pd.read_csv(cleaned_train_df_path).head().drop(columns=target)
    test_df = pd.read_csv(test_df_path)
    x_test = test_df.drop(columns=[target])
    y_test = test_df[target]

    columns_to_convert_bool = ['mainroad', 'guestroom', 'basement', 'hotwaterheating', 'airconditioning', 'prefarea']
    x_test = convert_columns_to_binary(x_test, columns_to_convert_bool)
    x_test = convert_furnish_column(x_test, 'test', ['furnishingstatus',])

    #make sure train and test set have the same number of columns 
    _, x_test = x_train.align(x_test,join='left',axis=1)
    x_test.fillna(0, inplace=True)

    x_test['prediction'] = rf_regressor.predict(x_test)
    x_test[target] = y_test

    x_test.to_csv(cleaned_test_result_csv, index=False)
    

### get metrics

In [103]:
def get_metrics(test_result_path: comp.InputPath(), cleaned_train_df_path: comp.InputPath()):
    import pandas as pd
    import numpy as np
    from sklearn.metrics import mean_squared_error

    target = 'price'
    test_df = pd.read_csv(test_result_path)[[target,'prediction']]
    train_df = pd.read_csv(cleaned_train_df_path)
    print('std of full data:', np.std(test_df[target].append(train_df[target])))
    mse = mean_squared_error(test_df[target], test_df['prediction'])
    print('rmse of test:', np.sqrt(mse))

### store model to gcs

In [104]:
save_model= kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/112de249a2c252f0a636bbfdf469d7ef2456f286/components/google-cloud/storage/upload_to_explicit_uri/component.yaml')

### Kubeflow pipeline creation work start from here

In [105]:
import_packages = ['pandas==1.3.5', 'numpy==1.21.6', 'scikit-learn==1.0.2']

prepare_data_op = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.7',
    packages_to_install=import_packages
)
preprocess_training_data_op = kfp.components.create_component_from_func(
    func=preprocess_training_data,
    base_image='python:3.7',
    packages_to_install=import_packages
)
train_model_op = kfp.components.create_component_from_func(
    func=train_model,
    base_image='python:3.7',
    packages_to_install=import_packages
)
predict_on_test_data_op = kfp.components.create_component_from_func(
    func=predict_on_test_data,
    base_image='python:3.7',
    packages_to_install=import_packages
)
get_metrics_op = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='python:3.7',
    packages_to_install=import_packages
)

In [106]:

# # Define the pipeline
@dsl.pipeline(
   name='House Price Prediction Kubeflow Pipeline',
   description='A pipeline that performs house price prediction'
)

# # Define parameters to be fed into pipeline
def house_price_pipeline(url, gcs_model_path):

    # # if output or input using component input/output path, code below might be necessary
    # pv = dsl.VolumeOp(
    # name="t-vol",
    # resource_name="t-vol", 
    # size="1Gi", 
    # modes=dsl.VOLUME_MODE_RWO)
    # persistent_volume_path = '/workspace'
    
    web_downloader_task = web_downloader(url=url)

    prepare_data_task = prepare_data_op(web_downloader_task.outputs['data'])

    preprocess_training_data_task = preprocess_training_data_op(prepare_data_task.outputs['train_output_csv'])
    
    train_model_task = train_model_op(preprocess_training_data_task.outputs['cleaned_train_output_csv'])#.add_pvolumes({persistent_volume_path: pv.volume})

    predict_on_test_data_task = predict_on_test_data_op(preprocess_training_data_task.outputs['cleaned_train_output_csv'], prepare_data_task.outputs['test_output_csv'],train_model_task.outputs['model']).after(train_model_task)
                                                    # .add_pvolumes({persistent_volume_path: pv.volume}).after(train_model_task)
                                                        
    
    get_metrics_task = get_metrics_op(predict_on_test_data_task.outputs['cleaned_test_result_csv'], preprocess_training_data_task.outputs['cleaned_train_output_csv'])

    save_model_task = save_model(data=train_model_task.outputs['model'],gcs_path=gcs_model_path).apply(gcp.use_gcp_secret('user-gcp-sa')).after(get_metrics_task)
                                                                                                #.add_pvolumes({persistent_volume_path: pv.volume}).after(get_metrics_task)
    
    web_downloader_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    preprocess_training_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train_model_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    predict_on_test_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    get_metrics_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    save_model_task.execution_options.caching_strategy.max_cache_staleness = "P0D"

    
    
    

### set gcs_model_path

In [107]:
import datetime
print(datetime.datetime.now().date())


pipeline_func = house_price_pipeline
experiment_name = 'house_price_exp' +"_"+ str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = 'kubeflow'

gcs_model_path = 'gs://<your_bucket>/house_price_rf.pkl'
arguments = {'url': 'https://drive.google.com/uc?export=download&id=1QYduO-649A1ZhHUg46lwQ71dK1n_Au1Y',
            'gcs_model_path' : gcs_model_path,
            }


client = kfp.Client() # change arguments accordingly
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)






2024-04-22
