In [1]:
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
from kfp.components import load_component_from_file
#from utilities import get_minio_credentials



In [None]:

# @func_to_container_op
# def show_results(scores_test : float, scores_train: float,results_train:float, results_test:float) -> None:
#     # Given the outputs from decision_tree and logistic regression components
#     # the results are shown.
#     print(f"Test set: {scores_test ,results_test }")
#     print(f"Train set: {scores_train, results_train}")


@dsl.pipeline(name='Btap Pipeline', 
              description='MLP employed for Total energy consumed regression problem and minio for storage of data.',
              )
def btap_pipeline(input_location:str, output_location: str , minio_tenant: str,bucket:str, minio_url,
                   minio_access_key: str, minio_secret_key: str):
    
    copy_ops = []
    
    
    # Loads the yaml manifest for each component
    preprocess = load_component_from_file('preprocessing/preprocessing.yaml')
    feature_selection = load_component_from_file('feature_selection/feature_selection.yaml')
    predict = load_component_from_file('predict/predict.yaml')
    # Component that takes a file and puts it into minio
    copy_to_minio_op = load_component_from_file('components/copy_to_minio.yaml')
    # Component that does an "mc find" operation, finding files in minio that match a pattern
    mc_find_op = load_component_from_file('components/minio_find.yaml')
    # Component that takes a list of files and concatenates their contents to a JSON list
    mc_cat_files_to_json_op = load_component_from_file('components/minio_cat_files_to_json.yaml')
    
#     preprocess_ = preprocess(
#                              tenant="standard",
#                              bucket=,
#                              in_obj_name=,
#                              output_path="oluwabukola_ishola/btap_data/split_data_out")
    
    preprocess_ = preprocess(tenant= minio_tenant ,
                             bucket=bucket,
                             object_name=input_location
                             )
    
    
    
    feature_selection_ = feature_selection(preprocess.output)
    
   
    
    # We add the KFP RUN_ID here in the output location so that we don't
    # accidentally overwrite another run.  There's lots of ways to manage
    # data, this is just one possibility.
    this_run_output_location = f"{str(output_location).rstrip('/')}" \
                               f"/{kfp.dsl.RUN_ID_PLACEHOLDER}"
    
    
    models =["large","wide","deep"]
    
    with kfp.dsl.ParallelFor(models) as model:
        # NOTE: A current limitation of the ParallelFor loop in KFP is that it
        # does not give us an easy way to collect the results afterwards.  To
        # get around this problem, we store results in a known place in minio
        # and later glob the result files back out
        #
        # Save the result from this sample to minio in
        # ./seeds/{seed}/result.out.  We save with {seed} in the filepath to
        # prevent different paths from otherwriting each other.  Note that
        # this relies on seed being unique
        #
        # TODO: Could we do an append-to-file-in-minio and concatenate them 
        # on the fly? Would minio have issues with simultaneous writes?
        sample_output_location = f"{this_run_output_location}/models"
        #predict_ = predict(split_data["X_train"], split_data["X_test"],split_data["y_train"], split_data["y_test"],feature_selection_.output,model)
        predict_ = predict(input_data=preprocess.output, 
                           features=feature_selection_.output, 
                           input_model=model)
       
        copy_sample = copy_to_minio_op(
            minio_url,
            minio_access_key,
            minio_secret_key,
            predict_.output,
            f"{this_run_output_location}{model}/predict_result.out",
            )
    
        # Make a list of copy_ops so we can do result collection after they finish
        copy_ops.append(copy_sample)
    
        # Collect all result.out files in the sample_output_location and concatenate
        # their contents as a json list
        search_pattern = r'/result.out'
        files_to_cat = mc_find_op(
           nio_url,
            minio_access_key,
            minio_secret_key,
            sample_output_location,
            search_pattern,
        )
    
        # files_to_cat requires all sample_ops to be done before running (all
        # results must be generated first).  Enforce this by setting files_to_cat
        # to be .after() all copy_op tasks
        for op in copy_ops:
            files_to_cat.after(op)

        all_samples = mc_cat_files_to_json_op(
            minio_url,
            minio_access_key, mi
            minio_secret_key,
            files_to_cat.output,
        )
    
    
        #final_result = show_results(all_samples.output)
    
        copy_result = copy_to_minio_op(
            minio_url,
            minio_access_key,
            minio_secret_key,
            all_samples.output,
            f"{this_run_output_location}/result.out",
        )
    
if __name__ == '__main__':
    #experiment_yaml_zip = 'Btap Ppipeline'+ '.zip'
    kfp.compiler.Compiler().compile(btap_pipeline, 'pipeline.yaml')
    #kfp.compiler.Compiler().compile(btap_pipeline, experiment_yaml_zip)
    print(f"Exported pipeline definition to pipeline.yaml")