# Automated data transformation and ingestion from an Amazon S3 bucket to SageMaker Feature Store

## Architecture Overview
This notebook shows you how to use [AWS Service Catalog](https://aws.amazon.com/servicecatalog), [SageMaker Projects](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-whatis.html), and [Pipelines](https://aws.amazon.com/sagemaker/pipelines/) to create re-usable and portable components in SageMaker Studio.
This project automates feature transformations and ingestion into Feature Store, triggered off of new data files that are uploaded to an S3 bucket. The SageMaker project creates all necessary components, sets up all permissions and links between resources.

<img src="../design/feature-store-ingestion-pipeline.drawio.svg" style="background-color:white;" alt="solution overview" width="1000"/>

## Prerequisites
The following resources must be created before you can proceed with deployment of the SageMaker:
- DataWrangler `.flow` file which contains an output node -> done within module [`02-data-wrangler`](../02-data-wrangler/00-data-wrangler-demo.ipynb)
- Feature group to store features extracted from the data -> done within the notebook [`01-dw-flow-feature-store`](../02-data-wrangler/01-dw-flow-feature-store.ipynb) in the module `02-data-wrangler`
- SageMaker project portfolio -> done with [intial setup](../README.md#deploy-sagemaker-projects)
- S3 bucket where new data files will be uploaded

In [None]:
import sagemaker
import boto3
import time
import json
import os
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session

print(sagemaker.__version__)

In [None]:
# load environment variables from %store
%store -r 

In [None]:
%store

In [None]:
try:
    dw_flow_file_url
    dw_output_name
    feature_group_name
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 02-data-wrangler/01-dw-flow-feature-store.ipynb notebook")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

In [None]:
# Set the string literals
s3_input_data_prefix = f"{data_bucket}/feature-store-ingestion-pipeline/landing-zone/"
pipeline_name_prefix = "s3-fs-ingest-pipeline"

## Create data load project

### Option 1: Create a project in Studio

<img src="../img/studio-create-project.png" alt="studio-create-project" width="500"/>

#### Select a project template for automated feature ingestion and transformation pipeline

<img src="../img/studio-select-project-template.png" alt="studio-create-project" width="800"/>

#### Enter your specific project parameters

<img src="../img/studio-enter-project-parameters.png" alt="studio-create-project" width="800"/>

The parameters are:
- Project name and description
- **Pipeline name prefix**
- **Pipeline description**
- **S3 prefix** to monitor to uploaded files to trigger a data transformation and ingestion
- **Data Wrangler flow S3 url** with data processing pipeline
- **Data Wrangler output name** which generates the feature store input
- **Feature group name** to ingest the processed and transformed data
- **Lambda execution role**: provide your own IAM role for the lambda function or automatically create a new one

Click on **Create project**

Wait until project creation is completed. The banner "Creating project...":

<img src="../img/studio-creating-project-banner.png" alt="studio-creating-project-banner" width="500"/>

will change to the project details page:

![](img/studio-project-created.png)


In [None]:
# Set project_id to the project id of the created project
project_id = "p-c98nadneqmvr"

### Option 2: Create project in code
Alternatively, you can use [boto3 Python SDK](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_project) to create a new project from the notebook.  
First, get the `ProvisioningArtifactIds` and `ProductId` from service catalog CloudFormation template:

In [None]:
!aws cloudformation describe-stacks \
    --stack-name "sm-project-sc-portfolio" \
    --output table \
    --query "Stacks[0].Outputs[*].[OutputKey, OutputValue]"

In [None]:
cf = boto3.client("cloudformation")

r = cf.describe_stacks(StackName="sm-project-sc-portfolio")

Set parameters for the SageMaker project:

In [None]:
import boto3
from time import gmtime, strftime

sm = boto3.client("sagemaker")

provisioning_artifact_ids = [v for v in r["Stacks"][0]["Outputs"] if v["OutputKey"] == "ProvisioningArtifactIds"][0]["OutputValue"]
product_id = [v for v in r["Stacks"][0]["Outputs"] if v["OutputKey"] == "ProductId"][0]["OutputValue"]
project_name = f"s3-fs-ingest-{strftime('%d-%H-%M-%S', gmtime())}"
project_parameters = [
            {
                'Key': 'PipelineDescription',
                'Value': 'Feature Store ingestion pipeline'
            },
            {
                'Key': 'DataWranglerFlowUrl',
                'Value': dw_flow_file_url
            },
            {
                'Key': 'DataWranglerOutputName',
                'Value': dw_output_name
            },
            {
                'Key': 'S3DataPrefix',
                'Value': s3_input_data_prefix
            },
            {
                'Key': 'FeatureGroupName',
                'Value': feature_group_name
            },
            {
                'Key': 'PipelineNamePrefix',
                'Value': pipeline_name_prefix
            },
            
        ]

Finally, create a SageMaker project from the service catalog product template:

In [None]:
# create SageMaker project
r = sm.create_project(
    ProjectName=project_name,
    ProjectDescription="Feature Store ingestion from S3",
    ServiceCatalogProvisioningDetails={
        'ProductId': product_id,
        'ProvisioningArtifactId': provisioning_artifact_ids,
        'ProvisioningParameters': project_parameters
    },
)

print(r)
project_id = r["ProjectId"]

## Working with data ingestion project

### Project resources
The project template creates all necessary resources for an automated data transformation and ingestion:
- S3 rule for launching an AWS Lambda function whenever any new data is uploaded to the specified S3 prefix
- AWS Lambda function which launches the SageMaker pipeline
- SageMaker pipeline which runs a processing job with using a DataWrangler processor
- DataWrangler processor which uses a stored `.flow` file with data transformation workflow

### CodeCommit repository with seed code
All source code for pipeline creation and pipeline parameter configuration is delivered as a CodeCommit repository. The code is fully functional and works out-of-the-box. You own this code and can change any configuration or parameters of the pipeline according to your requirements.

To start working with the code you must clone the repository into Studio user's home directory:

<img src="../img/studo-project-clone-repo.png" alt="studo-project-clone-repo" width="800"/>

You can make your changes to the source code and push it to the CodeCommit repository. The project also delivers an [AWS CodePipeline](https://aws.amazon.com/codepipeline/) CI/CD pipeline which launches an [AWS CodeBuild](https://aws.amazon.com/codebuild/) stage whenever there is a new commit in the repository. The build pulls the code from the repository and calls `create_pipeline` function (file `build.py`). You can change the existing or provide your own code in the `pipeline.create_pipeline` in the file `pipeline.py`. The default code configures a SageMaker pipeline with Data Wrangler processor and upserts the pipeline.

### SageMaker pipeline
The project delivers a SageMaker pipeline consisting of one processing step with Data Wrangler processor. The pipeline performs the transformation contained in a specified Data Wrangler `.flow` file and stores the transformed features in a specified feature group in the Feature Store.
This pipeline is launched by a Lambda function whenever there is a new file uploaded to the specified S3 location. The pipeline is linked to the project and available in the **Pipeline** tab of the project details page:

<img src="../img/studio-project-details-pipelines.png" alt="studio-project-details-pipelines" width="800"/>

From there you can see the pipeline details, parameters, and the execution history:

<img src="../img/studio-pipeline-execution-history.png" alt="studio-pipeline-execution-history" width="800"/>

You can also start a new execution manually from Studio:

![](img/studio-pipeline-start-execution.png)

and provide pipeline parameters:

<img src="../img/studio-pipeline-parameter-input.png" alt="studio-pipeline-parameter-input" width="500"/>


## Test the automation pipeline

To test the deployed data transformation and feature store ingestion pipeline, perform the following steps:
1. Load features from a feature group - via Athena SQL query
1. Optionally change the data in the loaded DataFrame
1. Export data as `.csv` and save to the monitored S3 location - this will launch the data transformation and ingestion via our pipeline
1. Monitor the pipeline execution
1. Check the loaded data in the feature group

### Import packages

In [None]:
sm_client = boto3.client("sagemaker")

### Upload new data to S3

Create a feature group object:

In [None]:
feature_store_session = Session(
    default_bucket=data_bucket
)

feature_group = FeatureGroup(
    name=feature_group_name, 
    sagemaker_session=feature_store_session
)

In [None]:
# Build SQL query to features group
fs_query = feature_group.athena_query()
fs_query_output_prefix = "feature-store-ingestion-pipeline/fs_query_results/"

query_string = f'SELECT * FROM "{fs_query.table_name}"'
print(f'Prepared query {query_string}')
print(fs_query)

In [None]:
# Run Athena query. The output is loaded to a Pandas dataframe.
fs_query.run(
    query_string=query_string, 
    output_location=f"s3://{data_bucket}/{fs_query_output_prefix}"
)

fs_query.wait()
data_df = fs_query.as_dataframe()

In [None]:
# Do some data manipulation - update/insert 
fs_id = "39580"
data_df

In [None]:
# Save the data as .csv
file_name = f"new-data-{strftime('%d-%H-%M-%S', gmtime())}.csv"
data_df.to_csv(file_name, index=False)

Upload the file to S3 prefix. This will launch the Lambda function which will start a new pipeline execution:

In [None]:
# Upload data to S3 location. This will launch a new pipeline execution
print(f"Uploading the file {file_name} to s3://{s3_input_data_prefix}")

boto3.Session().resource('s3').Bucket(data_bucket).Object(os.path.join(('/').join(s3_input_data_prefix.split('/')[1:]), file_name)).upload_file(file_name)

### Monitor pipeline execution

In [None]:
try:
    project_id
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] Set project_id to the id of the created project ")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

In [None]:
# set the pipeline name
s3_to_fs_pipeline_name = f"{pipeline_name_prefix}-{project_id}"

%store s3_to_fs_pipeline_name

In [None]:
# check pipeline execution 
summaries = sm_client.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')
summaries

In [None]:
latest_execution = sm_client.list_pipeline_executions(PipelineName=s3_to_fs_pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')
print (latest_execution)

In [None]:
# Wait for pipeline execution to complete 'Executing' status
while sm_client.describe_pipeline_execution(PipelineExecutionArn=latest_execution)["PipelineExecutionStatus"] == "Executing":
    print('Pipeline is in Executing status...')
    time.sleep(60)
    
print('Pipeline is done Executing')
print(sm_client.describe_pipeline_execution(PipelineExecutionArn=latest_execution))

Alternatively, you can monitor the pipeline execution inside the Pipeline widget of Studio:

![](img/studio-pipeline-executing.png)

### Check the loaded data
Once the execution completes, we can check that the data is loaded into the feature group.

In [None]:
# Lookup valid Record ID for get_record call
fs_id = '39580'

query_string = query_string = f'SELECT * FROM "{fs_query.table_name}" WHERE fs_id={fs_id}'
print(query_string)

In [None]:
fs_query.run(
    query_string=query_string, 
    output_location='s3://'+data_bucket+'/'+fs_query_output_prefix+'/fs_query_results/'
)

fs_query.wait()
data_df = fs_query.as_dataframe()

In [None]:
data_df

### Start pipeline run via SDK
You can start the data transformation and ingestion pipeline on demand using [SageMaker SDK](https://sagemaker.readthedocs.io/en/v2.57.0/workflows/pipelines/index.html). `pipeline.start` function allows you to provide parameter values to override the default value for the pipeline execution. 

In [None]:
# get Pipeline object
pipeline = Pipeline(name=s3_to_fs_pipeline_name)

In [None]:
# start execution with the specified parameters
execution = pipeline.start(
    parameters=dict(
        InputDataUrl=f"s3://{s3_input_data_prefix}{file_name}",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()

### Change the default values for pipeline parameters
To change the default values for the parameters, you can edit `pipeline.py` file with pipeline and parameter definition code:
```python
    # setup pipeline parameters
    p_processing_instance_count = ParameterInteger(
        name="ProcessingInstanceCount",
        default_value=1
    )
    p_processing_instance_type = ParameterString(
        name="ProcessingInstanceType",
        default_value="ml.m5.4xlarge"
    )
    p_processing_volume_size = ParameterInteger(
        name="ProcessingVolumeSize",
        default_value=50
    )
    p_flow_output_name = ParameterString(
        name='FlowOutputName',
        default_value=flow_output_name
    )
    p_input_flow = ParameterString(
        name='InputFlowUrl',
        default_value=data_wrangler_flow_s3_url
    )
    p_input_data = ParameterString(
        name="InputDataUrl",
        default_value=input_data_s3_url
    )
```

The pipeline will be automatically started after you commit and push the changes into the project's source code repository.

# Clean up
- delete project
- delete s3 buckets
- delete CloudFormation stacks

# Release resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>