In [None]:
# SageMaker JumpStart provides APIs as part of SageMaker SDK that allow you to deploy and fine-tune models in network isolation using scripts that SageMaker maintains.

from sagemaker.jumpstart.model import JumpStartModel


model_id = "huggingface-llm-falcon-40b-bf16"
endpoint_name ="sm-js-llm-async-falcon-40b-bf16"
endpoint_input = {'inputs': 'Girafatron is obsessed with giraffes, the most glorious animal on the face of this Earth. Giraftron believes all other animals are irrelevant when compared to the glorious majesty of the giraffe.\nDaniel: Hello, Girafatron!\nGirafatron:', 'parameters': {'max_new_tokens': 50, 'top_k': 10, 'return_full_text': False, 'do_sample': True}}

In [None]:
import sagemaker
from sagemaker.huggingface import HuggingFaceModel, get_huggingface_llm_image_uri
import time
import boto3

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()

In [None]:
role = sagemaker.get_execution_role()  # execution role for the endpoint
sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
bucket = sess.default_bucket()  # bucket to house artifacts
model_bucket = sess.default_bucket()  # bucket to house artifacts
s3_code_prefix_js = "hf-large-model-djl-/code_falcon40b/jumstart"  # folder within bucket where code artifact will go

s3_bucket=sagemaker_session.default_bucket()
bucket_prefix='falcon-async-inference'

region = sess._region_name
account_id = sess.account_id()

s3_client = boto3.client("s3")
sm_client = boto3.client("sagemaker")
smr_client = boto3.client("sagemaker-runtime")


In [None]:
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig

async_config = AsyncInferenceConfig(
    output_path=f"s3://{s3_bucket}/{bucket_prefix}/output",
    max_concurrent_invocations_per_instance=4,
    # Optionally specify Amazon SNS topics
    # notification_config = {
    # "SuccessTopic": "arn:aws:sns:<aws-region>:<account-id>:<topic-name>",
    # "ErrorTopic": "arn:aws:sns:<aws-region>:<account-id>:<topic-name>",
    # }
)

In [None]:
model = JumpStartModel(model_id=model_id)
predictor = model.deploy(
    initial_instance_count=1,
        instance_type="ml.g5.24xlarge",
        endpoint_name=endpoint_name,
        async_inference_config=async_config)


In [None]:
# Check if endpoint is created
endpoint_description = sagemaker_session.sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
# code to invoke async endpoint for inference

print(endpoint_description)

In [None]:
import os

def upload_file(input_location):
    prefix = f"{bucket_prefix}/input"
    return sagemaker_session.upload_data(
        input_location,
        bucket=sagemaker_session.default_bucket(),
        key_prefix=prefix,
        extra_args={"ContentType": "text/json"},
    )

# Inference

In [None]:
%%writefile ../inference/async_sm_js_endpoint_text.jsonl
{"inputs": "Translate English to French: Cheese", "properties": {"do_sample": true, "max_new_tokens": 110,"no_repeat_ngram_size": 3}}


In [None]:
%%writefile ../inference/async_sm_js_endpoint_text.jsonl
{"inputs":  "I hate it when my phone battery dies. Sentiment: Negative ### Tweet: My day has been :+1: Sentiment: Positive ### Tweet: This new music video was incredibile Sentiment:", "properties": { "max_new_tokens": 3}}

In [None]:
%%writefile ../inference/async_sm_js_endpoint_text.jsonl
{"inputs":  "Could you remind me when was the C programming language invented?", "properties": { "max_new_tokens": 3}}

In [None]:
input_1_location = "../inference/async_sm_js_endpoint_text.jsonl"
input_1_s3_location = upload_file(input_1_location)

response_model = smr_client.invoke_endpoint_async(
    EndpointName=endpoint_name,
    InputLocation=input_1_s3_location,
    # Accept='application/json',
    ContentType="application/json"
)

output_location = response_model["OutputLocation"]
print(f"OutputLocation: {output_location}")

output = get_output(output_location)
print(f"Output: {output}")

In [None]:
response_model = smr_client.invoke_endpoint_async(
    EndpointName=endpoint_name,
    InputLocation=input_1_s3_location,
    # Accept='application/json',
    ContentType="application/json"
)

output_location = response_model["OutputLocation"]
print(f"OutputLocation: {output_location}")


# response_model["Body"].read().decode("utf8")

In [None]:
output = get_output(output_location)
print(f"Output: {output}")

In [None]:
import urllib, time
from botocore.exceptions import ClientError

def get_output(output_location):
    output_url = urllib.parse.urlparse(output_location)
    bucket = output_url.netloc
    key = output_url.path[1:]
    print("key",output_url.path[1:])
    while True:
        try:
            return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])
        except ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchKey":
                print("waiting for output...")
                time.sleep(2)
                continue
            raise