In [1]:
# Library Imports
import kfp
from kfp import dsl
import logging
import os

# Fetch the AWS keys from environment variables
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

# Check if the environment variables are set
if aws_access_key_id and aws_secret_access_key:
    logging.info("AWS Access Key and Secret Key have been retrieved successfully.")
    logging.info("AWS Access Key ID: %s", aws_access_key_id)
    logging.info("AWS Secret Access Key: %s",
        aws_secret_access_key[:4] + "*" * 16 + aws_secret_access_key[-4:])
else:
    raise EnvironmentError("AWS Access Key or Secret Key not set properly.")

@dsl.component(packages_to_install=["numpy", "pandas", "scikit-learn", "boto3"])
def build_model(aws_access_key_id: str, aws_secret_access_key: str):
    # Import Libraries
    import boto3
    # from botocore.exceptions import ClientError

    import pandas as pd
    import pickle

    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import LinearRegression
    # from sklearn.metrics iawsmport mean_squared_error

    bucket = "iriscloudbt-mlapp"
    data_s3_path = "data/rental_1000.csv"
    model_s3_path = "model/rental_prediction_model.pkl"
    local_data_path = "rental_1000.csv"
    local_model_path = "rental_prediction_model.pkl"

    s3_client = boto3.client(
        "s3",
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
    )

    # Download data from S3
    s3_client.download_file(bucket, data_s3_path, local_data_path)

    # Load the dataset
    rentalDF = pd.read_csv(local_data_path)

    # Prepare the features and labels
    X = rentalDF[["rooms", "sqft"]].values
    y = rentalDF["price"].values
    
    # Split the dataset into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20)
    
    # Train the model
    lr = LinearRegression()
    model = lr.fit(X_train, y_train)

    # Save the model using pickle
    with open(local_model_path, 'wb') as f:
        pickle.dump(model, f)

    # Upload the model to S3
    s3_client.upload_file(local_model_path, bucket, model_s3_path)


@dsl.pipeline
def rental_prediction_pipeline():
    build_model(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

  return component_factory.create_component_from_func(


In [2]:
from kfp import compiler

compiler.Compiler().compile(rental_prediction_pipeline, 'rental_prediction_pipeline.yaml')

In [None]:
# Library Imports
import kfp
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Set your Kubeflow Pipelines endpoint here
kfp_endpoint = None
client = kfp.Client(host=kfp_endpoint)

# Experiment name
experiment_name = "My Experiment"

# Create a new experiment
def create_experiment(client, experiment_name):
  experiment = client.create_experiment(name=experiment_name)
  logging.info(f"Created experiment: {experiment.name}")
  return experiment


# List all experiments
def list_experiments(client):
  experiments = client.list_experiments()
  logging.info(f"Experiments: {experiments}")
  return experiments

# Create a Run from a pipeline function
def create_run_from_pipeline_func(client, pipeline_func, experiment_name, enable_caching=False):
  run = client.create_run_from_pipeline_func(
     pipeline_func,
     experiment_name=experiment_name,
     enable_caching=enable_caching
  )
  logging.info("Pipeline run initiated")
  return run

# List all runs for a given experiment
def list_runs(client, experiment_id):
  runs = client.list_runs(experiment_id=experiment_id)
  logging.info(f"Runs: {runs}")
  return runs

# Delete a specific run by run_id
def delete_run(client, run_id):
  client.delete_run(run_id)
  logging.info(f"Deleted run: {run_id}")

# List all runs for a given experiment and delete the first run
def delete_previous_run(client, experiment_id):
  runs = list_runs(client, experiment_id)
  if runs and runs.runs:
     run_id = runs.runs[0].run_id
     logging.info(f"Deleting run: {run_id}")
     delete_run(client, run_id)

# Delete a specific experiment by experiment_id
def delete_experiment(client, experiment_id):
  client.delete_experiment(experiment_id)
  logging.info(f"Deleted experiment: {experiment_id}")