# Demo of DataMesh: Data exploration and feature engineering

## Overview
Amazon SageMaker helps data scientists and developers to prepare, build, train, and deploy high-quality machine learning (ML) models quickly by bringing together a broad set of capabilities purpose-built for ML. AWS Lake Formation provides a single place to define, classify, tag, and manage fine-grained permissions for data in Amazon S3.

In this SageMaker Studio notebook we highlight how you can do data exploration and feature engineering in a data mesh environment. The producer account has provided access to the consumer account, and the database and table are available in the consumer AWS Lake Formation (LF). The next step before going through this notebook, is to grant access to the table in LF to the SageMaker role used by your notebook. LF is an extra layer of access management seating on the top of IAM. This means that even if you have access to the table or database with IAM, if you did not grant access with LF, you will not be able to access the data.

In a Data mesh environment you use Amazon Athena to access the data. Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. In this notebook we use Athena to first explore a sample of the data within the notebook. After, we use Athena within a processing job to query the whole data and perform the tranformations. 

This sample notebook walks you through: 
1. Query and explore credit risk dataset shared by the producer account - [South German Credit (UPDATE) Data Set](https://archive.ics.uci.edu/ml/datasets/South+German+Credit+%28UPDATE%29)
2. Preprocessing data with sklearn on the dataset and build a model to featurize future data

## 1. Initialize and import the necessary libraries

In [None]:
import boto3

client = boto3.client('athena')
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost import XGBoost
from sagemaker import Session
from sagemaker.xgboost import XGBoostModel
from sagemaker.sklearn import SKLearnModel
from sagemaker.pipeline import PipelineModel
region = boto3.session.Session().region_name
role = get_execution_role()
import sys
!{sys.executable} -m pip install PyAthena
import pyathena
from pyathena import connect
import pandas as pd

from io import StringIO
import os
import time
import sys
import IPython
from time import gmtime, strftime
import numpy as np


session = Session()
bucket = session.default_bucket()
prefix = "sagemaker/sagemaker-credit-risk-model-data-mesh"
region = session.boto_region_name


In [None]:
!pip install awswrangler
import awswrangler as wr


## 2. Data exploration
In this part we will do some data exploration on the notebook itself. We first query Athena by using the pyathena library and then we save the results as a dataframe to later explore.

In [None]:
#Athena query from boto3
#Once you have queried the data, use the CLI command in the terminal to get the results 
#aws athena get-query-results --query-execution-id ADD HERE THE EXECUTION ID TO GET THE RESULTS
response = client.start_query_execution(
    QueryString='SELECT * FROM "rl_credit-card"."credit_card" LIMIT 10',
    QueryExecutionContext={
        'Database': 'rl_credit-card',
        'Catalog': 'AwsDataCatalog'
    },
    ResultConfiguration={
        'OutputLocation': 's3://sagemaker-us-east-1-934586227363/athenaqueries'
    }
)
print(response['QueryExecutionId'])

In [None]:
#aws athena get-query-results --query-execution-id ADD HERE THE EXECUTION ID PRINTED ABOVE TO GET THE RESULTS
#!aws athena get-query-results --query-execution-id '79a497d7-8240-4d98-b33a-5fefa47a8fb4'

In [None]:
#You can as well query via pyathena to avoid the two steps way from above

conn = connect(s3_staging_dir='s3://sagemaker-us-east-1-934586227363/athenaqueries/',
               region_name='us-east-1')

df = pd.read_sql('SELECT * FROM "rl_credit-card"."credit_card" LIMIT 10;', conn)

df.head()


In [None]:
df= wr.athena.read_sql_query('SELECT * FROM credit_card LIMIT 10;', database="rl_credit-card", ctas_approach=False)

df.head()

In [None]:
#check if there is class unbalance

df['credit_risk'].value_counts()

## 3. Feature engineering

In [None]:
sklearn_processor = SKLearnProcessor(
    role=role,
    base_job_name="sagemaker-clarify-credit-risk-processing-job",
    instance_type="ml.m5.large",
    instance_count=1,
    framework_version="0.20.0",
)
!pygmentize processing/preprocessor.py

In [None]:

import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition
from sagemaker.dataset_definition.inputs import DatasetDefinition
from sagemaker.processing import ProcessingInput, ProcessingOutput

raw_data_path = "s3://{0}/{1}/data/train/".format(bucket, prefix)
train_data_path = "s3://{0}/{1}/data/preprocessed/train/".format(bucket, prefix)
val_data_path = "s3://{0}/{1}/data/preprocessed/val/".format(bucket, prefix)
model_path = "s3://{0}/{1}/sklearn/".format(bucket, prefix)
test_data_path = "s3://{0}/{1}/data/test/".format(bucket, prefix)

AthenaDataset = AthenaDatasetDefinition (
  catalog = 'AwsDataCatalog', 
  database = 'rl_credit-card', 
  query_string = 'SELECT * FROM "rl_credit-card"."credit_card"',                                
  output_s3_uri = 's3://sagemaker-us-east-1-934586227363/athenaqueries/', 
  work_group = 'primary', 
  output_format = 'PARQUET')

dataSet = DatasetDefinition(
  athena_dataset_definition = AthenaDataset, 
  local_path='/opt/ml/processing/input/dataset.parquet')


sklearn_processor.run(
    code="processing/preprocessor.py",
    inputs=[ProcessingInput(
      input_name="dataset", 
      destination="/opt/ml/processing/input", 
      dataset_definition=dataSet)],
    outputs=[
        ProcessingOutput(
            output_name="train_data", source="/opt/ml/processing/train", destination=train_data_path
        ),
        ProcessingOutput(
            output_name="val_data", source="/opt/ml/processing/val", destination=val_data_path
        ),
        ProcessingOutput(
            output_name="model", source="/opt/ml/processing/model", destination=model_path
        ),
        ProcessingOutput(
            output_name="test_data", source="/opt/ml/processing/test", destination=test_data_path
        ),
    ],
    arguments=["--train-test-split-ratio", "0.2"],
    logs=False,
)





# 4. Training

In [None]:
!pygmentize training/train_xgboost.py

In [None]:
hyperparameters = {
    "max_depth": "5",
    "eta": "0.1",
    "gamma": "4",
    "min_child_weight": "6",
    "silent": "1",
    "objective": "binary:logistic",
    "num_round": "100",
    "subsample": "0.8",
    "eval_metric": "auc",
    "early_stopping_rounds": "20",
}

entry_point = "train_xgboost.py"
source_dir = "training/"
output_path = "s3://{0}/{1}/{2}".format(bucket, prefix, "xgb_model")
code_location = "s3://{0}/{1}/code".format(bucket, prefix)

estimator = XGBoost(
    entry_point=entry_point,
    source_dir=source_dir,
    output_path=output_path,
    code_location=code_location,
    hyperparameters=hyperparameters,
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="0.90-2",
    py_version="py3",
    role=role,
)

In [None]:
print(estimator)

In [None]:
job_name = f"credit-risk-xgb-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

train_input = TrainingInput(
    "s3://{0}/{1}/data/preprocessed/train/".format(bucket, prefix), content_type="csv"
)
val_input = TrainingInput(
    "s3://{0}/{1}/data/preprocessed/val/".format(bucket, prefix), content_type="csv"
)

inputs = {"train": train_input, "validation": val_input}

estimator.fit(inputs, job_name=job_name)