# Monitor data quality for promotion planning

In [None]:
from sagemaker import get_execution_role
from sagemaker.image_uris import retrieve
from sagemaker.model import Model, Session
from sagemaker.model_monitor import (BatchTransformInput,
                                     CronExpressionGenerator,
                                     DataCaptureConfig, DefaultModelMonitor,
                                     MonitoringDatasetFormat)
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [None]:
sagemaker_session = Session()
sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client

In [None]:
role = get_execution_role()
region = 'us-east-1'
instance_count = 1
instance_type = 'ml.m4.xlarge'
data_bucket = 'adp-rnd-ml-datasets'
model_bucket = 'adp-rnd-ml-models'
stage_bucket = 'adp-rnd-ml-stage'

endpoint_monitor = DefaultModelMonitor(
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
    base_job_name='promotion-planning',
    sagemaker_session=sagemaker_session
)
transform_monitor = DefaultModelMonitor(
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
    base_job_name='promotion-planning',
    sagemaker_session=sagemaker_session
)

## Model

In [None]:
image_uri = retrieve(framework='xgboost',
                     region=region,
                     version='0.90-1')

bucket_prefix = 'promotion-planning/model/promotion-planning-train-job-2023-01-31-084806/output'
model_file_name = 'model.tar.gz'
model_s3_key = f'{bucket_prefix}/{model_file_name}'
model_url = f's3://{model_bucket}/{model_s3_key}'
model_name = 'promotion-planning-2023-01-31-084806'

model = Model(image_uri=image_uri,
              model_data=model_url,
              role=role,
              name=model_name)

model.create()

### Deploy model

In [None]:
initial_instance_count = 1
endpoint_instance_type = 'ml.t2.medium'
endpoint_name = 'promotion-planning-endpoint-084806'
endpont_capture_destination_s3_uri = f's3://{stage_bucket}/captured_data'

data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=endpont_capture_destination_s3_uri
)

In [None]:
predictor = model.deploy(
    initial_instance_count=initial_instance_count,
    instance_type=endpoint_instance_type,
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

## Suggest baseline

In [None]:
training_dataset = 'promotion-planning/train/data.csv'
baseline_data_uri = f's3://{data_bucket}/{training_dataset}'
baseline_results_uri = f's3://{stage_bucket}/promotion-planning/baseline_results'

In [None]:
result = endpoint_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

## Create monitoring schedule

In [None]:
s3_report_path = f's3://{stage_bucket}/monitoring-results'
statistics_path = f'{baseline_results_uri}/statistics.json'
constraints_path = f'{baseline_results_uri}/constraints.json'

### Real-time endpoint

In [None]:
endpoint_mon_schedule_name = 'promotion-planning-endpoint-model-monitor-schedule'

In [None]:
endpoint_monitor.create_monitoring_schedule(
    monitor_schedule_name=endpoint_mon_schedule_name,
    endpoint_input=endpoint_name,
    output_s3_uri=s3_report_path,
    statistics=statistics_path,
    constraints=constraints_path,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

### Batch transform

In [None]:
transform_capture_destination_s3_uri = f's3://{stage_bucket}/captured_data/batch_tranform/promotion-planning'
transform_mon_schedule_name = 'promotion-planning-transform-model-monitor-schedule'

In [None]:
transform_monitor.create_monitoring_schedule(
    monitor_schedule_name=transform_mon_schedule_name,
    batch_transform_input=BatchTransformInput(
        data_captured_destination_s3_uri=transform_capture_destination_s3_uri,
        destination='/opt/ml/processing/input',
        dataset_format=MonitoringDatasetFormat.csv(header=False),
    ),
    output_s3_uri=s3_report_path,
    statistics=statistics_path,
    constraints=constraints_path,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

## Model monitor information

In [None]:
endpoint_monitor.monitoring_schedule_name = endpoint_mon_schedule_name
transform_monitor.monitoring_schedule_name = transform_mon_schedule_name

### Endpoint monitor

In [None]:
model_monitor = endpoint_monitor
mon_schedule_name = endpoint_mon_schedule_name

or

### Batch transform monitor

In [None]:
model_monitor = transform_monitor
mon_schedule_name = transform_mon_schedule_name

### describe schedule

In [None]:
schedule_description = model_monitor.describe_schedule()
print('{}\n{}\n{}'.format(
    schedule_description.get('MonitoringScheduleName'),
    schedule_description.get('MonitoringScheduleStatus'),
    schedule_description.get('EndpointName')
))

In [None]:
schedule_description

### last execution

In [None]:
list_executions = model_monitor.list_executions()
if list_executions:
    last_list_execution = list_executions[-1].describe()
    print('{}\n{}\n{}'.format(
        last_list_execution.get('ProcessingJobStatus'),
        last_list_execution.get('ExitMessage'),
        last_list_execution['ProcessingEndTime'].isoformat() if last_list_execution.get('ProcessingEndTime') else ''
    ))


In [None]:
list_executions[-1].constraint_violations().body_dict

### describe monitoring schedule

In [None]:
describe_monitoring_schedule = sagemaker_session.describe_monitoring_schedule(monitoring_schedule_name=mon_schedule_name)
print('{}\n{}\n{}'.format(
    describe_monitoring_schedule['MonitoringScheduleStatus'],
    describe_monitoring_schedule['LastMonitoringExecutionSummary']['MonitoringExecutionStatus'] if describe_monitoring_schedule.get('LastMonitoringExecutionSummary') else '',
    describe_monitoring_schedule['LastMonitoringExecutionSummary']['ScheduledTime'].isoformat() if describe_monitoring_schedule.get('LastMonitoringExecutionSummary') else '',
))


### list monitoring schedules

In [None]:
monitoring_schedules = sagemaker_session.list_monitoring_schedules(endpoint_name=endpoint_name)
[{  'MonitoringScheduleName': s.get('MonitoringScheduleName'),
    'MonitoringScheduleStatus': s.get('MonitoringScheduleStatus'),
    'EndpointName': s.get('EndpointName'),
    'MonitoringType': s.get('MonitoringType'),
    } for s in monitoring_schedules['MonitoringScheduleSummaries']]

### list monitoring executions

In [None]:
monitoring_executions = sagemaker_session.list_monitoring_executions(monitoring_schedule_name=mon_schedule_name)
[{  'MonitoringExecutionStatus': s.get('MonitoringExecutionStatus'),
    'ScheduledTime': s.get('ScheduledTime').isoformat() if s.get('ScheduledTime') else '',
    } for s in monitoring_executions['MonitoringExecutionSummaries'][:5]]

## Model usage

### Real-time endpoint

In [None]:
stage_file_path = f's3://{stage_bucket}/promotion-planning/input/stage.csv'


In [None]:
!pip install -Uq awswrangler

In [None]:
import time

import awswrangler as wr

def predict_from_file(file_path: str, max_lines=None):
    df = wr.s3.read_csv(file_path, header=0)
    dataset_type = "text/csv"
    for index, series in df.iterrows():
        if index == max_lines:
            break
        payload = ','.join(series.astype(str).array)
        response = sagemaker_runtime_client.invoke_endpoint(
            EndpointName=endpoint_name,
            Body=payload,
            ContentType=dataset_type,
        )
        prediction = response["Body"].read().decode()
        print(prediction, end=' ')
        time.sleep(.1)


def predict_spoiled_data(max_lines=None):
    null_element_row = ',0.05,0.0,0.0,0.0,0.0,0.0,7.0,10.0,9.0,0.0,216.92,216.92,216.92,216.92,0.0,0.0,0.0,0.0,216.92,216.92,216.92,216.92,86.77,2.4996543,2.4996543,2.4996543,2.4996543,0.9999539,0.9999539,0.9999539,0.9999539,2.4996543,2.4996543,2.4996543,2.4996543,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0'
    rows = []
    rows.append(null_element_row)
    element_row_template = '{},{},0.0,0.0,0.0,0.0,0.0,7.0,10.0,9.0,0.0,216.92,216.92,216.92,216.92,0.0,0.0,0.0,0.0,216.92,216.92,216.92,216.92,86.77,2.4996543,2.4996543,2.4996543,2.4996543,0.9999539,0.9999539,0.9999539,0.9999539,2.4996543,2.4996543,2.4996543,2.4996543,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0'
    for i in range(max_lines):
        rows.append(element_row_template.format((i-2)*10000.0, (i-2)*10000.0))
    dataset_type = "text/csv"
    for index, row in enumerate(rows):
        if index == max_lines:
            break
        payload = row
        response = sagemaker_runtime_client.invoke_endpoint(
            EndpointName=endpoint_name,
            Body=payload,
            ContentType=dataset_type,
        )
        prediction = response["Body"].read().decode()
        print(prediction, end=' ')
        time.sleep(.1)


In [None]:
predict_from_file(file_path=stage_file_path, max_lines=100)

In [None]:
predict_spoiled_data(max_lines=100)

### Batch transform

In [None]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import BatchDataCaptureConfig

transform_instance_count = 1
transform_instance_type = 'ml.m4.xlarge'
base_transform_job_name='promotion-planning'
batch_input = 's3://{}/promotion-planning/input/'.format(stage_bucket)
batch_output = 's3://{}/promotion-planning/output'.format(stage_bucket)

transformer = Transformer(
    model_name=model_name,
    instance_count=transform_instance_count,
    instance_type=transform_instance_type,
    strategy='SingleRecord',
    assemble_with='Line',
    output_path=batch_output,
    max_concurrent_transforms=16,
    max_payload=6,
    base_transform_job_name=base_transform_job_name,
)

In [None]:
transformer.transform(
    data=batch_input,
    content_type='text/csv',
    split_type='Line',
    input_filter='$[1:]',
    batch_data_capture_config=BatchDataCaptureConfig(
        destination_s3_uri=transform_capture_destination_s3_uri,
    ),
)

## Clean up

In [None]:
endpoint_monitor.delete_monitoring_schedule()


In [None]:
transform_monitor.delete_monitoring_schedule()

In [None]:
sagemaker_session.delete_endpoint_config(endpoint_config_name=endpoint_name)

In [None]:
sagemaker_session.delete_endpoint(endpoint_name=endpoint_name)

In [None]:
sagemaker_session.delete_model(model_name=model_name)