# Data Scientist Context

This notebook demonstrate a sample of the activities and files 

In [None]:
import boto3
import pandas as pd
import logging
import json
import requests
import sagemaker
import string

from pathlib import Path
from urllib import parse

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

## Use case

In this example, we will use an auto insurance domain to detect claims that are possibly fraudulent.
more precisley we address the use-case: “what is the likelihood that a given autoclaim is fraudulent?,” and explore the technical solution.

In [None]:
sagemaker_session = sagemaker.Session()
boto_session = sagemaker_session.boto_session
sagemaker_client = boto_session.client('sagemaker')
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()

s3_uploader = sagemaker.s3.S3Uploader

bucket = sagemaker_session.default_bucket()
prefix = "mlops-demo"

## Data
The inputs for building our model and workflow are two tables of insurance data: a claims table and a customers table. This data was synthetically generated is provided to you in its raw state for pre-processing with SageMaker Data Wrangler.

In [None]:
base_url = "https://github.com/aws/amazon-sagemaker-examples/raw/master/end_to_end/fraud_detection/data/"
file_list = ["claims.csv", "customers.csv"]
feature_eng_base_path = Path("feature_engineering")

In [None]:
local_path = Path("data")
local_path.mkdir(exist_ok=True)
for file_url in file_list:
    file_url = base_url + file_url
    parsed_url = parse.urlparse(file_url)
    file_name = Path(parsed_url.path).name
    file_path = local_path / file_name
    with file_path.open("wb") as f, requests.get(file_url, stream=True) as r:
        for chunk in r.iter_content():
            f.write(chunk)
    logger.info(f"Retrieved {file_url}")

Upload the files to S3, and open it on SageMaker Data Wrangler 

In [None]:
data_uri_prefix = s3_uploader.upload(local_path.as_posix(), f"s3://{bucket}/{prefix}")

In [None]:
claims_uri = data_uri_prefix + "/claims.csv"
customers_uri = data_uri_prefix + "/customers.csv"

Editing the tempalte `flow` files to point at the correct dataset in S3

In [None]:

claims_flow_template_file = feature_eng_base_path / "claims_flow_template"

with (feature_eng_base_path / "claims_flow_template").open("r") as f, (feature_eng_base_path / "claims.flow").open("w") as g:
    variables = {"data_uri": (data_uri_prefix + "/claims.csv")}
    template = string.Template(f.read())
    claims_flow = template.substitute(variables)
    claims_flow = json.loads(claims_flow)
    json.dump(claims_flow, g, indent=2)
    logger.info("Created claims.flow ")

with (feature_eng_base_path / "customers_flow_template").open("r") as f, (feature_eng_base_path / "customers.flow").open("w") as g:
    variables = {"data_uri": (data_uri_prefix + "/customers.csv")}
    template = string.Template(f.read())
    claims_flow = template.substitute(variables)
    claims_flow = json.loads(claims_flow)
    json.dump(claims_flow, g, indent=2)
    logger.info("Created customers.flow ")

We can review the feature engineering:
- Let's look at the feature engineering for the [Claims Dataset](claims.flow)

- Let's look at the feature engineering for the [Customers Dataset](customers.flow)

For each flow file we generate the corresponding Jupyter Notebook for export to Feature Store, to extract the data schema

## Feature Store

In [None]:
from sagemaker.utils import name_from_base
from sagemaker.feature_store.feature_group import FeatureGroup

from utils.feature_store_utils import get_fg_conf

In [None]:
featurestore_runtime = boto_session.client(
    service_name='sagemaker-featurestore-runtime',
    region_name=region
)

feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [None]:
feature_store_offline_s3_uri = f"s3://{bucket}/{prefix}/fs/"

### Claims Feature Group

In [None]:
claims_fg_conf = get_fg_conf(feature_eng_base_path / 'claims.fg.json', feature_store_offline_s3_uri)
feature_definitions = claims_fg_conf['feature_definitions']
feature_group_name = name_from_base(claims_fg_conf['feature_group_name'])
feature_group_properties = claims_fg_conf['feature_group_properties']

In [None]:
claims_feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=feature_store_session,
    feature_definitions=feature_definitions
)

claims_feature_group.create(
    role_arn=role,
    **feature_group_properties
)

### Customers Feature Group

In [None]:
customers_fg_conf = get_fg_conf(
    feature_eng_base_path / 'customers.fg.json',
    feature_store_offline_s3_uri
)
feature_definitions = customers_fg_conf['feature_definitions']
feature_group_name = name_from_base(customers_fg_conf['feature_group_name'])
feature_group_properties = customers_fg_conf['feature_group_properties']

In [None]:
customers_feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=feature_store_session,
    feature_definitions=feature_definitions)

customers_feature_group.create(
    role_arn=role,
    **feature_group_properties
)

## Data Processing Jobs

In [None]:
from sagemaker.processing import (
    FeatureStoreOutput,
    ProcessingInput,
    ProcessingOutput,
)
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.wrangler.processing import DataWranglerProcessor

from utils.parse_flow import FlowFile

In [None]:
def create_pipeline_fi(
    role: str,
    pipeline_name: str,
    sagemaker_session:sagemaker.Session()=None,
    **kwarg
)-> Pipeline:
    """Create a SageMaker pipeline

    Args:
        role (str): IAM role that executes the pipeline
        pipeline_name (str): name of the pipeline
        sagemaker_session ([type], optional): [description]. Defaults to None.

    Returns:
        Pipeline: Sagemaker pipeline
    """
    flow_file_path = kwarg["flow_file_path"]
    feature_group_name = kwarg["feature_group_name"]

    flow_file = FlowFile(flow_file_path)

    instance_count = ParameterInteger(name="InstanceCount", default_value=1)
    instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge")
    input_data_uri = ParameterString(name="InputDataUri")

    output_content_type = "CSV"
    output_config = {flow_file.output_name: {"content_type": output_content_type}}
    job_argument = [f"--output-config '{json.dumps(output_config)}'"]

    data_sources = [
        ProcessingInput(
            input_name=flow_file.input_name,
            source=input_data_uri,
            destination=f"/opt/ml/processing/{flow_file.input_name}",
        )
    ]

    outputs = [
        ProcessingOutput(
            output_name=flow_file.output_name,
            app_managed=True,
            feature_store_output=FeatureStoreOutput(
                feature_group_name=feature_group_name
            ),
        )
    ]

    data_wrangler_processor = DataWranglerProcessor(
        role=role,
        data_wrangler_flow_source=flow_file_path,
        instance_count=instance_count,
        instance_type=instance_type,
        sagemaker_session=sagemaker_session,
    )

    data_wrangler_step = ProcessingStep(
        name="data-wrangler-step",
        processor=data_wrangler_processor,
        inputs=data_sources,
        outputs=outputs,
        job_arguments=job_argument,
    )

    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[instance_count, instance_type, input_data_uri],
        steps=[data_wrangler_step],
        sagemaker_session=sagemaker_session,
    )

    return pipeline

### Claims feature ingestion pipeline

In [None]:
claims_pipeline_conf = {
    "flow_file_path":'claims.flow',
    "feature_group_name": claims_feature_group.name,
}
claims_pipeline = create_pipeline_fi(
    role,
    "claims-pipeline",
    sagemaker_session,
    **claims_pipeline_conf
)
claims_pipeline.definition()

In [None]:
claims_pipeline.create(
    role_arn=role,
    description="Claims feature ingestion pipeline", 
)

#### Run the pipeline 

In [None]:
claims_pipeline_execution = claims_pipeline.start(
    parameters={
        "InputDataUri": claims_uri
    },
    execution_display_name="initial-load"
)

In [None]:
claims_pipeline_execution.describe()

### Customers feature ingestion pipeline

In [None]:
customers_pipeline_conf = {
    "flow_file_path":'customers.flow',
    "feature_group_name": customers_feature_group.name,
}
customers_pipeline = create_pipeline_fi(
    role,
    "customers-pipeline",
    sagemaker_session,
    **customers_pipeline_conf
)
customers_pipeline.definition()

In [None]:
customers_pipeline.create(
    role_arn=role,
    description="Customers feature ingestion pipeline", 
)

#### Run the pipeline 

In [None]:
customers_pipeline_execution = customers_pipeline.start(
    parameters={
        "InputDataUri": customers_uri
    },
    execution_display_name="initial-load"
)

## Model training

Here we define a SageMaker Pipeline to manage all the tasks necesary for model training:
- data extraction from Feature Store
- data processing specific to Model Training, e.g., joining datasets, traint/test split
- train the model
- evaluate initial dataset bias across features
- record a basline of the distribution of the training dataset for Model Monitor use

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.dataset_definition.inputs import (
    DatasetDefinition,
    AthenaDatasetDefinition,
)
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    Processor,
)
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep
from sagemaker import clarify
from sagemaker.model_metrics import MetricsSource, ModelMetrics

In [None]:
def create_pipeline_xgboost_training(
    role: str, pipeline_name: str, sagemaker_session: sagemaker.Session = None, **kwargs
) -> Pipeline:


    customers_fg_name = kwargs["customers_fg_name"]
    claims_fg_name = kwargs["claims_fg_name"]
    create_dataset_script_path = kwargs["create_dataset_script_path"]
    prefix = kwargs["prefix"]
    model_entry_point = kwargs["model_entry_point"]
    model_package_group_name = kwargs["model_package_group_name"]
    
    customer_fg = sagemaker_client.describe_feature_group(FeatureGroupName=customers_fg_name)
    claims_fg = sagemaker_client.describe_feature_group(FeatureGroupName=claims_fg_name)
    database_name = customer_fg['OfflineStoreConfig']['DataCatalogConfig']['Database']
    claims_table = claims_fg['OfflineStoreConfig']['DataCatalogConfig']['TableName']
    customers_table = customer_fg['OfflineStoreConfig']['DataCatalogConfig']['TableName']
    catalog = customer_fg['OfflineStoreConfig']['DataCatalogConfig']['Catalog']

    train_instance_param = ParameterString(
        name="TrainingInstance",
        default_value="ml.m4.xlarge",
    )

    model_approval_status = ParameterString(
        name="ModelApprovalStatus", default_value="PendingManualApproval"
    )
    baseline_instance_type = ParameterString(
        name="BaselineInstanceType", default_value="ml.m5.xlarge"
    )

    # Create dataset step
    create_dataset_processor = SKLearnProcessor(
        framework_version="0.23-1",
        role=role,
        instance_type="ml.m5.xlarge",
        instance_count=1,
        base_job_name=f"{prefix}/fraud-demo-create-dataset",
        sagemaker_session=sagemaker_session,
    )

    training_columns_string = ", ".join(f'"{c}"' for c in training_columns)

    query_string = f"""
    SELECT DISTINCT {training_columns_string}
    FROM "{claims_table}" claims LEFT JOIN "{customers_table}" customers
    ON claims.policy_id = customers.policy_id
    """
    athena_data_path = "/opt/ml/processing/athena"
    
    data_sources = [
        ProcessingInput(
            input_name="athena_dataset",
            dataset_definition=DatasetDefinition(
                local_path=athena_data_path,
                data_distribution_type="FullyReplicated",
                athena_dataset_definition=AthenaDatasetDefinition(
                    catalog=catalog,
                    database=database_name,
                    query_string=query_string,
                    output_s3_uri=f"s3://{bucket}/{prefix}/athena/data/",
                    output_format="PARQUET"
                )
            )
        )
    ]
    
    create_dataset_step = ProcessingStep(
        name="CreateDataset",
        processor=create_dataset_processor,
        inputs=data_sources,
        outputs=[
            ProcessingOutput(
                output_name="train_data",
                source="/opt/ml/processing/output/train"
            ),
            ProcessingOutput(
                output_name="test_data",
                source="/opt/ml/processing/output/test"
            ),
            ProcessingOutput(
                output_name="baseline",
                source="/opt/ml/processing/output/baseline"
            ),
        ],
        job_arguments=[
            "--athena-data",
            athena_data_path,
        ],
        code=create_dataset_script_path,
    )

    # baseline job step
    # Get the default model monitor container
    model_monitor_container_uri = sagemaker.image_uris.retrieve(
        framework="model-monitor",
        region=region,
        version="latest",
    )

    # Create the baseline job using
    dataset_format = DatasetFormat.csv()
    env = {
        "dataset_format": json.dumps(dataset_format),
        "dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
        "output_path": "/opt/ml/processing/output",
        "publish_cloudwatch_metrics": "Disabled",
    }

    monitor_analyzer = Processor(
        image_uri=model_monitor_container_uri,
        role=role,
        instance_count=1,
        instance_type=baseline_instance_type,
        base_job_name=f"{prefix}/monitoring",
        sagemaker_session=sagemaker_session,
        max_runtime_in_seconds=1800,
        env=env,
    )

    baseline_step = ProcessingStep(
        name="BaselineJob",
        processor=monitor_analyzer,
        inputs=[
            ProcessingInput(
                source=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                    "baseline"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/input/baseline_dataset_input",
                input_name="baseline_dataset_input",
            ),
        ],
        outputs=[
            ProcessingOutput(
                source="/opt/ml/processing/output",
                output_name="monitoring_output",
            ),
        ],
    )

    # Model training step
    train_instance_count = 1
    training_job_output_path = f"s3://{bucket}/{prefix}/training_jobs"
    metric_uri = f"{prefix}/training_jobs/metrics_output/metrics.json"

    hyperparameters = {
        "max_depth": "3",
        "eta": "0.2",
        "objective": "binary:logistic",
        "num_round": "100",
        "bucket": f"{bucket}",
        "object": f"{metric_uri}",
    }

    xgb_estimator = XGBoost(
        entry_point=model_entry_point,
        hyperparameters=hyperparameters,
        role=role,
        instance_count=train_instance_count,
        instance_type=train_instance_param,
        framework_version="1.0-1",
        sagemaker_session=sagemaker_session,
    )

    train_step = TrainingStep(
        name="XgboostTrain",
        estimator=xgb_estimator,
        inputs={
            "train": sagemaker.inputs.TrainingInput(
                s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                    "train_data"
                ].S3Output.S3Uri
            )
        },
    )

    # instantiate the Clarify processor
    clarify_processor = clarify.SageMakerClarifyProcessor(role=role,
                                                      instance_count=1,
                                                      instance_type="ml.c5.xlarge",
                                                      sagemaker_session=sagemaker_session)
    
    # Run bias metrics with clarify steps
    pipeline_bias_output_path = f"s3://{bucket}/{prefix}/clarify-output/pipeline/bias"
    
    # clarify configuration
    bias_data_config = clarify.DataConfig(
        s3_data_input_path=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        s3_output_path=pipeline_bias_output_path,
        label="fraud",
        dataset_type="text/csv",
    )

    bias_config = clarify.BiasConfig(
        label_values_or_threshold=[0],
        facet_name="customer_gender_female",
        facet_values_or_threshold=[1],
    )

    analysis_config = bias_data_config.get_config()
    analysis_config.update(bias_config.get_config())
    analysis_config["methods"] = {"pre_training_bias": {"methods": "all"}}

    clarify_config_dir = Path("config")
    clarify_config_dir.mkdir(exist_ok=True)
    with open(clarify_config_dir / "analysis_config.json", "w") as f:
        json.dump(analysis_config, f)  
    

    clarify_step = ProcessingStep(
        name="ClarifyProcessor",
        processor=clarify_processor,
        inputs=[
            sagemaker.processing.ProcessingInput(
                input_name="analysis_config",
                source=f"{clarify_config_dir}/analysis_config.json",
                destination="/opt/ml/processing/input/config",
            ),
            sagemaker.processing.ProcessingInput(
                input_name="dataset",
                source=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                    "train_data"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/input/data",
            ),
        ],
        outputs=[
            sagemaker.processing.ProcessingOutput(
                source="/opt/ml/processing/output/analysis.json",
                destination=pipeline_bias_output_path,
                output_name="analysis_result",
            )
        ],
    )    
    
    # Register Model step

    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=f"s3://{bucket}/{metric_uri}",
            content_type="application/json",
        )
    )
    
    register_step = RegisterModel(
        name="RegisterModel",
        estimator=xgb_estimator,
        model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.t2.large", "ml.m5.large"],
        transform_instances=["ml.m5.xlarge"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
    )

    # pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            baseline_instance_type,
            train_instance_param,
            model_approval_status,
        ],
        steps=[
            create_dataset_step,
            baseline_step,
            train_step,
            clarify_step,
            register_step,
        ],
        sagemaker_session=sagemaker_session,
    )

    return pipeline


In [None]:
training_columns = [
    "fraud",
    "incident_severity",
    "num_vehicles_involved",
    "num_injuries",
    "num_witnesses",
    "police_report_available",
    "injury_claim",
    "vehicle_claim",
    "total_claim_amount",
    "incident_month",
    "incident_day",
    "incident_dow",
    "incident_hour",
    "driver_relationship_self",
    "driver_relationship_na",
    "driver_relationship_spouse",
    "driver_relationship_child",
    "driver_relationship_other",
    "incident_type_collision",
    "incident_type_breakin",
    "incident_type_theft",
    "collision_type_front",
    "collision_type_rear",
    "collision_type_side",
    "collision_type_na",
    "authorities_contacted_police",
    "authorities_contacted_none",
    "authorities_contacted_fire",
    "authorities_contacted_ambulance",
    "customer_age",
    "customer_education",
    "months_as_customer",
    "policy_deductable",
    "policy_annual_premium",
    "policy_liability",
    "auto_year",
    "num_claims_past_year",
    "num_insurers_past_5_years",
    "customer_gender_male",
    "customer_gender_female",
    "policy_state_ca",
    "policy_state_wa",
    "policy_state_az",
    "policy_state_or",
    "policy_state_nv",
    "policy_state_id",
]
training_configuration = dict(
    customers_fg_name = customers_feature_group.name,
    claims_fg_name = claims_feature_group.name,
    create_dataset_script_path = "scripts/create_dataset.py",
    prefix = prefix,
    model_entry_point = "scripts/xgboost_starter_script.py",
    model_package_group_name = name_from_base("xgboost"),
    training_columns=training_columns
)

In [None]:
model_training_pipeline = create_pipeline_xgboost_training(
    role=role,
    pipeline_name=name_from_base('xgboost-develop'),
    sagemaker_session=sagemaker_session,
    **training_configuration
)

In [None]:
model_training_pipeline.definition()

In [None]:
model_training_pipeline.create(role_arn=role, description="Traing XGBoost model")

In [None]:
model_training_pipeline_execution = model_training_pipeline.start(execution_display_name="first-build")

In [None]:
model_training_pipeline_execution.describe()

In [None]:
model_training_pipeline_execution.list_steps()

## Clean-up

In [None]:
s3 = boto_session.resource('s3')
s3_bucket = s3.Bucket(bucket)

In [None]:
for fg in [claims_feature_group, customers_feature_group]:
    s3_prefix = claims_feature_group.describe()['OfflineStoreConfig']['S3StorageConfig']['ResolvedOutputS3Uri'].split('/', 3)[-1]
    s3_bucket.delete_objects(
        Delete={
            'Objects': [
                {
                    'Key': s3_prefix
                },
            ],
        },

    )

In [None]:
# Delete feature groups
claims_feature_group.delete()
customers_feature_group.delete()

In [None]:
sagemaker_client.delete_pipeline(PipelineName="claims-pipeline")
sagemaker_client.delete_pipeline(PipelineName="customers-pipeline")
sagemaker_client.delete_pipeline(PipelineName=model_training_pipeline.name)