# Sagemaker Feature Engineering using Python code

---

## Background
우편, 이메일, 전화 등을 통한 직접 마케팅은 고객을 확보하는 일반적인 전략입니다. 리소스와 고객의 관심이 제한되어 있기 때문에 목표는 특정 제안에 참여할 가능성이 있는 잠재 고객 하위 집합만을 타겟팅하는 것입니다. 인구 통계, 과거 상호 작용 및 환경 요인과 같은 쉽게 구할 수 있는 정보를 기반으로 잠재 고객을 예측하는 것은 일반적인 머신 러닝 문제입니다.

---

## Preparation

다음을 지정하여 시작합니다.

- training 및 model df에 사용할 S3 bucket 및 prefix. 이는 Notebook Instance, training 및 hosting과 동일한 지역 내에 있어야 합니다.
- df에 대한 training 및 hosting access 권한을 부여하는 데 사용된 IAM role arn. 이를 만드는 방법은 documentation를 참조하세요.



# Learning Journey 1: Feature Engineering with Feature Store

In [None]:
import sagemaker
import numpy as np
import pandas as pd
bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-1'
 
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

이제 분석 전반에 사용할 Python 라이브러리를 가져옵니다.

In [None]:
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup

region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

feature_group_name = "<feature-group-name>"
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)

In [None]:
fs_query = feature_group.athena_query()
fs_table = fs_query.table_name
query_string = 'SELECT * FROM "'+fs_table+'"'
print('Running ' + query_string)

In [None]:
fs_query.run(query_string=query_string, output_location='s3://'+bucket+'/'+prefix+'/fs_query_results/')
fs_query.wait()
model_data = fs_query.as_dataframe()

In [None]:
model_data = model_data.drop(['fs_id', 'fs_time', 'write_time', 'api_invocation_time', 'is_deleted'], axis=1)

In [None]:
model_data

In [None]:
train_data, validation_data, test_data = np.split(model_data.sample(frac=1, random_state=1729), [int(0.7 * len(model_data)), int(0.9 * len(model_data))])   # Randomly sort the data then split out first 70%, second 20%, and last 10%

In [None]:
pd.concat([train_data['y_yes'], train_data.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)
pd.concat([validation_data['y_yes'], validation_data.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('validation.csv', index=False, header=False)

In [None]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')

In [None]:
#Validate in S3 Bucket

# Learning Journey 2 : Working on Notebook Instance using Python code

In [None]:
import sagemaker_datawrangler           # For interactive data prep widget
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import sys                                        # For writing outputs to notebook
import math                                       # For ceiling function
import json                                       # For parsing hosting outputs
import os                                         # For manipulating filepath names
import sagemaker 

In [None]:
pd.__version__

In [None]:
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-2'
 
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

---

## Data
github에서 dfset을 다운로드하는 것으로 시작합니다.

\[Moro et al., 2014\] S. Moro, P. Cortez and P. Rita. A df-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014


In [None]:
!wget https://raw.githubusercontent.com/brain-hack/MLOps-with-aws/main/Module3-Feature-Engineering/Dataset/bank-additional-full.csv

In [None]:
df= pd.read_csv('./bank-additional-full.csv')
pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 20)         # Keep the output on one page
df

In [None]:
for column in df.select_dtypes(include=['object']).columns:
    if column != 'y':
        print(pd.crosstab(index=df[column], columns=df['y'], normalize='columns'))

for column in df.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = df[[column, 'y']].hist(by='y', bins=30)
    plt.show()

In [None]:
print(df.corr())
pd.plotting.scatter_matrix(df, figsize=(12, 12))
plt.show()

In [None]:
# Transformation
df['no_previous_contact'] = np.where(df['pdays'] == 999, 1, 0)                                 # Indicator variable to capture when pdays takes a value of 999
df['not_working'] = np.where(np.in1d(df['job'], ['student', 'retired', 'unemployed']), 1, 0)   # Indicator for individuals not actively employed

In [None]:
model_df= pd.get_dummies(df)                                                                  # Convert categorical variables to sets of indicators

In [None]:
model_df= model_df.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)

In [None]:
train_df, validation_df, test_df= np.split(model_df.sample(frac=1, random_state=1729), [int(0.7 * len(model_df)), int(0.9 * len(model_df))])   # Randomly sort the dfthen split out first 70%, second 20%, and last 10%

In [None]:
pd.concat([train_df['y_yes'], train_df.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)
pd.concat([validation_df['y_yes'], validation_df.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('validation.csv', index=False, header=False)

In [None]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')

# Learning Journey 3 : Feature Engineering with Sagemaker Processing

In [None]:
# Upload to S3 Bucket
from sagemaker import Session
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-3'

sess = Session()
input_source = sess.upload_data('./bank-additional-full.csv', bucket=bucket, key_prefix=f'{prefix}/input_data')
input_source

In [None]:
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

Amazon SageMaker Processing을 사용하면 Amazon SageMaker에서 데이터 사전 또는 사후 처리, 기능 엔지니어링, 데이터 검증 또는 모델 평가 워크로드를 위한 단계를 실행할 수 있습니다. 처리 작업은 Amazon S3에서 데이터를 입력으로 받고 데이터를 Amazon S3에 출력으로 저장합니다.

![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)

여기에서 데이터 세트를 가져와 SageMaker Processing으로 변환합니다. SageMaker Processing은 노트북 서버를 실행하는 인스턴스와 별도로 SageMaker에서 관리하는 클러스터에서 테라바이트 규모의 데이터를 처리하는 데 사용할 수 있습니다. 일반적인 SageMaker 워크플로에서 노트북은 프로토타입 제작에만 사용되며 비교적 저렴하고 덜 강력한 인스턴스에서 실행할 수 있는 반면, 처리, 학습 및 모델 호스팅 작업은 별도의 더 강력한 SageMaker에서 관리하는 인스턴스에서 실행됩니다. SageMaker Processing에는 Scikit-learn에 대한 기성품 지원과 Bring Your Own Container 옵션이 포함되어 있어 다양한 데이터 변환 기술 및 작업과 함께 사용할 수 있습니다.  

SageMaker Processing을 사용하려면 아래에 표시된 대로 Python 데이터 전처리 스크립트를 제공하기만 하면 됩니다. 이 예에서는 SageMaker 사전 빌드 Scikit-learn 컨테이너를 사용하고 있으며, 여기에는 데이터 처리를 위한 많은 일반적인 기능이 포함되어 있습니다. 실행할 수 있는 코드와 작업의 종류에 대한 제한은 거의 없으며 최소한의 계약만 있습니다. 입력 및 출력 데이터는 지정된 디렉터리에 배치해야 합니다. 이렇게 하면 SageMaker Processing이 S3에서 입력 데이터를 자동으로 로드하고 작업이 완료되면 변환된 데이터를 다시 S3에 업로드합니다.

In [None]:
!wget https://raw.githubusercontent.com/brain-hack/MLOps-with-aws/main/Module3-Feature-Engineering/feature-engg-script.py

In [None]:
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role


sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=get_execution_role(),
    instance_type="ml.m5.large",
    instance_count=1, 
    base_job_name='mlops-sklearnprocessing'
)

sklearn_processor.run(
    code='feature-engg-script.py',
    # arguments = ['arg1', 'arg2'],
    inputs=[
        ProcessingInput(
            source=input_source, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_path,
        ),
        ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation", destination=validation_path),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test", destination=test_path),
    ]
)

In [None]:
!aws s3 ls $train_path/

In [None]:
!aws s3 ls $test_path/

# Model Building

In [None]:
# Use the previously prepared data
from sagemaker import Session
import sagemaker
import boto3
import re
from sagemaker import get_execution_role
import numpy as np
import pandas as pd
import os

role = get_execution_role()

bucket=sagemaker.Session().default_bucket()
prefix = 'mlops/activity-3'
sess = Session()
train_path = f"s3://{bucket}/{prefix}/train"
validation_path = f"s3://{bucket}/{prefix}/validation"
test_path = f"s3://{bucket}/{prefix}/test"

In [None]:
container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='latest')

In [None]:
s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')

In [None]:
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    instance_count=1, 
                                    instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, prefix),
                                    sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

In [None]:
xgb_predictor = xgb.deploy(initial_instance_count=1,
                           instance_type='ml.m4.xlarge')

In [None]:
#Evaluation
xgb_predictor.serializer = sagemaker.serializers.CSVSerializer()

In [None]:
!aws s3 ls $test_path

In [None]:
test_data_x = pd.read_csv(os.path.join(test_path, 'test_script_x.csv'),header=None)
test_data_y = pd.read_csv(os.path.join(test_path, 'test_script_y.csv'),header=None)

In [None]:
def predict(data, predictor, rows=500 ):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])

    return np.fromstring(predictions[1:], sep=',')
predictions = predict(test_data_x, xgb_predictor)

In [None]:
pd.crosstab(index=test_data_y[0], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])

In [None]:
xgb_predictor.delete_endpoint(delete_endpoint_config=True)

# Model Deployment 

In [None]:
import boto3

client = boto3.client(service_name="sagemaker")
runtime = boto3.client(service_name="sagemaker-runtime")

In [None]:
model_artifacts = xgb.model_data
model_artifacts

In [None]:
from time import gmtime, strftime

model_name = "xgboost-serverless" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Model name: " + model_name)

# dummy environment variables
byo_container_env_vars = {"SAGEMAKER_CONTAINER_LOG_LEVEL": "20", "SOME_ENV_VAR": "myEnvVar"}

create_model_response = client.create_model(
    ModelName=model_name,
    Containers=[
        {
            "Image": container,
            "Mode": "SingleModel",
            "ModelDataUrl": model_artifacts,
            "Environment": byo_container_env_vars,
        }
    ],
    ExecutionRoleArn=role,
)

print("Model Arn: " + create_model_response["ModelArn"])

In [None]:
xgboost_epc_name = "mlops-serverless-epc" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName=xgboost_epc_name,
    ProductionVariants=[
        {
            "VariantName": "byoVariant",
            "ModelName": model_name,
            "ServerlessConfig": {
                "MemorySizeInMB": 4096,
                "MaxConcurrency": 1,
            },
        },
    ],
)

print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])

In [None]:
endpoint_name = "xgboost-serverless-ep" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=xgboost_epc_name,
)

print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])

In [None]:
# wait for endpoint to reach a terminal state (InService) using describe endpoint
import time

describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)

while describe_endpoint_response["EndpointStatus"] == "Creating":
    describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)
    print(describe_endpoint_response["EndpointStatus"])
    time.sleep(15)

describe_endpoint_response

In [None]:
# Endpoint invocation
payload = b"3., 999.,   0.,   1.,   0.,   0.,   0.,   0.,   0.,   0.,   0., 1.,   0.,   0.,   0.,   0.,   0.,   1.,   0.,   0.,   0.,   0., 0.,   0.,   0.,   0.,   0.,   1.,   0.,   1.,   0.,   0.,   1., 0.,   0.,   1.,   0.,   0.,   1.,   0.,   0.,   0.,   0.,   1., 0.,   0.,   0.,   0.,   0.,   0.,   0.,   1.,   0.,   0.,   0., 0.,   1.,   0."

response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=payload,
    ContentType="text/csv",
)

print(response["Body"].read().decode())

In [None]:
client.delete_model(ModelName=model_name)
client.delete_endpoint_config(EndpointConfigName=xgboost_epc_name)
client.delete_endpoint(EndpointName=endpoint_name)

# Automatic Model Tuning

In [None]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner
hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),
                            'min_child_weight': ContinuousParameter(1, 10),
                            'alpha': ContinuousParameter(0, 2),
                            'max_depth': IntegerParameter(1, 10)}
objective_metric_name = 'validation:auc'

In [None]:
tuner = HyperparameterTuner(xgb,
                            objective_metric_name,
                            hyperparameter_ranges,
                            max_jobs=20,
                            max_parallel_jobs=3)

In [None]:
tuner.fit({'train': s3_input_train, 'validation': s3_input_validation})

In [None]:
boto3.client('sagemaker').describe_hyper_parameter_tuning_job(
HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']

In [None]:
tuner.best_training_job()

In [None]:
tuner_predictor = tuner.deploy(initial_instance_count=1,
                           instance_type='ml.m4.xlarge')

In [None]:
tuner_predictor.serializer = sagemaker.serializers.CSVSerializer()

In [None]:
test_data_x = pd.read_csv(os.path.join(test_path, 'test_script_x.csv'),header=None)
test_data_y = pd.read_csv(os.path.join(test_path, 'test_script_y.csv'),header=None)

In [None]:
predictions = predict(test_data_x).to_numpy(),tuner_predictor)

In [None]:
pd.crosstab(index=test_data_y[0], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])

### Delete End Point

In [None]:
tuner_predictor.delete_endpoint(delete_endpoint_config=True)