In [None]:
import sagemaker
role = sagemaker.get_execution_role()
print(role)

In [None]:
import sagemaker

# Get the default SageMaker execution role
role_arn = sagemaker.get_execution_role()
role_arn

In [None]:
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sagemaker.workflow.function_step import step
from sagemaker.workflow.step_outputs import get_step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.parameters import ParameterInteger
from sagemaker.workflow.parameters import ParameterFloat

In [None]:
train_data_path = ParameterString(
    name="TrainDataPath",
    default_value="s3://srushanth-baride/binary-classification-with-a-bank-dataset/train.csv"
)

test_data_path = ParameterString(
    name="TestDataPath",
    default_value="s3://srushanth-baride/binary-classification-with-a-bank-dataset/test.csv"
)

In [None]:
instance_type_data_processing = ParameterString(
    name="InstanceType",
    default_value="ml.m5.large"
)

instance_type_model_training = ParameterString(
    name="InstanceType",
    default_value="ml.m5.large"
)

instance_count = ParameterInteger(
    name="InstanceCount",
    default_value=1
)

In [None]:
train_test_split_ratio = ParameterFloat(
    name="TrainTestSplitRatio",
    default_value=0.8
)

---

In [None]:
@step(
    name="IngestTrainingData",
    display_name="Ingest Training Data",
    instance_type=instance_type_data_processing,
    instance_count=instance_count
)
def ingest_train_data(s3_path: ParameterString) -> pd.DataFrame:
    """
    Ingest data from S3 path
    
    Args:
        s3_path (ParameterString): S3 path to training data
        
    Returns:
        pd.DataFrame: Training dataframe
    """
    df = pd.read_csv(s3_path)
    print(f"Ingested data with shape: {df.shape}")
    return df

@step(
    name="IngestTestingData",
    display_name="Ingest Testing Data",
    instance_type=instance_type_data_processing,
    instance_count=instance_count
)
def ingest_test_data(s3_path: ParameterString) -> pd.DataFrame:
    """
    Ingest data from S3 path
    
    Args:
        s3_path (ParameterString): S3 path to training data
        
    Returns:
        pd.DataFrame: Training dataframe
    """
    df = pd.read_csv(s3_path)
    print(f"Ingested data with shape: {df.shape}")
    return df

In [None]:
@step(
    name="ExtractFeatures",
    display_name="Extract Feature Columns",
    instance_type=instance_type_data_processing,
    instance_count=instance_count,
)
def extract_features_column(df: pd.DataFrame) -> pd.DataFrame:
    """
    Extract feature columns from training data

    Args:
        train_df (pd.DataFrame): Training dataframe

    Returns:
        pd.Series: Feature variable series
    """
    features_df = df.drop(columns=["y"])
    print(f"Extracted Features columns with {features_df.shape} samples")
    return features_df

@step(
    name="ExtractTarget",
    display_name="Extract Target Column",
    instance_type=instance_type_data_processing,
    instance_count=instance_count,
)
def extract_target_column(df: pd.DataFrame) -> pd.Series:
    """
    Extract target column from training data

    Args:
        train_df (pd.DataFrame): Training dataframe

    Returns:
        pd.Series: Feature variable series
    """
    features = df["y"]
    print(f"Extracted target column with {len(features)} samples")
    return features

In [None]:
@step(
    name="Pre-processTrainData",
    display_name="Pre-process Train Data", 
    instance_type=instance_type_data_processing, 
    instance_count=instance_count
)
def preprocess_train_data(df: pd.DataFrame) -> tuple:
    """_summary_

    Args:
        df (pd.DataFrame): _description_

    Returns:
        tuple: _description_
    """
    # Label encoding
    object_labels = [
        "job",
        "marital",
        "education",
        "default",
        "balance",
        "housing",
        "loan",
        "contact",
        "month",
        "poutcome",
    ]

    for column_name in object_labels:
        le = LabelEncoder()
        df[column_name] = le.fit_transform(df[column_name])

    # Split features and target
    X = df.drop(columns=["id"])

    return X

@step(
    name="Pre-processTestData",
    display_name="Pre-process Test Data", 
    instance_type=instance_type_data_processing, 
    instance_count=instance_count
)
def preprocess_test_data(df: pd.DataFrame) -> tuple:
    """_summary_

    Args:
        df (pd.DataFrame): _description_

    Returns:
        tuple: _description_
    """
    # Label encoding
    object_labels = [
        "job",
        "marital",
        "education",
        "default",
        "balance",
        "housing",
        "loan",
        "contact",
        "month",
        "poutcome",
    ]

    for column_name in object_labels:
        le = LabelEncoder()
        df[column_name] = le.fit_transform(df[column_name])

    # Split features and target
    X = df.drop(columns=["id"])

    return X

In [None]:
@step(
    name="SplitData", 
    display_name="Split Data", 
    instance_type=instance_type_data_processing, 
    instance_count=instance_count
)
def split_data(x: pd.DataFrame, y: pd.DataFrame) -> tuple:
    """_summary_

    Args:
        X (pd.DataFrame): _description_
        y (pd.DataFrame): _description_

    Returns:
        tuple: _description_
    """
    x_train, x_test, y_train, y_test = train_test_split(
        x, y, train_size=0.8, random_state=42
    )

    return x_train, x_test, y_train, y_test

In [None]:
@step(
    name="TrainModel",
    display_name="Train Model", 
    instance_type=instance_type_model_training, 
    instance_count=instance_count
)
def train_model(
    x_train: pd.DataFrame,
    x_test: pd.DataFrame,
    y_train: pd.DataFrame,
    y_test: pd.DataFrame,
) -> lgb.LGBMClassifier:
    """_summary_

    Args:
        x_train (pd.DataFrame): _description_
        x_test (pd.DataFrame): _description_
        y_train (pd.DataFrame): _description_
        y_test (pd.DataFrame): _description_

    Returns:
        lgb.LGBMClassifier: _description_
    """
    model = lgb.LGBMClassifier()
    model.fit(x_train, y_train, eval_set=[(x_test, y_test)])

    return model

@step(
    name="MakePredictions",
    display_name="Make Predictions",
    instance_type=instance_type_model_training, 
    instance_count=instance_count
)
def make_predictions(model: lgb.LGBMClassifier, test_data: pd.DataFrame) -> np.ndarray:
    """_summary_

    Args:
        model (lgb.LGBMClassifier): _description_
        test_data (pd.DataFrame): _description_

    Returns:
        np.ndarray: _description_
    """
    return model.predict_proba(test_data)[:, -1]

---

In [None]:
def create_pipeline():
    # Pipeline
    step_ingest_train_data = ingest_train_data(train_data_path)
    step_ingest_test_data = ingest_test_data(test_data_path)

    step_extract_features_column = extract_features_column(step_ingest_train_data)
    step_extract_target_column = extract_target_column(step_ingest_train_data)
    
    step_preprocess_train_data = preprocess_train_data(step_extract_features_column)
    step_preprocess_test_data = preprocess_test_data(step_ingest_test_data)
    
    step_split_data = split_data(step_preprocess_train_data, step_extract_target_column)
    
    step_train_model = train_model(
        step_split_data[0], 
        step_split_data[1], 
        step_split_data[2], 
        step_split_data[3]
    )
    step_make_predictions = make_predictions(step_train_model, step_preprocess_test_data)

    # Dependencies
    step_ingest_train_data_instance = get_step(step_ingest_train_data)
    step_ingest_test_data_instance = get_step(step_ingest_test_data)
    step_extract_features_column_instance = get_step(step_extract_features_column)
    step_extract_target_column_instance = get_step(step_extract_target_column)
    step_preprocess_train_data_instance = get_step(step_preprocess_train_data)
    step_preprocess_test_data_instance = get_step(step_preprocess_test_data)
    step_split_data_instance = get_step(step_split_data)
    step_train_model_instance = get_step(step_train_model)
    step_make_predictions_instance = get_step(step_make_predictions)

    step_extract_features_column_instance.add_depends_on([step_ingest_train_data_instance])
    step_extract_target_column_instance.add_depends_on([step_ingest_train_data_instance])
    step_preprocess_train_data_instance.add_depends_on([step_extract_features_column_instance])
    step_preprocess_test_data_instance.add_depends_on([step_ingest_test_data_instance])
    step_split_data_instance.add_depends_on([
        step_preprocess_train_data_instance, 
        step_extract_target_column_instance
    ])
    step_train_model_instance.add_depends_on([step_split_data_instance])
    step_make_predictions_instance.add_depends_on([step_train_model_instance])

    # Create and return pipeline
    pipeline = Pipeline(
        name="BankMarketingPipeline",
        parameters=[
            train_data_path, 
            test_data_path, 
            instance_type_data_processing, 
            instance_count
        ],
        steps=[
            step_ingest_train_data, 
            step_ingest_test_data, 
            step_extract_features_column, 
            step_extract_target_column, 
            step_preprocess_train_data, 
            step_preprocess_test_data, 
            step_split_data, 
            step_train_model_instance, 
            step_make_predictions
        ],
        sagemaker_session=sagemaker.Session()
    )

    return pipeline

In [None]:
# Execute pipeline
pipeline = create_pipeline()

In [None]:
pipeline.upsert(role_arn=role_arn)

In [None]:
execution = pipeline.start()

---