# Pipeline - Payment Anomaly (RFC)  v-03.05.24

In [1]:
#!pip install --upgrade boto3 fsspec s3fs

# Pipeline Begin

In [2]:
import boto3 
import pandas as pd 
import sagemaker
import os
from sagemaker.workflow.pipeline_context import PipelineSession 

pipeline_name = "sagemaker-mlops-payment-anomaly-pipeline"
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name 
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
model_package_group_name = f"PaymentAnomalyModelPackageGroup"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Load Data

In [3]:
from sagemaker.workflow.parameters import(ParameterInteger,
                                         ParameterString,
                                         ParameterFloat)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount",
                                             default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType",
                                           default_value="ml.m4.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType",
                                         default_value="ml.m4.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus",
                                        default_value="PendingManualApproval")

base_job_prefix = "payment-anomaly-model"

bucket_name = 'eliezerraj-908671954593-dataset'
prefix_name = 'payment-rcf'
feat_prefix_name = 'payment-rcf/feature/opt/ml/input/data/train'
tmp_prefix_name = 'payment-rcf/tmp'
temp_location = 's3://{}/{}/tmp'.format(bucket_name, prefix_name)
train_data_location = 's3://{}/{}/feature/opt/ml/input/data/train'.format(bucket_name, prefix_name)
model_location = 's3://{}/{}/output'.format(bucket_name, prefix_name)
input_requirement = 's3://{}/{}/{}'.format(bucket_name, prefix_name, 'requirements.txt')

print("---------------------------------")
print("bucket_name: ", bucket_name)
print("prefix_name: ", prefix_name)
print("feat_prefix_name: ", feat_prefix_name)
print("tmp_prefix_name: ", tmp_prefix_name)
print("temp_location: ", temp_location)
print("train_data_location: ", train_data_location)
print("model_location: ", model_location)
print("input_requirement: ", input_requirement)

---------------------------------
bucket_name:  eliezerraj-908671954593-dataset
prefix_name:  payment-rcf
feat_prefix_name:  payment-rcf/feature/opt/ml/input/data/train
tmp_prefix_name:  payment-rcf/tmp
temp_location:  s3://eliezerraj-908671954593-dataset/payment-rcf/tmp
train_data_location:  s3://eliezerraj-908671954593-dataset/payment-rcf/feature/opt/ml/input/data/train
model_location:  s3://eliezerraj-908671954593-dataset/payment-rcf/output
input_requirement:  s3://eliezerraj-908671954593-dataset/payment-rcf/requirements.txt


# Step 1 Feature Engineiring

In [4]:
from sagemaker.feature_store.feature_store import FeatureStore
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session()

data_location = 's3://{}/{}'.format(bucket_name, prefix_name)

sagemaker_client = boto_session.client(service_name='sagemaker',
                                       region_name=region)

featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime',
                                           region_name=region)

feature_store_session = Session(boto_session=boto_session,
                                sagemaker_client=sagemaker_client,
                                sagemaker_featurestore_runtime_client=featurestore_runtime)

feature_store = FeatureStore(sagemaker_session=feature_store_session)

payment_feature_group_name = 'payment-fraud-feature-group'

payment_feature_group = FeatureGroup(name=payment_feature_group_name,
                                     sagemaker_session=feature_store_session)


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Clean the S3

In [5]:
# Clean S3 feature

s3_client = boto3.client('s3')

response = s3_client.list_objects_v2(Bucket=bucket_name, 
                                     Prefix=feat_prefix_name)
try:
    files_in_folder = response["Contents"]
    files_to_delete = []

    for f in files_in_folder:
        files_to_delete.append({"Key": f["Key"]})

    # This will delete all files in a folder
    response = s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": files_to_delete})
except:
  print("No files to clean :", feat_prefix_name)

response = s3_client.list_objects_v2(Bucket=bucket_name, 
                                     Prefix=tmp_prefix_name)

try:
    files_in_folder = response["Contents"]
    files_to_delete = []

    for f in files_in_folder:
        files_to_delete.append({"Key": f["Key"]})

    # This will delete all files in a folder
    response = s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": files_to_delete})
except:
  print("No files to clean :", tmp_prefix_name)

No files to clean : payment-rcf/feature/opt/ml/input/data/train
No files to clean : payment-rcf/tmp


# Load dataset using feature Store query

In [6]:
#result_df, query = feature_store.create_dataset(base=payment_feature_group,output_path=temp_location).to_dataframe()

#df_payment = result_df.filter(['fraud',
#                               'amount',
#                               'tx_1d',
#                               'avg_1d',
#                               'tx_7d',
#                               'avg_7d',
#                               'tx_30d',
#                               'avg_30d',
#                               'time_btw_cc_tx'], axis=1)

In [7]:
#df_payment

# Load dataset using athena query

In [8]:
payment_query = payment_feature_group.athena_query()
payment_table = payment_query.table_name

query_string = 'SELECT amount, tx_1d, avg_1d, tx_7d, avg_7d, tx_30d, avg_30d, time_btw_cc_tx FROM ' + payment_table

print("------------------------------")
print("query_string: ", query_string)

payment_query.run(query_string=query_string,
                  output_location=temp_location)

payment_query.wait()
print("------------------------------")
print(payment_query.as_dataframe())

------------------------------
query_string:  SELECT amount, tx_1d, avg_1d, tx_7d, avg_7d, tx_30d, avg_30d, time_btw_cc_tx FROM payment_fraud_feature_group_1714865432
------------------------------
       amount  tx_1d   avg_1d  tx_7d   avg_7d  tx_30d  avg_30d  time_btw_cc_tx
0       102.0      4   219.00      8   394.88      15   463.07              64
1       372.0      2   531.00      5   955.20      29   484.14             212
2       346.0      4   315.25      8   292.25      11   241.36               0
3       470.0      2   574.00      3   416.67       7   291.57             286
4       128.0      1   128.00      5   250.80      18   300.67               0
...       ...    ...      ...    ...      ...     ...      ...             ...
71396   149.0      2   198.00      4   212.25       9   242.22             197
71397   818.0      2   520.00      2   520.00      11   202.09             399
71398   644.0      2   707.50      5   408.00      10   350.70               0
71399   516.

In [9]:
payment_query.as_dataframe().to_csv('{}/train.csv'.format(train_data_location), header=False, index=False)

# Step 2 Build Model

In [10]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker import RandomCutForest

image_uri = sagemaker.image_uris.retrieve(framework="randomcutforest",
                                          region=region,
                                          version="latest",
                                          instance_type=training_instance_type)

fixed_hyperparameters = {
                        "num_samples_per_tree": 200,
                        "feature_dim": 8,
                        "num_trees": 50,
                        }

rcf = sagemaker.estimator.Estimator(image_uri=image_uri,
                                    role=role,
                                    output_path=model_location,
                                    instance_count=processing_instance_count,
                                    instance_type=processing_instance_type,
                                    hyperparameters=fixed_hyperparameters)

step_training = TrainingStep(
    name="Train-Payment-Anomaly-Model",
    estimator=rcf,
    inputs={
        "train": TrainingInput(
            s3_data=train_data_location,
            content_type="text/csv;label_size=0",
            distribution='ShardedByS3Key',
        )
    },
)

print("step_training : ", step_training)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.
Defaulting to the only supported framework/algorithm version: 1. Ignoring framework/algorithm version: latest.


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
step_training :  TrainingStep(name='Train-Payment-Anomaly-Model', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)


# Step 3 Register Model

In [11]:
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=image_uri,
    model_data=step_training.properties.ModelArtifacts.S3ModelArtifacts, #step_training.get_top_model_s3_uri(top_k=0,s3_bucket=f"{bucket_name}/{prefix_name}",prefix="output"),
    sagemaker_session=pipeline_session,
    role=role,
)

from time import gmtime, strftime

model_name = "rfc-payment-v1-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

register_args = model.register(
                                content_types=["text/csv"],
                                response_types=["text/csv"],
                                inference_instances=["ml.t2.medium", "ml.m5.large"],
                                transform_instances=["ml.m4.xlarge"],
                                model_package_group_name=model_package_group_name,
                                approval_status=model_approval_status,
)

step_register = ModelStep(
    name="RegisterModel-Payment-Anomaly-Model",
    step_args=register_args
)

print("step_register : ", step_register)

step_register :  ModelStep(name='RegisterModel-Payment-Anomaly-Model', steps=[_RegisterModelStep(name='RegisterModel-Payment-Anomaly-Model-RegisterModel', display_name=None, description=None, step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])




# Build and trigger the pipeline

In [12]:
import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
    ],
    steps=[step_training,
           step_register],
)

definition = json.loads(pipeline.definition())

print(definition)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}, {'Name': 'ProcessingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m4.xlarge'}, {'Name': 'TrainingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m4.xlarge'}, {'Name': 'ModelApprovalStatus', 'Type': 'String', 'DefaultValue': 'PendingManualApproval'}], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'Train-Payment-Anomaly-Model', 'Type': 'Training', 'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File', 'TrainingImage': '404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:1'}, 'OutputDataConfig': {'S3OutputPath': 's3://eliezerraj-908671954593-dataset/payment-rcf/output'}, 'StoppingCondition': {'MaxRuntimeInSeconds': 86400}, 'ResourceConfig': {'VolumeSizeInGB': 30, 'InstanceCount': {'Get': 'Parameters.ProcessingInsta

In [13]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=role)
# start Pipeline execution
pipeline.start()

Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:908671954593:pipeline/sagemaker-mlops-payment-anomaly-pipeline/execution/mwsbiuoztm3i', sagemaker_session=<sagemaker.session.Session object at 0x7f58c3b76560>)