## Baseball win percentage prediction  

In [None]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

In [None]:
# Restart the kernel
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

### Set up global variables

In [None]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "data-engineering-435508"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2024_trs"


# The model_repo is specified in the pipeline definition
# Threshold values are specified in the pipelien definition
# Data url is specified in the pipeline definition

### Import libraries

In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

ImportError: cannot import name 'Artifact' from 'kfp.dsl' (c:\Users\thomz\AppData\Local\Programs\Python\Python313\Lib\site-packages\kfp\dsl\__init__.py)

## Create pipeline components
Components include:
* Load data
* Train model
* Evaluate model
* Deploy model

## Create pipeline components

### Data ingestion

In [None]:
@dsl.component(
    packages_to_install=["pandas","google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info('Downloaded Data!')
    
    
# CURRENTLY NOT USED

### Train and test split

In [1]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(
    dataset: Input[Dataset], 
    dataset_train: Output[Dataset], 
    dataset_test: Output[Dataset]):
    
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Load data from csv
    alldata = pd.read_csv(dataset.path, index_col=None)
    
    # Split
    train, test = train_test_split(alldata, test_size=0.3)
    
    # Output definitions
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

NameError: name 'dsl' is not defined

### Train the KNN Regression model

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def train_model(
    features: Input[Dataset], 
    model: Output[Model]):
    
    '''train a KNN regressor with default parameters (k = 4)'''
    import pandas as pd
    from sklearn.neighbors import KNeighborsRegressor
    from sklearn import metrics
    import json
    import logging 
    import sys
    import os
    import pickle
       
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load the train set
    data = pd.read_csv(features.path+".csv")
  
    # Train the model
    model_knn = KNeighborsRegressor(n_neighbors=4)
    model_knn.fit(data.drop('Win_percentage', axis = 1), data['Win_percentage'])
    
    # Save the model
    model.metadata["framework"] = "KNN"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:
        pickle.dump(model_knn, file)


### Evaluate the model

In [3]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2','numpy'],
    base_image="python:3.10.7-slim"
)
def evaluate_model(
    model_knn: Input[Model], 
    test_set: Input[Dataset], 
    thresholds_dict_str: str, 
    kpi: Output[Metrics]
) -> NamedTuple('outputs', approval=bool):
    
    '''evaluate the KNN regressor'''
    import pandas as pd
    from sklearn.neighbors import KNeighborsRegressor
    from sklearn.model_selection import train_test_split
    from numpy import nan_to_num
    import json
    import logging 
    import sys
    import os
    import pickle
       
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Create threshold_check function
    def threshold_check(val1, val2):
        cond = False
        if val1 >= val2 :
            cond = True
        return cond
    
    # Load the model
    m_filename = model_knn.path + ".pkl"
    model = pickle.load(open(m_filename, 'rb'))
    
    # Load data 
    data = pd.read_csv(test_set.path+".csv")
    X_test = data.drop(columns=["Win_percentage"])
    y_test = data['Win_percentage']
    y_pred = model.predict(X_test)
    
    # Compute R^2
    r_squared = model.score(X_test, y_test)

    # Threshold
    thresholds = nan_to_num(thresholds)
    
    thresholds_dict = json.loads(thresholds_dict_str)
    model_knn.metadata["R-squared"] = float(r_squared)
    kpi.log_metric("R-squared", float(r_squared))
    outputs = NamedTuple('outputs', approval=bool)
    approval_value = threshold_check(float(r_squared), int(thresholds_dict['R-squared']))
    return outputs(approval_value)

    

TypeError: component() got an unexpected keyword argument 'packages_to_install'

### Deploy the model

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model_name:str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    dest_file_name= model_name + '.pkl'
    blob = bucket.blob(dest_file_name)
    source_file_name= model.path + '.pkl'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")

## Create the pipeline

In [4]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-moneyball",
    
)
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str:str, model_repo_uri:str):    
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    # Splitting and scaling the model
    split_op = train_test_split(
        dataset = dataset_op.output)
    
    # Training the model
    model_train_op = train_model(
        features = split_op.outputs["dataset_train"]) 
    
    # Evaluating the model
    model_evaluation_op = evaluate_model(
        model_knn = model_train_op.outputs["model"],
        test_set = split_op.outputs["dataset_test"],
        thresholds_dict_str = thresholds_dict_str,
    )

    
    with dsl.If(
        model_evaluation_op.outputs["approval"]==True,
        name="approve-model",
    ):
           
        upload_model_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model_name="moneyball",
            model = model_train_op.outputs['model']
        )    
    

SyntaxError: parameter without a default follows parameter with a default (3547794537.py, line 13)

### Compile and run the pipeline

In [None]:
from kfp import compiler

compiler.Compiler().compile(pipeline_func=pipeline, package_path = 'ml_moneyball.yaml')

In [None]:

import google.cloud.aiplatform as aip

aip.init(
    project=PROJECT_ID,
    location=REGION,
)

job = aip.PipelineJob(
    display_name="moneyball-pipeline",
    template_path="ml_moneyball.yaml",
    enable_caching=False,
    location=REGION,    
    parameter_values={
            'project_id': PROJECT_ID, # makesure to use your project id
            'data_bucket': 'data_de2024_trs',  # makesure to use your data bucket name 
            'dataset_uri':'gs://data_de2024_trs/baseball_clean.csv',
            'model_repo':'models_de2024_trs', # makesure to use your model bucket name 
            'thresholds_dict_str':'{"R-squared":0.7}',
            'model_repo_uri':'gs://models_de2024_trs' # makesure to use your model bucket name 
        }
)
job.run()