## Sagemaker Pipeline

- An Amazon SageMaker pipeline is a series of interconnected steps in directed acyclic graph (DAG) that are defined using the drag-and-drop UI. Before jumping into CICD we need to understand how pipelins works in Sagemaker as well as How you can build your own pipeline in different usecases.

In [1]:
#necessary imports
import sagemaker
import boto3
from sagemaker.workflow.parameters import ParameterInteger,ParameterString,ParameterFloat

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
#defining sagemaker session and getting the region on which we are going to work on
sagemaker_session=sagemaker.session.Session()

region=sagemaker_session.boto_region_name
print("region name",region)

region name ap-south-1


In [3]:
#getting the execution role of my sagemaker
role=sagemaker.get_execution_role()
print("role is",role)

role is arn:aws:iam::730335253621:role/service-role/AmazonSageMaker-ExecutionRole-20241117T225496


In [4]:
#defining the bucket name and the pipeline model group name
bucket_name="pipeline-bucket-777/california-pipeline-case-study"
model_package_group_name="sagemakerppipeline"

### Data Collection
- Collecting the data from  my s3 bucket which We will use in my further implementation.

In [5]:
#defining the path of my s3 bucket and uploading it to s3 bucket
s3_upload_data_path = f"s3://{bucket_name}/"
input_customer_data=s3_upload_data_path+"customer_data.csv"
input_sales_data=s3_upload_data_path+"sales_data.csv"
input_shopping_mall_data=s3_upload_data_path+"shopping_mall_data.csv"

print(input_customer_data)
print(input_sales_data)
print(input_shopping_mall_data)

s3://pipeline-bucket-777/california-pipeline-case-study/customer_data.csv
s3://pipeline-bucket-777/california-pipeline-case-study/sales_data.csv
s3://pipeline-bucket-777/california-pipeline-case-study/shopping_mall_data.csv


### Define Parameters to Parametrize Pipeline Execution
Define Pipeline parameters that you can use to parametrize the pipeline. 

Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.The supported parameter types include:
- ParameterString - represents a str Python type

- ParameterInteger - represents an int Python type

- ParameterFloat - represents a float Python type

These parameters support providing a default value, which can be overridden on pipeline execution.

The default value specified should be an instance of the type of the parameter.

In [6]:
processing_instance_count=ParameterInteger("ProcessingInstanceCount",default_value=1)
input_data_customer=ParameterString("InputDatacustomer",default_value=input_customer_data)
input_data_sales=ParameterString("InputDatasales",default_value=input_sales_data)
input_data_shopping_mall=ParameterString("InputDatasmall",default_value=input_shopping_mall_data)

### Define a Processing Step for Feature Engineering
First, develop a preprocessing script that is specified in the Processing step.This notebook cell writes a file preprocessing_file.py, which contains the preprocessing script. You can update the script, and rerun this cell to overwrite. The preprocessing script uses scikit-learn to do the following:

In [7]:
%%writefile scripts/preprocessing_file.py
"""Feature engineers the dataset."""

# importing all necessary libraries
import argparse
import logging
import os
import pathlib
import requests
import tempfile
import boto3
import numpy as np
import pandas as pd
import joblib
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.model_selection import train_test_split

# defining logging here.
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


# defining different encoders for category based columns
def category_encoder(input_data):
    return input_data.replace(['Clothing', 'Shoes', 'Books', 'Cosmetics', 'Food & Beverage',
       'Toys', 'Technology', 'Souvenir'], [0, 1, 2, 3, 4,5,6,7])


def mall_encoder(input_data):
    return input_data.replace(['South Coast Plaza', 'Beverly Center', 'Westfield Century City',
       'Stanford Shopping Center', 'Westfield Valley Fair',
       'Del Amo Fashion Center', 'The Grove', 'Glendale Galleria',
       'Fashion Valley'], [0, 1, 2, 3, 4,5,6,7,8])


def gender_encoder(input_data):
    return input_data.replace(["Male","Female"], [0, 1])

def paymethod_encoder(input_data):
    return input_data.replace(['Credit Card', 'Debit Card', 'Cash'], [0, 1, 2])

def location_encoder(input_data):
    return input_data.replace(['Costa Mesa', 'Los Angeles', 'Palo Alto', 'Santa Clara',
       'Torrance', 'Glendale', 'San Diego'], [0, 1, 2,3,4,5,6])

if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument("--input-customer-data", type=str, required=True)
    parser.add_argument("--input-sales-data", type=str, required=True)
    parser.add_argument("--input-mall-data", type=str, required=True)

    args = parser.parse_args()

    # path of the environment variable
    base_dir = "/opt/ml/processing"
    
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    
    input_customer_data = args.input_customer_data
    input_sales_data = args.input_sales_data
    input_data_shopping_mall = args.input_mall_data
    # getting the bucket name
    bucket = input_sales_data.split("/")[2]
    # getting the key name for customer data
    key = "/".join(input_customer_data.split("/")[3:])

    logger.info("Downloading data from key: %s",  key)

    # path of the csv file we did uploaded in container
    customer_fn = f"{base_dir}/data/customer_data.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, customer_fn)


    # getting the key name for sales data
    key = "/".join(input_sales_data.split("/")[3:])

    logger.info("Downloading data from key: %s",  key)

    # path of the csv file we did uploaded in container
    sales_fn = f"{base_dir}/data/sales_data.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, sales_fn)

    # getting the key name for shopping mall data
    key = "/".join(input_data_shopping_mall.split("/")[3:])

    logger.info("Downloading data from key: %s", key)

    # path of the csv file we did uploaded in container
    mall_fn = f"{base_dir}/data/shopping_mall_data.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, mall_fn)


    logger.debug("Reading downloaded customer data.")
    customer_df = pd.read_csv(customer_fn)
    os.unlink(customer_fn)

    logger.debug("Reading downloaded sales data.")
    sales_df = pd.read_csv(sales_fn)
    os.unlink(sales_fn)

    logger.debug("Reading downloaded shooping mall data.")
    mall_df = pd.read_csv(mall_fn)
    os.unlink(mall_fn)

        # --- Step 1: Merge Data ---
    # Merge Customer Data and Sales Data
    merged_data = pd.merge(sales_df, customer_df, on="customer_id", how="inner")
    # Merge with Shopping Mall Data
    final_data = pd.merge(merged_data, mall_df, on="shopping_mall", how="inner")

    final_data=final_data.drop(["invoice_no","customer_id","invoice date",'area (sqm)'],axis=1)


    # Function.columns[0].[0].rmer helps us to convert those encoded functions to transformers which we can use inside sklearn pipeline
    category_transformer = FunctionTransformer(category_encoder)
    mall_transformer = FunctionTransformer(mall_encoder)
    gender_transformer = FunctionTransformer(gender_encoder)
    paymethod_transformer = FunctionTransformer(paymethod_encoder)
    location_transformer = FunctionTransformer(location_encoder)
    
    # defining the numeric pipeline
    numeric_transformer = Pipeline(
        steps=[("scaler", StandardScaler())]
    )
    
    # defining all categorical features
    categorical_features = ["category", "shopping_mall", "gender", "payment_method","location"]
    
    # creating cateorical pipeline
    categorical_transformer = Pipeline(
        steps=[
            ("category_transformer", category_transformer),
            ("mall_transformer", mall_transformer),
            ("gender_transformer", gender_transformer),
            ("paymethod_transformer", paymethod_transformer),
            ("location_transformer", location_transformer),
        
        ]
    )
    numeric_features = ['quantity', 'age',
         'construction_year',
           'store_count']
    # we put all transformation in one place which can affect all the columns in the dataset whatever names we specified
    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )
    
    # splitting the data set
    train, test = train_test_split(final_data, test_size=0.20, shuffle=True)
    # split the dataset train and validation
    train, validation = train_test_split(train, test_size=0.15, shuffle=True)

    train_x=train.drop(["price"],axis=1)
    train_y=train["price"]
    
    test_x=test.drop(["price"],axis=1)
    test_y=test["price"]
    
    validation_x=validation.drop(["price"],axis=1)
    validation_y=validation["price"]
    # fitting the preprocessor
    all_columns_names=final_data.drop(["price"],axis=1).columns
    featurized_model = preprocess.fit(train_x)
    
    # convert transformed numpy array as dataframe in one place
    train_x = pd.DataFrame(featurized_model.transform(train_x), columns=all_columns_names)
    train_y = train_y.to_numpy().reshape(len(train_y), 1)
    train_X = np.array(train_x)
    
    # concatenate all the features
    train = np.concatenate((train_y, train_X), axis=1)
    # logger.info("__________________Train Transformation Ended__________________")
    ############################TRAINING TRANSFORMATION ENDED###########################
    
    ############################TESTING TRANSFORMATION STARTED###########################
    # logger.info("__________________Test Transformation Started__________________")
    test_x = pd.DataFrame(featurized_model.transform(test_x), columns=all_columns_names)
    test_y = test_y.to_numpy().reshape(len(test_y), 1)
    test_X = np.array(test_x)
    
    test = np.concatenate((test_y, test_X), axis=1)
    # logger.info("__________________Test Transformation Ended__________________")
    ############################TESTING TRANSFORMATION ENDED###########################
    
    ############################VALIDATION TRANSFORMATION STARTED###########################
    # logger.info("__________________Validation Transformation Started__________________")
    validation_x = pd.DataFrame(featurized_model.transform(validation_x), columns=all_columns_names)
    validation_y = validation_y.to_numpy().reshape(len(validation_y), 1)
    validation_X = np.array(validation_x)
    
    validation = np.concatenate((validation_y, validation_X), axis=1)
    # logger.info("__________________Validation Transformation Ended__________________")
    ############################VALIDATION TRANSFORMATION ENDED###########################

    all_columns_names=all_columns_names.insert(0,"price")
    # converting all the train and test and validation dataset to csv files
    
    train=pd.DataFrame(train,columns=all_columns_names)
    validation=pd.DataFrame(validation,columns=all_columns_names)
    test=pd.DataFrame(test,columns=all_columns_names)




Overwriting scripts/preprocessing_file.py


### SKLearn Processor instance or estimator for processing job
creating an instance of a SKLearnProcessor processor and use that in our ProcessingStep of sagemaker workflow

In [8]:
#let's the define the processor using sklearn
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_preprocessing_framework_version="0.23-1"

sklearn_preprocessor=SKLearnProcessor(framework_version=sklearn_preprocessing_framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-process",
    role=role)

Use the processor instance to construct a ProcessingStep of sagemaker workflow, along with the input and output channels, and the code that runs when the pipeline invokes pipeline execution. This is similar to a sklearn processor instance’s run() method in the Python SDK.Note the input_data parameters passed into ProcessingStep is the input data used in the step. This input data is used by the sklearn processor instance when it is run.


the "train_data" and "test_data" named channels specified in the output configuration for the processing job
Step Properties can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step.

In [9]:
# #let' define the steps while processing 
# from sagemaker.processing import ProcessingInput, ProcessingOutput
# from sagemaker.workflow.steps import ProcessingStep

# step_process = ProcessingStep(
#     name = "HotelProcess",
#     processor = sklearn_preprocessor,
#     outputs = [
#         ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=f"s3://{bucket_name}preprocesseddata/train"),
#         ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=f"s3://{bucket_name}preprocesseddata/test"),
#         ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=f"s3://{bucket_name}preprocesseddata/validation"),
#         ProcessingOutput(output_name="model", source="/opt/ml/processing/model", destination=f"s3://{bucket_name}sklearn-preprocessed-model/model-artifact"),
#     ],
#     code = "scripts/preprocessing_file.py",
#     job_arguments=["--input-data", input_data]
# )
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# Define the processing step
step_process = ProcessingStep(
    name="Processing",
    processor=sklearn_preprocessor,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=f"s3://{bucket_name}preprocesseddata/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=f"s3://{bucket_name}preprocesseddata/test"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=f"s3://{bucket_name}preprocesseddata/validation"),
        ProcessingOutput(output_name="model", source="/opt/ml/processing/model", destination=f"s3://{bucket_name}sklearn-preprocessed-model/model-artifact"),
    ],
    code="scripts/preprocessing_file.py",
    job_arguments=["--input-customer-data", input_customer_data,
                   "--input-sales-data", input_data_sales,
                   "--input-mall-data", input_data_shopping_mall]
)


In [10]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"SalesPipeLine"
pipeline = Pipeline(
    name = pipeline_name,
    parameters = [
        processing_instance_count,
        input_customer_data,
        input_data_sales,
        input_data_shopping_mall
    ],
    steps = [step_process]
)

### Examining the pipeline definition
The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [11]:
import json
definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'InputDatasales',
   'Type': 'String',
   'DefaultValue': 's3://pipeline-bucket-777/california-pipeline-case-study/sales_data.csv'},
  {'Name': 'InputDatasmall',
   'Type': 'String',
   'DefaultValue': 's3://pipeline-bucket-777/california-pipeline-case-study/shopping_mall_data.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Processing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '720646828776.dkr.ecr.ap-south-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerArguments': ['--input-customer-da

### Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [12]:
pipeline.upsert(role)



{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:730335253621:pipeline/SalesPipeLine',
 'ResponseMetadata': {'RequestId': '190c9d72-39b4-46af-a246-9aff348592ec',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '190c9d72-39b4-46af-a246-9aff348592ec',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Tue, 19 Nov 2024 05:49:27 GMT'},
  'RetryAttempts': 0}}

In [13]:
execution = pipeline.start()