### MLOps 에 필요한 Lambda file 생성하기

#### lambda-yolo5-training

이 Lambda Function 은 lambda_docker 를 참조합니다.

#### lambda-check-acc

In [1]:
!mkdir lambda-check-acc

In [7]:
%%writefile lambda-check-acc/lambda_function.py

import json
import boto3
import tarfile
from io import BytesIO
import os
import pickle
from io import StringIO
import csv


s3 = boto3.client('s3')
sm = boto3.client('sagemaker')
s3_resource = boto3.resource('s3')


acc_col_num = os.environ['ACC_COL_NUM']


def lambda_handler(event, context):
    # print(event)    
    model_data_url = event['model_data_url']
    bucket = event['bucket']
    key_value = model_data_url.split(bucket)[1][1:]
    print(key_value)
    tar_file_obj = s3.get_object(Bucket=bucket, Key=key_value)
    tar_content = tar_file_obj ['Body'].read()
    
    accuracy = 0
    
    with tarfile.open(fileobj = BytesIO(tar_content)) as tar:
      for tar_resource in tar:
          if (tar_resource.isfile()):
            # if "txt" in tar_resource.name:
            #     inner_file_bytes = tar.extractfile(tar_resource).read()
            #     print(inner_file_bytes)
            #     accuracy = inner_file_bytes.decode('utf-8')
            if "results.csv" in tar_resource.name:
                inner_file_bytes = tar.extractfile(tar_resource).read()
                file_data = inner_file_bytes.decode('utf-8')
                file = StringIO(file_data)
                csv_data = csv.reader(file, delimiter=",")
                
                max_line = len(list(csv_data))
                
                file = StringIO(file_data)
                csv_data = csv.reader(file, delimiter=",")
                
                line_count = 0
                
                for row in csv_data:
                    line_count += 1
                    if line_count == max_line:
                        accuracy = row[int(acc_col_num)].lstrip()
                        
    print("accuracy is " + accuracy)
    
    desired_accuracy = event['desired_accuracy']
    
    if accuracy > desired_accuracy:
        event['train_result'] = "PASS"
        print("PASS")
    else:
        event['train_result'] = "FAIL"
        print("FAIL")

    return event


Writing lambda-check-acc/lambda_function.py


In [8]:
!zip -r lambda-check-acc.zip lambda-check-acc

  adding: lambda-check-acc/ (stored 0%)
  adding: lambda-check-acc/lambda_function.py (deflated 66%)


#### lambda-sf-trigger

In [9]:
!mkdir lambda-sf-trigger

In [10]:
%%writefile lambda-sf-trigger/lambda_function.py
import json
import boto3
import os


s3 = boto3.client('s3')
sf = boto3.client('stepfunctions')


state_machine_arn = os.environ['STATE_MACHINE_ARN']
desired_accuracy = os.environ['DESIRED_ACCURACY']


def lambda_handler(event, context):
    
    print(event)
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    file_key = event['Records'][0]['s3']['object']['key']
    print('Reading {} from {}'.format(file_key, bucket_name))
    
    # obj = s3.get_object(Bucket = bucket_name, Key = file_key)
    
    # file_content = obj['Body'].read().decode('utf-8')
    json_string = {
        "desired_accuracy": desired_accuracy
    }
    
    # json_content = json.loads(json_string)
    print(json_string)
    
    sf.start_execution(
        stateMachineArn = state_machine_arn,
        input = json.dumps(json_string))
    
    return event



Writing lambda-sf-trigger/lambda_function.py


In [12]:
!zip -r lambda-sf-trigger.zip lambda-sf-trigger

  adding: lambda-sf-trigger/ (stored 0%)
  adding: lambda-sf-trigger/lambda_function.py (deflated 49%)


#### lambda-await

In [13]:
!mkdir lambda-await

In [14]:
%%writefile lambda-await/lambda_function.py
import json
import boto3
import os


sagemaker       = boto3.client('sagemaker')
cfn_client      = boto3.client('cloudformation')


def lambda_handler(event, context):
    stage = event['stage']

    if stage == 'Training':
        training_job_name = event['training_job_name']
        training_details = describe_training_job(training_job_name)
        print(training_details)

        status = training_details['TrainingJobStatus']
        if status == 'Completed':
            s3_output_path = training_details['OutputDataConfig']['S3OutputPath']
            model_data_url = os.path.join(s3_output_path, training_details['TrainingJobName'], 'output/model.tar.gz')

            event['message'] = 'Training job "{}" complete. Model data uploaded to "{}"'.format(training_job_name, model_data_url)
            event['model_data_url'] = model_data_url
            event['training_job'] = training_details['TrainingJobName']
        elif status == 'Failed':
            failure_reason = training_details['FailureReason']
            event['message'] = 'Training job failed. {}'.format(failure_reason)
    
    event['status'] = status
    
    print(event)
    
    return event

def describe_training_job(name):
    """ Describe SageMaker training job identified by input name.
    Args:
        name (string): Name of SageMaker training job to describe.
    Returns:
        (dict)
        Dictionary containing metadata and details about the status of the training job.
    """
    try:
        response = sagemaker.describe_training_job(
            TrainingJobName = name
        )
    except Exception as e:
        print(e)
        print('Unable to describe training job.')
        raise(e)
    
    return response

Writing lambda-await/lambda_function.py


In [15]:
!zip -r lambda-await.zip lambda-await

  adding: lambda-await/ (stored 0%)
  adding: lambda-await/lambda_function.py (deflated 64%)


#### lambda-model-registry

In [18]:
!mkdir lambda-model-registry

In [19]:
%%writefile lambda-model-registry/lambda_function.py

import json
import boto3
import botocore
import os


sm_client = boto3.client("sagemaker")

model_package_group_name = os.environ['MODEL_PACKAGE_GROUP_NAME']
model_package_group_desc = os.environ['MODEL_PACKAGE_GROUP_DESC']

# training_image = '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'
training_image = os.environ['ECR_IMAGE_URI']


def lambda_handler(event, context):
    
    modelpackage_inference_specification =  {
        "InferenceSpecification": {
          "Containers": [
             {
                "Image": training_image,
             }
          ],
          "SupportedContentTypes": [ "application/x-image" ],
          "SupportedResponseMIMETypes": [ "application/x-image" ],
        }
    }
     
    model_data_url = event['model_data_url'] 
    
    
    # Specify the model data
    modelpackage_inference_specification["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]=model_data_url
    
    create_model_package_input_dict = {
        "ModelPackageGroupName" : model_package_group_name,
        "ModelPackageDescription" : model_package_group_desc,
        "ModelApprovalStatus" : "PendingManualApproval"
    }

    create_model_package_input_dict.update(modelpackage_inference_specification)
    modelpackage_inference_specification["InferenceSpecification"]["Containers"][0]
    
    try:
        create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
    except botocore.exceptions.ClientError as ce:
        # When model package group does not exit
        print('Model package grop does not exist. Creating a new one')
        if ce.operation_name == "CreateModelPackage":
            if ce.response["Error"]["Message"] == "Model Package Group does not exist.":
                # Create model package group
                create_model_package_group_response = sm_client.create_model_package_group(
                    ModelPackageGroupName=model_package_group_name,
                    ModelPackageGroupDescription=model_package_group_desc,
                )
                
                create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
                
    return event



Writing lambda-model-registry/lambda_function.py


In [20]:
!zip -r lambda-model-registry.zip lambda-model-registry

  adding: lambda-model-registry/ (stored 0%)
  adding: lambda-model-registry/lambda_function.py (deflated 66%)
