In [1]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'brave-watch-414204'

In [2]:
# !pip install scikit-surprise

In [3]:
import kfp

from kfp.v2 import dsl
from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import InputPath


from kfp.v2.dsl import Output
from kfp.v2.dsl import Metrics

from kfp.v2 import compiler

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component,
                        Markdown)

  from kfp.v2 import dsl


## Kubeflow Pipeline (KFP) Definition

##### Kubeflow is an open-source platform designed to make it easy to deploy, manage, and scale machine learning models on Kubernetes.

##### Component-based Architecture: Kubeflow is built using a modular architecture, allowing users to pick and choose components based on their requirements.

##### [Click here to see how to construct a Kubeflow Pipeline:](https://www.youtube.com/watch?v=gtVHw5YCRhE&ab_channel=MLEngineer)

## Kubeflow Pyspark Component VS Big Query

##### Spark allow us to directly read csv from bucket but Big Query doesn't support his function
##### Big Query is easier to config from the UI

##### Spark has a series of Libs for scalability(SparkML ect...)
##### Pandas and sk learn may not avoidable by using Big Query -> ("Upload file to big query", No similar thing like Spark ML)

#### Need to redesign the pipeline trigger flow 
##### With the same design(triiger by file upload) 1. upload file -> 2.write data to bigquery -> 3. excute training pipline 
##### The new way to triiger: when new data entery comes into Big Query Table 
##### [Click here to see how to trigger cloud function with Big Query:](https://medium.com/@sidspwc/trigger-event-processing-from-big-query-using-gcp-eventarc-and-cloud-functions-c1c420b3e0df)

##### [Click here to see how to construct a Pyspark Servless component:](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)

### Get Data Component

In [4]:
@component(
    base_image="python:3.9",
    packages_to_install=["numpy","pandas","fsspec","gcsfs","pyspark"]
)
def get_data(
    dataset: Output[Dataset],
):
#     import numpy as np
#     import pandas as pd
    
#     raw_interaction = pd.read_csv("gs://raw-interaction-datasets/RAW_interactions.csv")
#     data = raw_interaction[['user_id','recipe_id','rating']]
    
#     # Cutting logic here ################################################################
#     down_size = 10000
#     data = data.head(down_size)

    from pyspark.sql import SparkSession
    
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("Read Data from GCS") \
        .getOrCreate()
    
    gcs_path = "gs://raw-interaction-datasets/RAW_interactions.csv"
    raw_interaction = spark.read.csv(gcs_path, header=True, inferSchema=True)
    data = raw_interaction[['user_id','recipe_id','rating']]
    
    print("Dataset path >>>>>>" + dataset.path)
    # data.to_csv(dataset.path, index=False)
    data.write.csv(dataset.path, header=True, mode="overwrite")
    

### Feature Engineering Component

### Training Component

In [5]:
@component(
    base_image="python:3.9", 
    packages_to_install=["numpy","pandas","scikit-surprise","fsspec","gcsfs","google-cloud-storage"]
)
def training_model(
    dataset: Input[Dataset],
    training_status: Output[bool]
):
    import pickle
    import pandas as pd
    from google.cloud import storage

    import surprise
    from surprise import KNNBasic
    from surprise import KNNWithMeans
    from surprise import Dataset
    from surprise import accuracy
    from surprise import Reader
    from surprise.model_selection import train_test_split
    from surprise.model_selection import cross_validate
    
    print("Dataset path >>>>>>" + dataset.path)
    ratings_df = pd.read_csv(dataset.path)
    
    reader = Reader(rating_scale=(1,5))
    data = Dataset.load_from_df(ratings_df, reader)  # assumes dataframe contains: user, item, ratings (in this order)
    
    # training
    trainset, testset  = train_test_split(data, test_size=0.1)  # select 10% of rating events
    algo = KNNBasic(k=40,sim_options={'name': 'msd', 'user_based': True }) # User-based CF
    # algo = KNNBasic(k=40, sim_options={'name': 'cosine', 'user_based': True}) # ZeroDivisionError: float division WHY !!! 
    # algo = KNNBasic(k=40, sim_options={'name': 'pearson', 'user_based': True})
    # algo = KNNBasic(k=40, sim_options={'name': 'pearson_baseline', 'user_based': True})
    algo.fit(trainset)
    
    # Dump model pickle file
    model_bytes = pickle.dumps(algo)
    
    # Upload the model pickle file to GCS
    bucket_name = "brave-watch-414204"
    pickle_file_name = "knn_model.pkl"
    folder_path = "Model"
    destination_blob_name = f'{folder_path}/{pickle_file_name}'
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(model_bytes)
    training_status = True

### Construct Pipeline

In [6]:
@dsl.pipeline(name="training_pipeline",
          pipeline_root="gs://brave-watch-414204" + "training_pipeline")
def basic_pipeline():
    data_processing = get_data()
    training = training_model(dataset = data_processing.outputs['dataset'])

In [7]:
compiler.Compiler().compile(
    pipeline_func=basic_pipeline, 
    package_path="training_pipeline.yaml")

In [8]:
job = pipeline_jobs.PipelineJob(
    display_name="training_pipeline",
    template_path="training_pipeline.yaml",
    enable_caching=False,
    location="asia-southeast1"
)

In [9]:
job.run(sync=False)

Creating PipelineJob
PipelineJob created. Resource name: projects/564706877122/locations/asia-southeast1/pipelineJobs/training-pipeline-20240308030217
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/564706877122/locations/asia-southeast1/pipelineJobs/training-pipeline-20240308030217')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast1/pipelines/runs/training-pipeline-20240308030217?project=564706877122
PipelineJob projects/564706877122/locations/asia-southeast1/pipelineJobs/training-pipeline-20240308030217 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/564706877122/locations/asia-southeast1/pipelineJobs/training-pipeline-20240308030217 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/564706877122/locations/asia-southeast1/pipelineJobs/training-pipeline-20240308030217 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/564706877122/l