In [20]:
region='ap-northeast-2'
bucket = 'daytrip-ai-service'
role = "arn:aws:iam::742627718059:role/DayTripAISagemaker"

print('Region: {}'.format(region))
print('Bucket: {}'.format(bucket))
print('Role: {}'.format(role))

Region: ap-northeast-2
Bucket: daytrip-ai-service
Role: arn:aws:iam::742627718059:role/DayTripAISagemaker


In [21]:
dataset = "daylog-pref-v3"
mlp_layer = 4
predictive_factor = 64
model_name = f'neumf-{dataset}-{mlp_layer}-{predictive_factor}'
pascal_model_name = 'NeuMFDaylogPrefV3'

model_package_group_name = model_name
project_prefix = model_name

print('Model package group name: {}'.format(model_package_group_name))
print('Project prefix: {}'.format(project_prefix))

Model package group name: neumf-daylog-pref-v3-4-64
Project prefix: neumf-daylog-pref-v3-4-64


In [22]:
import boto3
from sagemaker.workflow.pipeline_context import PipelineSession

boto_session = boto3.Session(region_name=region)
pipeline_session = PipelineSession(boto_session=boto_session, default_bucket=bucket)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


In [23]:
source_dir = '../code'

In [24]:
import tarfile

with tarfile.open('source.tar.gz', 'w:gz') as tar:
    tar.add(source_dir, arcname='.')
    
source_artifact = pipeline_session.upload_data('source.tar.gz', bucket, f'{project_prefix}/source')
repackage_code = pipeline_session.upload_data('./repackage.py', bucket, f'{project_prefix}/source')
print("Source artifact: {}".format(source_artifact))
print("Repackage code: {}".format(repackage_code))


Source artifact: s3://daytrip-ai-service/neumf-daylog-pref-v3-4-64/source/source.tar.gz
Repackage code: s3://daytrip-ai-service/neumf-daylog-pref-v3-4-64/source/repackage.py


In [25]:
from sagemaker.pytorch.estimator import PyTorch as PyTorchEstimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.steps import CacheConfig

train_instance_type = 'ml.g4dn.8xlarge'
train_instance_count = 1
train_output_path = f's3://{bucket}/{project_prefix}/train'

estimator = PyTorchEstimator(
    sagemaker_session=pipeline_session,
    role=role,
    source_dir=source_artifact,
    entry_point='train.py',
    instance_type=train_instance_type,
    instance_count=train_instance_count,
    py_version='py310',
    framework_version='2.2.0',
    output_path=train_output_path,
    hyperparameters={
        'device': 'cuda',

        'model-name': model_name,
        'mlp-layer': mlp_layer,
        'predictive-factor': predictive_factor,

        'negative-sample-ratio': 4,

        'epochs': 60,
        'batch-size': 256,
        'lr': 0.002,

        'eval-k': 10
    },
    metric_definitions=[
        {'Name': 'HR@10', 'Regex': 'HR@10\/validation=(.*?);'},
        {'Name': 'mAP@10', 'Regex': 'mAP@10\/validation=(.*?);'},
        {'Name': 'nDCG@10', 'Regex': 'nDCG@10\/validation=(.*?);'},
        {'Name': 'LOSS', 'Regex': 'LOSS\/train=(.*?);'},
    ],
    disable_profiler = True,
)

training_step = TrainingStep(
    name=pascal_model_name+"-Training",
    estimator=estimator,
    inputs={
        'train': TrainingInput(s3_data=f's3://{bucket}/dataset/{dataset}/train'),
        'validation': TrainingInput(s3_data=f's3://{bucket}/dataset/{dataset}/validation'),
    },
    # cache_config=CacheConfig(enable_caching=True, expire_after='1d'),
)


In [26]:
import time
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker import ScriptProcessor

repack_image_uri = f'763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-training:2.2.0-gpu-py310'
repack_instance_type = 'ml.m5.xlarge'
repack_output_path = f's3://{bucket}/{project_prefix}/repack'

script_processor = ScriptProcessor(
    sagemaker_session=pipeline_session,
    role=role,
    image_uri=repack_image_uri,
    command=['python3'],
    instance_type=repack_instance_type,
    instance_count=1,
)

repack_output = ProcessingOutput(
    output_name='repack',
    source='/opt/ml/processing/output',
    destination=f'{repack_output_path}/repack-{time.strftime("%Y%m%d-%H%M%S")}',
)

repack_step = ProcessingStep(
    name=pascal_model_name+'-Repack',
    processor=script_processor,
    inputs=[
        ProcessingInput(source=training_step.properties.ModelArtifacts.S3ModelArtifacts, destination='/opt/ml/processing/model'),
        ProcessingInput(source=source_artifact, destination='/opt/ml/processing/source')
    ],
    outputs=[
        ProcessingOutput(
            output_name='repack',
            source='/opt/ml/processing/output',
            destination=f'{repack_output_path}/repack-{time.strftime("%Y%m%d-%H%M%S")}',
        ),
    ],
    code=repackage_code,
    # cache_config=CacheConfig(enable_caching=True, expire_after='1d'),
    depends_on=[training_step],
)

In [27]:
from sagemaker.workflow.functions import Join
from sagemaker.workflow.model_step import ModelStep
from sagemaker import Model

model_image_uri = f'763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-inference:2.2.0-gpu-py310'
model_instance_type = 'ml.g4dn.4xlarge'
accelerator_instance_type= 'ml.eia1.medium'
model_data = Join(on='/', values=[repack_step.properties.ProcessingOutputConfig.Outputs['repack'].S3Output.S3Uri, 'model.tar.gz'])

# model_metrics = ModelMetrics(
#     model_statistics=MetricsSource(
#         s3_uri="{}/evaluation.json".format(
#             step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
#         ),
#         content_type="application/json"
#     )
# )

model = Model(
    sagemaker_session=pipeline_session,
    role=role,
    image_uri=model_image_uri,
    model_data=model_data,
)

model_register_step_args = model.register(
    model_package_group_name=model_package_group_name,
    content_types=['application/json'],
    response_types=['application/json'],
    inference_instances=[model_instance_type],
    transform_instances=[model_instance_type],
    approval_status='PendingManualApproval',
)

model_register_step = ModelStep(
    name=pascal_model_name,
    step_args=model_register_step_args,
    depends_on=[repack_step],
)

In [28]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = pascal_model_name + '-Train-Pipeline'
pipeline = Pipeline(
    name=pipeline_name,
    sagemaker_session=pipeline_session,
    steps=[training_step, repack_step, model_register_step],
)

pipeline.upsert(role_arn=role)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:742627718059:pipeline/NeuMFDaylogPrefV3-Train-Pipeline',
 'ResponseMetadata': {'RequestId': 'e88d47fe-9928-4143-bc92-bf107abf0b34',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e88d47fe-9928-4143-bc92-bf107abf0b34',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '105',
   'date': 'Thu, 22 Aug 2024 07:28:22 GMT'},
  'RetryAttempts': 0}}

In [19]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:ap-northeast-2:742627718059:pipeline/NeuMFDaylogPrefV3-Train-Pipeline/execution/jisn12bfqv5l', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x105357f40>)