In [None]:
# install Kubeflow
!pip install kfp --upgrade -q

In [None]:
# install UCI ML Repo
!pip install ucimlrepo -q

In [None]:
# Import necessary libraries
import kfp
from kfp import dsl
from kfp.dsl import component

In [None]:
# Define a function to load and preprocess data
@component(base_image='python:3.9', packages_to_install=['pandas', 'numpy'])
def load_and_preprocess_data() -> str:
    import numpy as np
    import pandas as pd
    url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data'
    data = pd.read_csv(url, header=None)

    columns = ["word_freq_make", "word_freq_address", "word_freq_all", "word_freq_3d", "word_freq_our",
               "word_freq_over", "word_freq_remove", "word_freq_internet", "word_freq_order", "word_freq_mail",
               "word_freq_receive", "word_freq_will", "word_freq_people", "word_freq_report", "word_freq_addresses",
               "word_freq_free", "word_freq_business", "word_freq_email", "word_freq_you", "word_freq_credit",
               "word_freq_your", "word_freq_font", "word_freq_000", "word_freq_money", "word_freq_hp",
               "word_freq_hpl", "word_freq_george", "word_freq_650", "word_freq_lab", "word_freq_labs",
               "word_freq_telnet", "word_freq_857", "word_freq_data", "word_freq_415", "word_freq_85",
               "word_freq_technology", "word_freq_1999", "word_freq_parts", "word_freq_pm", "word_freq_direct",
               "word_freq_cs", "word_freq_meeting", "word_freq_original", "word_freq_project", "word_freq_re",
               "word_freq_edu", "word_freq_table", "word_freq_conference", "char_freq_;", "char_freq_(",
               "char_freq_[", "char_freq_!", "char_freq_$", "char_freq_#", "capital_run_length_average",
               "capital_run_length_longest", "capital_run_length_total", "spam"]
    data.columns = columns

    # Save preprocessed data to CSV
    data.to_csv('preprocessed_data.csv', index=False)
    return 'preprocessed_data.csv'

In [None]:
# Define a function for exploratory data analysis
@component(base_image='python:3.9', packages_to_install=['pandas', 'numpy'])
def eda(preprocessed_data_path: str):
    import pandas as pd
    import numpy as np
    data = pd.read_csv(preprocessed_data_path)

    # Print data information and description
    print(data.info())
    print(data.describe())

    # Save the info and describe to a text file
    with open('eda_output.txt', 'w') as f:
        f.write(str(data.info()) + '\n' + str(data.describe()))

In [None]:
# Define a function for feature engineering
@component(base_image='python:3.9', packages_to_install=['pandas', 'numpy', 'scikit-learn'])
def feature_engineering(preprocessed_data_path: str) -> str:
    import pandas as pd
    import numpy as np
    from sklearn.impute import SimpleImputer
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    data = pd.read_csv(preprocessed_data_path)

    # Handle missing values
    imputer_num = SimpleImputer(strategy='median')
    imputer_cat = SimpleImputer(strategy='most_frequent')

    numeric_features = data.select_dtypes(include=[np.number]).columns
    categorical_features = data.select_dtypes(exclude=[np.number]).columns

    data[numeric_features] = imputer_num.fit_transform(data[numeric_features])
    data[categorical_features] = imputer_cat.fit_transform(data[categorical_features])

    # Splitting the data
    X = data.drop('spam', axis=1)
    y = data['spam']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Standardizing the data
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Save engineered features
    np.savez('engineered_features.npz', X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test)
    return 'engineered_features.npz'

In [None]:
# Define a function for model training
@component(base_image='python:3.9', packages_to_install=['numpy', 'tensorflow', 'keras'])
def model_training(engineered_features_path: str) -> str:
    import numpy as np
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    data = np.load(engineered_features_path)
    X_train = data['X_train']
    X_test = data['X_test']
    y_train = data['y_train']
    y_test = data['y_test']

    model = Sequential()
    model.add(Dense(64, input_dim=X_train.shape[1], activation='relu'))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

    history = model.fit(X_train, y_train, epochs=50, batch_size=10, validation_split=0.2)

    # Save the trained model
    model.save('spam_classification_model.h5')

    # Save the training history
    np.savez('training_history.npz', history=history.history)
    return 'spam_classification_model.h5'


In [None]:
# Define a function for evaluating the model
@component(base_image='python:3.9', packages_to_install=['numpy', 'tensorflow', 'keras'])
def model_evaluation(trained_model_path: str, engineered_features_path: str):
    import numpy as np
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    import matplotlib.pyplot as plt

    data = np.load(engineered_features_path)
    X_test = data['X_test']
    y_test = data['y_test']

    model = keras.models.load_model(trained_model_path)
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f'Test Loss: {loss}')
    print(f'Test Accuracy: {accuracy}')

    history_data = np.load('/mnt/data/training_history.npz', allow_pickle=True)
    history = history_data['history'].item()

    plt.plot(history['loss'], label='train_loss')
    plt.plot(history['val_loss'], label='val_loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig('/mnt/data/loss_plot.png')

In [None]:
# Define the pipeline
@dsl.pipeline(
    name='Spam Classification Pipeline',
    description='A pipeline to train and classify emails as spam or not spam'
)
def spam_classification_pipeline():
    preprocessed_data_task = load_and_preprocess_data()
    eda(preprocessed_data_path=preprocessed_data_task.output)
    engineered_features_task = feature_engineering(preprocessed_data_path=preprocessed_data_task.output)
    trained_model_task = model_training(engineered_features_path=engineered_features_task.output)
    model_evaluation(trained_model_path=trained_model_task.output, engineered_features_path=engineered_features_task.output)

In [None]:
# Compile the pipeline
kfp.compiler.Compiler().compile(spam_classification_pipeline, 'spam_classification_pipeline.yaml')


In [None]:
# Check contents of the YAML file
!cat spam_classification_pipeline.yaml

# PIPELINE DEFINITION
# Name: spam-classification-pipeline
# Description: A pipeline to train and classify emails as spam or not spam
components:
  comp-eda:
    executorLabel: exec-eda
    inputDefinitions:
      parameters:
        preprocessed_data_path:
          parameterType: STRING
  comp-feature-engineering:
    executorLabel: exec-feature-engineering
    inputDefinitions:
      parameters:
        preprocessed_data_path:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-load-and-preprocess-data:
    executorLabel: exec-load-and-preprocess-data
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-model-evaluation:
    executorLabel: exec-model-evaluation
    inputDefinitions:
      parameters:
        engineered_features_path:
          parameterType: STRING
        trained_model_path:
          parameterType: STRING
  comp-model-training:
    executorLabel: 

In [None]:
# Creating the KFP client, since the pipeline is compiled
client = kfp.Client()

# Specify experiment name
experiment_name = "spam_classification_experiment"
experiment = client.create_experiment(name=experiment_name)

# Submit the pipeline run
run_name = "spam_classification_run"
run_result = client.run_pipeline(
    experiment_id=experiment.id,
    job_name=run_name,
    pipeline_package_path='spam_classification_pipeline.yaml'
)