<a id='06-nb'></a>

# Music Recommender
## Part 6: SageMaker Pipelines
----
In this final notebook, we'll combine all the steps we've gone over in each individual notebook, and condense them down into a [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html) object which will automate  the entire modeling process from the beginning of data ingestion to monitoring the model. SageMaker Pipelines is a tool for building machine learning pipelines that take advantage of direct SageMaker integration. Because of this integration, you can create a pipeline and set up SageMaker Projects for orchestration using a tool that handles much of the step creation and management for you.

----
### Contents
- [Overview](00_overview_arch_data.ipynb)
- [Part 1: Data Prep using Data Wrangler](01_music_dataprep.flow)
- [Part 2a: Feature Store Creation - Tracks](02a_export_fg_tracks.ipynb)
- [Part 2b: Feature Store Creation - User Preferences](02b_export_fg_5star_features.ipynb)
- [Part 2c: Feature Store Creation - Ratings](02c_fg_create_ratings.ipynb)
- [Part 3: Train Model with Debugger Hooks. Set Artifacts and Register Model.](03_train_model_lineage_registry_debugger.ipynb)
- [Part 4: Deploy Model & Inference using Online Feature Store](04_deploy_inference_explainability.ipynb)
- [Part 5: Model Monitor](05_model_monitor.ipynb)
- [Part 6: SageMaker Pipelines](06_pipeline.ipynb)
    - [Architecture](#06-arch)
    - [Pipelines Overview](#pipelines)
    - [Cleanup](#06-cleanup)

### Load stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything you may need to create them again or it may be your first time running this notebook.

In [1]:
%store -r
%store

Stored variables and their in-db values:
dw_ecrlist                           -> {'region': {'us-west-2': '174368400705', 'us-east-
fg_name_ratings                      -> 'ratings-feature-group-13-03-22-53-fcdc820e'
fg_name_tracks                       -> 'track-features-13-03-22-53-fcdc820e'
fg_name_user_preferences             -> 'user-5star-track-features-13-03-22-53-fcdc820e'
flow_export_id                       -> '13-03-22-53-fcdc820e'
flow_s3_uri                          -> 's3://sagemaker-us-west-1-738335684114/music-recom
model_path                           -> 's://sagemaker-us-west-1-738335684114/music-recomm
prefix                               -> 'music-recommendation'
ratings_data_source                  -> 's3://sagemaker-us-west-1-738335684114/music-recom
s3_output_path                       -> 's3://sagemaker-us-west-1-738335684114'
tracks_data_source                   -> 's3://sagemaker-us-west-1-738335684114/music-recom
train_data_uri                       -> 's3://

### Install required and/or update third-party libraries

Stored variables and their in-db values:
dw_ecrlist                           -> {'region': {'us-west-2': '174368400705', 'us-east-
fg_name_ratings                      -> 'ratings-feature-group-13-03-22-53-fcdc820e'
fg_name_tracks                       -> 'track-features-13-03-22-53-fcdc820e'
fg_name_user_preferences             -> 'user-5star-track-features-13-03-22-53-fcdc820e'
flow_export_id                       -> '13-03-22-53-fcdc820e'
flow_s3_uri                          -> 's3://sagemaker-us-west-1-738335684114/music-recom
model_path                           -> 's://sagemaker-us-west-1-738335684114/music-recomm
prefix                               -> 'music-recommendation'
ratings_data_source                  -> 's3://sagemaker-us-west-1-738335684114/music-recom
s3_output_path                       -> 's3://sagemaker-us-west-1-738335684114'
tracks_data_source                   -> 's3://sagemaker-us-west-1-738335684114/music-recom
train_data_uri                       -> 's3://sagemaker-us-west-1-738335684114/music-recom
training_job_name                    -> 'sagemaker-xgboost-2021-06-13-23-56-09-766'
val_data_uri                         -> 's3://sagemaker-us-west-1-738335684114/music-recom

In [2]:
!python -m pip install -Uq pip
# !python -m pip install -q awswrangler==2.2.0 imbalanced-learn==0.7.0 sagemaker==2.23.1 boto3==1.16.48
!python -m pip install -q sagemaker==2.45.0 imbalanced-learn awswrangler

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


### Import libraries

In [3]:
import json
import boto3
import pathlib
import sagemaker
import numpy as np
import pandas as pd
import awswrangler as wr

from sagemaker.estimator import Estimator
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.feature_store.feature_group import FeatureGroup

import demo_helpers  # our custom set of functions

In [4]:
sagemaker.__version__

'2.45.0'

### Set region and boto3 config

In [None]:
import sys
import pprint
sys.path.insert(1, './code')
from parameter_store import ParameterStore
ps = ParameterStore()

parameter = ps.read('music-rec')
pprint.pprint(parameter)

dw_ecrlist = parameter['dw_ecrlist']
fg_name_ratings = parameter['fg_name_ratings']
fg_name_tracks = parameter['fg_name_tracks']
fg_name_user_preferences = parameter['fg_name_user_preferences']

flow_export_id = parameter['flow_export_id']
flow_s3_uri = parameter['flow_s3_uri']
model_path = parameter['model_path']
prefix = parameter['prefix']
ratings_data_source = parameter['ratings_data_source']
tracks_data_source = parameter['tracks_data_source']

"""
endpoint_name = parameter['endpoint_name']
feature_names = parameter['feature_names']
fs_name_ratings = parameter['fs_name_ratings']
fs_name_tracks = parameter['fs_name_tracks']
fs_name_user_preferences = parameter['fs_name_user_preferences']
model_name = parameter['model_name']
model_packages = parameter['model_packages']

mpg_name = parameter['mpg_name']
num_training_samples = parameter['num_training_samples']
pipeline_name = parameter['pipeline_name']


s3_output_path = parameter['s3_output_path']

train_data_uri = parameter['train_data_uri']
training_job_name = parameter['training_job_name']
tuning_job_name = parameter['tuning_job_name']
val_data_uri = parameter['val_data_uri']
best_training_job_name = parameter['best_training_job_name']
deploy_instance_type = parameter['deploy_instance_type']
"""

In [5]:
region = boto3.Session().region_name
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client('s3', region_name=region)

sagemaker_boto_client = boto_session.client('sagemaker')
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client)
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker_session.default_bucket()
account_id = boto3.client('sts').get_caller_identity()["Account"]

In [6]:
#======> Output_paths
explainability_output_path = f's3://{bucket}/{prefix}/clarify-explainability'

processing_dir = "/opt/ml/processing"
create_dataset_script_uri = f's3://{bucket}/{prefix}/code/create_datasets.py'
create_feature_store_script_uri = f's3://{bucket}/{prefix}/code/feature_store_ingest.py'
deploy_model_script_uri = f's3://{bucket}/{prefix}/code/deploy_model.py'
model_monitor_script_uri = f's3://{bucket}/{prefix}/code/model_monitor.py'


# Output name is auto-generated from the select node's ID + output name from the flow file. 
# You can change to a different node ID to export a different step in the flow file
output_name_tracks = "d0d4f05a-3031-4438-867b-c5fd033d6c15.default"
output_name_user_preferences = "3098f603-08d2-4319-83f5-55f509eeab60.default"
output_name_ratings = "8b7e0be2-7b25-4c06-82eb-c0547efebf67.default"

#======> variables used for parameterizing the notebook run
flow_instance_count = 2
flow_instance_type = "ml.m5.4xlarge"

deploy_model_instance_type = "ml.m4.xlarge"

<a id ='06-arch'> </a>

## Architecture: Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment
##### [back to top](#06-nb)
----

![arch diagram](./images/music-rec.png)

<a id='pipelines'></a>

## SageMaker Pipeline Overview
#### [back to top](#06-nb)
---- 

#### [Step 1: Data Wrangler Preprocessing Step](#data-wrangler)
#### [Step 2: Dataset and train test split](#dataset-train-test)
#### [Step 3: Train XGboost Model](#pipe-train-xgb)
#### [Step 4: Model Pre-deployment](#pipe-pre-deploy)
#### [Step 5: Register Model](#pipe-Register-Model)
#### [Step 6: Deploy Model](#deploy)
#### [Combine Steps and Run Pipeline](#combine)


Now that you've manually done each step in our machine learning workflow, you can certain steps to allow for faster model experimentation without sacrificing transparncy and model tracking. In this section you will create a pipeline which trains a new model, persists the model in SageMaker and then adds the model to the registry.

### Pipeline parameters
An important feature of SageMaker Pipelines is the ability to define the steps ahead of time, but be able to change the parameters to those steps at execution without having to re-define the pipeline. This can be achieved by using ParameterInteger, ParameterFloat or ParameterString to define a value upfront which can be modified when you call `pipeline.start(parameters=parameters)` later. Only certain parameters can be defined this way.

In [7]:
train_instance_param = ParameterString(
    name="TrainingInstance",
    default_value="ml.m4.xlarge",
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

<a id='data-wrangler'></a>
### Step 1: Data Wranger Preprocessing Step
[Pipeline Overview](#pipelines)

#### Upload flow to S3
This will become an input to the first step and, as such, needs to be in S3.

In [8]:
# name of the flow file which should exist in the current notebook working directory
flow_file_name = "01_music_dataprep.flow"

s3_client.upload_file(Filename=flow_file_name, Bucket=bucket, Key=f'{prefix}/dataprep-notebooks/music_dataprep.flow')
flow_s3_uri = f's3://{bucket}/{prefix}/dataprep-notebooks/music_dataprep.flow'

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

Data Wrangler flow 01_music_dataprep.flow uploaded to s3://sagemaker-us-west-1-738335684114/music-recommendation/dataprep-notebooks/music_dataprep.flow


#### Define the first Data Wrangler step's inputs

In [9]:
data_sources = []

## Input - S3 Source: tracks.csv
data_sources.append(ProcessingInput(
    source=f"s3://{bucket}/{prefix}/tracks.csv", # You can override this to point to other dataset on S3
    destination=f"{processing_dir}/tracks.csv",
    input_name="tracks.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Input - S3 Source: ratings.csv
data_sources.append(ProcessingInput(
    source=f"s3://{bucket}/{prefix}/ratings.csv", # You can override this to point to other dataset on S3
    destination=f"{processing_dir}/ratings.csv",
    input_name="ratings.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Input - Flow: 01_music_dataprep.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination=f"{processing_dir}/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

#### Define outputs for the Data Wranger step

In [10]:
flow_output_tracks = sagemaker.processing.ProcessingOutput(
    output_name=output_name_tracks,
    app_managed=True,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(
        feature_group_name=fg_name_tracks)
    )

flow_output_user_preferences = sagemaker.processing.ProcessingOutput(
    output_name=output_name_user_preferences,
    app_managed=True,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(
        feature_group_name=fg_name_user_preferences)
    )

flow_output_ratings = sagemaker.processing.ProcessingOutput(
    output_name=output_name_ratings,
    app_managed=True,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(
        feature_group_name=fg_name_ratings)
    )

In [11]:
# Output configuration used as processing job container arguments 
output_config_tracks = {
    output_name_tracks: {
        "content_type": "CSV"
    }
}

output_config_user_preferences = {
    output_name_user_preferences: {
        "content_type": "CSV"
    }
}

output_config_ratings = {
    output_name_ratings: {
        "content_type": "CSV"
    }
}

In [12]:
dw_ecrlist = {
    'region':{'us-west-2':'174368400705',
              'us-east-2':'415577184552',
              'us-west-1': '926135532090'
             }
}

ps.store({'dw_ecrlist':dw_ecrlist},namespace='music-rec')
ps.store()


# Data Wrangler Container URL.
container_uri = f"{dw_ecrlist['region'][region]}.dkr.ecr.{region}.amazonaws.com/sagemaker-data-wrangler-container:1.x"

Stored 'dw_ecrlist' (dict)


#### Define processor and processing step

In [13]:
from sagemaker.network import NetworkConfig

In [14]:
# Data Wrangler Container URL
# You can find the proper container uri by exporting your Data Wrangler flow to a pipeline notebook
# container_uri = "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.x"


flow_processor = sagemaker.processing.Processor(
    role=sagemaker_role, 
    image_uri=container_uri, 
    instance_count=flow_instance_count, 
    instance_type=flow_instance_type, 
    volume_size_in_gb=30,
    network_config=NetworkConfig(enable_network_isolation=False),
    sagemaker_session=sagemaker_session
)

flow_step_tracks = ProcessingStep(
    name='DataWranglerStepTracks', 
    processor=flow_processor, 
    inputs=[flow_input] + data_sources, 
    outputs=[flow_output_tracks],
    job_arguments=[f"--output-config '{json.dumps(output_config_tracks)}'"],
)

flow_step_user_preferences = ProcessingStep(
    name='DataWranglerStepUserPref', 
    processor=flow_processor, 
    inputs=[flow_input] + data_sources, 
    outputs=[flow_output_user_preferences],
    job_arguments=[f"--output-config '{json.dumps(output_config_user_preferences)}'"]
)

flow_step_ratings = ProcessingStep(
    name='DataWranglerStepRatings', 
    processor=flow_processor, 
    inputs=[flow_input] + data_sources, 
    outputs=[flow_output_ratings],
    job_arguments=[f"--output-config '{json.dumps(output_config_ratings)}'"]
)

<a id='dataset-train-test'></a>
### Step 2: Create Dataset and Train/Test Split

[Pipeline Overview](#pipelines)

In [15]:
s3_client.upload_file(Filename='create_datasets.py', Bucket=bucket, Key=f'{prefix}/code/create_datasets.py')

create_dataset_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=sagemaker_role,
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name='music-recommendation-split-data',
    sagemaker_session=sagemaker_session)

create_dataset_step = ProcessingStep(
    name='SplitData',
    processor=create_dataset_processor,
    outputs = [
        sagemaker.processing.ProcessingOutput(output_name='train_data', source=f'{processing_dir}/output/train'),
        sagemaker.processing.ProcessingOutput(output_name='test_data',  source=f'{processing_dir}/output/test')
    ],
    job_arguments=["--feature-group-name-tracks", fg_name_tracks,
                   "--feature-group-name-ratings", fg_name_ratings,
                   "--feature-group-name-user-preferences", fg_name_user_preferences,
                   "--bucket-name", bucket,
                   "--bucket-prefix", prefix,
                   "--region", region
                  ],
    code=create_dataset_script_uri
)

create_dataset_step.add_depends_on([flow_step_tracks.name, flow_step_user_preferences.name,flow_step_ratings.name ])

<a id='pipe-train-xgb'></a>
### Step 3: Train XGBoost Model
In this step we use the ParameterString `train_instance_param` defined at the beginning of the pipeline.

[Pipeline Overview](#pipelines)

In [16]:
from sagemaker.debugger import DebuggerHookConfig, CollectionConfig
from sagemaker.debugger import Rule, rule_configs

In [17]:
hyperparameters = {
    "max_depth": "4",
    "eta": "0.2",
    "objective": "reg:squarederror",
    "num_round": "100"
}

save_interval = 5

In [18]:
xgb_estimator = Estimator(
    role=sagemaker_role,
    instance_count=2,
    instance_type='ml.m5.4xlarge',
    image_uri=sagemaker.image_uris.retrieve("xgboost", region, "0.90-2"),
    hyperparameters=hyperparameters,
    max_run=1800,

    debugger_hook_config=DebuggerHookConfig(
        s3_output_path=s3_output_path,  
        collection_configs=[
            CollectionConfig(
                name="metrics",
                parameters={
                    "save_interval": str(save_interval)
                }
            ),
            CollectionConfig(
                name="feature_importance",
                parameters={
                    "save_interval": str(save_interval)
                }
            ),
            CollectionConfig(
                name="full_shap",
                parameters={
                    "save_interval": str(save_interval)
                }
            ),
            CollectionConfig(
                name="average_shap",
                parameters={
                    "save_interval": str(save_interval)
                }
            ),
        ],
    ),

    rules=[
        Rule.sagemaker(
            rule_configs.loss_not_decreasing(),
            rule_parameters={
                "collection_names": "metrics",
                "num_steps": str(save_interval * 2),
            },
        )
    ],
)

In [19]:
train_step = TrainingStep(
    name='TrainStep',
    estimator=xgb_estimator,
    inputs={
        'train': sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri,
            content_type="text/csv"
        ),
        'validation': sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs['test_data'].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

<a id='pipe-pre-deploy'></a>
### Step 4: Model Pre-Deployment Step

[Pipeline Overview](#pipelines)

In [20]:
model = sagemaker.model.Model(
    name='music-recommender-xgboost-model',
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role
)

inputs = sagemaker.inputs.CreateModelInput(
    instance_type="ml.m4.xlarge"
)

create_model_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=inputs
)

<a id='pipe-Register-Model'></a>
### Step 5: Register Model
In this step you will use the ParameterString `model_approval_status` defined at the outset of the pipeline code.


[Pipeline Overview](#pipelines)

In [21]:
register_step = RegisterModel(
    name="XgboostRegisterModel",
    estimator=xgb_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=mpg_name,
    approval_status=model_approval_status,
)

NameError: name 'mpg_name' is not defined

<a id='deploy'></a>
### Step 6: Deploy Model

[Pipeline Overview](#pipelines)

In [None]:
s3_client.upload_file(Filename='deploy_model.py', Bucket=bucket, Key=f'{prefix}/code/deploy_model.py')

deploy_model_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=sagemaker_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name='music-recommender-deploy-model',
    sagemaker_session=sagemaker_session)

deploy_step = ProcessingStep(
    name='DeployModel',
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name", create_model_step.properties.ModelName, 
        "--region", region,
        "--endpoint-instance-type", deploy_model_instance_type,
        "--endpoint-name", "music-recommender-model-pipeline"],
    code=deploy_model_script_uri)

<a id='monitor'></a>
### Step 7: Model Monitor

[Pipeline Overview](#pipelines)

In [None]:
s3_client.upload_file(Filename='model_monitor.py', Bucket=bucket, Key=f'{prefix}/code/model_monitor.py')


model_monitor_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=sagemaker_role,
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name='music-recommendation-model-monitor',
    sagemaker_session=sagemaker_session)

model_monitor_step = ProcessingStep(
    name='ModelMonitor',
    processor=model_monitor_processor,
    outputs = [
        sagemaker.processing.ProcessingOutput(output_name='model_baseline', source=f'{processing_dir}/output/baselineresults')
    ],
    job_arguments=["--baseline-data-uri", val_data_uri,
                   "--bucket-name", bucket,
                   "--bucket-prefix", prefix,
                   "--region", region,
                   "--endpoint", "music-recommender-model-pipeline"
                  ],
    code=model_monitor_script_uri
)

<a id='combine'></a>

### Combine the Pipeline Steps and Run
[Pipeline Overview](#pipelines)

Once all of our steps are defined, we can put them together using the SageMaker `Pipeline` object. While we pass the steps in order so that it is easier to read, technically the order that we pass them does not matter since the pipeline DAG will parse it out properly based on any dependencies between steps. If the input of one step is the output of another step, the Pipelines understands which must come first.

In [None]:
pipeline_name = f'MusicRecommendationPipeline'

ps.store({'pipeline_name':pipeline_name},namespace='music-rec')
ps.store()


pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        train_instance_param, 
        model_approval_status],
    steps=[
        flow_step_tracks,
        flow_step_user_preferences,
        flow_step_ratings,
        create_dataset_step,
        train_step, 
        create_model_step, 
        register_step,
        deploy_step,
        model_monitor_step  # turning off this step until we can fix depends_on otherwise it fails 
    ])

"""
training_step.add_depends_on([processing_step_2])

"""

### Submit the pipeline definition to the SageMaker Pipeline service
Note: If an existing pipeline has the same name it will be overwritten.

In [None]:
pipeline.upsert(role_arn=sagemaker_role)

In [None]:
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

### View the entire pipeline definition
Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.

In [None]:
#json.loads(pipeline.describe()['PipelineDefinition'])

### Run the pipeline
Note this will take about 70-80 minutes to complete. You can watch the progress of the Pipeline Job on your SageMaker Studio Components panel

<!-- ![image.png](attachment:image.png) -->

In [None]:
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)

featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

We're using the same feature group names --- fg_name_ratings, fg_name_tracks, fg_name_user_preferences --- we created in the `02` notebooks and using the data in our S3 bucket to create an [Offline Feature Store](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html). In offline mode, large streams of data are fed to an offline store, which can be used for training and batch inference. This mode requires a feature group to be stored in an offline store. The offline store uses your S3 bucket for storage and can also fetch data using Athena queries.  

In [None]:
feature_group_names = [fg_name_ratings, fg_name_tracks, fg_name_user_preferences]
feature_groups = []
for name in feature_group_names:
    feature_group = FeatureGroup(name=name, sagemaker_session=feature_store_session)
    feature_groups.append(feature_group)

In [None]:
feature_group_s3_prefixes = []
for feature_group in feature_groups:
    feature_group_table_name = feature_group.describe().get("OfflineStoreConfig").get("DataCatalogConfig").get("TableName")
    feature_group_s3_prefix = f'{account_id}/sagemaker/{region}/offline-store/{feature_group_table_name}'
    feature_group_s3_prefixes.append(feature_group_s3_prefix)

# wait for data to be added to offline feature store
def wait_for_offline_store(feature_group_s3_prefix):
    print(feature_group_s3_prefix)
    offline_store_contents = None
    while (offline_store_contents is None):
        objects_in_bucket = s3_client.list_objects(Bucket=bucket, Prefix=feature_group_s3_prefix)
        if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) > 1):
            offline_store_contents = objects_in_bucket['Contents']
        else:
            print('Waiting for data in offline store...')
            time.sleep(60)
    print('Data available:', feature_group_s3_prefix)
    
for s3_prefix in feature_group_s3_prefixes:
    wait_for_offline_store(s3_prefix)

In [None]:
# Special pipeline parameters can be defined or changed here
parameters = {'TrainingInstance': 'ml.m5.xlarge'}

Earlier in the notebook, we defines several `ProcessingStep()`s and a `TrainingStep()` which our `Pipeline()` instance here will reference and kick off.

In [None]:
start_response = pipeline.start(parameters=parameters)
start_response.wait(delay=60, max_attempts=200)
start_response.describe()

<pre>
</pre>

After completion we can use Sagemaker Studio's **Components and Registries** tab to see our Pipeline graph and any further error or log messages.

<a id='06-cleanup'></a>

## Cleanup
#### [back to top](#06-nb)
---- 

After running the demo, you should remove the resources which were created. You can also delete all the objects in the project's S3 directory by passing the keyword argument `delete_s3_objects=True`.

In [None]:
# when ran, this cell will delete all the resources created by this demo

'''
demo_helpers.delete_project_resources(
    sagemaker_boto_client=sagemaker_boto_client,
    endpoint_name=endpoint_name, 
    pipeline_name=pipeline_name, 
    mpg_name=mpg_name, 
    prefix=prefix,
    delete_s3_objects=True,
    bucket_name=bucket
)

# delete feature stores within SageMaker Studio
for fg_name in [fg_name_ratings,fg_name_tracks,fg_name_user_preferences]:
    feature_group = FeatureGroup(name=fg_name, sagemaker_session=feature_store_session)
    feature_group.delete()
    print("Deleted feature group: {}".format(fg_name))
'''
