# This is KubeFlow pipeline Auto Generator

Below is the implementation of a pipeline autogenerate based on a config.yaml file. 

In [11]:
 ########### TODO s :
 # TODO: fix-bug : ERROR: Could not find a version that satisfies the requirement gzip (from versions: none)ERROR: No matching distribution found for gzip
 #TODO : Persistent Volume for pipelines
#TODO : creating stages from a docker image
#TODO : creating pipeline chains
#TODO : creating conditional pipelines and pipeline chains
#TODO : kepler and Prometheus queries
#TODO : filtering stage only cover number-base threshold right now


###########TODO : outputs artifacts !!!!!!!!!!!!!!!!
######### 1. for experiment (main track paper)
#########2. as a tool for general use (poster paper)

In [12]:
#https://www.kubeflow.org/docs/components/pipelines/legacy-v1/installation/localcluster-deployment/

In [13]:
import gzip
import kfp
import pandas as pd
from kfp import dsl
from kfp import compiler
import hashlib
from typing import List, Dict, Optional, Any
import yaml
from psutil import users


# @dsl.component(base_image='python:3.12.2')
# def say_hello(input_path:str,output_file:str):
#     data =pd.read_csv(input_path)
#     if data is not None:
#             json_data = data.to_json(orient='records')  # Convert DataFrame to JSON
#             with gzip.open(output_file, 'wt', encoding='utf-8') as f:
#                 f.write(json_data)
#             return output_file 

In [14]:
# stages' function definitions

# filtering
def compare_rows(data: pd.DataFrame, column_name: str, threshold: int, operation: str) -> pd.DataFrame:
    if operation == 'greater_than':
        return data[data[column_name] > threshold]
    elif operation == 'less_than':
        return data[data[column_name] < threshold]
    elif operation == 'equal_to':
        return data[data[column_name] == threshold]
    else:
        raise ValueError("Unsupported operation")
    
@dsl.component(
    base_image="python:3.12.2",
    packages_to_install=['pandas']
)
def filtering(data_path:str, operation: str, column_name: str, threshold: int):
    
    print("inside filtering")
    try:

        data =pd.read_csv(data_path)
        
        if column_name not in data.columns:
            raise ValueError(f"Column '{column_name}' does not exist in the DataFrame.")  
        
        filtered_data=compare_rows(data, column_name, threshold, operation)
        if filtered_data.empty:
            print("filtered dat was Empty !!!")
        filtered_data.to_csv(data_path, index=False)
        print(f"Filtered data has been written to {data_path}")
    except FileNotFoundError:
        print(f"Error: File '{data_path}' not found.")
    except Exception as e:
        print(f"An error occurred: {e}")
        
        
@dsl.component(
    base_image="python:3.12.2",
    packages_to_install=['pandas']
)
def anonymize_columns(data_path: str, columns_to_anonymize: list):
    # Convert the Input dataset to a DataFrame
    data_df = pd.read_csv(data_path)

    for column in columns_to_anonymize:
        if column in data_df.columns:
            # Anonymize the column using SHA-256 hashing
            data_df[column] = data_df[column].apply(lambda x: hashlib.sha256(str(x).encode()).hexdigest())
        else:
            print(f"Warning: Column '{column}' not found in the data.")

        data_df.to_csv(data_path, index=False)
        
# Aggregation
@dsl.component(
    base_image="python:3.12.2",
    packages_to_install=['pandas','typing']
)
def aggregate_columns(
    data_path: str, 
    groupby_column: str, 
    columns_to_aggregate: list,  # list[str]
    aggregation_functions: dict  # dict[str, list[str]]
):
    # Convert Dataset to DataFrame
    data_df = pd.read_csv(data_path)
    
    try:
        # Perform the aggregation
        aggregated_data = data_df.groupby(groupby_column).agg(aggregation_functions)
        
        # Select only the required columns to aggregate
        aggregated_data = aggregated_data[columns_to_aggregate]
        
        # Save the result to the output path
        aggregated_data.to_csv(data_path, index=False)
    except KeyError as e:
        raise f"Error: Column {e} not found in the data."
    except Exception as e:
        raise f"An error occurred: {e}"


# Function to compress JSON to GZIP
@dsl.component(
    base_image="python:3.12.2",
    packages_to_install=['pandas', 'gzip']
)
def compress_json_to_gzip(
    data_path: str
):
    # Convert Dataset to DataFrame
    data_df = pd.read_csv(data_path)
    
    try:
        # Convert DataFrame to JSON
        json_data = data_df.to_json(orient='records')
        
        # Compress and write to GZIP file
        with gzip.open(data_path, 'wt', encoding='utf-8') as f:
            f.write(json_data)
    except Exception as e:
        print(f"An error occurred: {e}")
        # Optionally save an empty file in case of failure
        with open(data_path, 'w') as f:
            f.write('')

# Function to compress CSV to GZIP
@dsl.component(
    base_image="python:3.12.2",
    packages_to_install=['pandas', 'gzip']
)
def compress_csv_to_gzip(
    data_path: str
):
    # Convert Dataset to DataFrame
    data_df = pd.read_csv(data_path)
    
    try:
        # Compress DataFrame to CSV GZIP
        with gzip.open(data_path, 'wt') as f:
            data_df.to_csv(data_path, f)
    except Exception as e:
        raise f"An error occurred: {e}"

In [15]:
# parse the yaml file to create the kubeflow pipeline:

# Deployment of the kubeflow
class Deployment:
    def __init__(self, namespace: str, prometheusURL: str ):
       self.namespace = namespace
       self.prometheusURL = prometheusURL

#
class Stage:
    def __init__(self, name: str, type: str ,parameter: Dict[str, Any] ):
        self.name = name
        self.type = type
        self.parameter = parameter

#type : csv , pipeline
#metadat = filepath or next name of pipeline to call.          
class Datasource:
    def __init__(self, type: str, metadata: str):
        self.type = type
        self.metadata = metadata

# pipeline name must be unique in the whole config file

class Pipeline:
    def __init__(self, name:str, flow:List[str], datasource:Datasource , consumers:List[str]):
        self.flow = flow
        self.datasource = datasource
        self.name = name
        self.consumers = consumers
        
class PipelineConfig:
    def __init__(self,pipelines:List[Pipeline],stages:List[Stage],deployment: Deployment):
        self.pipelines = pipelines
        self.stages = stages
        self.deployment = deployment
        

In [16]:
# Function to read YAML and convert to PipelineConfig
def read_yaml_to_pipeline_config(file_path: str) -> PipelineConfig:
    with open(file_path, 'r') as file:
        data = yaml.safe_load(file)

        # Create Stage objects from the YAML stages
        stages = [Stage(name=stage['name'], type=stage['type'], parameter=stage['parameter']) for stage in data['stages']]

        # Create Pipeline objects from the YAML pipelines
        pipelines = []
        for pipeline in data['pipelines']:
            datasource = Datasource(type=pipeline['datasource']['type'], metadata=pipeline['datasource']['metadata'])
            pipelines.append(Pipeline(name=pipeline['name'], flow=pipeline['flow'], datasource=datasource,consumers=pipeline['consumers']))

        deployment_data = data.get('Deployment', {})
        deployment = Deployment(namespace=deployment_data.get('namespace', ''),
                                prometheusURL=deployment_data.get('prometheusURL', ''))
        # Create PipelineConfig object
        pipeline_config = PipelineConfig(pipelines=pipelines, stages=stages, deployment=deployment)

        return pipeline_config

In [17]:
def create_component_for_stage(stage: 'Stage'):
    if stage.type == 'filtering':
        return filtering
    elif stage.type == 'anonymization':
        return anonymize_columns
    elif stage.type == 'aggregation':
        return aggregate_columns
    elif stage.type == 'compress_json_to_gzip':
        return compress_json_to_gzip
    elif stage.type == 'compress_csv_to_gzip':
        return compress_csv_to_gzip
    else:
        print(f"Error: Unknown stage type '{stage.type}'")
        return None

In [18]:
# creating the pipelines based on the pipelines config file

def dynamic_pipeline(data_path: str, pipeline_config: PipelineConfig, pipeline: Pipeline):
    # Process each pipeline stage dynamically
    for stage_name in pipeline.flow:
        # Find the stage by name
        stage = next((s for s in pipeline_config.stages if s.name == stage_name), None)

        if stage is None:
            print(f"Error: Stage {stage_name} not found in the configuration.")
            continue

        # Create component for the current stage
        component_op = create_component_for_stage(stage)

        # Dynamically handle each stage
        if stage.type == 'filtering':
           
                # Ensure to pass the output as an Output type
                component_op(
                    data_path=data_path, 
                    operation=stage.parameter['operation'], 
                    column_name=stage.parameter['column_name'], 
                    threshold=stage.parameter['threshold']
                )
        

        elif stage.type == 'anonymization':
                 component_op(
                    data_path=data_path, 
                    columns_to_anonymize=stage.parameter['columns_to_anonymize']
        )

        elif stage.type == 'aggregation':
               component_op(
                    data_path=data_path, 
                    groupby_column=stage.parameter['groupby_column'], 
                    columns_to_aggregate=stage.parameter['columns_to_aggregate'], 
                    aggregation_functions=stage.parameter['aggregation_functions']
        )

        elif stage.type == 'compress_json_to_gzip':
               component_op(
                 data_path =data_path
        )

        elif stage.type == 'compress_csv_to_gzip':
                component_op(
                    data_path=data_path
        )
        else:
            raise Exception(f"Component for stage '{stage_name}' could not be created.")

    return data_path




In [19]:
#TODO: (first) pipeline datasource.type must be csv and it's path.

def create_pipeline_for_each_config(client: kfp.Client):
    pipeline_config_file = './data/pipeline_config_sample.yaml'  # Make sure this file exists in your working directory
    pipeline_config = read_yaml_to_pipeline_config(pipeline_config_file)
    # Iterate over every pipeline in the pipeline config
    for pipeline in pipeline_config.pipelines:
        # Generate a unique name for each pipeline
        pipeline_name = f"pipeline_{pipeline.name}"

        # Define a Kubeflow pipeline dynamically for each pipeline
        @dsl.pipeline(
            name=pipeline_name,
            description=f"Pipeline generated from config: {pipeline.name}"
        )
        def kubeflow_pipeline():
            # Call dynamic_pipeline to handle the stages of this specific pipeline
            dynamic_pipeline(pipeline.datasource.metadata, pipeline_config, pipeline)

        # Compile the pipeline
        pipeline_file_name = pipeline_name + '.yaml'
        kfp.compiler.Compiler().compile(pipeline_func=kubeflow_pipeline, package_path=pipeline_file_name)
        print(f"Pipeline '{pipeline_name}' compiled successfully.")
        
        # Upload the compiled pipeline to the specified namespace
        client.upload_pipeline(pipeline_package_path=pipeline_file_name, pipeline_name=pipeline_name)
        print(f"Pipeline '{pipeline_name}' uploaded successfully.")

    return "All pipelines compiled successfully!"

In [20]:
def main():
    
    client = kfp.Client() #namespace=pipeline_config.deployment.namespace
    # Create and compile pipelines based on the configuration
    result = create_pipeline_for_each_config(client)
    print(result)  # Optionally print the result message

# Call the main function
if __name__ == "__main__":
    main()

Pipeline 'pipeline_employee_data_pipeline' compiled successfully.


ApiException: (409)
Reason: Conflict
HTTP response headers: HTTPHeaderDict({'Audit-Id': '7ec23a7f-5db2-4dc9-bb19-fa235a8976eb', 'Cache-Control': 'no-cache, private', 'Content-Length': '579', 'Content-Type': 'application/json', 'Date': 'Mon, 07 Oct 2024 10:58:39 GMT'})
HTTP response body: {"error_message":"Failed to create a pipeline and a pipeline version. The pipeline already exists.: Failed to create a pipeline and a pipeline version: Already exist error: Failed to create a new pipeline. The name pipeline_employee_data_pipeline already exists. Please specify a new name","error_details":"Failed to create a pipeline and a pipeline version. The pipeline already exists.: Failed to create a pipeline and a pipeline version: Already exist error: Failed to create a new pipeline. The name pipeline_employee_data_pipeline already exists. Please specify a new name"}


In [None]:

# @dsl.pipeline(name= 'test', description="Pipeline generated from config")
# def hello_pipeline():
#     print("inside hello_pipeline")
#     filtering(data_path='data/sample.csv', operation='greater_than', column_name='age', threshold='30')
#     anonymize_columns(data_path='data/result.csv' , columns_to_anonymize=['name','email'])
#     aggregate_columns(data_path='data/result.csv',groupby_column='age',columns_to_aggregate=['salary'],aggregation_functions={'salary':'sum','age':'max'})
#     compress_json_to_gzip(data_path='data/result.csv')
#     compress_csv_to_gzip(data_path='data/result.csv')
#     print('after filtering')
# def main():
#     
#     compiler.Compiler().compile(pipeline_func=hello_pipeline, package_path='pipeline.yaml')
#     
# if __name__ == "__main__":
#     main()