# Deploy DeepSeek Distilled Models on SageMaker

This notebook walks you through various strategies of deploying [distilled models from DeepSeek R1](https://huggingface.co/collections/deepseek-ai/deepseek-r1-678e1e131c0169c0bc89728d). This includes:
    
- DeepSeek-R1-Distill-Llama-70B using Hugginface Text Generation Inference (TGI) on SageMaker Endpoint
- DeepSeek-R1-Distill-Llama-70B using Deep Java Library and vLLM backend on SageMaker Endpoint
- DeepSeek-R1-Distill-Llama-8B and DeepSeek-R1-Distill-Qwen-7B on a single SageMaker Endpoint using Inference Components

However, this implementation applies to any of the distilled deepseek models on HuggingFace in the aforementioned link

In [None]:
pip install sagemaker -U

## Using HuggingFace TGI for Serving DeepSeek-R1-Distill-Llama-70B

In [None]:
import json
import sagemaker
import boto3
from sagemaker.huggingface import HuggingFaceModel, get_huggingface_llm_image_uri

try:
	role = sagemaker.get_execution_role()
except ValueError:
	iam = boto3.client('iam')
	role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

# Hub Model configuration. https://huggingface.co/models
hub = {
	'HF_MODEL_ID':'deepseek-ai/DeepSeek-R1-Distill-Llama-70B',
	'SM_NUM_GPUS': json.dumps(8) # Change this based on the GPU used, ml.g6.48xlarge has 8 GPUs
}



# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
	image_uri=get_huggingface_llm_image_uri("huggingface",version="2.3.1"),
	env=hub,
	role=role, 
)

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
	initial_instance_count=1,
	instance_type="ml.g6e.48xlarge", # has 8 GPUs with a total GPU memeory of 384 GB
	container_startup_health_check_timeout=3600,
  )


### Invoke the SageMaker Endpoint

In [281]:
# Streamin helper class
import json
import boto3
import io
import re

NEWLINE = re.compile(r'\\n')  
DOUBLE_NEWLINE = re.compile(r'\\n\\n')

class LineIterator:
    """
    A helper class for parsing the byte stream from Llama 2 model inferenced with LMI Container. 
    
    The output of the model will be in the following repetetive but incremental format:
    ```
    b'{"generated_text": "'
    b'lo from L"'
    b'LM \\n\\n'
    b'How are you?"}'
    ...

    For each iteration, we just read the incremental part and seek for the new position for the next iteration till the end of the line.

    """
    
    def __init__(self, stream, cc_schema):
        self.byte_iterator = iter(stream)
        self.buffer = io.BytesIO()
        self.read_pos = 0
        self.cc_schema = cc_schema

    def __iter__(self):
        return self

    def __next__(self):
        start_sequence = b'{"generated_text": "'
        stop_sequence = b'"}'
        new_line = '\n'
        double_new_line = '\n\n'
        while True:
            self.buffer.seek(self.read_pos)
            line = self.buffer.readline()
            if line:
                self.read_pos += len(line)
                if line.startswith(start_sequence):# in :
                    line = line.lstrip(start_sequence)
                
                if line.endswith(stop_sequence):
                    line =line.rstrip(stop_sequence)
                line = line.decode('utf-8')
                line = NEWLINE.sub(new_line, line)
                line = DOUBLE_NEWLINE.sub(double_new_line, line) 
                if self.cc_schema:
                    pattern = r'"content"\s*:\s*"((?:[^"\\]|\\.)*)"'  # snipping out the "content :" key that holds the model response as its value (For DJL)
                else: 
                    pattern = r'"text"\s*:\s*"((?:[^"\\]|\\.)*)"'  # snipping out the "text :" key that holds the model response as its value (For TGI)
                match = re.search(pattern, line)
                # print(line)
                if match:
                    return match.group(1)
                else:
                    return ""
                    
            try:
                chunk = next(self.byte_iterator)
            except StopIteration:
                if self.read_pos < self.buffer.getbuffer().nbytes:
                    continue
                raise
            if 'PayloadPart' not in chunk:
                print('Unknown event type:' + chunk)
                continue
            self.buffer.seek(0, io.SEEK_END)
            self.buffer.write(chunk['PayloadPart']['Bytes'])

smr_client = boto3.client("sagemaker-runtime")

# SageMaker Runtime API for Inference with Streaming
def get_realtime_response_stream(sagemaker_runtime, endpoint_name, payload):
    response_stream = sagemaker_runtime.invoke_endpoint_with_response_stream(
        EndpointName=endpoint_name,
        Body=json.dumps(payload), 
        ContentType="application/json",
    )
    return response_stream


def print_response_stream(response_stream,  cc_schema = False):
    event_stream = response_stream.get('Body')
    for line in LineIterator(event_stream, cc_schema):  
        print(line.strip(), end=" ")

In [287]:
# Prompt template for the distilled llama model

def prompt_template(system_message, query, stream):

    # DeepSeek does not recommend using system prompts and recommends adding it to the user prompt
    user_question = f"{system_message}\n\n{query}"
    payload={'inputs':  f"""<｜begin▁of▁sentence｜><｜User｜>{user_question}<｜Assistant｜>""", 
     'parameters': {'max_new_tokens': 1050, 'top_p': 0.9, 'temperature': 0.1, "return_full_text": False, "stop" : ["<｜end▁of▁sentence｜>"]}, "stream": stream}

    # template with system message parameter
    # payload={'inputs':  f"""<｜begin▁of▁sentence｜>{system_message}<｜User｜>{query}<｜Assistant｜>""", 
    #  'parameters': {'max_new_tokens': 500, 'top_p': 0.9, 'temperature': 0.1, "return_full_text": False}}
    
    return payload

In [288]:
# send a request to the sagemaker endpoint
system_message="""
You are a Chatty Assitant
"""
query= "What is the most expensive gem?"

stream = True #to stream response or not

payload = prompt_template(system_message, query, stream)

if not stream:
    response = predictor.predict(
    payload
    )

    print(response[0]['generated_text'])
else:
    smr_client = boto3.client('sagemaker-runtime')
    response_stream = get_realtime_response_stream(smr_client, predictor.endpoint_name, payload)
    print_response_stream(response_stream)   

<think>    Okay  ,  so  I  need  to  figure  out  what  the  most  expensive  gem  is  .  I  'm  not  really  a  gem  expert  ,  but  I  know  a  few  things  .  Let  me  start  by  thinking   about  what  makes  a  gem  expensive  .  It   's  probably  a  combination  of  rarity  ,  beauty  ,  and  maybe  historical  significance  .    I  remember   hearing  about  diamonds  being  really  valuable  ,  especially  large  ,  flawless   ones  .  The  \"  Pink  Star  \"  comes  to  mind  ;  I  think  it  's  a  pink  diamond  that  sold  for  a  lot  .  But  I  'm  not  sure  if  it  's  the  most  expensive  .  Then  there  's  the  \"  C  ull  inan  Diamond  ,\"  which  I  think  is  also  very  valuable  ,  but  I  'm  not  certain   about  its  price  .  I  also  recall  that  some  colored  diamonds  ,  like  red  diamonds   ,  are  extremely  rare  .  The  \"  M  ou  ssa  ie  ff  Red  Diamond  \"  might  be  one  of  them  .  It  's  supposed  to  be  the  largest   known  red  dia

### To invoke an existing SageMaker Endpoint

In [None]:
from sagemaker.predictor import Predictor
predictor1 = Predictor(endpoint_name="ENDPOINT NAME")

system_message="""
You are a Chatty Assitant
"""
query= "What is the most expensive gem?"

user_question = f"{system_message}\n\n{query}"
payload={'inputs':  f"""<｜begin▁of▁sentence｜><｜User｜>{user_question}<｜Assistant｜>""", 
 'parameters': {'max_new_tokens': 500, 'top_p': 0.9, 'temperature': 0.1, "return_full_text": False}}

response = predictor1.predict(json.dumps(payload),
                  initial_args={"ContentType": "application/json"}
                 )

print(json.loads(response)[0]['generated_text'])

# Using DJL with vLLM for serving DeepSeek-R1-Distill-Llama-70B

In [239]:
import sagemaker
from sagemaker.djl_inference.model import DJLModel
import boto3
import json

In [212]:
model_id = 'deepseek-ai/DeepSeek-R1-Distill-Llama-70B' # model will be download form Huggingface hub

env = {
    "TENSOR_PARALLEL_DEGREE": "8",            # use 8 GPUs, modify baed on instance types
    "OPTION_ROLLING_BATCH": "vllm",           # use vllm for rolling batching
    "OPTION_TRUST_REMOTE_CODE": "true",
   }
role = sagemaker.get_execution_role()
model = DJLModel(
    model_id=model_id,
    env=env,
    role=role)

In [None]:
instance_type = "ml.g6e.48xlarge" # has 8 GPUs with a total GPU memeory of 384 GB

predictor = model.deploy(initial_instance_count=1,
             instance_type=instance_type,
             # endpoint_name="djl-llama-70-distil-r1",
             container_startup_health_check_timeout=3600
            )

In [247]:
%%time
## Use Inference API Schema
stream = True
payload = {"inputs": "What are the planets in our solar system?", "parameters": {"max_new_tokens":128,'temperature': 0.6,}, "stream":stream}

if not stream:
    response = predictor.predict(
    payload
    )

    print(response['generated_text'])
else:
    smr_client = boto3.client('sagemaker-runtime')
    response_stream = get_realtime_response_stream(smr_client, predictor.endpoint_name, payload)
    print_response_stream(response_stream)   


 Well , let me think . There 's Mercury , Venus , Earth , Mars , Jupiter , Saturn , Uran us , and Neptune . Wait , didn 't Pluto used to be considered a planet ? Yeah , I remember  hearing that it was re classified as a dwarf planet a while back . So , now we have eight planets in our solar system .  Each of these  planets has unique features . Mercury is the closest  to the Sun and is really hot during the day  but freezing at night . Venus is known for being the hottest planet , even hotter than Mercury , because  of its thick atmosphere that traps heat . Earth  is where we live , and it 's    CPU times: user 40 ms, sys: 4.56 ms, total: 44.5 ms
Wall time: 4.41 s


In [250]:
%%time
## Use Chat Completion Schema
stream = False
payload = {
    "messages": [
      {
        "role": "system",
        "content": "You are a helpful assistant."
      },
      {
        "role": "user",
        "content": "What is deep learning?"
      }
    ],
    "max_tokens":256,
    "temperature": 0.6,
    "stream": stream
  }
## Use Chat Completions API Schema

if not stream:
    response = predictor.predict(
    payload
    )

    print(response['choices'])
else:
    smr_client = boto3.client('sagemaker-runtime')
    response_stream = get_realtime_response_stream(smr_client, predictor.endpoint_name, payload)
    print_response_stream(response_stream, cc_schema=True)   


[{'index': 0, 'message': {'role': 'assistant', 'content': '<think>\nOkay, so I need to understand what deep learning is. I\'ve heard the term before, especially in the context of AI and machine learning, but I\'m not entirely sure what it entails. Let me start by breaking down the term: "deep learning." It must be a subset of machine learning, which itself is part of artificial intelligence. \n\nI remember that machine learning involves training models to make predictions or decisions based on data. Traditional machine learning requires a lot of feature engineering, where humans manually select which features of the data are important. But deep learning, I think, is different because it can learn these features automatically. That must be why it\'s called "deep"—because it uses multiple layers to learn hierarchical representations of data.\n\nSo, deep learning uses neural networks, inspired by the human brain. Each layer in these networks processes the data at different levels. For exa

## Using SageMaker Inference Componenets to serve R1- Distilled Llamma 8b and Qwen 7B

In [251]:
import sagemaker
from sagemaker.djl_inference.model import DJLModel
import boto3
import json

#### Create the endpoint 

In [252]:
# Create Endpoint Configuration for serving the models
role = sagemaker.get_execution_role()
sm_client = boto3.client(service_name="sagemaker")
endpoint_config_name = "r1-distilled-model-v2"
sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ExecutionRoleArn=role,
    ProductionVariants=[{
        "VariantName": "AllTraffic",
        "InstanceType": "ml.g5.12xlarge",
        "InitialInstanceCount": 1,
		"RoutingConfig": {
            "RoutingStrategy": "LEAST_OUTSTANDING_REQUESTS"
        }
    }]
)

endpoint_name = "r1-distilled-ic-model-v2"
sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)

{'EndpointArn': 'arn:aws:sagemaker:us-east-1:715253196401:endpoint/r1-distilled-ic-model-v2',
 'ResponseMetadata': {'RequestId': 'eb8fcf39-661f-4222-85cc-52546cf95875',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'eb8fcf39-661f-4222-85cc-52546cf95875',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '92',
   'date': 'Wed, 29 Jan 2025 20:11:31 GMT'},
  'RetryAttempts': 0}}

#### Create the SageMaker Model objects for teh models of interest

In [253]:
# Create a SageMaker model object for llama 70b with DJL and LMI-dist serving properties

model_id = 'deepseek-ai/DeepSeek-R1-Distill-Llama-8B' # model will be download form Huggingface hub

env = {
    "TENSOR_PARALLEL_DEGREE": "2",            # use 2 GPUs out of, modify baed on instance types
    "OPTION_ROLLING_BATCH": "lmi-dist",           # use LMI-Dist for rolling batching
    "OPTION_TRUST_REMOTE_CODE": "true",
   }
role = sagemaker.get_execution_role()
llama_model = "r1-llama-3-8b-distill"
model = DJLModel(
    model_id=model_id,
    env=env,
    name = llama_model,
    sagemaker_session=sagemaker.Session(),
    role=role)

model.create(instance_type="ml.g5.12xlarge")

In [254]:
# Create a SageMaker model object for Qwen 32B with DJL and vLLM serving properties

model_id = 'deepseek-ai/DeepSeek-R1-Distill-Qwen-7B' # model will be download form Huggingface hub

env = {
    "TENSOR_PARALLEL_DEGREE": "2",            # use 2 GPUs, modify baed on instance types
    "OPTION_ROLLING_BATCH": "vllm",           # use vllm for rolling batching
    "OPTION_TRUST_REMOTE_CODE": "true",
   }
role = sagemaker.get_execution_role()
qwen_model = "r1-qwen-7b-distill"
model = DJLModel(
    model_id=model_id,
    env=env,
    name = qwen_model,
    sagemaker_session=sagemaker.Session(),
    role=role)

model.create(instance_type="ml.g5.12xlarge")

#### Check the status of the endpoint until it has successfully deployed

In [262]:
# Check Status of endpoint

sagemaker_client = boto3.client('sagemaker')

while True:
    try:
        response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
        status = response['EndpointStatus']
        print(f"Current status: {status}")

        if status == 'InService':
            print(f"Endpoint '{endpoint_name}' is now in service!")
            break
        elif status in ['Creating', 'Updating']:
            print(f"Endpoint '{endpoint_name}' is still {status.lower()}. Waiting...")
            time.sleep(30)  # Sleep for 30 seconds before checking again
        else:
            print(f"Endpoint status is '{status}'. This may indicate an issue.")
            break

    except sagemaker_client.exceptions.ClientError as e:
        if e.response['Error']['Code'] == 'ValidationException':
            print(f"Endpoint '{endpoint_name}' not found.")
        else:
            print(f"An error occurred: {e}")
        break

print("Endpoint status check completed.")

Current status: InService
Endpoint 'r1-distilled-ic-model-v2' is now in service!
Endpoint status check completed.


In [None]:
# create Inference Componenets of the various models to be hosted behind the endpoint with their respective resource configs
sm_client.create_inference_component(
    InferenceComponentName=qwen_model,
    EndpointName=endpoint_name,
    VariantName="AllTraffic",
    Specification={
        "ModelName": qwen_model,
        "ComputeResourceRequirements": {
		    "NumberOfAcceleratorDevicesRequired": 2, # Number of GPUs
			"NumberOfCpuCoresRequired": 20, # Number of CPU cores  (total vCPU // (number of replica * number if inference componenets) - more for management) Based on 48vCPU for the selected instance
			"MinMemoryRequiredInMb": 20*1024 # Minimum memory in MB (total CPU memory // (number of replica * number if inference componenets) - more for management) Based on 192 GB of the selected instance
	    }
    },
    RuntimeConfig={"CopyCount": 1}, # Number of replicas
)

# Inference component for FLAN-T5 XXL
sm_client.create_inference_component(
    InferenceComponentName=llama_model,
    EndpointName=endpoint_name,
    VariantName="AllTraffic",
    Specification={
        "ModelName": llama_model,
        "ComputeResourceRequirements": {
		    "NumberOfAcceleratorDevicesRequired": 2, 
			"NumberOfCpuCoresRequired": 20, 
			"MinMemoryRequiredInMb": 20*1024
	    }
    },
    RuntimeConfig={"CopyCount": 1},
)

In [276]:
# Invoke the endpoints
def invoke_sagemaker_endpoint_with_retry(endpoint_name, inference_component_name, payload, max_retries=5, initial_delay=10):

    sagemaker_runtime = boto3.client('sagemaker-runtime')

    for attempt in range(max_retries):
        try:
            response = sagemaker_runtime.invoke_endpoint(
                EndpointName=endpoint_name,
                InferenceComponentName = inference_component_name,
                ContentType="application/json",
                Accept="application/json",
                Body=json.dumps(payload),
            )

            result = response['Body'].read().decode()
            print(result)
            return result

        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'ValidationError' and 'Inference Component has no capacity' in str(e):
                if attempt < max_retries - 1:
                    wait_time = initial_delay * (2 ** attempt)  # Exponential backoff
                    print(f"Attempt {attempt + 1} failed. SageMaker still deploying Inference Components, Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print(f"Max retries reached. Last error: {e}")
            else:
                print(f"An error occurred: {e}")
                return None

        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            return None

    print("All retry attempts failed.")
    return None

system_message = "AI Assitant"
query = "Why is California a great place to live?"

payload={'inputs':  f"""<｜begin▁of▁sentence｜>{system_message}<｜User｜>{query}<｜Assistant｜>""", 
     'parameters': {'max_new_tokens': 100, 'top_p': 0.9, 'temperature': 0.6, "return_full_text": False}}   

inference_c_name = qwen_model #llama_model
result = invoke_sagemaker_endpoint_with_retry(endpoint_name, inference_c_name, payload)

{"generated_text": "<think>\nOkay, so I need to figure out why California is a great place to live. I'm not super familiar with California, but I know it's a big state in the U.S. I'll start by thinking about what makes a place good to live in. Probably things like climate, job opportunities, cost of living, culture, and maybe natural beauty. \n\nFirst, the climate. I've heard that California has a mild climate, which is nice because not everyone can handle extreme cold"}
