# Setup and Integrate Data Wrangler with Apache Airflow ML Pipeline

<div class="alert alert-warning"> ⚠️ <strong> PRE-REQUISITE: </strong>

Before proceeding with this notebook, please ensure that you have 
    
1. Executed the <code>00_setup_data_wrangler.ipynb</code> Notebook</li>
2. Created an <a href="https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html" target="_blank">Amazon Managed Workflow for Apache Airflow (MWAA)</a> environment. Please visit the Amazon MWAA <a href="https://docs.aws.amazon.com/mwaa/latest/userguide/get-started.html" target="_blank">Get started</a> documentation to see how you can create an MWAA environment. Alternatively, to quickly get started with MWAA, follow the <a href="https://catalog.us-east-1.prod.workshops.aws/v2/workshops/795e88bb-17e2-498f-82d1-2104f4824168/en-US/workshop-2-0-2/setup/mwaa" target="_blank">step-by-step instructions</a> in the MWAA workshop to setup an MWAA environment.

</div>

This notebook creates the required scripts for the Apache Airflow workflow and uploads them to the respective S3 bucket locations for MWAA. We will create-

1. A `requirements.txt` file and upload it to the MWAA `/requirements` prefix
2. We upload the `SMDataWranglerOperator.py` Python script which is the SageMaker Data Wrangler custom Airflow Operator to the `/dags` prefix.
2. A `config.py` Python script that will setup configurations for our DAG Tasks and upload to the `/dags` prefix.
3. And finally, we create an `ml_pipeline.py` Python script which sets up the end-to-end Apache Airflow workflow DAG and upload it to the `/dags` prefix.
---

Import required dependencies and initialize variables

<div class="alert alert-warning"> ⚠️ <strong> NOTE: </strong>
    Note: replace <code>bucket</code> name with your MWAA Bucket name.
</div>

In [209]:
import time
import uuid
import sagemaker
import boto3
import string

# Sagemaker session
sess = sagemaker.Session()

# MWAA Client
mwaa_client = boto3.client('mwaa')

# Replace the bucket name with your MWAA Bucket
bucket = 'airflow-data-wrangler'

print(f'SageMaker version: {sagemaker.__version__}')
print(f'S3 bucket: {bucket}')

SageMaker version: 2.59.5
S3 bucket: airflow-data-wrangler


Creating and uploading this `.airflowignore` file helps Airflow to prevent interpreting the helper Python scripts as a DAG file. 

In [565]:
%%writefile scripts/.airflowignore
SMDataWranglerOperator
config.py

Writing scripts/.airflowignore


In [566]:
s3_client.upload_file("scripts/.airflowignore", bucket, f"dags/.airflowignore")

---
## Create `requirements.txt` file

Write a `requirements.txt` file and upload it to S3. We will need a few dependencies to be able to run our Data Wrangler python script using the Apache Airflow Python operator, mainly the SageMaker SDK.

In [556]:
%%writefile scripts/requirements.txt
awswrangler
pandas
sagemaker==v2.59.5
dag-factory==0.7.2

Writing scripts/requirements.txt


In [557]:
s3_client = boto3.client("s3")
s3_client.upload_file("scripts/requirements.txt", bucket, f"requirements/requirements.txt")

---
## Upload the custom SageMaker Data Wrangler Operator

In this step we will upload the [custom Airflow operator](https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html) for SageMaker Data Wrangler. With this operator, you can pass in any SageMaker Data Wrangler `.flow` file to Airflow to perform data transformations.

In [558]:
s3_client.upload_file("scripts/SMDataWranglerOperator.py", bucket, f"dags/SMDataWranglerOperator.py")

---

## Update MWAA IAM Execution Role

Every MWAA Environment has an [Execution Role](https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-create-role.html) attached to it. This role consists of permissions policy that grants Amazon Managed Workflows for Apache Airflow (MWAA) permission to invoke the resources of other AWS services on your behalf. In our case, we want our MWAA Tasks to be able to access SageMaker and S3. Edit the MWAA Execution role and add the permissions listed below-

- `AmazonS3FullAccess`
- `AmazonSageMakerFullAccess`

## Setup SageMaker Role for MWAA

Next, we will create a SageMaker service role to be used in the ML pipeline module. To create an IAM role for Amazon SageMaker

- Go to the IAM Console - Roles
- Choose Create role
- For role type, choose AWS Service, find and choose SageMaker, and choose Next: Permissions
- On the Attach permissions policy page, choose (if not already selected)
  - AWS managed policy `AmazonSageMakerFullAccess`
  - AWS managed policy `AmazonS3FullAccess` for access to Amazon S3 resources
- Then choose Next: Tags and then Next: Review.
- For Role name, enter AirflowSageMakerExecutionRole and Choose Create Role

Alternatively, we can also use the default SageMaker Execution role since it already has these permissions.


In [97]:
iam_role = sagemaker.get_execution_role()
iam_role

'arn:aws:iam::965425568475:role/service-role/AmazonSageMaker-ExecutionRole-20201030T135016'

---
## Setup configuration script

In this step we create a helper script to define the model training and model creation task configurations. This script will be used by the DAG tasks to obtain various configuration information for model training and model creation.

<div class="alert alert-warning"> ⚠️ <strong> NOTE: </strong>
    Note: replace <code>bucket</code> with the SageMaker Default bucket name for your SageMaker studio domain.
</div>


In [559]:
%%writefile scripts/config.py
#!/usr/bin/env python
import time
import uuid
import sagemaker
import json
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from sagemaker.amazon.amazon_estimator import get_image_uri

def config(**opts):
    
    region_name=opts['region_name'] if 'region_name' in opts else sagemaker.Session().boto_region_name
    
    # Hook
    hook = AwsBaseHook(aws_conn_id='airflow-sagemaker', resource_type="sagemaker")
    boto_session = hook.get_session(region_name=region_name)
    sagemaker_session =  sagemaker.session.Session(boto_session=boto_session)
    

    training_job_name=opts['training_job_name']
    bucket = opts['bucket'] if 'bucket' in opts else sagemaker_session.default_bucket() #"sagemaker-us-east-2-965425568475"
    s3_prefix = opts['s3_prefix']
    # Get the xgboost container uri
    container = get_image_uri(region_name, 'xgboost', repo_version='1.0-1')
    
    ts = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"   
    config = {}
    
    config["data_wrangler_config"] = {        
        "sagemaker_role":             opts['role_name'],
        #"s3_data_type"              :    defaults to "S3Prefix" 
        #"s3_input_mode"             :    defaults to "File", 
        #"s3_data_distribution_type" :    defaults to "FullyReplicated", 
        #"aws_conn_id"               :    defaults to "aws_default",
        #"kms_key"                   :    defaults to None, 
        #"volume_size_in_gb"         :    defaults to 30,
        #"enable_network_isolation"  :    defaults to False, 
        #"wait_for_processing"       :    defaults to True, 
        #"container_uri"             :    defaults to "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.x", 
        #"container_uri_pinned"      :    defaults to "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.12.0",  
        "outputConfig": {
              #"s3_output_upload_mode":     #defaults to EndOfJob
              #"output_content_type":       #defaults to CSV
              #"output_bucket":             #defaults to SageMaker Default bucket
              "output_prefix": s3_prefix     #prefix within bucket where output will be written, default is generated automatically
        }
    }

    config["train_config"]={
        "AlgorithmSpecification": {
            "TrainingImage": container,
            "TrainingInputMode": "File"
        },
        "HyperParameters": {
            "max_depth": "5",
            "num_round": "10",
            "objective": "reg:squarederror"
        },
        "InputDataConfig": [
            {
                "ChannelName": "train",
                "ContentType": "csv",
                "DataSource": {
                    "S3DataSource": {
                        "S3DataDistributionType": "FullyReplicated",
                        "S3DataType": "S3Prefix",
                        "S3Uri": f"s3://{bucket}/{s3_prefix}/train"
                    }
                }
            }
        ],
        "OutputDataConfig": {
            "S3OutputPath": f"s3://{bucket}/{s3_prefix}/xgboost"
        },
        "ResourceConfig": {
            "InstanceCount": 1,
            "InstanceType": "ml.m5.2xlarge",
            "VolumeSizeInGB": 5
        },
        "RoleArn": opts['role_name'],
        "StoppingCondition": {
            "MaxRuntimeInSeconds": 86400
        },
        "TrainingJobName": training_job_name
    }
    
    config["model_config"]={
           "ExecutionRoleArn": opts['role_name'],
           "ModelName": f"XGBoost-Fraud-Detector-{ts}",
           "PrimaryContainer": { 
              "Mode": "SingleModel",
              "Image": container,
              "ModelDataUrl": f"s3://{bucket}/{s3_prefix}/xgboost/{training_job_name}/output/model.tar.gz"
           },
        }
    
    return config

Writing scripts/config.py


Upload `config.py` to the `/dags` prefix.

In [560]:
s3_client.upload_file("scripts/config.py", bucket, f"dags/config.py")

---
## Setup Apache Airflow DAG (Directed Acyclic Graph)

In this step, we will create the Python script to setup the Apache Airflow DAG. The script will create three distinct tasks and finally chain them together using `>>` in the end to create the Airflow DAG.

1. Use Python operator to define a task to run the Data Wrangler script for data pre-processing
2. Use SageMaker operator to define a task to train an XGBoost model using the training data
3. Use SageMaker operator to define a task to create a model using the model artifacts created by the training step

In [538]:
%store -r ins_claim_flow_uri
ins_claim_flow_uri

's3://sagemaker-us-east-2-965425568475/data-wrangler-pipeline/flow/flow-21-08-44-46-aea6f365.flow'

In [561]:
%%writefile scripts/ml_pipeline.py
#!/usr/bin/env python
import time
import uuid
import json
import boto3
import sagemaker

# Import config file.
from config import config
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import DAG

# airflow operators
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

# airflow sagemaker operators
from airflow.providers.amazon.aws.operators.sagemaker_training import SageMakerTrainingOperator
from airflow.providers.amazon.aws.operators.sagemaker_model import SageMakerModelOperator

# airflow sagemaker configuration
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.workflow.airflow import training_config

# airflow Data Wrangler operator
from SMDataWranglerOperator import SageMakerDataWranglerOperator

# airflow dummy operator
from airflow.operators.dummy import DummyOperator


  
default_args = {  
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ['airflow@iloveairflow.com'],
    'email_on_failure': False,
    'email_on_retry': False
}
ts                = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
DAG_NAME          = f"ml-pipeline"

#-------
### Start creating DAGs
#-------

dag = DAG(  
            DAG_NAME,
            default_args=default_args,
            dagrun_timeout=timedelta(hours=2),
            # Cron expression to auto run workflow on specified interval
            # schedule_interval='0 3 * * *'
            schedule_interval=None
        )

#-------
# Task to create configurations
#-------

config_task = PythonOperator(
        task_id = 'Start',
        python_callable=config,
        op_kwargs={
            'training_job_name': f"XGBoost-training-{ts}",
            's3_prefix': 'data-wrangler-pipeline',
            'role_name': 'AmazonSageMaker-ExecutionRole-20201030T135016'},
        provide_context=True,
        dag=dag
    )

#-------
# Task with SageMakerDataWranglerOperator operator for Data Wrangler Processing Job.
#-------

def datawrangler(**context):
    config = context['ti'].xcom_pull(task_ids='Start',key='return_value')
    preprocess_task = SageMakerDataWranglerOperator(
                            task_id='DataWrangler_Processing_StepNew',
                            dag=dag,
                            flow_file_s3uri="$flow_uri",
                            processing_instance_count=2,
                            instance_type='ml.m5.4xlarge',
                            aws_conn_id="aws_default",
                            config= config["data_wrangler_config"]
                    )
    preprocess_task.execute(context)

datawrangler_task = PythonOperator(
        task_id = 'SageMaker_DataWrangler_step',
        python_callable=datawrangler,
        provide_context=True,
        dag=dag
    )

#-------
# Task with SageMaker training operator to train the xgboost model
#-------

def trainmodel(**context):
    config = context['ti'].xcom_pull(task_ids='Start',key='return_value')
    trainmodel_task = SageMakerTrainingOperator(
                task_id='Training_Step',
                config= config['train_config'],
                aws_conn_id='aws-sagemaker',
                wait_for_completion=True,
                check_interval=30
            )
    trainmodel_task.execute(context)

train_model_task = PythonOperator(
        task_id = 'SageMaker_training_step',
        python_callable=trainmodel,
        provide_context=True,
        dag=dag
    )

#-------
# Task with SageMaker Model operator to create the xgboost model from artifacts
#-------

def createmodel(**context):
    config = context['ti'].xcom_pull(task_ids='Start',key='return_value')
    createmodel_task= SageMakerModelOperator(
            task_id='Create_Model',
            config= config['model_config'],
            aws_conn_id='aws-sagemaker',
        )
    createmodel_task.execute(context)
    
create_model_task = PythonOperator(
        task_id = 'SageMaker_create_model_step',
        python_callable=createmodel,
        provide_context=True,
        dag=dag
    )

#------
# Last step
#------
end_task = DummyOperator(task_id='End', dag=dag)

# Create task dependencies

config_task >> datawrangler_task >> train_model_task >> create_model_task >> end_task

Writing scripts/ml_pipeline.py


Replace `$flow_uri` in the `ml_pipeline.py` script with the store magic variable `ins_claim_flow_uri` which contains S3 path of the `.flow` file.

In [562]:
with open("ml_pipeline.py", 'r') as f:
    variables   = {'flow_uri': ins_claim_flow_uri}
    template    = string.Template(f.read())
    ml_pipeline = template.substitute(variables)

# Creates the .flow file
with open('ml_pipeline.py', 'w') as f:
    f.write(ml_pipeline)

Upload `ml_pipeline.py` to the `/dags` prefix.

In [563]:
s3_client.upload_file("scripts/ml_pipeline.py", bucket, f"dags/ml_pipeline.py")

---
## View Airflow DAG and run

Once the above steps are complete, you can access the [Apache Airflow UI](https://docs.aws.amazon.com/mwaa/latest/userguide/access-airflow-ui.html) and view the DAG. To access the Apache Airflow UI, go to the Amazon MWAA Console, select the MWAA Environment and click the _Airflow UI_ link.

<img src="images/mwaa_ui.png" width="800"/>


You can run the DAG by clicking on the "Play" button, alternatively you can -

1. Setup the DAG to run on a set schedule automatically using cron expressions
2. Setup the DAG to run based on S3 sensors such that the pipeline/workflow would execute whenever a new file arrives in a bucket/prefix.

<img src="images/mwaa_dag.png" width="800"/>


### Clean Up

1. Delete the MWAA Environment from the Amazon MWAA Console.
2. Delete the MWAA S3 files.
3. Delete the Model training data and model artifact files from S3.
4. Delete the SageMaker Model.

---
# Conclusion

We created an ML Pipeline with Apache Airflow and used the Data Wrangler script to pre-process and generate new training data for our model training and subsequently created a new model in Amazon SageMaker.
