# (Pipeline) Load Libraries

In [9]:
!pip install --upgrade boto3 fsspec s3fs

Collecting boto3
  Downloading boto3-1.34.94-py3-none-any.whl.metadata (6.6 kB)
Collecting fsspec
  Using cached fsspec-2024.3.1-py3-none-any.whl.metadata (6.8 kB)
Collecting botocore<1.35.0,>=1.34.94 (from boto3)
  Downloading botocore-1.34.94-py3-none-any.whl.metadata (5.7 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Using cached s3transfer-0.10.1-py3-none-any.whl.metadata (1.7 kB)
INFO: pip is looking at multiple versions of aiobotocore to determine which version is compatible with other requirements. This could take a while.
Collecting aiobotocore<3.0.0,>=2.5.4 (from s3fs)
  Using cached aiobotocore-2.12.3-py3-none-any.whl.metadata (21 kB)
  Using cached aiobotocore-2.12.2-py3-none-any.whl.metadata (21 kB)
  Using cached aiobotocore-2.12.1-py3-none-any.whl.metadata (21 kB)
  Using cached aiobotocore-2.12.0-py3-none-any.whl.metadata (21 kB)
  Using cached aiobotocore-2.11.2-py3-none-any.whl.metadata (21 kB)
  Using cached aiobotocore-2.11.1-py3-none-any.whl.metadata (21 

# Begin Pipeline Setup

In [10]:
import boto3 
import pandas as pd 
import sagemaker
import os
from sagemaker.workflow.pipeline_context import PipelineSession 
import sagemaker

s3_client = boto3.resource('s3')

pipeline_name = f"sagemaker-mlops-customer-pipeline" 

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name 
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

model_package_group_name = f"CustomerClassModelPackageGroup"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Load data

In [11]:
bucket_name = 'eliezerraj-908671954593-dataset'
prefix_name = 'customer'
file_name = 'customer_profile.csv'

input_dataset = 's3://{}/{}/{}'.format(bucket_name, prefix_name, file_name)
input_requirement = 's3://{}/{}/{}'.format(bucket_name, prefix_name, 'requirements.txt')
data_location = f"s3://{bucket_name}/{prefix_name}/output/train"

store_data = pd.read_csv(input_dataset)

print("---------------------------------")
print("Shape of dataframe : ", store_data.shape)
print("input_dataset :", input_dataset)
print("data_location :", data_location)
print("input_requirement :", input_requirement)

from sagemaker.workflow.parameters import(
                         ParameterInteger,
                         ParameterString,
                         ParameterFloat)

base_job_prefix = "customer-class-model"

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount",
                                             default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType",
                                           default_value="ml.m4.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType",
                                         default_value="ml.m4.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus",
                                        default_value="PendingManualApproval")

---------------------------------
Shape of dataframe :  (10127, 7)
input_dataset : s3://eliezerraj-908671954593-dataset/customer/customer_profile.csv
data_location : s3://eliezerraj-908671954593-dataset/customer/output/train
input_requirement : s3://eliezerraj-908671954593-dataset/customer/requirements.txt


# Step 1 Feature Engineering

In [12]:
# Define Processing Step for Feature Engineering

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m4.xlarge",
    instance_count=processing_instance_count,
    base_job_name="mlops-pipeline-customer-feature-eng",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_dataset,
                        destination="/opt/ml/processing/input"),
        ProcessingInput(source=input_requirement,
                        destination="/opt/ml/processing/input/req/")
    ],
    outputs=[
        ProcessingOutput(output_name="train",
                         source="/opt/ml/processing/train",
                         destination=f"s3://{bucket_name}/{prefix_name}/output/train"
                        )
    ],
    code=f"feature-scaled.py",
)

step_process = ProcessingStep(
    name="FeaturingEngineering-Customer-Model",
    step_args=processor_args
)

print("step_process : ", step_process)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


step_process :  ProcessingStep(name='FeaturingEngineering-Customer-Model', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)




# Step 2 Build the model

In [13]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput

instance_count = 1
k_clusters = 4
feature_dim = 4

image_uri = sagemaker.image_uris.retrieve(
                                            framework="kmeans",
                                            region=region,
                                            version="latest",
                                            instance_type=training_instance_type,
)

kmeans = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=instance_count,
    instance_type=training_instance_type,
    hyperparameters={
        "k": k_clusters,
        "feature_dim": feature_dim,
    },
)

step_training = TrainingStep(
    name="Train-Customer-Model",
    estimator=kmeans,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, #data_location,
            content_type="text/csv;label_size=0",
        )
    },
)

print("step_training : ", step_training)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m4.xlarge.


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
step_training :  TrainingStep(name='Train-Customer-Model', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)


# Step 3 Register Model

In [14]:
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=image_uri,
    model_data=step_training.properties.ModelArtifacts.S3ModelArtifacts, #step_training.get_top_model_s3_uri(top_k=0,s3_bucket=f"{bucket_name}/{prefix_name}",prefix="output"),
    sagemaker_session=pipeline_session,
    role=role,
)

from time import gmtime, strftime

model_name = "kmeans-customer-v1-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

register_args = model.register(
                                content_types=["text/csv"],
                                response_types=["text/csv"],
                                inference_instances=["ml.t2.medium", "ml.m5.large"],
                                transform_instances=["ml.m4.xlarge"],
                                model_package_group_name=model_package_group_name,
                                approval_status=model_approval_status,
)

step_register = ModelStep(
    name="RegisterModel-Customer-Model",
    step_args=register_args
)

print("step_register : ", step_register)

step_register :  ModelStep(name='RegisterModel-Customer-Model', steps=[_RegisterModelStep(name='RegisterModel-Customer-Model-RegisterModel', display_name=None, description=None, step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])


# Build and trigger the pipeline

In [15]:
import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_dataset,
    ],
    steps=[step_process,
           step_training,
           step_register
          ],
)

definition = json.loads(pipeline.definition())

print(definition)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml




{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}, {'Name': 'ProcessingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m4.xlarge'}, {'Name': 'TrainingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m4.xlarge'}, {'Name': 'ModelApprovalStatus', 'Type': 'String', 'DefaultValue': 'PendingManualApproval'}], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'FeaturingEngineering-Customer-Model', 'Type': 'Processing', 'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m4.xlarge', 'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'}, 'VolumeSizeInGB': 30}}, 'AppSpecification': {'ImageUri': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3', 'ContainerEntrypoint': ['python3', '/opt/ml/processing/input/code/feature-scaled.py']}, 'Role

In [16]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=role)
# start Pipeline execution
pipeline.start()



_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:908671954593:pipeline/sagemaker-mlops-customer-pipeline/execution/2xh2ynkpondj', sagemaker_session=<sagemaker.session.Session object at 0x7eff8fac8bb0>)