In [1]:
%%capture
!pip install -r requirements-components-test.txt

In [2]:
import kfp
from kfp.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics, ClassificationMetrics, Artifact
import os
from dotenv import load_dotenv

# Load environment variables from env file
load_dotenv('env')

# Github variables
GITHUB_USERNAME = os.getenv("GITHUB_USERNAME")
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
GITHUB_REPO_URL = "https://github.com/danilonicioka/mlops-workflow.git"
GITHUB_CLONED_DIR = "mlops-workflow"
GITHUB_DVC_BRANCH = "dvc"
GITHUB_MAIN_BRANCH = "main"

# Kubeflow variables
KUBEFLOW_PIPELINE_NAME = "mlops"
KUBEFLOW_HOST_URL = "http://ml-pipeline.kubeflow:8888"  # KFP host URL
KUBEFLOW_PIPELINE_ID="7451916e-eee8-4c14-ad5f-8dee5aa61e3b"
with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
    KUBEFLOW_TOKEN = f.read()

# DVC variables
DVC_REMOTE_DB = "minio_remote"
DVC_REMOTE_DB_URL = "s3://dvc-data"
DVC_FILE_DIR = 'data/external'
DVC_FILE_NAME = 'dataset.csv'

# MinIO variables
MINIO_URL = "minio-service.kubeflow:9000"
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
MINIO_MODEL_BUCKET_NAME = "model-files"
MINIO_MODEL_OBJECT_NAME = "model-store/youtubegoes5g/model.pt"

# Triggers variables
TRIGGER_TYPE = '1'
PERFORMANCE_FACTOR = 0.05
# Temp dir and files to save accuracy for trigger 3
TEMP_DIR = "tmp"
TEMP_FILE_ACC_IN_LAST_RUN = "accuracy_in_last_run.txt"
LAST_ACC_OBJECT_NAME = "accuracy-score/last_acc.txt"

# Model variables
MODEL_LR = 0.0001
MODEL_EPOCHS = 3500
MODEL_PRINT_FREQUENCY_PER_N_EPOCHS = 500
MODEL_NAME = "youtubegoes5g"

# Kserve variables
#MODEL_FRAMEWORK = "pytorch"
KSERVE_NAMESPACE = "kubeflow-user-example-com"
KSERVE_SVC_ACC = "sa-minio-kserve"
#MODEL_URI = "pvc://model-store-claim"
#MODEL_URI = "minio-service.kubeflow:9000/model-files"

# Model archiver gen vars
MODEL_STORE_POD_NAME = "model-store-pod"
MODEL_STORE_POD_CONTAINER_NAME = "model-store"
MAR_POD_NAME = "margen-pod"
MAR_POD_CONTAINER_NAME = "margen-container"
MAR_OBJECT_NAME = "model-store/youtubegoes5g.mar"
K8S_API_TOKEN = os.getenv("K8S_API_TOKEN")

# Model Serving

In [3]:
@component(base_image="python:3.11.9", packages_to_install=['kserve==0.13.0','kubernetes==30.1.0'])
def model_serving(
    # mar_gen_cond: Input[Artifact],
    # cond_info: Output[Metrics],
    bucket_name: str,
    model_name: str,
    kserve_namespace: str,
    kserve_svc_acc: str
):
    # Create kserve instance
    from kubernetes import client 
    from kserve import KServeClient, constants, V1beta1InferenceService, V1beta1InferenceServiceSpec, V1beta1PredictorSpec, V1beta1TorchServeSpec
    from datetime import datetime
    import time
    
    # # exec if a new model was uploaded
    # with open(mar_gen_cond.path) as f:
    #     up_model = f.read()
        
    # # if up_model == '1':
    # cond_info.log_metric("Inference Service", "Created/Updated")

    #Inference server config
    now = datetime.now()
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    # with open(model_uri.path) as f:
    #     uri = f.read()
    uri = f's3://{bucket_name}'

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=model_name, namespace=kserve_namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name=kserve_svc_acc,
                                       pytorch=(V1beta1TorchServeSpec(
                                           storage_uri=uri))))
    )

    KServe = KServeClient()

    #replace old inference service with a new one
    try:
        KServe.delete(name=model_name, namespace=kserve_namespace)
        print("Old model deleted")
    except:
        print("Couldn't delete old model")
    time.sleep(10)

    KServe.create(isvc)
    # else:
    #     cond_info.log_metric("Inference Service", "Not Created/Updated")
        
@pipeline
def my_pipeline(
    github_repo_url: str,
    github_cloned_dir: str,
    github_dvc_branch: str,
    github_main_branch: str,
    github_username: str,
    github_token: str,
    dvc_remote_name: str,
    dvc_remote_db_url: str,
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    dvc_file_dir: str,
    dvc_file_name: str,
    model_name: str,
    kserve_namespace: str,
    model_lr: float,
    model_epochs: int,
    model_print_frequency_per_n_epochs: int,
    bucket_name: str,
    minio_model_object_name: str,
    kserve_svc_acc: str,
    trigger_type: str,
    performance_factor: float,
    last_accuracy_object_name: str,
    tmp_dir: str,
    tmp_file_last_acc: str,
    k8s_api_token: str
):
    model_serving_task = model_serving(bucket_name=bucket_name,
                                       model_name=model_name, 
                                       kserve_namespace=kserve_namespace,
                                       kserve_svc_acc=kserve_svc_acc)
    model_serving_task.set_caching_options(False)

# Compile the pipeline
pipeline_filename = f"{KUBEFLOW_PIPELINE_NAME}.yaml"
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path=pipeline_filename)

# Submit the pipeline to the KFP cluster
client = kfp.Client(
    host=KUBEFLOW_HOST_URL,
    existing_token=KUBEFLOW_TOKEN)  

client.create_run_from_pipeline_func(
    my_pipeline,
    enable_caching=False,
    arguments={
        'github_repo_url': GITHUB_REPO_URL,
        'github_cloned_dir': GITHUB_CLONED_DIR,
        'github_dvc_branch': GITHUB_DVC_BRANCH,
        'github_main_branch': GITHUB_MAIN_BRANCH,
        'github_username': GITHUB_USERNAME,
        'github_token': GITHUB_TOKEN,
        'dvc_remote_name': DVC_REMOTE_DB,
        'dvc_remote_db_url': DVC_REMOTE_DB_URL,
        'minio_url': MINIO_URL,
        'minio_access_key': MINIO_ACCESS_KEY,
        'minio_secret_key': MINIO_SECRET_KEY,
        'dvc_file_dir': DVC_FILE_DIR,
        'dvc_file_name': DVC_FILE_NAME,
        'model_name': MODEL_NAME,
        'kserve_namespace': KSERVE_NAMESPACE,
        'model_lr': MODEL_LR,
        'model_epochs': MODEL_EPOCHS,
        'model_print_frequency_per_n_epochs': MODEL_PRINT_FREQUENCY_PER_N_EPOCHS,
        'bucket_name': MINIO_MODEL_BUCKET_NAME,
        'minio_model_object_name': MINIO_MODEL_OBJECT_NAME,
        'kserve_svc_acc': KSERVE_SVC_ACC,
        'trigger_type': TRIGGER_TYPE,
        'performance_factor': PERFORMANCE_FACTOR,
        'last_accuracy_object_name': LAST_ACC_OBJECT_NAME,
        'tmp_dir': TEMP_DIR,
        'tmp_file_last_acc': TEMP_FILE_ACC_IN_LAST_RUN,
        'k8s_api_token': K8S_API_TOKEN
    })



RunPipelineResult(run_id=9280bf87-b0cb-4082-9720-7db3c2319000)

# Mar gen Component

In [22]:
def mar_gen(
    github_repo_url: str,
    github_cloned_dir: str,
    github_main_branch: str,
    github_username: str,
    github_token: str,
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    kserve_namespace: str,
    bucket_name: str,
    k8s_api_token: str
):
    from kubernetes import client, config, utils, watch
    import time
    from kubernetes.client import Configuration
    from kubernetes.client.api import core_v1_api
    from kubernetes.client.rest import ApiException
    from kubernetes.stream import stream
    import io
    import tarfile
    import pathlib
    import os
    from minio import Minio
    from minio.error import S3Error
    from git import Repo
    from subprocess import run
    ## aux functions

    # clone repo to get custom files for model
    def clone_repository_with_token(github_repo_url, 
                                    github_cloned_dir, 
                                    github_branch, 
                                    github_username, 
                                    github_token):
        """Clone a Git repository using a GitHub token in the URL and specifying the branch."""
        try:
            # Construct the URL with the GitHub username and token
            url_with_token = f"https://{github_username}:{github_token}@{github_repo_url.split('//')[1]}"

            # Clone the repository from the specified branch
            repo = Repo.clone_from(url_with_token, github_cloned_dir, branch=github_branch)
            return "Repository cloned successfully"
        except Exception as e:
            return f"Error occurred during repository cloning: {e}"

    # to wait some time while pod isn't running yet
    def wait_pod(core_v1, namespace, label, pod_name, time_in_sec):
        w = watch.Watch()
        for event in w.stream(func=core_v1.list_namespaced_pod,
                                  namespace=namespace,
                                  label_selector=label,
                                  timeout_seconds=time_in_sec):
            if event["object"].status.phase == "Running":
                w.stop()
                end_time = time.time()
                print(f"{pod_name} running ")
                return
            # event.type: ADDED, MODIFIED, DELETED
            if event["type"] == "DELETED":
                # Pod was deleted while we were waiting for it to start.
                print(f"{pod_name} deleted before it started")
                w.stop()
                return

    # kubectl exec
    def exec_commands(api_instance, namespace, pod_name, pod_container_name, command):
        name = pod_name
        resp = None
        try:
            resp = api_instance.read_namespaced_pod(name=name,
                                                    namespace=namespace)
        except ApiException as e:
            if e.status != 404:
                print(f"Unknown error: {e}")
                exit(1)

        if not resp:
            print(f"Pod {name} does not exist.")

        # Calling exec and waiting for response
        exec_command = [
            '/bin/sh',
            '-c',
            command]
        # When calling a pod with multiple containers running the target container
        # has to be specified with a keyword argument container=<name>.
        resp = stream(api_instance.connect_get_namespaced_pod_exec,
              name=pod_name,
              container=pod_container_name,
              namespace=namespace,
              command=exec_command,
              stderr=True, stdin=False,
              stdout=True, tty=False)
        print("Response: " + resp)

    # Copy from or to pod
    def copy_to_tar(source_path, dest_path, tar):
        """
        Adds a file or directory to a tar archive.

        Parameters:
        - source_path: Path to the source file or directory on the local machine.
        - dest_path: Destination directory inside the pod where files should be copied.
        - tar: The tarfile object to which files and directories will be added.
        """
        source_path = pathlib.Path(source_path)

        if source_path.is_file():
            # If it's a file, add to the tarfile with the destination path
            tar.add(source_path, arcname=pathlib.Path(dest_path).joinpath(source_path.name))
        elif source_path.is_dir():
            # If it's a directory, recursively add all its content
            for root, dirs, files in os.walk(source_path):
                root_path = pathlib.Path(root)
                # Compute the relative path within the tar and add to destination path
                for file in files:
                    file_path = root_path / file
                    tar.add(file_path, arcname=pathlib.Path(dest_path).joinpath(file_path.relative_to(source_path)))

    def extract_tar_to_local(tar_stream, dest_path):
        """
        Extracts a tar archive stream to a local directory.

        Parameters:
        - tar_stream: Tar stream from the pod.
        - dest_path: Local directory where the files will be extracted.
        """
        with tarfile.open(fileobj=tar_stream, mode='r:') as tar:
            tar.extractall(path=dest_path)

    def copy_file_or_dir(api_instance, namespace, pod_name, pod_container_name, source_path, dest_path, to_pod=True):
        """
        Copies a file or directory between a Kubernetes pod and the local machine.

        Parameters:
        - api_instance: Kubernetes API client instance.
        - namespace: Namespace of the pod.
        - pod_name: Name of the pod.
        - pod_container_name: Name of the container within the pod.
        - source_path: Path to the source file or directory (local or in the pod).
        - dest_path: Destination directory (local or in the pod).
        - to_pod: If True, copy from local to pod; if False, copy from pod to local.
        """
        try:
            if to_pod:
                # Copying from local to pod
                buf = io.BytesIO()
                with tarfile.open(fileobj=buf, mode='w:tar') as tar:
                    copy_to_tar(source_path, dest_path, tar)

                buf.seek(0)  # Reset buffer position after writing tar

                exec_command = ['tar', 'xvf', '-', '-C', '/']
                resp = stream(api_instance.connect_get_namespaced_pod_exec,
                              pod_name,
                              namespace,
                              container=pod_container_name,
                              command=exec_command,
                              stderr=True, stdin=True, stdout=True, tty=False,
                              _preload_content=False)

                # Send tar file to pod
                while resp.is_open():
                    resp.update(timeout=1)
                    if resp.peek_stdout():
                        print(f"STDOUT: {resp.read_stdout()}")
                    if resp.peek_stderr():
                        print(f"STDERR: {resp.read_stderr()}")
                    if buf.getvalue():
                        resp.write_stdin(buf.read())  # Write tar data to pod
                    else:
                        resp.write_stdin('\n')  # Signal end of input
                        break
                resp.close()

            else:
                # Copying from pod to local
                exec_command = ['tar', 'cvf', '-', source_path]
                resp = stream(api_instance.connect_get_namespaced_pod_exec,
                              pod_name,
                              namespace,
                              container=pod_container_name,
                              command=exec_command,
                              stderr=True, stdin=False, stdout=True, tty=False,
                              _preload_content=False)

                tar_stream = io.BytesIO()
                while resp.is_open():
                    resp.update(timeout=1)
                    if resp.peek_stdout():
                        tar_stream.write(resp.read_stdout().encode('utf-8'))  # Write stdout (tar) to stream
                    if resp.peek_stderr():
                        print(f"STDERR: {resp.read_stderr()}")

                tar_stream.seek(0)  # Reset stream position for extraction
                extract_tar_to_local(tar_stream, dest_path)
                resp.close()

        except Exception as e:
            print(f"Error copying file or directory: {e}")

    # init minio client
    def minio_setup(minio_url, minio_access_key, minio_secret_key):
        # Initialize Minio client with just the base URL (without path)
        client = Minio(
            minio_url,  # Ensure minio_url does not include a path, only the base URL (e.g., http://localhost:9000)
            access_key=minio_access_key,
            secret_key=minio_secret_key,
            secure=False  # Minio is using HTTP on localhost:9000
        )
        return client

    # upload file to minio using a client
    def upload_file(client, bucket_name, object_name, filepath):
        # Create the bucket if it does not exist
        if not client.bucket_exists(bucket_name):
            client.make_bucket(bucket_name)
            create_bucket_result = f"Successfully created bucket: {bucket_name}"
        else:
            create_bucket_result = f"Bucket {bucket_name} already exists"

        try:
            # Upload the file to the specified path in the bucket
            client.fput_object(bucket_name, object_name, filepath)
            return (f'Successfully uploaded {filepath} to {bucket_name}/{object_name}')

        except Exception as e:
            # Log and raise any upload errors
            raise Exception(f'Failed to upload model to Minio: {e}')

    # clone the repo
    clone_result = clone_repository_with_token(github_repo_url, 
                                               github_cloned_dir, 
                                               github_main_branch, 
                                               github_username, 
                                               github_token)

    ## init access to cluster when outside or inside the cluster
    # config.load_kube_config("kubeconfig")
    config.load_incluster_config()
    k8s_client = client.ApiClient()

    # apply model-store manifests 
    model_store_yaml_dir = f'{github_cloned_dir}/model-archiver/model-store-manifests/'
    try:
        pv_result = utils.create_from_directory(k8s_client, model_store_yaml_dir, verbose=True)
    except Exception as e:
        print(e)

    # init config to exec commands in pods
    try:
        c = Configuration().get_default_copy()
    except AttributeError:
        c = Configuration()
        c.assert_hostname = False
    Configuration.set_default(c)
    core_v1 = core_v1_api.CoreV1Api()

    # Create folders for model-store, config and scripts in PV
    model_store_pod_name = "model-store-pod"
    model_store_pod_container_name = "model-store"
    model_store_pod_label = "service.istio.io/canonical-name=model-store-pod"

    # Wait for pods to run before exec
    wait_pod(core_v1, kserve_namespace, model_store_pod_label, model_store_pod_name, 120)

    mkdir_ms_command = "mkdir -p /pv/model-store/youtubegoes5g/"
    mkdir_ms_result = exec_commands(core_v1, 
                                    kserve_namespace, 
                                    model_store_pod_name, 
                                    model_store_pod_container_name, 
                                    mkdir_ms_command)
    mkdir_conf_command = "mkdir /pv/config/"
    mkdir_conf_result = exec_commands(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      mkdir_conf_command)
    mkdir_scripts_command = "mkdir /pv/scripts/"
    mkdir_scripts_result = exec_commands(core_v1, 
                                         kserve_namespace, 
                                         model_store_pod_name, 
                                         model_store_pod_container_name, 
                                         mkdir_scripts_command)

    # download pt file from minio
    pt_object_name = 'model-store/youtubegoes5g/model.pt'
    pt_local_file_path = 'model.pt'

    # Initialize the MinIO client
    client = minio_setup(minio_url, minio_access_key, minio_secret_key)

    try:
        # Download the file from MinIO
        client.fget_object(bucket_name, pt_object_name, pt_local_file_path)
        print(f"File {pt_object_name} downloaded successfully to {pt_local_file_path}.")
    except S3Error as exc:
        print(f"Error occurred: {exc}")

    # cp pt file to model store pod
    model_pt_source_path = 'model.pt'
    model_store_dest_path = "/pv/model-store/youtubegoes5g/"

    cp_pt_result = copy_file_or_dir(core_v1, 
                                    kserve_namespace, 
                                    model_store_pod_name, 
                                    model_store_pod_container_name, 
                                    model_pt_source_path, 
                                    model_store_dest_path, 
                                    to_pod=True)

    # cp handler file to model store pod
    model_handler_source_path = f'{github_cloned_dir}/model-archiver/model-store/youtubegoes5g/custom_handler.py'

    cp_handler_result = copy_file_or_dir(core_v1, 
                                         kserve_namespace, 
                                         model_store_pod_name, 
                                         model_store_pod_container_name, 
                                         model_handler_source_path, 
                                         model_store_dest_path, 
                                         to_pod=True)

    # cp model.py file to model store pod
    model_py_source_path =  f'{github_cloned_dir}/model-archiver/model-store/youtubegoes5g/model.py'

    cp_py_result = copy_file_or_dir(core_v1, 
                                    kserve_namespace, 
                                    model_store_pod_name, 
                                    model_store_pod_container_name, 
                                    model_py_source_path, 
                                    model_store_dest_path, 
                                    to_pod=True)

    # cp properties.json file to model store pod
    prop_source_path = f'{github_cloned_dir}/model-archiver/model-store/properties.json'
    model_prop_dest_path = "/pv/model-store/"

    cp_prop_result = copy_file_or_dir(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      prop_source_path, 
                                      model_prop_dest_path, 
                                      to_pod=True)

    # cp config.properties file to model store pod
    config_source_path = f'{github_cloned_dir}/model-archiver/config/config.properties'
#         config_dest_path = "/pv/config/"

#         cp_conf_result = copy_file_or_dir(core_v1, 
#                                           kserve_namespace, 
#                                           model_store_pod_name, 
#                                           model_store_pod_container_name, 
#                                           config_source_path, 
#                                           config_dest_path, 
#                                           to_pod=True)

    # cp margen script file to model store pod
    scripts_source_path = f'{github_cloned_dir}/model-archiver/scripts/margen.sh'
    scripts_dest_path = "/pv/scripts/"

    cp_scripts_result = copy_file_or_dir(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      scripts_source_path, 
                                      scripts_dest_path, 
                                      to_pod=True)

    # Delete model_store_pod
    try:
        api_response = core_v1.delete_namespaced_pod(model_store_pod_name, kserve_namespace)
        print(api_response)
    except ApiException as e:
        print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e)

    # Upload config.properties to minio for IS
    config_object_name = "config/config.properties"
    # up_config_result = upload_file(client, bucket_name, config_object_name, config_source_path)

    # Create model archiver pod
    mar_yaml_dir = f'{github_cloned_dir}/model-archiver/manifests/'
    mar_pod_name = "margen-pod"
    mar_pod_container_name = "margen-container"
    mar_pod_label = "service.istio.io/canonical-name=margen-pod"

    try:
        margen_result = utils.create_from_directory(k8s_client, mar_yaml_dir, verbose=True)
    except Exception as e:
        print(e)

    # Wait for pods to run before exec
    wait_pod(core_v1, kserve_namespace, mar_pod_label, mar_pod_name, 120)

    # Exec mar gen in a script
    mar_gen_command = "bash scripts/margen.sh"
    mar_gen_result = exec_commands(core_v1, 
                                   kserve_namespace, 
                                   mar_pod_name, 
                                   mar_pod_container_name, 
                                   mar_gen_command)

#     # Copy mar file to local
#     mar_source_path = "youtubegoes5g.mar"
#     mar_dest_path = "./"

#     mar_cp_result = copy_file_or_dir(core_v1, 
#                                      kserve_namespace, 
#                                      mar_pod_name, 
#                                      mar_pod_container_name, 
#                                      mar_source_path, 
#                                      mar_dest_path, 
#                                      to_pod=False)

#     # Upload mar file to minio
#     mar_object_name = "model-store/youtubegoes5g.mar"
#     mar_filepath = './youtubegoes5g.mar'

#     # up_mar_result = upload_file(client, bucket_name, mar_object_name, mar_filepath)

#     # Delete margen pod
#     try:
#         api_response = core_v1.delete_namespaced_pod(mar_pod_name, kserve_namespace)
#         print(api_response)
#     except ApiException as e:
#         print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e)
        
mar_gen(GITHUB_REPO_URL,
        GITHUB_CLONED_DIR,
        GITHUB_MAIN_BRANCH,
        GITHUB_USERNAME,
        GITHUB_TOKEN,
        MINIO_URL,
        MINIO_ACCESS_KEY,
        MINIO_SECRET_KEY,
        KSERVE_NAMESPACE,
        MINIO_MODEL_BUCKET_NAME,
        K8S_API_TOKEN)

pod created. status='{'conditions': None,
 'container_statuses': None,
 'ephemeral_container_statuses': None,
 'host_ip': None,
 'init_container_statuses': None,
 'message': None,
 'nominated_node_name': None,
 'phase': 'Pending',
 'pod_i_ps': None,
 'pod_ip': None,
 'qos_class': 'Burstable',
 'reason': None,
 'start_time': None}'
Error from server (Conflict): {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"persistentvolumeclaims \"model-pv-claim\" already exists","reason":"AlreadyExists","details":{"name":"model-pv-claim","kind":"persistentvolumeclaims"},"code":409}

model-store-pod running 
Response: 
Response: mkdir: cannot create directory '/pv/config/': File exists

Response: mkdir: cannot create directory '/pv/scripts/': File exists

File model-store/youtubegoes5g/model.pt downloaded successfully to model.pt.
STDOUT: pv/model-store/youtubegoes5g/model.pt

STDOUT: pv/model-store/youtubegoes5g/custom_handler.py

STDOUT: pv/model-store/youtubegoes5g/mo

In [133]:
@component(base_image="python:3.11.9", packages_to_install=['gitpython', 'kubernetes==30.1.0','Minio==7.2.5'])
def mar_gen(
    github_repo_url: str,
    github_cloned_dir: str,
    github_main_branch: str,
    github_username: str,
    github_token: str,
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    kserve_namespace: str,
    bucket_name: str,
    k8s_api_token: str
):
    from kubernetes import client, config, utils, watch
    import time
    from kubernetes.client import Configuration
    from kubernetes.client.api import core_v1_api
    from kubernetes.client.rest import ApiException
    from kubernetes.stream import stream
    import io
    import tarfile
    import pathlib
    import os
    from minio import Minio
    from minio.error import S3Error
    from git import Repo
    from subprocess import run

    def clone_repository_with_token(github_repo_url, 
                                    github_cloned_dir, 
                                    github_branch, 
                                    github_username, 
                                    github_token):
        """Clone a Git repository using a GitHub token in the URL and specifying the branch."""
        try:
            # Construct the URL with the GitHub username and token
            url_with_token = f"https://{github_username}:{github_token}@{github_repo_url.split('//')[1]}"

            # Clone the repository from the specified branch
            repo = Repo.clone_from(url_with_token, github_cloned_dir, branch=github_branch)
            return "Repository cloned successfully"
        except Exception as e:
            return f"Error occurred during repository cloning: {e}"

    # github_repo_url = GITHUB_REPO_URL
    # github_cloned_dir = GITHUB_CLONED_DIR
    # github_main_branch = GITHUB_MAIN_BRANCH
    # github_username = GITHUB_USERNAME
    # github_token = GITHUB_TOKEN

    clone_result = clone_repository_with_token(github_repo_url, 
                                               github_cloned_dir, 
                                               github_main_branch, 
                                               github_username, 
                                               github_token)

    # config.load_kube_config("kubeconfig")
    config.load_incluster_config()
    k8s_client = client.ApiClient()
    model_store_yaml_dir = f'{github_cloned_dir}/model-archiver/model-store-manifests/'

    try:
        pv_result = utils.create_from_directory(k8s_client, model_store_yaml_dir, verbose=True)
    except Exception as e:
        print(e)

    def wait_pod(core_v1, namespace, label, pod_name, time_in_sec):
        w = watch.Watch()
        for event in w.stream(func=core_v1.list_namespaced_pod,
                                  namespace=namespace,
                                  label_selector=label,
                                  timeout_seconds=time_in_sec):
            if event["object"].status.phase == "Running":
                w.stop()
                end_time = time.time()
                print(f"{pod_name} running ")
                return
            # event.type: ADDED, MODIFIED, DELETED
            if event["type"] == "DELETED":
                # Pod was deleted while we were waiting for it to start.
                print(f"{pod_name} deleted before it started")
                w.stop()
                return

    def exec_commands(api_instance, namespace, pod_name, pod_container_name, command):
        name = pod_name
        resp = None
        try:
            resp = api_instance.read_namespaced_pod(name=name,
                                                    namespace=namespace)
        except ApiException as e:
            if e.status != 404:
                print(f"Unknown error: {e}")
                exit(1)

        if not resp:
            print(f"Pod {name} does not exist.")

        # Calling exec and waiting for response
        exec_command = [
            '/bin/sh',
            '-c',
            command]
        # When calling a pod with multiple containers running the target container
        # has to be specified with a keyword argument container=<name>.
        resp = stream(api_instance.connect_get_namespaced_pod_exec,
              name=pod_name,
              container=pod_container_name,
              namespace=namespace,
              command=exec_command,
              stderr=True, stdin=False,
              stdout=True, tty=False)
        print("Response: " + resp)

    try:
        c = Configuration().get_default_copy()
    except AttributeError:
        c = Configuration()
        c.assert_hostname = False
    Configuration.set_default(c)
    core_v1 = core_v1_api.CoreV1Api()

    # Create folders for model-store and config in PV
    # kserve_namespace = KSERVE_NAMESPACE
    # model_store_pod_name = MODEL_STORE_POD_NAME
    # model_store_pod_container_name = MODEL_STORE_POD_CONTAINER_NAME
    model_store_pod_name = "model-store-pod"
    model_store_pod_container_name = "model-store"
    model_store_pod_label = "service.istio.io/canonical-name=model-store-pod"

    # kserve_namespace = KSERVE_NAMESPACE

    # Wait for pods to run before exec
    wait_pod(core_v1, kserve_namespace, model_store_pod_label, model_store_pod_name, 120)

    mkdir_ms_command = "mkdir -p /pv/model-store/youtubegoes5g/"
    mkdir_ms_result = exec_commands(core_v1, 
                                    kserve_namespace, 
                                    model_store_pod_name, 
                                    model_store_pod_container_name, 
                                    mkdir_ms_command)
    mkdir_conf_command = "mkdir /pv/config/"
    mkdir_conf_result = exec_commands(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      mkdir_conf_command)
    mkdir_scripts_command = "mkdir /pv/scripts/"
    mkdir_scripts_result = exec_commands(core_v1, 
                                         kserve_namespace, 
                                         model_store_pod_name, 
                                         model_store_pod_container_name, 
                                         mkdir_scripts_command)

    # Copy from or to pod
    def copy_to_tar(source_path, dest_path, tar):
        """
        Adds a file or directory to a tar archive.

        Parameters:
        - source_path: Path to the source file or directory on the local machine.
        - dest_path: Destination directory inside the pod where files should be copied.
        - tar: The tarfile object to which files and directories will be added.
        """
        source_path = pathlib.Path(source_path)

        if source_path.is_file():
            # If it's a file, add to the tarfile with the destination path
            tar.add(source_path, arcname=pathlib.Path(dest_path).joinpath(source_path.name))
        elif source_path.is_dir():
            # If it's a directory, recursively add all its content
            for root, dirs, files in os.walk(source_path):
                root_path = pathlib.Path(root)
                # Compute the relative path within the tar and add to destination path
                for file in files:
                    file_path = root_path / file
                    tar.add(file_path, arcname=pathlib.Path(dest_path).joinpath(file_path.relative_to(source_path)))

    def extract_tar_to_local(tar_stream, dest_path):
        """
        Extracts a tar archive stream to a local directory.

        Parameters:
        - tar_stream: Tar stream from the pod.
        - dest_path: Local directory where the files will be extracted.
        """
        with tarfile.open(fileobj=tar_stream, mode='r:') as tar:
            tar.extractall(path=dest_path)

    def copy_file_or_dir(api_instance, namespace, pod_name, pod_container_name, source_path, dest_path, to_pod=True):
        """
        Copies a file or directory between a Kubernetes pod and the local machine.

        Parameters:
        - api_instance: Kubernetes API client instance.
        - namespace: Namespace of the pod.
        - pod_name: Name of the pod.
        - pod_container_name: Name of the container within the pod.
        - source_path: Path to the source file or directory (local or in the pod).
        - dest_path: Destination directory (local or in the pod).
        - to_pod: If True, copy from local to pod; if False, copy from pod to local.
        """
        try:
            if to_pod:
                # Copying from local to pod
                buf = io.BytesIO()
                with tarfile.open(fileobj=buf, mode='w:tar') as tar:
                    copy_to_tar(source_path, dest_path, tar)

                buf.seek(0)  # Reset buffer position after writing tar

                exec_command = ['tar', 'xvf', '-', '-C', '/']
                resp = stream(api_instance.connect_get_namespaced_pod_exec,
                              pod_name,
                              namespace,
                              container=pod_container_name,
                              command=exec_command,
                              stderr=True, stdin=True, stdout=True, tty=False,
                              _preload_content=False)

                # Send tar file to pod
                while resp.is_open():
                    resp.update(timeout=1)
                    if resp.peek_stdout():
                        print(f"STDOUT: {resp.read_stdout()}")
                    if resp.peek_stderr():
                        print(f"STDERR: {resp.read_stderr()}")
                    if buf.getvalue():
                        resp.write_stdin(buf.read())  # Write tar data to pod
                    else:
                        resp.write_stdin('\n')  # Signal end of input
                        break
                resp.close()

            else:
                # Copying from pod to local
                exec_command = ['tar', 'cvf', '-', source_path]
                resp = stream(api_instance.connect_get_namespaced_pod_exec,
                              pod_name,
                              namespace,
                              container=pod_container_name,
                              command=exec_command,
                              stderr=True, stdin=False, stdout=True, tty=False,
                              _preload_content=False)

                tar_stream = io.BytesIO()
                while resp.is_open():
                    resp.update(timeout=1)
                    if resp.peek_stdout():
                        tar_stream.write(resp.read_stdout().encode('utf-8'))  # Write stdout (tar) to stream
                    if resp.peek_stderr():
                        print(f"STDERR: {resp.read_stderr()}")

                tar_stream.seek(0)  # Reset stream position for extraction
                extract_tar_to_local(tar_stream, dest_path)
                resp.close()

        except Exception as e:
            print(f"Error copying file or directory: {e}")

    def minio_setup(minio_url, minio_access_key, minio_secret_key):
        # Initialize Minio client with just the base URL (without path)
        client = Minio(
            minio_url,  # Ensure minio_url does not include a path, only the base URL (e.g., http://localhost:9000)
            access_key=minio_access_key,
            secret_key=minio_secret_key,
            secure=False  # Minio is using HTTP on localhost:9000
        )
        return client

    # minio_url = MINIO_URL
    # minio_access_key = MINIO_ACCESS_KEY
    # minio_secret_key = MINIO_SECRET_KEY
    # bucket_name = MINIO_MODEL_BUCKET_NAME
    pt_object_name = 'model-store/youtubegoes5g/model.pt'
    model_object_name = 'model-store/youtubegoes5g/model.py'
    pt_local_file_path = 'model.pt'
    model_local_file_path = 'model.py'

    # Initialize the MinIO client
    client = minio_setup(minio_url, minio_access_key, minio_secret_key)

    try:
        # Download the file from MinIO
        client.fget_object(bucket_name, pt_object_name, pt_local_file_path)
        print(f"File {pt_object_name} downloaded successfully to {pt_local_file_path}.")
    except S3Error as exc:
        print(f"Error occurred: {exc}")

    try:
        # Download the file from MinIO
        client.fget_object(bucket_name, model_object_name, model_local_file_path)
        print(f"File {model_object_name} downloaded successfully to {model_local_file_path}.")
    except S3Error as exc:
        print(f"Error occurred: {exc}")

    model_pt_source_path = 'model.pt'
    model_store_dest_path = "/pv/model-store/youtubegoes5g/"

    cp_pt_result = copy_file_or_dir(core_v1, 
                                    kserve_namespace, 
                                    model_store_pod_name, 
                                    model_store_pod_container_name, 
                                    model_pt_source_path, 
                                    model_store_dest_path, 
                                    to_pod=True)

    model_py_source_path = 'model.py'

    cp_py_result = copy_file_or_dir(core_v1, 
                                    kserve_namespace, 
                                    model_store_pod_name, 
                                    model_store_pod_container_name, 
                                    model_py_source_path, 
                                    model_store_dest_path, 
                                    to_pod=True)

    prop_source_path = f'{github_cloned_dir}/model-archiver/model-store/properties.json'
    model_prop_dest_path = "/pv/model-store/"

    cp_prop_result = copy_file_or_dir(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      prop_source_path, 
                                      model_prop_dest_path, 
                                      to_pod=True)

    config_source_path = f'{github_cloned_dir}/model-archiver/config/config.properties'
    config_dest_path = "/pv/config/"

    cp_conf_result = copy_file_or_dir(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      config_source_path, 
                                      config_dest_path, 
                                      to_pod=True)

    scripts_source_path = f'{github_cloned_dir}/model-archiver/scripts/margen.sh'
    scripts_dest_path = "/pv/scripts/"

    cp_scripts_result = copy_file_or_dir(core_v1, 
                                      kserve_namespace, 
                                      model_store_pod_name, 
                                      model_store_pod_container_name, 
                                      scripts_source_path, 
                                      scripts_dest_path, 
                                      to_pod=True)

    # Delete model_store_pod
    try:
        api_response = core_v1.delete_namespaced_pod(model_store_pod_name, kserve_namespace)
        print(api_response)
    except ApiException as e:
        print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e)

    # Create model archiver pod
    mar_yaml_dir = f'{github_cloned_dir}/model-archiver/manifests/'
    mar_pod_name = "margen-pod"
    mar_pod_container_name = "margen-container"
    mar_pod_label = "service.istio.io/canonical-name=margen-pod"

    try:
        margen_result = utils.create_from_directory(k8s_client, mar_yaml_dir, verbose=True)
    except Exception as e:
        print(e)

    # Wait for pods to run before exec
    wait_pod(core_v1, kserve_namespace, mar_pod_label, mar_pod_name, 120)

    # Exec mar gen in a script
    mar_gen_command = "bash /home/model-server/scripts/margen.sh"
    mar_gen_result = exec_commands(core_v1, 
                                   kserve_namespace, 
                                   mar_pod_name, 
                                   mar_pod_container_name, 
                                   mar_gen_command)

    # Copy mar file to local
    mar_source_path = "youtubegoes5g.mar"
    mar_dest_path = "./"

    mar_cp_result = copy_file_or_dir(core_v1, 
                                     kserve_namespace, 
                                     mar_pod_name, 
                                     mar_pod_container_name, 
                                     mar_source_path, 
                                     mar_dest_path, 
                                     to_pod=False)

    # Upload mar file to minio

    def upload_file(client, bucket_name, object_name, filepath):
        # Create the bucket if it does not exist
        if not client.bucket_exists(bucket_name):
            client.make_bucket(bucket_name)
            create_bucket_result = f"Successfully created bucket: {bucket_name}"
        else:
            create_bucket_result = f"Bucket {bucket_name} already exists"

        try:
            # Upload the file to the specified path in the bucket
            client.fput_object(bucket_name, object_name, filepath)
            return (f'Successfully uploaded {filepath} to {bucket_name}/{object_name}')

        except Exception as e:
            # Log and raise any upload errors
            raise Exception(f'Failed to upload model to Minio: {e}')

    mar_object_name = "model-store/youtubegoes5g.mar"
    mar_filepath = './youtubegoes5g.mar'

    up_mar_result = upload_file(client, bucket_name, mar_object_name, mar_filepath)

    # Delete margen pod
    try:
        api_response = core_v1.delete_namespaced_pod(mar_pod_name, kserve_namespace)
        print(api_response)
    except ApiException as e:
        print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e)
        
@pipeline
def my_pipeline(
    github_repo_url: str,
    github_cloned_dir: str,
    github_main_branch: str,
    github_username: str,
    github_token: str,
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    kserve_namespace: str,
    bucket_name: str,
    k8s_api_token: str
):
    margen_task = mar_gen(github_repo_url=github_repo_url,
                          github_cloned_dir=github_cloned_dir,
                          github_main_branch=github_main_branch,
                          github_username=github_username,
                          github_token=github_token,
                          minio_url=minio_url,
                          minio_access_key=minio_access_key,
                          minio_secret_key=minio_secret_key,
                          kserve_namespace=kserve_namespace,
                          bucket_name=bucket_name,
                          k8s_api_token=k8s_api_token)

# Compile the pipeline
pipeline_filename = "margen_pipe.yaml"
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path=pipeline_filename)

# Submit the pipeline to the KFP cluster
client = kfp.Client(
    host=KUBEFLOW_HOST_URL,
    existing_token=KUBEFLOW_TOKEN)  

client.create_run_from_pipeline_func(
    my_pipeline,
    enable_caching=False,
    arguments={
        'github_repo_url': GITHUB_REPO_URL,
        'github_cloned_dir': GITHUB_CLONED_DIR,
        'github_main_branch': GITHUB_MAIN_BRANCH,
        'github_username': GITHUB_USERNAME,
        'github_token': GITHUB_TOKEN,
        'minio_url': MINIO_URL,
        'minio_access_key': MINIO_ACCESS_KEY,
        'minio_secret_key': MINIO_SECRET_KEY,
        'kserve_namespace': KSERVE_NAMESPACE,
        'bucket_name': MINIO_MODEL_BUCKET_NAME,
        'k8s_api_token': K8S_API_TOKEN
    })



RunPipelineResult(run_id=cd9ed8eb-ddb9-49d3-a1c2-2661fe10a3bf)

# Data Ingestion

In [134]:
from git import Repo
from subprocess import run, CalledProcessError
import os
import pandas as pd

github_repo_url = GITHUB_REPO_URL
github_cloned_dir = GITHUB_CLONED_DIR
github_dvc_branch = GITHUB_DVC_BRANCH
github_username = GITHUB_USERNAME
github_token = GITHUB_TOKEN
dvc_remote_name = DVC_REMOTE_DB
dvc_remote_db_url = DVC_REMOTE_DB_URL
minio_url = MINIO_URL
minio_access_key = MINIO_ACCESS_KEY
minio_secret_key = MINIO_SECRET_KEY
dvc_file_dir = DVC_FILE_DIR
dvc_file_name = DVC_FILE_NAME

def clone_repository_with_token(github_repo_url, github_cloned_dir, github_dvc_branch, github_username, github_token):
    """Clone a Git repository using a GitHub token in the URL and specifying the branch."""
    try:
        # Construct the URL with the GitHub username and token
        url_with_token = f"https://{github_username}:{github_token}@{github_repo_url.split('//')[1]}"

        # Clone the repository from the specified branch
        repo = Repo.clone_from(url_with_token, github_cloned_dir, branch=github_dvc_branch)
        return "Repository cloned successfully"
    except Exception as e:
        return f"Error occurred during repository cloning: {e}"

def configure_dvc_remote(github_cloned_dir, dvc_remote_name, dvc_remote_db_url, minio_url, minio_access_key, minio_secret_key):
    http_minio = f'http://{minio_url}'
    """Configure the Minio bucket as the DVC remote repository using the `dvc remote` commands."""
    try:
        # Add the remote
        run(
            ['dvc', 'remote', 'add', '-d', dvc_remote_name, dvc_remote_db_url],
            cwd=github_cloned_dir,
            capture_output=True,
            text=True,
            check=True
        )

        # Configure the endpoint URL
        run(
            ['dvc', 'remote', 'modify', dvc_remote_name, 'endpointurl', http_minio],
            cwd=github_cloned_dir,
            capture_output=True,
            text=True,
            check=True
        )

        # Configure access key ID
        run(
            ['dvc', 'remote', 'modify', dvc_remote_name, 'access_key_id', minio_access_key],
            cwd=github_cloned_dir,
            capture_output=True,
            text=True,
            check=True
        )

        # Configure secret access key
        run(
            ['dvc', 'remote', 'modify', dvc_remote_name, 'secret_access_key', minio_secret_key],
            cwd=github_cloned_dir,
            capture_output=True,
            text=True,
            check=True
        )

        return f'Successfully configured Minio bucket as DVC remote repository: {dvc_remote_name}'
    except CalledProcessError as e:
        # Log and raise any errors
        return f'Failed to configure DVC remote: {e.stderr}'

def perform_dvc_pull(github_cloned_dir, dvc_remote_name):
    """Perform a DVC pull to synchronize local data with the remote repository."""
    try:
        # Run the `dvc pull` command
        result = run(['dvc', 'pull', '-r', dvc_remote_name], cwd=github_cloned_dir, capture_output=True, text=True)

        # Check if the command executed successfully
        if result.returncode != 0:
            # Log and raise an error if the command failed
            error_message = f"dvc pull failed with error: {result.stderr}"
            raise Exception(error_message)

        # Log successful operation
        return "Successfully pulled data from remote DVC repository"

    except Exception as e:
        # Log and handle the error
        return f"Error occurred during dvc pull: {e}"

# Call the functions
clone_result = clone_repository_with_token(github_repo_url, github_cloned_dir, github_dvc_branch, github_username, github_token)
configure_result = configure_dvc_remote(github_cloned_dir, dvc_remote_name, dvc_remote_db_url, minio_url, minio_access_key, minio_secret_key)
dvc_pull_result = perform_dvc_pull(github_cloned_dir, dvc_remote_name)

# Save dataset with pandas in Dataset artifact
pulled_dataset_path = os.path.join(github_cloned_dir, dvc_file_dir, dvc_file_name)
tmp_dataset_path = "/tmp/" + dvc_file_name
dataset = pd.read_csv(pulled_dataset_path)
dataset.to_pickle(tmp_dataset_path)

In [5]:
print("Dataset size", dataset.shape[0])

Dataset size 2693


# Data Preparation

In [6]:
test_size = 0.2
random_state = 42

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
import torch
import os

# Load dataset from Dataset artifact
df = pd.read_pickle(tmp_dataset_path)

# Handle null values and replace specific characters
#df = df.replace([' ', '-',np.nan], 0) # There are null values
df = df.replace([' ', '-', np.nan], np.nan)

# Selective columns for mean calculation
columns_to_convert = [
    'CQI1', 'CQI2', 'CQI3', 'cSTD CQI', 'cMajority', 'c25 P', 'c50 P', 'c75 P', 
    'RSRP1', 'RSRP2', 'RSRP3', 'pMajority', 'p25 P', 'p50 P', 'p75 P', 
    'RSRQ1', 'RSRQ2', 'RSRQ3', 'qMajority', 'q25 P', 'q50 P', 'q75 P', 
    'SNR1', 'SNR2', 'SNR3', 'sMajority', 's25 P', 's50 P', 's75 P'
]
df[columns_to_convert] = df[columns_to_convert].astype(float)

# Replace np.nan with mean values for selective columns
df[columns_to_convert] = df[columns_to_convert].fillna(df[columns_to_convert].mean())

# Convert 'Stall' column to numerical values
df['Stall'].replace({'Yes': 1, 'No': 0}, inplace=True)

X = df[columns_to_convert].values
y = df['Stall'].values

# Apply SMOTE for balancing the dataset
# oversample = SMOTE(random_state=random_state)
oversample = SMOTE()
X, y = oversample.fit_resample(X, y)

# Standardize the features
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Convert to torch tensors
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)

# Split the dataset into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

# Data Training

In [11]:
import os
import torch
from torch import nn
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score, classification_report
from minio import Minio

model_lr = MODEL_LR
model_epochs = MODEL_EPOCHS
model_print_frequency_per_n_epochs = MODEL_PRINT_FREQUENCY_PER_N_EPOCHS
bucket_name = MINIO_MODEL_BUCKET_NAME
minio_model_object_name = MINIO_MODEL_OBJECT_NAME
trigger_type = TRIGGER_TYPE
performance_factor = PERFORMANCE_FACTOR
last_accuracy_object_name = LAST_ACC_OBJECT_NAME
tmp_dir = TEMP_DIR
tmp_file_last_acc = TEMP_FILE_ACC_IN_LAST_RUN

# Build model with non-linear activation function
class InterruptionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer_1 = nn.Linear(in_features=29, out_features=200)
        self.layer_2 = nn.Linear(in_features=200, out_features=100)
        self.layer_3 = nn.Linear(in_features=100, out_features=1)
        self.relu = nn.ReLU() # <- add in ReLU activation function
        # Can also put sigmoid in the model
        # This would mean you don't need to use it on the predictions
        # self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Intersperse the ReLU activation function between layers
        return self.layer_3(self.relu(self.layer_2(self.relu(self.layer_1(x)))))

# Helper functions
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item() # torch.eq() calculates where two tensors are equal
    acc = (correct / len(y_pred)) * 100
    return acc

def minio_setup(minio_url, minio_access_key, minio_secret_key):
    # Initialize Minio client with just the base URL (without path)
    client = Minio(
        minio_url,  # Ensure minio_url does not include a path, only the base URL (e.g., http://localhost:9000)
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False  # Minio is using HTTP on localhost:9000
    )
    return client

def upload_file(client, bucket_name, object_name, filepath):
    # Create the bucket if it does not exist
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
        create_bucket_result = f"Successfully created bucket: {bucket_name}"
    else:
        create_bucket_result = f"Bucket {bucket_name} already exists"

    try:
        # Upload the file to the specified path in the bucket
        client.fput_object(bucket_name, object_name, filepath)
        return (f'Successfully uploaded {filepath} to {bucket_name}/{object_name}')

    except Exception as e:
        # Log and raise any upload errors
        raise Exception(f'Failed to upload model to Minio: {e}')

def read_from_minio(client, bucket_name, object_name):
    """
    Function to read a file from a MinIO bucket and convert its single content to a float.
    If the file is not found or is empty, it returns 0.0.

    Args:
        client: minio client
        bucket_name (str): The name of the bucket in MinIO.
        object_name (str): The name of the object (file) in the bucket.

    Returns:
        float: The float value converted from the file content, or 0.0 if the file is not found or empty.
    """
    try:
        # Get the file from the MinIO bucket
        response = client.get_object(bucket_name, object_name)

        # Read the file content into a buffer
        file_data = response.read()

        # Decode file content and strip whitespace
        content = file_data.decode('utf-8').strip()

        # If the content is empty, return 0.0
        if not content:
            print(f"File {object_name} is empty.")
            return 0.0

        # Convert the content to a float
        float_value = float(content)
        return float_value

    except Exception as e:
        # Handle file not found or any other errors
        print(f"Error occurred: {e}")
        return 0.0

def save_float_to_tempfile(float_value, dir_name, file_name):
    """
    Saves a float value to a specified directory and file name.

    Args:
        float_value (float): The float value to save.
        dir_name (str): The name of the directory to save the file in.
        file_name (str): The name of the file.

    Returns:
        str: The path to the file.
    """
    # Ensure the directory exists
    os.makedirs(dir_name, exist_ok=True)
    temp_file_path = os.path.join(dir_name, file_name)

    with open(temp_file_path, 'w') as temp_file:
        # Convert the float to a string, then write to file
        temp_file.write(str(float_value))

    return temp_file_path

def get_accuracy_in_last_run(client, bucket_name, object_name):
    accuracy_in_last_run = read_from_minio(client, bucket_name, object_name)
    return accuracy_in_last_run

def update_accuracy_in_last_run(client, bucket_name, object_name, new_value, tmp_dir, tmp_file):
    filepath = save_float_to_tempfile(new_value, tmp_dir, tmp_file)
    upload_file(client, bucket_name, object_name, filepath)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = InterruptionModel().to(device)

# Setup loss and optimizer
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=model_lr)

# Fit the model
torch.manual_seed(42)

# Put all data on target device
# X_train = torch.load(X_train_artifact.path)
# X_test = torch.load(X_test_artifact.path)
# y_train = torch.load(y_train_artifact.path)
# y_test = torch.load(y_test_artifact.path)
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)

for epoch in range(model_epochs):
    # 1. Forward pass
    y_logits = model(X_train).squeeze()

    y_pred = torch.round(torch.sigmoid(y_logits)) # logits -> prediction probabilities -> prediction labels

    # 2. Calculate loss and accuracy
    loss = loss_fn(y_logits, y_train) # BCEWithLogitsLoss calculates loss using logits
    acc = accuracy_fn(y_true=y_train,
                    y_pred=y_pred)

    # 3. Optimizer zero grad
    optimizer.zero_grad()

    # 4. Loss backward
    loss.backward()

    # 5. Optimizer step
    optimizer.step()

    ### Testing
    model.eval()
    with torch.no_grad():
    # 1. Forward pass
        test_logits = model(X_test).squeeze()
        #print(test_logits.shape)
        test_pred = torch.round(torch.sigmoid(test_logits)) # logits -> prediction probabilities -> prediction labels
        # 2. Calcuate loss and accuracy
        test_loss = loss_fn(test_logits, y_test)
        test_acc = accuracy_fn(y_true=y_test,
                            y_pred=test_pred)


    # Print out what's happening
    if epoch % model_print_frequency_per_n_epochs == 0:
        print(f"Epoch: {epoch} | Loss: {loss:.5f}, Accuracy: {acc:.2f}% | Test Loss: {test_loss:.5f}, Test Accuracy: {test_acc:.2f}%")

model.eval()
with torch.no_grad():
    y_preds = torch.round(torch.sigmoid(model(X_test))).squeeze()

if device == "cuda":
    predictions = y_preds.cpu().numpy() #if it is cuda, then this, otherwise y_pred.numpy()
    true_labels = y_test.cpu().numpy()
else:
    predictions = y_preds.numpy()
    true_labels = y_test.numpy()

# Confusion Matrix
cmatrix = confusion_matrix(true_labels, predictions)
#print("Confusion Matrix:", cmatrix)

# Metrics
accuracy = accuracy_score(true_labels, predictions)
# metrics.log_metric("Accuracy", accuracy)
#print('Accuracy: %f' % accuracy)

# test accuracy
print(type(accuracy))

precision = precision_score(true_labels,  predictions, average='weighted')
# metrics.log_metric("Precision", precision)
print('Precision: %f' % precision)

recall = recall_score(true_labels, predictions, average='weighted')
# metrics.log_metric("Recall", recall)
print('Recall: %f' % recall)

microf1 = f1_score(true_labels, predictions, average='micro')
# metrics.log_metric("Micro F1 score", microf1)
print('Micro F1 score: %f' % microf1)

macrof1 = f1_score(true_labels, predictions, average='macro')
# metrics.log_metric("Macro F1 score", macrof1)
print('Macro F1 score: %f' % macrof1)

target_names = ['No-Stall', 'Stall']
# Print precision-recall report
print(classification_report(true_labels, predictions, target_names=target_names))

# Classification Metrics artifact
cmatrix = cmatrix.tolist()
target_names = ['No-Stall', 'Stall']
# classification_metrics.log_confusion_matrix(target_names, cmatrix)

# Save model
# model_path = "/tmp/model.pt"
# torch.save(model.state_dict(), model_path)
# os.rename(model_path, model_trained_artifact.path)

# Setup minio client to upload and read files
client = minio_setup(minio_url, minio_access_key, minio_secret_key)

previous_accuracy = get_accuracy_in_last_run(client, bucket_name, last_accuracy_object_name)

#metrics.log_metric("current-previous accuracy", accuracy-previous_accuracy)
# metrics.log_metric("current accuracy", accuracy)
# metrics.log_metric("previous accuracy", previous_accuracy)
print(accuracy)
print(previous_accuracy)

if trigger_type == '1' or trigger_type == '2':
    up_model = True
elif trigger_type == '3':
    if accuracy - previous_accuracy > performance_factor:
        up_model = True
        # update_accuracy_in_last_run(client, bucket_name, last_accuracy_object_name, accuracy, tmp_dir, tmp_file_last_acc)
else:
    up_model = False
    print('0')
    # metrics.log_metric("up model", '0')
    # with open(up_model_cond.path, 'w') as f:
        # f.write('0')

if up_model:
    print('1')
    # metrics.log_metric("up model", '1')
    # with open(up_model_cond.path, 'w') as f:
        # f.write('1')
    # upload_model_result = upload_file(client, bucket_name, minio_model_object_name, model_trained_artifact.path)

Epoch: 0 | Loss: 0.69019, Accuracy: 51.72% | Test Loss: 0.68449, Test Accuracy: 55.18%
Epoch: 500 | Loss: 0.47132, Accuracy: 78.29% | Test Loss: 0.48062, Test Accuracy: 77.87%
Epoch: 1000 | Loss: 0.36650, Accuracy: 84.04% | Test Loss: 0.42489, Test Accuracy: 81.99%
Epoch: 1500 | Loss: 0.26057, Accuracy: 89.68% | Test Loss: 0.39684, Test Accuracy: 82.41%
Epoch: 2000 | Loss: 0.17468, Accuracy: 94.36% | Test Loss: 0.40738, Test Accuracy: 84.96%
Epoch: 2500 | Loss: 0.11994, Accuracy: 96.70% | Test Loss: 0.43905, Test Accuracy: 86.24%
Epoch: 3000 | Loss: 0.08511, Accuracy: 97.91% | Test Loss: 0.48346, Test Accuracy: 86.38%
<class 'numpy.float64'>
Precision: 0.859740
Recall: 0.856738
Micro F1 score: 0.856738
Macro F1 score: 0.856727
              precision    recall  f1-score   support

    No-Stall       0.89      0.82      0.86       370
       Stall       0.82      0.89      0.86       335

    accuracy                           0.86       705
   macro avg       0.86      0.86      0.86  

# Compile Pipeline

In [None]:
# @pipeline
# def my_pipeline(
#     github_repo_url: str,
#     github_cloned_dir: str,
#     github_dvc_branch: str,
#     github_username: str,
#     github_token: str,
#     dvc_remote_name: str,
#     dvc_remote_db_url: str,
#     minio_url: str,
#     minio_access_key: str,
#     minio_secret_key: str,
#     dvc_file_dir: str,
#     dvc_file_name: str,
#     model_name: str,
#     kserve_namespae: str,
#     model_lr: float,
#     model_epochs: int,
#     model_print_frequency_per_n_epochs: int,
#     bucket_name: str,
#     minio_model_object_name: str,
#     kserve_svc_acc: str,
#     trigger_type: str,
#     performance_factor: float,
#     last_accuracy_object_name: str,
#     tmp_dir: str,
#     tmp_file_last_acc: str
# ):
#     data_ingestion_task = data_ingestion(
#         github_repo_url=github_repo_url,
#         github_cloned_dir=github_cloned_dir,
#         github_dvc_branch=github_dvc_branch,
#         github_username=github_username,
#         github_token=github_token,
#         dvc_remote_name=dvc_remote_name,
#         dvc_remote_db_url=dvc_remote_db_url,
#         minio_url=minio_url,
#         minio_access_key=minio_access_key,
#         minio_secret_key=minio_secret_key,
#         dvc_file_dir=dvc_file_dir,
#         dvc_file_name=dvc_file_name)
#     dataset_artifact = data_ingestion_task.outputs["dataset_artifact"]
#     data_preparation_task = data_preparation(dataset_artifact=dataset_artifact)
#     X_train_artifact = data_preparation_task.outputs["X_train_artifact"]
#     X_test_artifact = data_preparation_task.outputs["X_test_artifact"]
#     y_train_artifact = data_preparation_task.outputs["y_train_artifact"]
#     y_test_artifact = data_preparation_task.outputs["y_test_artifact"]
#     model_training_task = model_training(X_train_artifact=X_train_artifact, 
#                                          X_test_artifact=X_test_artifact, 
#                                          y_train_artifact=y_train_artifact, 
#                                          y_test_artifact=y_test_artifact,
#                                          model_lr=model_lr,
#                                          model_epochs=model_epochs,
#                                          model_print_frequency_per_n_epochs=model_print_frequency_per_n_epochs,
#                                          minio_url=minio_url,
#                                          minio_access_key=minio_access_key,
#                                          minio_secret_key=minio_secret_key,
#                                          bucket_name=bucket_name,
#                                          minio_model_object_name=minio_model_object_name,
#                                          trigger_type=trigger_type,
#                                          performance_factor=performance_factor,
#                                          last_accuracy_object_name=last_accuracy_object_name,
#                                          tmp_dir=tmp_dir,
#                                          tmp_file_last_acc=tmp_file_last_acc)
#     up_model_cond = model_training_task.outputs["up_model_cond"]
#     model_serving_task = model_serving(up_model_cond=up_model_cond,
#                                        bucket_name=bucket_name,
#                                        model_name=model_name, 
#                                        kserve_namespae=kserve_namespae,
#                                        kserve_svc_acc=kserve_svc_acc)

# # Compile the pipeline
# pipeline_filename = f"{KUBEFLOW_PIPELINE_NAME}.yaml"
# kfp.compiler.Compiler().compile(
#     pipeline_func=my_pipeline,
#     package_path=pipeline_filename)

# # Submit the pipeline to the KFP cluster
# client = kfp.Client(
#     host=KUBEFLOW_HOST_URL,
#     existing_token=KUBEFLOW_TOKEN)  

# client.create_run_from_pipeline_func(
#     my_pipeline,
#     enable_caching=False,
#     arguments={
#         'github_repo_url': GITHUB_REPO_URL,
#         'github_cloned_dir': GITHUB_CLONED_DIR,
#         'github_dvc_branch': GITHUB_DVC_BRANCH,
#         'github_username': GITHUB_USERNAME,
#         'github_token': GITHUB_TOKEN,
#         'dvc_remote_name': DVC_REMOTE_DB,
#         'dvc_remote_db_url': DVC_REMOTE_DB_URL,
#         'minio_url': MINIO_URL,
#         'minio_access_key': MINIO_ACCESS_KEY,
#         'minio_secret_key': MINIO_SECRET_KEY,
#         'dvc_file_dir': DVC_FILE_DIR,
#         'dvc_file_name': DVC_FILE_NAME,
#         'model_name': MODEL_NAME,
#         'kserve_namespae': KSERVE_NAMESPACE,
#         'model_lr': MODEL_LR,
#         'model_epochs': MODEL_EPOCHS,
#         'model_print_frequency_per_n_epochs': MODEL_PRINT_FREQUENCY_PER_N_EPOCHS,
#         'bucket_name': MINIO_MODEL_BUCKET_NAME,
#         'minio_model_object_name': MINIO_MODEL_OBJECT_NAME,
#         'kserve_svc_acc': KSERVE_SVC_ACC,
#         'trigger_type': TRIGGER_TYPE,
#         'performance_factor': PERFORMANCE_FACTOR,
#         'last_accuracy_object_name': LAST_ACC_OBJECT_NAME,
#         'tmp_dir': TEMP_DIR,
#         'tmp_file_last_acc': TEMP_FILE_ACC_IN_LAST_RUN
#     })

# #upload to Kubeflow 
# client.upload_pipeline(pipeline_package_path=pipeline_filename,
#                        pipeline_name=KUBEFLOW_PIPELINE_NAME,
#                        namespace = KSERVE_NAMESPACE)