How to deploy whisper-large-v3 successfully by SageMaker notebook?


When you try to deploy whisper-large-v3 by SageMaker notebook, you may get error like below:


ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received client error (400) from primary with message "{
  "code": 400,
  "type": "InternalServerException",
  "message": "Wrong index found for \u003c|0.02|\u003e: should be None but found 50366."
 

In particular, it seems the AWS Deep Learning Containers only support up to transformers version 4.26.0, which is too low.

You can find details here https://huggingface.co/openai/whisper-large-v3/discussions/58 Therefore, I’m sharing a workaround (custom way) to deploy the model or its derivatives.


1. Create a notebook instance and a notebook file 
2. Execute following python code section by section

In [1]:
# !pip install -U sagemaker

## Converting Checkpoint to faster-whisper checkpoint

In [2]:
ckpt_dir = "/home/ec2-user/SageMaker/efs/Projects/whisper/checkpoint/checkpoint-v7/checkpoint-60"

save_dir = "./model"

In [3]:
!ct2-transformers-converter --model {ckpt_dir} --output_dir {save_dir} \
--copy_files tokenizer.json preprocessor_config.json --quantization float16

Loading checkpoint shards: 100%|██████████████████| 2/2 [00:01<00:00,  1.55it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


# Prepare inference.py and requirements.txt

In [4]:
import os

# Directory and file paths
dir_path = './code'
inference_file_path = os.path.join(dir_path, 'inference.py')
requirements_file_path = os.path.join(dir_path, 'requirements.txt')

# Create the directory structure
os.makedirs(os.path.dirname(inference_file_path), exist_ok=True)

# Inference.py content
inference_content = '''
from faster_whisper import WhisperModel
import boto3
import json
import os
from decimal import Decimal
def model_fn(model_dir):
    # model_size = "large-v3"
    ct_model_path=".model"
    #model = WhisperModel(model_dir)
    model = WhisperModel(model_dir, device="cuda", compute_type="float16")
    #model = WhisperModel(model_dir, device="cpu", compute_type="int8")
    return model

def transform_fn(model, request_body, request_content_type, response_content_type="application/json"):
    print(f"request_body:{request_body}")
    data = json.loads(request_body)
    print(f"type:{type(data)}")
    s3_client = boto3.client("s3",region_name='us-west-2')  # us-west-2
    s3_bucket = data['s3_bucket']
    print(f"s3_bucket:{type(s3_bucket)}")
    object_key = data['key']
    print(f"object_key:{type(object_key)}")
    audio_file_name = object_key[object_key.rfind('/')+1:]
    contact_id = audio_file_name[0:audio_file_name.find("_")]
    fragment_id = audio_file_name[audio_file_name.find("_")+1:audio_file_name.rfind("_")]
    s3_client.download_file(s3_bucket, object_key, f"/tmp/{audio_file_name}")
    segments, info = model.transcribe(f"/tmp/{audio_file_name}",language="yue",vad_filter=True, vad_parameters=dict(min_silence_duration_ms=100),)
    scripts = []
    speaker =""
    if audio_file_name.find("_agent.wav") > 0:
        speaker = "agent"
    else:
        speaker = "customer"
    for segment in segments:
        scripts.append({"contact_id":contact_id,"time_stamp":int(fragment_id)+segment.start,"words":str(segment.text),"speaker":speaker})
    dynamodb_client = boto3.resource('dynamodb',region_name='us-west-2')
    conversation_table = dynamodb_client.Table("speech2text")
    with conversation_table.batch_writer() as batch:
        for item in scripts:
            #print(item)
            response = batch.put_item(Item={
            "ContactID": item["contact_id"],
            "SaidTimeStamp": Decimal(str(item["time_stamp"])),
            "Speaker": item["speaker"],
            "Words": item["words"]
            })
    #print(f"result:{scripts}")
    os.remove(f"/tmp/{audio_file_name}")
    #return scripts
    return json.dumps(scripts), response_content_type

'''

# Write the inference.py file
with open(inference_file_path, 'w') as file:
    file.write(inference_content)

# Requirements.txt content
requirements_content = '''
#transformers==4.36.2
faster_whisper
boto3
ctranslate2==3.24.0
#accelerate==0.26.1
'''
# Write the requirements.txt file
with open(requirements_file_path, 'w') as file:
    file.write(requirements_content)

# Prepare model and upload to s3

In [5]:
import shutil
shutil.make_archive('./model', 'gztar', './model')

'/home/ec2-user/SageMaker/efs/Projects/whisper/sagemaker-deploy/faster-whisper/model.tar.gz'

In [6]:
import sagemaker
import boto3
# Get the SageMaker session and default S3 bucket
# sagemaker_session = sagemaker.Session()
sagemaker_session = sagemaker.session.Session()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session._region_name
role = sagemaker.get_execution_role()
print(f"bucket: {bucket}, role:{role}, region: {region}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
bucket: sagemaker-us-west-2-452145973879, role:arn:aws:iam::452145973879:role/sagemaker_full_access, region: us-west-2


In [7]:
prefix = 'models/whisper_ckpts/faster-v7-ckpt60'

# Upload the model to S3
model_uri = sagemaker_session.upload_data(
    'model.tar.gz',
    bucket=bucket,
    key_prefix=prefix
)
print(f"Model uploaded to {model_uri}")

Model uploaded to s3://sagemaker-us-west-2-452145973879/models/whisper_ckpts/faster-v7-ckpt60/model.tar.gz


In [8]:
!rm model.tar.gz
!rm -rf model
# model_uri = 's3://sagemaker-us-west-2-452145973879/models/whisper_ckpts/faster-whisper/model.tar.gz'

# Deploy faster-whisper to sagemaker endpoint

In [9]:
import boto3
import sagemaker
from sagemaker.huggingface.model import HuggingFaceModel
import time

id = int(time.time())
endpoint_name = f'faster-whisper-hf-real-time-endpoint-{id}'

huggingface_model = HuggingFaceModel(
  model_data=model_uri,
  entry_point="inference.py",
  source_dir='code',
  role=role,
  transformers_version='4.26',
  pytorch_version='1.13',
  py_version='py39',
)

In [10]:
predictor = huggingface_model.deploy(
  initial_instance_count=1,
  instance_type='ml.g4dn.xlarge',
  endpoint_name=endpoint_name,
)

----------!

In [11]:
import json
import urllib.parse
import boto3
import os
import logging

#dynamodb_client = boto3.resource('dynamodb',region_name='us-west-2')
#s3_client = boto3.client('s3',region_name='us-west-2')
sagemaker_runtime= boto3.client('runtime.sagemaker',region_name=region)
logger = logging.getLogger()
logger.setLevel("INFO")
#conversation_table = dynamodb_client.Table("speech2text")
s3_bucket = bucket

# here replace the key with your test example s3 path
key = urllib.parse.unquote_plus("datasets/wavs/d5f2afaa-53af-4dcb-ac24-3827a99c748e_1_customer.wav", encoding='utf-8')
try:
    logger.info("s3_bucket:"+s3_bucket)
    logger.info("key:"+key)
    data = json.dumps({"s3_bucket":s3_bucket,"key":key})
    response = sagemaker_runtime.invoke_endpoint(EndpointName=endpoint_name,ContentType='application/json',Body=data)
        #logging("type:"+type(response))
        #logging("response:"+str(response))
        #results = json.loads(response['Body'].read().decode())
        #response = s3_client.get_object(Bucket=s3_bucket, Key=key)
    '''
        with conversation_table.batch_writer() as batch:
            for item in results:
                print(item)
                response = batch.put_item(Item={
                "ContactID": item["contact_id"],
                "SaidTimeStamp": item["time_stamp"],
                "Speaker": item["speaker"],
                "Words": item["words"]
                })
        #return response['ContentType']
    '''
    #return "success"
    print(response)
except:
    logger.info("ignore known defects")

{'ResponseMetadata': {'RequestId': '55a6b400-724a-4b4b-8043-22e14ab86f2e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '55a6b400-724a-4b4b-8043-22e14ab86f2e', 'x-amzn-invoked-production-variant': 'AllTraffic', 'date': 'Thu, 27 Jun 2024 00:28:42 GMT', 'content-type': 'application/json', 'content-length': '2896', 'connection': 'keep-alive'}, 'RetryAttempts': 0}, 'ContentType': 'application/json', 'InvokedProductionVariant': 'AllTraffic', 'Body': <botocore.response.StreamingBody object at 0x7fe2d281ad40>}


# Delete Items in DynamoDB

In [12]:
import boto3

region = 'us-west-2'

table_name = "speech2text"
dynamodb_client = boto3.resource('dynamodb',region_name='us-west-2')
table = dynamodb_client.Table(table_name)

# Scan the table to get all the items
scan = table.scan()
with table.batch_writer() as batch:
    while 'Items' in scan and len(scan['Items']) > 0:
        for item in scan['Items']:
            # print(item)
            # Delete each item
            batch.delete_item(Key={'ContactID': item['ContactID'], 'SaidTimeStamp': item['SaidTimeStamp']})
        # Scan for more items (DynamoDB can limit the number of items returned by scan, use 'ExclusiveStartKey' to continue scanning)
        if 'LastEvaluatedKey' in scan:
            scan = table.scan(ExclusiveStartKey=scan['LastEvaluatedKey'])
        else:
            break

print(f"All items in the table '{table_name}' have been deleted.")

All items in the table 'speech2text' have been deleted.


# Massive Inference

In [13]:
import boto3

def list_s3_contents(bucket_name, prefix):
    # Initialize a session using Amazon S3
    s3 = boto3.client('s3')

    # Initialize the variables
    continuation_token = None
    contents = []

    while True:
        if continuation_token:
            response = s3.list_objects_v2(
                Bucket=bucket_name, 
                Prefix=prefix,
                ContinuationToken=continuation_token
            )
        else:
            response = s3.list_objects_v2(
                Bucket=bucket_name, 
                Prefix=prefix
            )

        if 'Contents' in response:
            contents.extend(response['Contents'])

        if response.get('IsTruncated'):  # if the response is truncated, there are more keys to retrieve
            continuation_token = response.get('NextContinuationToken')
        else:
            break

    return contents

# Define the bucket name and prefix
s3_uri = 's3://sagemaker-us-west-2-452145973879/datasets/midea_data/midea_dialogue/short_30s/'
bucket_name = s3_uri.split('/')[2]
prefix = '/'.join(s3_uri.split('/')[3:])

# List the contents
contents = list_s3_contents(bucket_name, prefix)
# for obj in contents:
#     print(obj['Key'])

In [None]:
import json
import urllib.parse
import boto3
import os
import logging
from tqdm import tqdm

#dynamodb_client = boto3.resource('dynamodb',region_name='us-west-2')
#s3_client = boto3.client('s3',region_name='us-west-2')
sagemaker_runtime= boto3.client('runtime.sagemaker',region_name=region)
logger = logging.getLogger()
logger.setLevel("INFO")
#conversation_table = dynamodb_client.Table("speech2text")
s3_bucket = bucket

for content in tqdm(contents, total=len(contents)):
    key = urllib.parse.unquote_plus(content['Key'], encoding='utf-8')
    try:
        logger.info("s3_bucket:"+s3_bucket)
        logger.info("key:"+key)
        data = json.dumps({"s3_bucket":s3_bucket,"key":key})
        response = sagemaker_runtime.invoke_endpoint(EndpointName=endpoint_name,ContentType='application/json',Body=data)
            #logging("type:"+type(response))
            #logging("response:"+str(response))
            #results = json.loads(response['Body'].read().decode())
            #response = s3_client.get_object(Bucket=s3_bucket, Key=key)
        '''
            with conversation_table.batch_writer() as batch:
                for item in results:
                    print(item)
                    response = batch.put_item(Item={
                    "ContactID": item["contact_id"],
                    "SaidTimeStamp": item["time_stamp"],
                    "Speaker": item["speaker"],
                    "Words": item["words"]
                    })
            #return response['ContentType']
        '''
        #return "success"
        # print(response)
    except:
        logger.info("ignore known defects")

 96%|█████████▌| 430/450 [17:38<00:48,  2.40s/it]

In [None]:
import boto3

# Initialize a session using Amazon DynamoDB
dynamodb = boto3.resource('dynamodb')

# Specify the DynamoDB table
table_name = 'speech2text'
table = dynamodb.Table(table_name)

# Scan the table
def scan_table(table):
    response = table.scan()
    data = response['Items']

    while 'LastEvaluatedKey' in response:
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        data.extend(response['Items'])
    
    return data

# Get items from the table
items = scan_table(table)

# # Print the items
# for item in items:
#     print(item)

In [None]:
import pandas as pd
from decimal import Decimal

# Convert the list of dictionaries to a DataFrame
df = pd.DataFrame(items)
# Sort the DataFrame by 'ContactID' and 'SaidTimeStamp'
df = df.sort_values(by=['ContactID', 'SaidTimeStamp'])

# Group by 'ContactID' and concatenate 'Words' with 'Speaker' prefix
grouped = df.groupby('ContactID').apply(
    lambda x: ' '.join(f"{row['Speaker']}: {row['Words']}\n" for _, row in x.iterrows())
).reset_index(name='content')

# Rename the columns to match the image
ckpt_name = 'v7_ckpt60'
grouped.columns = ['contact_id', f'content_{ckpt_name}']

grouped.head()

In [None]:
df_old = pd.read_csv("/home/ec2-user/SageMaker/efs/Projects/Qwen2/data/对话文本.csv")
df_old.head()

In [None]:
df_combine = pd.merge(df_old, grouped, on='contact_id')
df_combine.head()

In [None]:
df_combine = df_combine[['param_id', 'contact_id', 'content', f'content_{ckpt_name}', 'extracted_content']]

df_combine.to_csv(f"../../outputs/transcripts_{ckpt_name}.csv", index=False)

# Delete endpoint

In [None]:
# predictor.delete_model()
# predictor.delete_endpoint()

You may need to slice the audio files by seconds. Here are python to do the job

https://github.com/Mohamedhany99/Audio-Splitter-per-seconds-python-/blob/main/Splitter.py
https://www.geeksforgeeks.org/cut-a-mp3-file-in-python/