# Import Libraries

In [8]:
import os
import shutil
import time
import json
import requests
import tempfile
import numpy as np
import pandas as pd

import boto3
import sagemaker
from sagemaker import get_execution_role

from datetime import datetime

# Setup Region, session, and role

In [10]:
region = os.environ["AWS_REGION"]
boto_session = boto3.Session(region_name=region)
# sagemaker_boto_client = boto_session.client("sagemaker")


account_id = boto_session.client("sts").get_caller_identity()["Account"]
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)

# Get the AWS EventBridge client
event_bridge_client = boto3.client('events')
lambda_client = boto3.client('lambda')
event_bridge_scheduler = boto3.client("scheduler")

sns_client = boto3.client('sns')
cloudwatch = boto3.client('cloudwatch')

sagemaker_session = sagemaker.Session(
    boto_session=boto_session, sagemaker_client=sm_client
)

sm_role = get_execution_role(sagemaker_session=sagemaker_session)

# Initialize variables

In [11]:
# S3 prefix
bucket = "artwork-content-trial-bucket"
prefix="mch-artwork-content"
train_data_dir_prefix="data"
pipeline_dir_prefix="pipeline-data"
# training_input_prefix = "content_ovr_anonymised.csv"
training_input_prefix = "ovr_content_data"

pipeline_name = "artwork-content-pipeline-demo"
lambda_fcn_name="content-sm-lambda-evt-trigger-fcn"
lambda_arn=f"arn:aws:lambda:us-east-1:791574662255:function:{lambda_fcn_name}"
role_name = "sm-lambda-sns-evt-trigger-role"

# =========================================

# Scheduling Pipeline using EventBridge

In [14]:
def add_lambda_permissions(lambda_client, lambda_function_name, src_arn, account_id, prefix):
    try :
        response = lambda_client.add_permission(
            FunctionName=lambda_function_name,
            StatementId=f"{prefix}-Trigger-Lambda-{int(time.time())}",
            Action='lambda:InvokeFunction',
            Principal= f"{prefix}.amazonaws.com",
            SourceArn=src_arn,
            SourceAccount=account_id
        )
        
    except Exception as e:
        print(f"Error occurred while adding lambda permissions: {e}")
        
    return response

def update_lambda_config(lambda_client, lambda_function_name, pipeline_name):
    try :
        response = lambda_client.update_function_configuration(
                        FunctionName=lambda_function_name,
                        Environment={
                            'Variables': {
                                'PIPELINE_NAME': pipeline_name
                            }
                        }
                    )
        
    except Exception as e:
        print(f"Error occurred while updating lambda config: {e}")
        
    return response

In [15]:
def s3_bucket_notification(s3_client, bucket, lambda_arn, prefix):
    try:
        # Create the S3 bucket event notification configuration
        response = s3_client.put_bucket_notification_configuration(
            Bucket=bucket,
            NotificationConfiguration={
                "LambdaFunctionConfigurations": [
                    {
                        "LambdaFunctionArn": lambda_arn,
                        "Events": [
                            "s3:ObjectCreated:Put"
#                             's3:ObjectCreated:CompleteMultipartUpload'
                        ],
                        "Filter": {
                            "Key": {
                                "FilterRules": [
                                    {
                                            'Name': 'suffix',
                                            'Value': '.csv'
                                    },
                                    {
                                        "Name": "prefix",
                                        "Value": prefix
                                    }
                                ]
                            }
                        }
                    }
                ]
            }
        )
    
    except Exception as e:
        print(f"Error occurred while adding permissions: {e}")
        
    return response        
        

In [16]:
def schedule_pipeline(region, account_id, bucket, prefix, s3_client, event_bridge_client, lambda_client, lambda_function_name, lambda_arn, pipeline_name):
    
    bucket_arn = f"arn:aws:s3:::{bucket}"

    # Define the EventBridge rule name and description
    rule_name = 'ContentSMLambdaEventRuleTrigger'
    rule_description = 'Event to trigger SageMaker Content pipeline via Lambda function when new data is added to S3.'
    
    # Define the Event Pattern
    event_pattern = {
            "source": [
                "aws.s3"
            ],
            "detail-type": [
                "AWS API Call via CloudTrail"
            ],
            "detail": {
                "eventSource": [
                    "s3.amazonaws.com"
                ],
                "eventName": [
                    "PutObject"
                ],
                "requestParameters": {
                    "bucketName": [
                        bucket
                    ],
                    "key": [
                        prefix + "/" + "*"
                    ]
                }
            }
        }


    # Put the EventBridge rule
    rule_response = event_bridge_client.put_rule(
        Name=rule_name,
        # ScheduleExpression=schedule_expression,
        Description=rule_description,
        EventPattern=json.dumps(event_pattern),
        State='ENABLED'
    )
    
    event_rule_arn = rule_response['RuleArn']
    
    # add lambda permissions to S3, Events
    add_lambda_permissions(lambda_client, lambda_function_name, event_rule_arn, account_id, prefix='events')
    add_lambda_permissions(lambda_client, lambda_function_name, bucket_arn, account_id, prefix='s3')
    update_lambda_config(lambda_client, lambda_function_name, pipeline_name)
    
    
    
    # set up s3 bucket notifcations
    s3_bucket_notification(s3_client, bucket, lambda_arn, prefix)  
   
    
    # Define the EventBridge target
    target = {
                'Arn': lambda_arn, # Replace with the actual lambda ARN
                'Id': 'ContentSMLambdaFunctionTarget',
                # 'RoleArn': role['arn'], # used in case of pipeline target
            }

    # Add the target to the EventBridge rule
    event_bridge_client.put_targets(
        Rule=rule_name,
        Targets=[target]
    )

    # Wait for the target to become active
    while True:
        response_rule = event_bridge_client.describe_rule(Name=rule_name)
        # response_rule = event_bridge_scheduler.get_schedule(Name=schedule_name)

        if response_rule['State'] == 'ENABLED':
            print('\n === Event Scheduled Successfully ===== \n')
            break
        else:
            print(f'response rule: {response_rule}')
        time.sleep(5)

    # Check the target status
    response = event_bridge_client.list_targets_by_rule(Rule=rule_name)
    
    return response


In [21]:

# response = schedule_pipeline(region, account_id, bucket, train_data_dir_prefix, s3_client, event_bridge_client, lambda_client, lambda_fcn_name, lambda_arn, pipeline_name)
print(response)


 === Event Scheduled Successfully ===== 

{'Targets': [{'Id': 'ContentSMLambdaFunctionTarget', 'Arn': 'arn:aws:lambda:us-east-1:791574662255:function:content-sm-lambda-evt-trigger-fcn'}], 'ResponseMetadata': {'RequestId': '885aa534-4f4a-40b9-99db-de35641c70e4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '885aa534-4f4a-40b9-99db-de35641c70e4', 'content-type': 'application/x-amz-json-1.1', 'content-length': '141', 'date': 'Thu, 09 Mar 2023 07:36:02 GMT'}, 'RetryAttempts': 0}}


In [None]:
rule_name = 'ContentSMLambdaEventRuleTrigger'
event_bridge_client.describe_rule(Name=rule_name)
event_bridge_client.list_targets_by_rule(Rule=rule_name)

In [None]:
rule_name = 'ContentSMLambdaEventRuleTrigger'
target_id = 'ContentSMLambdaFunctionTarget'

def stop_scheduled_event(event_bridge_client, rule_name, target_id):

    # Disable the EventBridge rule
    event_bridge_client.disable_rule(Name=rule_name)

    # Remove the target for the EventBridge rule
    event_bridge_client.remove_targets(Rule=rule_name, Ids=[target_id])

# =============================================================================
# stop_scheduled_event(event_bridge_client, rule_name, target_id)

# ================================================

# Steps related to lambda function and role creation

In [17]:
from lambda_utils import *
from zipfile import ZipFile

In [5]:
#Create IAM role for the Lambda function
# lambda_role = create_role(role_name)

Creating an IAM role for AWS Lambda function ...
SUCCESS: Successfully created IAM role for AWS Lambda function!
Adding permissions to AWS Lambda function's IAM role ...
SUCCESS: Successfully added permissions AWS Lambda function's IAM role!


In [18]:
lambda_role = {'arn': f"arn:aws:iam::791574662255:role/{role_name}"}
print(lambda_role)

{'arn': 'arn:aws:iam::791574662255:role/sm-lambda-sns-evt-trigger-role'}


In [19]:
lambda_output_path = 'lambda_output'
module_name='pipeline_trigger_lambda_function'
fcn_desc = 'AWS Lambda function for automatically triggering AWS SageMaker Pipeline.'

os.makedirs(name=lambda_output_path, exist_ok=True)

zip_path = os.path.join(lambda_output_path, 'function.zip')

#Compress file into a zip
with ZipFile(zip_path,'w') as z:
    z.write(f"{module_name}.py")

#Use zipped code as AWS Lambda function code
with open(zip_path, 'rb') as f:
    fcn_code = f.read()

shutil.rmtree(lambda_output_path)

#Create AWS Lambda function
lambda_arn = create_lambda(module_name, lambda_fcn_name, fcn_desc, fcn_code, lambda_role['arn'])

Creating AWS Lambda function ...
SUCCESS: Successfully created AWS Lambda function!


In [20]:
print(lambda_arn)

arn:aws:lambda:us-east-1:791574662255:function:content-sm-lambda-evt-trigger-fcn


In [None]:
# # Create and attach trigger for Amazon S3 event to kick-off AWS Lambda function
# print(f'Data landing zone prefix for S3 trigger: {train_data_dir_prefix}')
# create_s3_trigger(fcn_name, bucket, train_data_dir_prefix, account_id, lambda_arn)

# #Wait for the trigger to be created
# print('Waiting for 5 seconds for the newly created trigger to be active.')
# time.sleep(5)

In [6]:
# This file contains a small sample of ovr data
sagemaker.s3.S3Uploader.upload("./data/ovr_content_data_v3.csv", 
                               f"s3://{bucket}/{train_data_dir_prefix}")
#wait for file to finish uploading 
time.sleep(5)