In [34]:
import kfp
import kfp.dsl as dsl
from kfp.v2.dsl import (
    component,
    Input, Output, 
    Dataset, Model, Metrics, HTML, Markdown,
    InputPath, OutputPath)

from typing import Dict, Optional

In [35]:
@component(
  base_image="capoolebugchat/kws-training:v0.19.0",
)
def train(
    model_save_bucket: str,
    model_save_path: str,
    config: Dict,
    dataset: Input[Dataset],
    model: Output[Model],
    # results_metrics: Output[Metrics],
    # results_html: Output[HTML],
    model_summary: Output[Markdown]
):
  """training component for KWS project
  Uses KWS training docker image.
  - Inputs:
    + dataset: dataset Artifact, 
      containing datashim created PVC to mount to training docker container
    + config: dictionary of training configurations
  - Outputs: 
    + model: model Artifact, containing S3_URI to save model """
  
  import os
  import logging
  import glob
  import yaml
  
  #create empty dataset dir for PVC mounting
  os.system(f"mkdir /workspace/dataset")

  ### Connect this process to Minio for later writing model into Minio
  MINIO_SERVICE_HOST="minio-service.kubeflow.svc.cluster.local"
  MINIO_SERVICE_PORT="9000"
  #TODO: change these to using Kubeflow's Minio Secrets
  MINIO_SERVICE_ACCESS_KEY="minio"
  MINIO_SERVICE_SECRET_KEY="minio123"
  MINIO_SERVICE_SECURITY_OPTION=False
  from minio import Minio
  minio_client = Minio(
    f"{MINIO_SERVICE_HOST+':'+MINIO_SERVICE_PORT}",
    access_key = MINIO_SERVICE_ACCESS_KEY,
    secret_key = MINIO_SERVICE_SECRET_KEY,
    secure     = MINIO_SERVICE_SECURITY_OPTION
  )
  logging.info(f"Connected to Minio Server at {MINIO_SERVICE_HOST}:{MINIO_SERVICE_PORT}")

  def _dict_to_env(config_dict):
    """function to write hyperparams into training environment file
    - Inputs: dict: Python dictionary to hold configurations"""
    
    yaml_f = open("/workspace/h_param.yaml", 'r')
    env_f = open("/workspace/hparams.env",'w')
    
    def _write_param_to_env(param, value, env_f):
      
      logging.info(f"{param} = {value}")
      if isinstance(value, str):
        env_f.write(f"{param} = '{value}'\n")
      else: env_f.write(f"{param} = {value}\n")

    default_hparams = yaml.safe_load(yaml_f)
    
    print("Loading hyperparams:")
    
    # loading known configurations
    for key in default_hparams:
      hyperparam = default_hparams[key]
      if key in config_dict:
        logging.info(f"Overiding default run parameter: {key}")
        hyperparam = config_dict[key]
      _write_param_to_env(key, hyperparam, env_f)
      default_hparams[key] = hyperparam
    
    # doesn't accept unknown configurations
    for key in config_dict:
      if key not in default_hparams:
        logging.warn(f"Unknown configuration: {key}, unaccepted to training env")

    return default_hparams

  def _train():
    """Training function. Shouldnt be using this but idk what else can we use"""
    
    os.system("python3 -m kws_streaming.train.model_train_eval ds_tc_resnet --alsologtostderr")
  
  def _upload_local_directory_to_minio(local_path, bucket_name, minio_path):
    
    assert os.path.isdir(local_path)
    for local_file in glob.glob(local_path + '/**'):
      local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
      if not os.path.isfile(local_file):
        _upload_local_directory_to_minio(
            local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
      else:
        remote_path = os.path.join(
            minio_path, local_file[1 + len(local_path):])
        remote_path = remote_path.replace(os.sep, "/")  # Replace \ with / on Windows
        minio_client.fput_object(bucket_name, remote_path, local_file)
      
  
  # Initialize artifacts, save basic infos
  with open(model.path, 'w') as model_f:
    model_f.write("Placeholder")
  model.framework = "tensorflow"
  model.metadata = {
    "version":"Undesigned, Unimplemented",
    "trained_dataset": dataset.metadata['PVC'],
    "S3_BUCKET": model_save_bucket,
    "S3_path":f"minio://{model_save_bucket}/{model_save_path}"
  }

  def _upload_local_directory_to_minio(local_path, bucket_name, minio_path):
    
    assert os.path.isdir(local_path)
    for local_file in glob.glob(local_path + '/**'):
      local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
      if not os.path.isfile(local_file):
        _upload_local_directory_to_minio(
            local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
      else:
        remote_path = os.path.join(
            minio_path, local_file[1 + len(local_path):])
        remote_path = remote_path.replace(os.sep, "/")  # Replace \ with / on Windows
        minio_client.fput_object(bucket_name, remote_path, local_file)

  # Training sequence
  logging.info("Writing parameters into environment")
  # config["data_path"] = "/workspace/dataset"
  train_config = _dict_to_env(config)
  logging.info("Hyperparameters written")

  logging.info("Training commenced. Read logs.")
  _train()
  logging.info("Training completed.")
  
  logging.info("Uploading model")
  _upload_local_directory_to_minio(
    local_path = "./train_res/ds_tc_resnet/non_stream",
    bucket_name = model_save_bucket,
    minio_path = model_save_path)
  logging.info("Model uploaded to minio bucket.")
  
  with open(os.path.join(train_config["train_res"], "model_summary.txt"), 'r') as local_summary_f:
    for line in local_summary_f.readlines():
      with open(model_summary.path, 'a') as summary_f:
        summary_f.write(line)

  logging.info(f"Training finished, check storage at minio://{model_save_bucket}/{model_save_path}")

In [36]:
@component
def ingest_data(
    dataset_pvc: str,
    input_path: OutputPath(""),
    # dataset_uri: str,
    dataset: Output[Dataset]
):

    import os
    os.system(f"mkdir {dataset.path[:-4]}model")
    os.system("export")
    os.system("ls /tmp/outputs/dataset")
    os.system("ls /tmp/outputs/dataset/data")
    with open(dataset.path, 'w') as dataset_f:
        dataset_f.write("Placeholder")
    dataset.name = "KWSDataset"
    dataset.metadata["PVC"] = dataset_pvc
    # dataset.metadata["URI"] = dataset_uri

In [37]:
from kfp.onprem import mount_pvc

@dsl.pipeline(
    name="KWS Auto Train Pipeline",
    description="ultilized for KFP running, in development"
)
def pipeline(
    train_config: Optional[Dict],
    model_bucket: str,
    model_path: str
):
    
    #TODO: add validation of data dir
    data_ingest_task = ingest_data(
        dataset_pvc = "kws-dataset"
        )
    #TODO: do smt with dataset path, since datashim is only able to mount buckets, not path
    #TODO: do smt with the mounting location, use some variables fgs
    #TODO: add completed run metrics visualization
    train_task = train(
        model_save_bucket = model_bucket,
        model_save_path = model_path,
        config = train_config,
        dataset = data_ingest_task.outputs["dataset"]
    )

    train_task.apply(mount_pvc(
        pvc_name = "kws-dataset",
        volume_name = "dataset",
        volume_mount_path = "/workspace/dataset"
    ))

from kfp.compiler import Compiler

kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
    pipeline_func=pipeline,
    package_path='pipeline.yaml')

In [38]:

# import logging
# ### Connect this process to Minio for later writing model into Minio
# MINIO_SERVICE_HOST="192.168.1.22"
# MINIO_SERVICE_PORT="9000"
# #TODO: change these to using Kubeflow's Minio Secrets
# MINIO_SERVICE_ACCESS_KEY="minio"
# MINIO_SERVICE_SECRET_KEY="minio123"
# MINIO_SERVICE_SECURITY_OPTION=False
# from minio import Minio
# minio_client = Minio(
# f"{MINIO_SERVICE_HOST+':'+MINIO_SERVICE_PORT}",
# access_key = MINIO_SERVICE_ACCESS_KEY,
# secret_key = MINIO_SERVICE_SECRET_KEY,
# secure     = MINIO_SERVICE_SECURITY_OPTION
# )
# logging.info(f"Connected to Minio Server at {MINIO_SERVICE_HOST}:{MINIO_SERVICE_PORT}")

# import os
# import glob
# def _upload_local_directory_to_minio(local_path, bucket_name, minio_path):

#     assert os.path.isdir(local_path)
#     for local_file in glob.glob(local_path + '/**'):
#         local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
#         if not os.path.isfile(local_file):
#             _upload_local_directory_to_minio(
#                 local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
#         else:
#             remote_path = os.path.join(
#             minio_path, local_file[1 + len(local_path):])
#             remote_path = remote_path.replace(os.sep, "/")  # Replace \ with / on Windows
#             minio_client.fput_object(bucket_name, remote_path, local_file)

# _upload_local_directory_to_minio(
#     "components/2_train/local_dataset",
#     "ftech-ai",
#     "KWSdataset"
# )