<a href="https://colab.research.google.com/github/ShaswataJash/kfpcomponent/blob/main/TabularDataPreparationUsingPycaret.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook is the development workflow for kubeflow pipeline component of the same name as this notebook. Refer https://github.com/ShaswataJash/kfpcomponent

#Install required softwares

In [None]:
!uname -a

In [None]:
!lsb_release -a

In [None]:
!python --version

In [None]:
!pip install pycaret==2.3.10

In [None]:
#Ref: https://github.com/pycaret/pycaret/issues/2490
!pip install Jinja2==3.1.2

In [None]:
!apt-get install ca-certificates fuse tzdata curl unzip && \
  echo "user_allow_other" >> /etc/fuse.conf

In [None]:
!curl https://rclone.org/install.sh | bash

In [None]:
!rclone --version

#Develop source code files

In [None]:
%%writefile data_preparation.py
#!/usr/bin/env python3
import os
import sys
for arg in sys.argv:
    print(arg)
sys.stdout.flush()

import argparse
import logging
parser = argparse.ArgumentParser(description='kubeflow pipeline component to read csv file and prepare the data')
parser.add_argument('--log-level', default='INFO', choices=['ERROR', 'INFO', 'DEBUG'])
parser.add_argument('--bypass-rclone-for-input-data', default=False, action="store_true", help='whether input csv file should be read like local file - rclone is completely bypassed')
parser.add_argument('--bypass-rclone-for-output-data', default=False, action="store_true", help='whether output csv file should be written like local file - rclone is completely bypassed')
parser.add_argument('--rclone-environment-var', type=str, default= '{}', help='json formatted key-value pairs of strings which will be set as environment variables before executing rclone commands')
parser.add_argument('--input-datasource-directory-mountable', default=False, action="store_true", help='whether input csv file is present in mountable remote location when rclone is used')
parser.add_argument('--input-datasource-file-name', type=str, default='', help='name of the csv file including file extension and the directory/bucket path holding the specific file(if any) when rclone is used')
parser.add_argument('--additional-options-csv-parsing', type=str, default= '{}', help='json formatted key-value pairs of strings which will be passed to pandas.read_csv()')
parser.add_argument('--type-of-data-analysis-task', choices=['classification', 'regression', 'clustering', 'anomaly_detection'])
parser.add_argument('--target-variable-name', type=str, help='for classification and regression, specify the column name holding target variable')
parser.add_argument('--target-emptyindicator', type=str, default='', help='if target variable column holds null or na, those rows will be dropped. Sometime empty can be indicated by other representative string like - or *** etc')
parser.add_argument('--data-preparations-options', type=str, default= '{}', help='json formatted key-value pairs of strings which will be passed to pycaret setup() function')
parser.add_argument('--additional-options-csv-writing', type=str, default= '{}', help='json formatted key-value pairs of strings which will be passed to pandas.to_csv()')
parser.add_argument('--output-datasource-directory-mountable', default=False, action="store_true", help='whether output csv file will be written in mountable remote location when rclone is used')
parser.add_argument('--output-datasource-file-name', type=str, default='', help='filename of the prepared data including the directory/bucket path holding the specific file(if any) when rclone is used')
parser.add_argument('--input-datasource-local-file-path-when-rclone-bypassed', type=str, default='', help='absolute local path of the input csv file when rclone is NOT used i.e. when bypass-rclone-for-input-data is enabled')
parser.add_argument('--output-datasource-local-file-path-when-rclone-bypassed', type=str, default= '', help='absolute local path of the output csv file when rclone is NOT used i.e. when bypass-rclone-for-output-data is enabled')
args = parser.parse_args()

#keeping the log format same as used in pycaret for consistency (refer: https://github.com/pycaret/pycaret/blob/master/pycaret/internal/logging.py)
logging.basicConfig(level=args.log_level, format='%(asctime)s:%(levelname)s:%(message)s')
os.environ["PYCARET_CUSTOM_LOGGING_LEVEL"] = args.log_level

#sanity check of arguments
if args.bypass_rclone_for_input_data:
    args.input_datasource_directory_mountable = False
    args.input_datasource_file_name = None
else:
    args.input_datasource_local_file_path_when_rclone_bypassed = None

if args.bypass_rclone_for_output_data:
    args.output_datasource_directory_mountable = False
    args.output_datasource_file_name = None
else:
    args.output_datasource_local_file_path_when_rclone_bypassed = None

if args.bypass_rclone_for_input_data and args.bypass_rclone_for_output_data:
    args.rclone_environment_var = '{}'

#setting rclone related env
import json
try:
    rclone_config = json.loads(args.rclone_environment_var)
    logging.info("rclone_config: type=%s content=%s", type(rclone_config), rclone_config)
    for item in rclone_config.items():
        #converting explicitely item[1] to str because rclone config can have nested json. In that case, item[1] will be of dictonary type
        #replacing quote with double quote to make the values json compatible (note for string without ', below replacement has no effect)
        os.environ[item[0]] = str(item[1]).replace('\'', '"')
        logging.debug('%s => %s', item[0], os.getenv(item[0]))
except BaseException as err:
    logging.error("rclone configuration loading related error", exc_info=True)
    sys.stdout.flush()
    sys.exit("Forceful exit as exception encountered while loading rclone_config")    

#temporary directory creation
import tempfile
try:
    if not args.bypass_rclone_for_input_data:
        local_datastore_read_dir = tempfile.mkdtemp(prefix="my_local_read-")
        logging.debug('local_datastore_read_dir:%s',local_datastore_read_dir)

    if not args.bypass_rclone_for_output_data:
        local_datastore_write_dir = tempfile.mkdtemp(prefix="my_local_write-")
        logging.debug('local_datastore_write_dir:%s',local_datastore_write_dir)
except BaseException as err:
    logging.error("temporary directory creation related error", exc_info=True)
    sys.stdout.flush()
    sys.exit("Forceful exit as exception encountered while creating temporary directories")

#input file handling
import subprocess
import ntpath
if args.input_datasource_directory_mountable:
    input_data_read_cmd = "rclone -v mount remoteread:" + ntpath.dirname(args.input_datasource_file_name) + ' ' + local_datastore_read_dir + ' --daemon'
else:
    input_data_read_cmd = "rclone -v copy remoteread:" + args.input_datasource_file_name + ' ' + local_datastore_read_dir
logging.info(input_data_read_cmd)
input_data_read_call = subprocess.run(input_data_read_cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
logging.info(input_data_read_call.stdout)
if input_data_read_call.returncode != 0:
    logging.error("Error in rclone, errorcode= %s", input_data_read_call.returncode)
    sys.stdout.flush()
    sys.exit("Forceful exit as rclone returned error in context of reading")

#output file handling
if args.output_datasource_directory_mountable:
    output_data_write_cmd = "rclone -v mount remotewrite:" + ntpath.dirname(args.output_datasource_file_name) + ' ' + local_datastore_write_dir + ' --daemon'
    logging.info(output_data_write_cmd)
    output_data_write_call = subprocess.run(output_data_write_cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    logging.info(output_data_write_call.stdout)
    if output_data_write_call.returncode != 0:
        logging.error("Error in rclone, errorcode=%s", output_data_write_call.returncode)
        sys.stdout.flush()
        sys.exit("Forceful exit as rclone returned error in context of mounted writing")

#handling input csv file reading
import pandas
try:
    parse_config = json.loads(args.additional_options_csv_parsing)
    parse_config['filepath_or_buffer'] =  args.input_datasource_local_file_path_when_rclone_bypassed \
        if args.bypass_rclone_for_input_data else os.path.join(local_datastore_read_dir,ntpath.basename(args.input_datasource_file_name))
    logging.info("parse_config: type=%s content=%s", type(parse_config), parse_config)
    my_data = pandas.read_csv(**parse_config)
    logging.debug('%s', my_data)
    
except BaseException as err:
    logging.error("csv file reading related error", exc_info=True)
    sys.stdout.flush()
    sys.exit("Forceful exit as exception encountered while parsing input csv file")

#handling data preprocessing
import pycaret
try:
    if os.path.exists("logs.log"):
        os.remove("logs.log") #removing any content from log which pycaret will internally use for its own logging
    logging.info('pycaret version = %s ', pycaret.utils.version())
    setup_config = json.loads(args.data_preparations_options)
    if args.type_of_data_analysis_task == 'classification':
        import pycaret.classification
        setup_fn = pycaret.classification.setup
        get_config_fn = pycaret.classification.get_config
        setup_config['target'] = args.target_variable_name
        
    elif args.type_of_data_analysis_task == 'regression':
        import pycaret.regression
        setup_fn = pycaret.regression.setup
        get_config_fn = pycaret.regression.get_config
        setup_config['target'] = args.target_variable_name

    elif args.type_of_data_analysis_task == 'clustering':
        import pycaret.clustering
        setup_fn = pycaret.clustering.setup
        get_config_fn = pycaret.clustering.get_config

    elif args.type_of_data_analysis_task == 'anomaly':
        import pycaret.anomaly
        setup_fn = pycaret.anomaly.setup
        get_config_fn = pycaret.anomaly.get_config
        
    #as part of pycaret's data cleaning the rows with target column = nan are not being cleaned up. Thus, cleaning those rows explicitely
    if len(args.target_emptyindicator) > 0:
        #ref: https://stackoverflow.com/questions/49291740/delete-rows-if-there-are-null-values-in-a-specific-column-in-pandas-dataframe
        import numpy as np
        my_data[args.target_variable_name] = my_data[args.target_variable_name].replace(args.target_emptyindicator, np.nan)
        my_data = my_data.dropna(axis=0, subset=[args.target_variable_name])

    setup_config['log_experiment'] = False
    setup_config['data_split_shuffle'] = False
    setup_config['html'] = False
    setup_config['silent'] = True
    logging.info("setup_config: type=%s content=%s", type(setup_config), setup_config)
    setup_config['data'] = my_data #adding dataframe after logging, or else a big dataframe print happens as part of logging
    setup_fn(**setup_config)
    #ref: https://www.kdnuggets.com/2020/11/5-things-doing-wrong-pycaret.html
    X_transformed = get_config_fn('X')
    my_transformed_data = X_transformed
    if args.type_of_data_analysis_task == 'classification' or args.type_of_data_analysis_task == 'regression':
        y_transformed = get_config_fn('y')
        my_transformed_data = X_transformed.merge(y_transformed,left_index=True, right_index=True)
    
    logging.debug("====== PREPARED DATA ====")
    logging.debug('%s', my_transformed_data)
    logging.debug("=========================")

    #pycaret.utils.get_system_logs() #this will print the pycaret's own log into console
    
except BaseException as err:
    #pycaret.utils.get_system_logs()
    logging.error("exception encountered while transforming input dataframe", exc_info=True)
    sys.stdout.flush()
    sys.exit("Forceful exit as exception encountered while transforming input dataframe")

#handling output csv file writing
try:
    to_csv_config = json.loads(args.additional_options_csv_writing)
    to_csv_config['path_or_buf'] = args.output_datasource_local_file_path_when_rclone_bypassed \
        if args.bypass_rclone_for_output_data else os.path.join(local_datastore_write_dir,ntpath.basename(args.output_datasource_file_name))
    logging.info("to_csv_config: type=%s content=%s", type(to_csv_config), to_csv_config)
    my_transformed_data.to_csv(**to_csv_config)
except BaseException as err:
    logging.error("exception encountered while trying to write prepared data", exc_info=True)
    sys.stdout.flush()
    sys.exit("Forceful exit as exception encountered while trying to write prepared data")

if args.bypass_rclone_for_output_data:
    sys.stdout.flush()
    sys.exit(0)

if not args.output_datasource_directory_mountable:
    output_data_write_cmd = "rclone -v copy " + os.path.join(local_datastore_write_dir,ntpath.basename(args.output_datasource_file_name)) \
        + " remotewrite:" + ntpath.dirname(args.output_datasource_file_name)
    logging.info(output_data_write_cmd)
    output_data_write_call = subprocess.run(output_data_write_cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    logging.info(output_data_write_call.stdout)
    if output_data_write_call.returncode != 0:
        logging.error("Error in rclone, errorcode=%s", output_data_write_call.returncode)
        sys.stdout.flush()
        sys.exit("Forceful exit as rclone returned error in context of writing final csv file (copy mode)")

Docker size reduction tips:


*   https://devopscube.com/reduce-docker-image-size/
*   https://www.ecloudcontrol.com/best-practices-to-reduce-docker-images-size/



In [None]:
%%writefile Dockerfile
FROM python:3.7.13-slim

RUN python3 -m pip install pycaret==2.3.10
#installing jinja2 additionally due to Ref: https://github.com/pycaret/pycaret/issues/2490
RUN python3 -m pip install Jinja2==3.1.2

#install fuse as dependency for rclone. Additionally, install curl, unzip for rclone installer to work
#libgomp1 installation for pycaret in python-slim
RUN apt-get update \
    && apt-get install --no-install-recommends -y curl fuse libgomp1 unzip \
    && echo "user_allow_other" >> /etc/fuse.conf \
    && curl https://rclone.org/install.sh | bash \
    && apt-get -y remove --purge curl unzip \
    && apt-get -y autoremove \
    && rm -rf /var/lib/apt/lists/* \
    && rclone --version

COPY src/data_preparation.py /tmp
COPY tests/test_validation.py /tmp
COPY run_tests.sh /tmp
RUN chmod 544 /tmp/run_tests.sh

In [None]:
%%writefile run_tests.sh
#!/bin/bash

mkdir /tmp/my_local_dir_for_test

#Test: csv reading source from http, rclone read in copy
python /tmp/data_preparation.py --rclone-environment-var '{"RCLONE_CONFIG_REMOTEREAD_TYPE":"http", "RCLONE_CONFIG_REMOTEREAD_URL":"https://raw.githubusercontent.com/pycaret/datasets/main/data/common/"}' \
    --input-datasource-file-name 'CTG.csv' --additional-options-csv-parsing '{"sep":"," , "header":0}' \
    --type-of-data-analysis-task 'classification' --target-variable-name 'NSP' \
    --data-preparations-options '{"ignore_low_variance":true, "remove_outliers":true, "remove_multicollinearity":true, "multicollinearity_threshold":0.7}' \
    --bypass-rclone-for-output-data --output-datasource-local-file-path-when-rclone-bypassed '/tmp/my_local_dir_for_test/CTG_data-prep.csv' \
    --additional-options-csv-writing '{"index":false}' --log-level 'DEBUG'

#https://registry.opendata.aws/humor-detection/
#Test: csv reading source from s3(AWS provider), rclone read in mount
python /tmp/data_preparation.py --rclone-environment-var '{"RCLONE_CONFIG_REMOTEREAD_TYPE":"s3", "RCLONE_CONFIG_REMOTEREAD_PROVIDER":"AWS", "RCLONE_CONFIG_REMOTEREAD_REGION":"us-west-2"}' \
    --input-datasource-directory-mountable --input-datasource-file-name 'humor-detection-pds/Non-humours-biased.csv' \
    --type-of-data-analysis-task 'classification' --target-variable-name 'label' \
    --data-preparations-options '{"preprocess":false, "ignore_features":["image_url"]}' \
    --bypass-rclone-for-output-data --output-datasource-local-file-path-when-rclone-bypassed '/tmp/my_local_dir_for_test/Non-humours-biased_data-prep.csv' \
    --additional-options-csv-writing '{"index":false}' --log-level 'DEBUG'

python /tmp/test_validation.py

In [None]:
%%writefile test_validation.py
#!/usr/bin/env python3
import pandas
df = pandas.read_csv(filepath_or_buffer = '/tmp/my_local_dir_for_test/CTG_data-prep.csv')
assert len(df.index) == 2126 #original data had 2129 rows, amongst that 3 rows have no target
assert df.isnull().sum().sum() == 0 #pycaret will remove all missing values

df = pandas.read_csv(filepath_or_buffer = '/tmp/my_local_dir_for_test/Non-humours-biased_data-prep.csv')
assert len(df.columns) == 3 #original data had 4 columns, amongst that one will be dropped
print ('test-validation done successfully')

Refer: https://github.com/RealOrangeOne/docker-rclone-mount/blob/master/docker-compose.yml

If we have to use mount feature of rclone, it needs to have fuse support in underneath linux kernel. For that we are adding SYS_ADMIN in capability. But note without using mount feature also, we can do testing. in that case, rclone will use only copy feature.

In [None]:
%%writefile docker-compose.test.yml
services:
  sut:
    build: .
    command: /tmp/run_tests.sh
    cap_add:
      - SYS_ADMIN
    security_opt:
      - apparmor:unconfined
    devices:
      - "/dev/fuse:/dev/fuse"


*   https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#designing-a-pipeline-component
*   https://github.com/kubeflow/pipelines/blob/sdk/release-1.8/sdk/python/kfp/dsl/types.py
*   https://kubeflow-pipelines.readthedocs.io/en/stable/_modules/kfp/components/_structures.html



In [None]:
%%writefile component_both_input_output_as_artifact.yaml
name: TabularDataPreparationUsingPycaretWhereBothInputOutputAsArtifact
description: |
    Prepare tabular data (csv file) using pycaret library. (For pycaret's data pre-processing capabilities, refer https://pycaret.gitbook.io/docs/get-started/preprocessing)
    Refer data-preparations-options in command line arguments. 
    pycaret internally uses pandas dataframe to read and write csv file. You can utilize options exposed by panda's read_csv() and to_csv(). 
    Refer additional-options-csv-parsing and additional-options-csv-writing in command line arguments
    Input and output csv files are stored in input and output artifacts. Thus the csv files are read or written like locally mounted POSIX files.
metadata:
  annotations:
    author: Shaswata Jash <29448766+ShaswataJash@users.noreply.github.com>
    canonical_location: https://raw.githubusercontent.com/ShaswataJash/kfpcomponent/main/TabularDataPreparationUsingPycaret/component_both_input_output_as_artifact.yaml
inputs:
- name: log_level
  type: String
  description: 'choice amongst ERROR, INFO, DEBUG'
  optional: true
- name: additional_options_csv_parsing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.read_csv()'
  optional: true
- name: type_of_data_analysis_task
  type: String
  description: 'choice amongst classification, regression, clustering, anomaly_detection'
- name: target_variable_name 
  type: String
  description: 'for classification and regression, specify the column name holding target variable'
  optional: true
- name: target_emptyindicator
  type: String
  description: 'if target variable column holds null or na, those rows will be dropped. Sometime empty can be indicated by other representative string like - or *** etc'
  optional: true
- name: data_preparations_options
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pycaret setup() function'
  optional: true
- name: additional_options_csv_writing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.to_csv()'
  optional: true
- name: input_datasource_local_file_path_when_rclone_bypassed
  description: 'absolute local path of the input csv file when rclone is NOT used i.e. when input csv file is stored in input artifact of pipeline engine (e.g. argo)'

outputs:
- name: output_datasource_local_file_path_when_rclone_bypassed
  description: 'absolute local path of the output csv file when rclone is NOT used i.e. when output csv file is stored in output artifact of pipeline engine (e.g. argo)'

implementation:
  container:
    image: shasjash/kfpcomponents:TabularDataPreparationUsingPycaret_devlatest
    command:
    - python3 
    - /tmp/data_preparation.py
    args:
    - --bypass-rclone-for-input-data
    - --bypass-rclone-for-output-data
    - if:
        cond: {isPresent: log_level}
        then:
        - --log-level
        - {inputValue: log_level}
    - if:
        cond: {isPresent: additional_options_csv_parsing}
        then:
        - --additional-options-csv-parsing
        - {inputValue: additional_options_csv_parsing}
    - --type-of-data-analysis-task 
    - {inputValue: type_of_data_analysis_task}
    - if:
        cond: {isPresent: target_variable_name}
        then:
        - --target-variable-name
        - {inputValue: target_variable_name}
    - if:
        cond: {isPresent: target_emptyindicator}
        then:
        - --target-emptyindicator
        - {inputValue: target_emptyindicator}
    - if:
        cond: {isPresent: data_preparations_options}
        then:
        - --data-preparations-options
        - {inputValue: data_preparations_options}
    - if:
        cond: {isPresent: additional_options_csv_writing}
        then:
        - --additional-options-csv-writing
        - {inputValue: additional_options_csv_writing}
    - --input-datasource-local-file-path-when-rclone-bypassed
    - {inputPath: input_datasource_local_file_path_when_rclone_bypassed}
    - --output-datasource-local-file-path-when-rclone-bypassed
    - {outputPath: output_datasource_local_file_path_when_rclone_bypassed}

In [None]:
%%writefile component_input_using_rclone_output_as_artifact.yaml
name: TabularDataPreparationUsingPycaretWhereInputUsingRcloneOutputAsArtifact
description: |
    Prepare tabular data (csv file) using pycaret library. (For pycaret's data pre-processing capabilities, refer https://pycaret.gitbook.io/docs/get-started/preprocessing)
    Refer data-preparations-options in command line arguments. 
    pycaret internally uses pandas dataframe to read and write csv file. You can utilize options exposed by panda's read_csv() and to_csv(). 
    Refer additional-options-csv-parsing and additional-options-csv-writing in command line arguments
    Input csv files can be stored in rclone compatible storage. Both mount and copy mode are supported. (refer: https://rclone.org/)
    rclone configurations have to be shared through environment variables (refer: https://rclone.org/docs/#environment-variables). 
    Create rclone read configuration file name as 'REMOTEREAD' . Because the same is used within code.
    So convention for creating any environment variables related to rclone-read should start with 'RCLONE_CONFIG_REMOTEREAD'.
    Output csv files are stored in output artifacts. Thus the csv files are written like locally mounted POSIX files.
metadata:
  annotations:
    author: Shaswata Jash <29448766+ShaswataJash@users.noreply.github.com>
    canonical_location: https://raw.githubusercontent.com/ShaswataJash/kfpcomponent/main/TabularDataPreparationUsingPycaret/component_input_using_rclone_output_as_artifact.yaml
inputs:
- name: log_level
  type: String
  description: 'choice amongst ERROR, INFO, DEBUG'
  optional: true
- name: rclone_environment_var
  type: String
  description: 'json formatted key-value pairs of strings which will be set as environment variables before executing rclone commands'
- name: input_datasource_directory_mountable
  type: Boolean 
  description: 'whether input csv file is present in mountable remote location  when rclone is used'
  optional: true
- name: input_datasource_file_name
  type: String
  description: 'name of the csv file including file extension and the directory/bucket path holding the specific file(if any)  when rclone is used'
- name: additional_options_csv_parsing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.read_csv()'
  optional: true
- name: type_of_data_analysis_task
  type: String
  description: 'choice amongst classification, regression, clustering, anomaly_detection'
- name: target_variable_name 
  type: String
  description: 'for classification and regression, specify the column name holding target variable'
  optional: true
- name: target_emptyindicator
  type: String
  description: 'if target variable column holds null or na, those rows will be dropped. Sometime empty can be indicated by other representative string like - or *** etc'
  optional: true
- name: data_preparations_options
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pycaret setup() function'
  optional: true
- name: additional_options_csv_writing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.to_csv()'
  optional: true

outputs:
- name: output_datasource_local_file_path_when_rclone_bypassed
  description: 'absolute local path of the output csv file when rclone is NOT used i.e. when output csv file is stored in output artifact of pipeline engine (e.g. argo)'

implementation:
  container:
    image: shasjash/kfpcomponents:TabularDataPreparationUsingPycaret_devlatest
    command:
    - python3 
    - /tmp/data_preparation.py
    args:
    - --bypass-rclone-for-output-data
    - if:
        cond: {isPresent: log_level}
        then:
        - --log-level
        - {inputValue: log_level}
    - --rclone-environment-var
    - {inputValue: rclone_environment_var}
    - if:
        cond: {isPresent: input_datasource_directory_mountable}
        then:
        - --input-datasource-directory-mountable
        - {inputValue: input_datasource_directory_mountable}
    - --input-datasource-file-name
    - {inputValue: input_datasource_file_name}
    - if:
        cond: {isPresent: additional_options_csv_parsing}
        then:
        - --additional-options-csv-parsing
        - {inputValue: additional_options_csv_parsing}
    - --type-of-data-analysis-task 
    - {inputValue: type_of_data_analysis_task}
    - if:
        cond: {isPresent: target_variable_name}
        then:
        - --target-variable-name
        - {inputValue: target_variable_name}
    - if:
        cond: {isPresent: target_emptyindicator}
        then:
        - --target-emptyindicator
        - {inputValue: target_emptyindicator}
    - if:
        cond: {isPresent: data_preparations_options}
        then:
        - --data-preparations-options
        - {inputValue: data_preparations_options}
    - if:
        cond: {isPresent: additional_options_csv_writing}
        then:
        - --additional-options-csv-writing
        - {inputValue: additional_options_csv_writing}
    - --output-datasource-local-file-path-when-rclone-bypassed
    - {outputPath: output_datasource_local_file_path_when_rclone_bypassed}

In [None]:
%%writefile component_input_as_artifact_output_using_rclone.yaml
name: TabularDataPreparationUsingPycaretWhereInputAsArtifactOutputUsingRclone
description: |
    Prepare tabular data (csv file) using pycaret library. (For pycaret's data pre-processing capabilities, refer https://pycaret.gitbook.io/docs/get-started/preprocessing)
    Refer data-preparations-options in command line arguments. 
    pycaret internally uses pandas dataframe to read and write csv file. You can utilize options exposed by panda's read_csv() and to_csv(). 
    Refer additional-options-csv-parsing and additional-options-csv-writing in command line arguments
    Output csv files can be stored in rclone compatible storage. Both mount and copy mode are supported. (refer: https://rclone.org/)
    rclone configurations have to be shared through environment variables (refer: https://rclone.org/docs/#environment-variables). 
    Create rclone write configuration file name as 'REMOTEWRITE'. Because the same is used within code.
    So convention for creating any environment variables related to rclone should start either with 'RCLONE_CONFIG_REMOTEWRITE'.
    Intput csv files are stored in intput artifacts. Thus the csv files are read like locally mounted POSIX files.
metadata:
  annotations:
    author: Shaswata Jash <29448766+ShaswataJash@users.noreply.github.com>
    canonical_location: https://raw.githubusercontent.com/ShaswataJash/kfpcomponent/main/TabularDataPreparationUsingPycaret/component_input_as_artifact_output_using_rclone.yaml
inputs:
- name: log_level
  type: String
  description: 'choice amongst ERROR, INFO, DEBUG'
  optional: true
- name: rclone_environment_var
  type: String
  description: 'json formatted key-value pairs of strings which will be set as environment variables before executing rclone commands'
- name: additional_options_csv_parsing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.read_csv()'
  optional: true
- name: type_of_data_analysis_task
  type: String
  description: 'choice amongst classification, regression, clustering, anomaly_detection'
- name: target_variable_name 
  type: String
  description: 'for classification and regression, specify the column name holding target variable'
  optional: true
- name: target_emptyindicator
  type: String
  description: 'if target variable column holds null or na, those rows will be dropped. Sometime empty can be indicated by other representative string like - or *** etc'
  optional: true
- name: data_preparations_options
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pycaret setup() function'
  optional: true
- name: additional_options_csv_writing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.to_csv()'
  optional: true
- name: output_datasource_directory_mountable
  type: Boolean
  description: 'whether output csv file will be written in mountable remote location  when rclone is used'
  optional: true
- name: output_datasource_file_name
  type: String 
  description: 'filename of the prepared data including the directory/bucket path holding the specific file(if any) when rclone is used'
- name: input_datasource_local_file_path_when_rclone_bypassed
  description: 'absolute local path of the input csv file when rclone is NOT used i.e. when input csv file is stored in input artifact of pipeline engine (e.g. argo)'

implementation:
  container:
    image: shasjash/kfpcomponents:TabularDataPreparationUsingPycaret_devlatest
    command:
    - python3 
    - /tmp/data_preparation.py
    args:
    - --bypass-rclone-for-input-data
    - if:
        cond: {isPresent: log_level}
        then:
        - --log-level
        - {inputValue: log_level}
    - --rclone-environment-var
    - {inputValue: rclone_environment_var}
    - if:
        cond: {isPresent: additional_options_csv_parsing}
        then:
        - --additional-options-csv-parsing
        - {inputValue: additional_options_csv_parsing}
    - --type-of-data-analysis-task 
    - {inputValue: type_of_data_analysis_task}
    - if:
        cond: {isPresent: target_variable_name}
        then:
        - --target-variable-name
        - {inputValue: target_variable_name}
    - if:
        cond: {isPresent: target_emptyindicator}
        then:
        - --target-emptyindicator
        - {inputValue: target_emptyindicator}
    - if:
        cond: {isPresent: data_preparations_options}
        then:
        - --data-preparations-options
        - {inputValue: data_preparations_options}
    - if:
        cond: {isPresent: additional_options_csv_writing}
        then:
        - --additional-options-csv-writing
        - {inputValue: additional_options_csv_writing}
    - if:
        cond: {isPresent: output_datasource_directory_mountable}
        then:
        - --output-datasource-directory-mountable
        - {inputValue: output_datasource_directory_mountable}
    - if:
        cond: {isPresent: output_datasource_file_name}
        then:
        - --output-datasource-file-name
        - {inputValue: output_datasource_file_name}
    - --input-datasource-local-file-path-when-rclone-bypassed
    - {inputPath: input_datasource_local_file_path_when_rclone_bypassed}

In [None]:
%%writefile component_both_input_output_using_rclone.yaml
name: TabularDataPreparationUsingPycaretWhereBothInputOutputUsingRclone
description: |
    Prepare tabular data (csv file) using pycaret library. (For pycaret's data pre-processing capabilities, refer https://pycaret.gitbook.io/docs/get-started/preprocessing)
    Refer data-preparations-options in command line arguments. 
    pycaret internally uses pandas dataframe to read and write csv file. You can utilize options exposed by panda's read_csv() and to_csv(). 
    Refer additional-options-csv-parsing and additional-options-csv-writing in command line arguments
    Input and output csv files can be stored in rclone compatible storage. Both mount and copy mode are supported. (refer: https://rclone.org/)
    rclone configurations have to be shared through environment variables (refer: https://rclone.org/docs/#environment-variables). 
    Create rclone read and write configuration file name as 'REMOTEREAD' and 'REMOTEWRITE'. Because the same are used within code.
    So convention for creating any environment variables related to rclone should start either with 'RCLONE_CONFIG_REMOTEREAD' or 'RCLONE_CONFIG_REMOTEWRITE'.
metadata:
  annotations:
    author: Shaswata Jash <29448766+ShaswataJash@users.noreply.github.com>
    canonical_location: https://raw.githubusercontent.com/ShaswataJash/kfpcomponent/main/TabularDataPreparationUsingPycaret/component_both_input_output_using_rclone.yaml
inputs:
- name: log_level
  type: String
  description: 'choice amongst ERROR, INFO, DEBUG'
  optional: true
- name: rclone_environment_var
  type: String
  description: 'json formatted key-value pairs of strings which will be set as environment variables before executing rclone commands'
- name: input_datasource_directory_mountable
  type: Boolean 
  description: 'whether input csv file is present in mountable remote location  when rclone is used'
  optional: true
- name: input_datasource_file_name
  type: String
  description: 'name of the csv file including file extension and the directory/bucket path holding the specific file(if any)  when rclone is used'
- name: additional_options_csv_parsing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.read_csv()'
  optional: true
- name: type_of_data_analysis_task
  type: String
  description: 'choice amongst classification, regression, clustering, anomaly_detection'
- name: target_variable_name 
  type: String
  description: 'for classification and regression, specify the column name holding target variable'
  optional: true
- name: target_emptyindicator
  type: String
  description: 'if target variable column holds null or na, those rows will be dropped. Sometime empty can be indicated by other representative string like - or *** etc'
  optional: true
- name: data_preparations_options
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pycaret setup() function'
  optional: true
- name: additional_options_csv_writing
  type: String
  description: 'json formatted key-value pairs of strings which will be passed to pandas.to_csv()'
  optional: true
- name: output_datasource_directory_mountable
  type: Boolean
  description: 'whether output csv file will be written in mountable remote location  when rclone is used'
  optional: true
- name: output_datasource_file_name
  type: String 
  description: 'filename of the prepared data including the directory/bucket path holding the specific file(if any) when rclone is used'

implementation:
  container:
    image: shasjash/kfpcomponents:TabularDataPreparationUsingPycaret_devlatest
    command:
    - python3 
    - /tmp/data_preparation.py
    args:
    - if:
        cond: {isPresent: log_level}
        then:
        - --log-level
        - {inputValue: log_level}
    - --rclone-environment-var
    - {inputValue: rclone_environment_var}
    - if:
        cond: {isPresent: input_datasource_directory_mountable}
        then:
        - --input-datasource-directory-mountable
    - --input-datasource-file-name
    - {inputValue: input_datasource_file_name}
    - if:
        cond: {isPresent: additional_options_csv_parsing}
        then:
        - --additional-options-csv-parsing
        - {inputValue: additional_options_csv_parsing}
    - --type-of-data-analysis-task 
    - {inputValue: type_of_data_analysis_task}
    - if:
        cond: {isPresent: target_variable_name}
        then:
        - --target-variable-name
        - {inputValue: target_variable_name}
    - if:
        cond: {isPresent: target_emptyindicator}
        then:
        - --target-emptyindicator
        - {inputValue: target_emptyindicator}
    - if:
        cond: {isPresent: data_preparations_options}
        then:
        - --data-preparations-options
        - {inputValue: data_preparations_options}
    - if:
        cond: {isPresent: additional_options_csv_writing}
        then:
        - --additional-options-csv-writing
        - {inputValue: additional_options_csv_writing}
    - if:
        cond: {isPresent: output_datasource_directory_mountable}
        then:
        - --output-datasource-directory-mountable
    - --output-datasource-file-name
    - {inputValue: output_datasource_file_name}

#Software testing

Now let us simulate testing what will be done by docker-hub infrastructure as part of auto-testing by using docker-compose.test.yml present in github source repository.

In [None]:
!rm -rf /tmp/my_local_dir_for_test
!chmod 544 run_tests.sh
!cp data_preparation.py /tmp
!cp test_validation.py /tmp
!./run_tests.sh

Following test uses Google Cloud Storage (GCS). To share the credential of gcs, we will create service account which can access the GCS on end-user's behalf. For that refer 
https://cloud.google.com/iam/docs/service-accounts (Go to 'User-managed service accounts' section)
create a new user-managed service account for accessing gcs bucket. Under 'Grant this service account access to project' option, find roles filter by google product -> select -> roles for 'cloud storage' -> provide roles corresponding to object creation in bucket. Once service account is created, download the key file and preserve it secretly in your personal capacity. Refer: https://cloud.google.com/iam/docs/creating-managing-service-account-keys for how to generate the service-account json file. Rename that file as 'sa_gcs_service_account.json' and then upload to google colab using 'upload to session storage' icon.

It is needless to say that before running these tests, two buckets were created - namely -(a)kfpcomponent and (b)shastest. If you want to run the below test for other bucket name, please change the test command accordingly. These test cannot be made part of docker-UT as it will require GCS service account related confidential file to be exposed in docker.

In [None]:
import json
with open('/content/sa_gcs_service_account.json','r') as file:
    gcs_sa_content = json.loads(file.read())

rclone_env_str = '{"RCLONE_CONFIG_REMOTEREAD_TYPE":"gcs",' + \
                 '"RCLONE_CONFIG_REMOTEREAD_SERVICE_ACCOUNT_CREDENTIALS":{},'.format(json.dumps(gcs_sa_content)) + \
                 '"RCLONE_CONFIG_REMOTEREAD_LOCATION":"us-east1",' + \
                 '"RCLONE_CONFIG_REMOTEWRITE_TYPE":"gcs",' + \
                 '"RCLONE_CONFIG_REMOTEWRITE_SERVICE_ACCOUNT_CREDENTIALS":{},'.format(json.dumps(gcs_sa_content)) + \
                 '"RCLONE_CONFIG_REMOTEWRITE_LOCATION":"us-east1",' + \
                 '"RCLONE_CONFIG_REMOTEWRITE_BUCKET_POLICY_ONLY":"true"}'

print(rclone_env_str)
#to remain compatible with bash command line argument (refer below how $rclone_env_str is passed in %%bash)
#Read: handling of double quotes in bash - https://stackoverflow.com/questions/19579546/can-i-access-python-variables-within-a-bash-or-script-ipython-notebook-c
rclone_env_str = rclone_env_str.replace('"', '\\"') 

print(rclone_env_str)

In [None]:
%%bash -s "$rclone_env_str"
rm -rf /tmp/my_local_dir_for_test
cp data_preparation.py /tmp
mkdir /tmp/my_local_dir_for_test
#note the difference in command line handling with double quoted and single quoted string. if single quoted, the inside string is passed into the program as it is.
#if double quoted, the inside string is evaluated - as part of that all double quotes are further removed unless they are escaped.
python /tmp/data_preparation.py --rclone-environment-var "$1" \
    --input-datasource-file-name 'kfpcomponent/CTG.csv' --additional-options-csv-parsing '{"sep":"," , "header":0}' \
    --type-of-data-analysis-task 'classification' --target-variable-name 'NSP' \
    --data-preparations-options '{"ignore_low_variance":true, "remove_outliers":true, "remove_multicollinearity":true, "multicollinearity_threshold":0.7}' \
    --output-datasource-file-name 'shastest/CTG_data-prep.csv' \
    --additional-options-csv-writing '{"index":false}' --log-level 'DEBUG'

In [None]:
#validation test to check file was correctly uploaded in GCS storage.
from google.cloud import storage
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/sa_gcs_service_account.json"

client = storage.Client()
with open('/content/CTG_data-prep_downloaded_for_test.csv', mode='wb') as file_obj:
    client.download_blob_to_file('gs://shastest/CTG_data-prep.csv', file_obj)

import pandas
df = pandas.read_csv(filepath_or_buffer = '/content/CTG_data-prep_downloaded_for_test.csv')
assert len(df.index) == 2126 #original data had 2129 rows, amongst that 3 rows have no target
assert df.isnull().sum().sum() == 0 #pycaret will remove all missing values

In [None]:
! pip3 install kfp==1.8.12

First validate the component.yaml file in http://www.yamllint.com/. Once component.yaml file is corrected, execute the below cell to finally check

In [None]:
import kfp

csv_data_prepare_op_out_to_artifact = kfp.components.load_component_from_file('component_input_using_rclone_output_as_artifact.yaml')
csv_data_prepare_op_both_in_out_artifact = kfp.components.load_component_from_file('component_both_input_output_as_artifact.yaml')

@kfp.dsl.pipeline(name="testpipeline1")
def my_sample_pipeline_first_step_input_using_rclone_rest_from_artifacts():
    csv_prepared_file_step1 = csv_data_prepare_op_out_to_artifact(rclone_environment_var='{"RCLONE_CONFIG_REMOTEREAD_TYPE":"http", "RCLONE_CONFIG_REMOTEREAD_URL":"https://raw.githubusercontent.com/pycaret/datasets/main/data/common/"}',
                                            input_datasource_file_name='CTG.csv',
                                            type_of_data_analysis_task='classification',
                                            target_variable_name='NSP',
                                            data_preparations_options='{"ignore_low_variance":true, "remove_outliers":true}', 
                                            additional_options_csv_writing='{"index":false}'
                                            ).outputs['output_datasource_local_file_path_when_rclone_bypassed']

    csv_data_prepare_op_both_in_out_artifact(input_datasource_local_file_path_when_rclone_bypassed = csv_prepared_file_step1,
                                            type_of_data_analysis_task='classification',
                                            target_variable_name='NSP',
                                            data_preparations_options='{"remove_multicollinearity":true, "multicollinearity_threshold":0.7}', 
                                            additional_options_csv_writing='{"index":false}'
                                            )


kfp.compiler.Compiler().compile(
    pipeline_func=my_sample_pipeline_first_step_input_using_rclone_rest_from_artifacts,
    package_path='my_sample_pipeline_first_step_input_using_rclone_rest_from_artifacts.yaml')

kfp.v2.compiler.Compiler().compile(
    pipeline_func=my_sample_pipeline_first_step_input_using_rclone_rest_from_artifacts,
    package_path='my_sample_pipeline_first_step_input_using_rclone_rest_from_artifacts_v2.json')


In [None]:
import kfp
import json
csv_data_prepare_op_both_in_out_using_rclone = kfp.components.load_component_from_file('component_both_input_output_using_rclone.yaml')

@kfp.dsl.pipeline(name="testpipeline2")
def my_sample_pipeline_all_steps_using_rclone(rclone_env_val:str):
    
    step1_op = csv_data_prepare_op_both_in_out_using_rclone(rclone_environment_var=rclone_env_val,
                                            #input_datasource_directory_mountable=True,
                                            input_datasource_file_name='kfpcomponent/CTG.csv',
                                            type_of_data_analysis_task='classification',
                                            target_variable_name='NSP',
                                            data_preparations_options='{"ignore_low_variance":true, "remove_outliers":true}', 
                                            additional_options_csv_writing='{"index":false}',
                                            #output_datasource_directory_mountable=True,
                                            output_datasource_file_name='shastest/CTG_data-prep1.csv',
                                            log_level='DEBUG'
                                            ).set_cpu_limit('1')  

    csv_data_prepare_op_both_in_out_using_rclone(rclone_environment_var=rclone_env_val,
                                            #input_datasource_directory_mountable=True,
                                            input_datasource_file_name='shastest/CTG_data-prep1.csv',
                                            type_of_data_analysis_task='classification',
                                            target_variable_name='NSP',
                                            data_preparations_options='{"remove_multicollinearity":true, "multicollinearity_threshold":0.7}', 
                                            additional_options_csv_writing='{"index":false}',
                                            #output_datasource_directory_mountable=True,
                                            output_datasource_file_name='shastest/CTG_data-prep2.csv',
                                            log_level='DEBUG'
                                            ).set_cpu_limit('1').after(step1_op)


kfp.compiler.Compiler().compile(
    pipeline_func=my_sample_pipeline_all_steps_using_rclone,
    package_path='my_sample_pipeline_all_steps_using_rclone.yaml')

kfp.v2.compiler.Compiler().compile(
    pipeline_func=my_sample_pipeline_all_steps_using_rclone,
    package_path='my_sample_pipeline_all_steps_using_rclone_v2.json')

#Push the code to github

Before commiting code to github, install github client (gh) by following instruction mentioned in https://github.com/cli/cli/blob/trunk/docs/install_linux.md (Choose Debian, Ubuntu Linux way of installation) 

Use the colab's 'Terminal' icon present in left vertical pane to open linux terminal to type commands. Once 'gh' is installed, type **$gh auth login** (refer https://docs.github.com/en/get-started/getting-started-with-git/caching-your-github-credentials-in-git) to follow onscreen prompts. For colab, use **Paste an authentication token** option. Personal tokens can be generated in https://github.com/settings/tokens

You can use Shift+Ctrl+v shortcut to paste any string in colab console

In [None]:
!pwd

In [None]:
!rm -Rf kfpcomponent

In [None]:
!git clone https://github.com/ShaswataJash/kfpcomponent.git

Follow directory structure according to https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#organizing-the-component-files

In [None]:
!mkdir kfpcomponent/TabularDataPreparationUsingPycaret
!mkdir kfpcomponent/TabularDataPreparationUsingPycaret/src
!mkdir kfpcomponent/TabularDataPreparationUsingPycaret/tests

In [None]:
#it will ensure file is coped in git repo only if file content is changed by checking checksum of file content
!rsync -c data_preparation.py kfpcomponent/TabularDataPreparationUsingPycaret/src
!rsync -c component_both_input_output_as_artifact.yaml kfpcomponent/TabularDataPreparationUsingPycaret/component_both_input_output_as_artifact.yaml
!rsync -c component_both_input_output_using_rclone.yaml kfpcomponent/TabularDataPreparationUsingPycaret/component_both_input_output_using_rclone.yaml
!rsync -c component_input_as_artifact_output_using_rclone.yaml kfpcomponent/TabularDataPreparationUsingPycaret/component_input_as_artifact_output_using_rclone.yaml
!rsync -c component_input_using_rclone_output_as_artifact.yaml kfpcomponent/TabularDataPreparationUsingPycaret/component_input_using_rclone_output_as_artifact.yaml
!rsync -c test_validation.py kfpcomponent/TabularDataPreparationUsingPycaret/tests
!rsync -c Dockerfile kfpcomponent/TabularDataPreparationUsingPycaret/
!rsync -c run_tests.sh kfpcomponent/TabularDataPreparationUsingPycaret/
!rsync -c docker-compose.test.yml kfpcomponent/TabularDataPreparationUsingPycaret/

In [None]:
%cd kfpcomponent

In [None]:
!git add -A

In [None]:
!git status

For git-user who has set their email visibility as private, git provides alternate email address to use in web-based Git operations, e.g., edits and merges. The alias email can be viewed in https://github.com/settings/emails

In [None]:
!git config --global user.email "29448766+ShaswataJash@users.noreply.github.com"

In [None]:
!git commit -a -m "introduced autoremove in docker"

In [None]:
!git push origin main

In [None]:
%cd ..