# Deploy ML Model in OSS

In [1]:
import os
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [2]:
import requests
import json

def make_requests(protocol, host, port, path, method, body=None, header=None):
    try:
        response = None
        url = f"{protocol}://{host}:{port}{path}"
        if method == 'GET':
            response = requests.get(url, headers=header)
        if method == 'POST':
            response = requests.post(url, data=json.dumps(body), headers=header)
        if method == 'PUT':
            response = requests.put(url, data=json.dumps(body), headers=header)
        if method == 'DELETE':
            response = requests.delete(url, headers=header)
        return response
    except Exception as ex:
        logger.error(f"exception: {str(ex)}")
        raise ex

In [3]:
protocol = 'http'
host = '3.93.0.242'
port = 9200

header = {
    'Content-Type': 'application/json'
}

# Arguments

In [4]:
MODEL_GROUP = "search_v1_model_group"
MODEL_GROUP_DESCRIPTION = "search_v1_model_group"
MODELS = [
    {
        "MODEL_NAME":"huggingface/sentence-transformers/msmarco-distilbert-base-tas-b",
        "MODEL_VERSION":"1.0.2",
        "MODEL_FORMAT":"TORCH_SCRIPT",
        "MODEL_TYPE": "dense"
    },
    {
        "MODEL_NAME":"amazon/neural-sparse/opensearch-neural-sparse-encoding-v1",
        "MODEL_VERSION":"1.0.1",
        "MODEL_FORMAT":"TORCH_SCRIPT",
        "MODEL_TYPE": "sparse"
    }
]

PIPELINE_NAME = "default_ingestion_pipeline"
FIELD_MAP = {
    "huggingface/sentence-transformers/msmarco-distilbert-base-tas-b": "bert_embeddings",
    "amazon/neural-sparse/opensearch-neural-sparse-encoding-v1": "oss_sparse_embeddings"
}

SEARCH_PIPELINE_NAME='default_search_pipeline'

# Set Cluster Settings

In [5]:
SET_CLUSTER_SETTINGS_PATH = "/_cluster/settings"
SET_CLUSTER_SETTINGS_BODY = {
  "persistent": {
    "plugins": {
      "ml_commons": {
        "only_run_on_ml_node": "false",
        "model_access_control_enabled": "true",
        "native_memory_threshold": "99"
      }
    }
  },
  "transient": {
    "plugins.ml_commons.model_access_control_enabled": "true"
  }
}

response = make_requests(protocol, host, port, SET_CLUSTER_SETTINGS_PATH, 'PUT', body=SET_CLUSTER_SETTINGS_BODY, header=header)
logger.info(f"SET_CLUSTER_SETTINGS response: {response.status_code} body: {response.json()}")

INFO:__main__:SET_CLUSTER_SETTINGS response: 200 body: {'acknowledged': True, 'persistent': {'plugins': {'ml_commons': {'only_run_on_ml_node': 'false', 'model_access_control_enabled': 'true', 'native_memory_threshold': '99'}}}, 'transient': {'plugins': {'ml_commons': {'model_access_control_enabled': 'true'}}}}


# Register Model Group

In [6]:
REGISTER_MODEL_GROUP_PATH = "/_plugins/_ml/model_groups/_register"
REGISTER_MODEL_GROUP_BODY = {
  "name": MODEL_GROUP,
  "description": MODEL_GROUP_DESCRIPTION
}

response = make_requests(protocol, host, port, REGISTER_MODEL_GROUP_PATH, 'POST', body=REGISTER_MODEL_GROUP_BODY, header=header)
logger.info(f"REGISTER_MODEL_GROUP response: {response.status_code} body: {response.json()}")
MODEL_GROUP_ID = response.json()['model_group_id']

INFO:__main__:REGISTER_MODEL_GROUP response: 200 body: {'model_group_id': '0FlCF5EBMkhoZOcBzWSd', 'status': 'CREATED'}


In [7]:
TEST_MODEL_GROUP_PATH = "/_plugins/_ml/model_groups/_search"
TEST_MODEL_GROUP_BODY = {
  "query": {
    "match": {
      "_id": MODEL_GROUP_ID
    }
  }
}
response = make_requests(protocol, host, port, TEST_MODEL_GROUP_PATH, 'POST', body=TEST_MODEL_GROUP_BODY, header=header)
logger.info(f"TEST_MODEL_GROUP response: {response.status_code} body: {response.json()}")

INFO:__main__:TEST_MODEL_GROUP response: 200 body: {'took': 14, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': '.plugins-ml-model-group', '_id': '0FlCF5EBMkhoZOcBzWSd', '_version': 1, '_seq_no': 0, '_primary_term': 1, '_score': 1.0, '_source': {'created_time': 1722672139544, 'access': 'public', 'latest_version': 0, 'last_updated_time': 1722672139544, 'name': 'search_v1_model_group', 'description': 'search_v1_model_group'}}]}}


# Register Model

In [8]:
MODEL_TASK_IDS = []

for m in MODELS:
    REGISTER_MODEL_PATH = "/_plugins/_ml/models/_register"
    REGISTER_MODEL_BODY = {
      "name": m['MODEL_NAME'],
      "version": m['MODEL_VERSION'],
      "model_group_id": MODEL_GROUP_ID,
      "model_format": m['MODEL_FORMAT']
    }
    
    response = make_requests(protocol, host, port, REGISTER_MODEL_PATH, 'POST', body=REGISTER_MODEL_BODY, header=header)
    logger.info(f"REGISTER_MODEL response: {response.status_code} body: {response.json()}")
    MODEL_TASK_IDS.append(response.json()['task_id'])

logger.info(f"task ids {MODEL_TASK_IDS}")

INFO:__main__:REGISTER_MODEL response: 200 body: {'task_id': '0VlCF5EBMkhoZOcB92RQ', 'status': 'CREATED'}
INFO:__main__:REGISTER_MODEL response: 200 body: {'task_id': '0llCF5EBMkhoZOcB-WRa', 'status': 'CREATED'}
INFO:__main__:task ids ['0VlCF5EBMkhoZOcB92RQ', '0llCF5EBMkhoZOcB-WRa']


In [11]:
MODEL_IDS = []
MODEL_ID_MAP = {}
for idx, t in enumerate(MODEL_TASK_IDS):
    TEST_MODEL_PATH = f"/_plugins/_ml/tasks/{t}"
    response = make_requests(protocol, host, port, TEST_MODEL_PATH, 'GET', header=header)
    logger.info(f"TEST_MODEL response: {response.status_code} body: {response.json()}")
    data = response.json()
    if 'state' in data and data['state'] == 'COMPLETED':
        MODEL_ID_MAP[data['model_id']] = MODELS[idx]['MODEL_NAME']
        MODEL_IDS.append(data['model_id'])
logger.info(f"model ids {MODEL_IDS}")

INFO:__main__:TEST_MODEL response: 200 body: {'model_id': '1FlCF5EBMkhoZOcB-mQ9', 'task_type': 'REGISTER_MODEL', 'function_name': 'TEXT_EMBEDDING', 'state': 'COMPLETED', 'worker_node': ['bGY9drC5T6S-_poyFGzDQQ'], 'create_time': 1722672150244, 'last_update_time': 1722672198324, 'is_async': True}
INFO:__main__:TEST_MODEL response: 200 body: {'model_id': '01lCF5EBMkhoZOcB-mQf', 'task_type': 'REGISTER_MODEL', 'function_name': 'SPARSE_ENCODING', 'state': 'COMPLETED', 'worker_node': ['bGY9drC5T6S-_poyFGzDQQ'], 'create_time': 1722672150873, 'last_update_time': 1722672222606, 'is_async': True}
INFO:__main__:model ids ['1FlCF5EBMkhoZOcB-mQ9', '01lCF5EBMkhoZOcB-mQf']


# Deploy Models

In [12]:
MODEL_DEPLOY_TASK_IDS = []

for m in MODEL_IDS:
    DEPLOY_MODEL_PATH = f"/_plugins/_ml/models/{m}/_deploy"
    response = make_requests(protocol, host, port, DEPLOY_MODEL_PATH, 'POST', body={}, header=header)
    logger.info(f"DEPLOY_MODEL response: {response.status_code} body: {response.json()}")
    MODEL_DEPLOY_TASK_IDS.append(response.json()['task_id'])

logger.info(f"task ids {MODEL_DEPLOY_TASK_IDS}")

INFO:__main__:DEPLOY_MODEL response: 200 body: {'task_id': '1VlFF5EBMkhoZOcBgmS6', 'task_type': 'DEPLOY_MODEL', 'status': 'CREATED'}
INFO:__main__:DEPLOY_MODEL response: 200 body: {'task_id': '1llFF5EBMkhoZOcBhGTW', 'task_type': 'DEPLOY_MODEL', 'status': 'CREATED'}
INFO:__main__:task ids ['1VlFF5EBMkhoZOcBgmS6', '1llFF5EBMkhoZOcBhGTW']


In [15]:
for t in MODEL_DEPLOY_TASK_IDS:
    TEST_MODEL_DEPLOY_TASK_PATH = f"/_plugins/_ml/tasks/{t}"
    response = make_requests(protocol, host, port, TEST_MODEL_DEPLOY_TASK_PATH, 'GET', header=header)
    logger.info(f"TEST_MODEL response: {response.status_code} body: {response.json()}")
    data = response.json()
    if 'state' in data and data['state'] == 'COMPLETED':
        logger.info(f"task id {t} is {data['state']}")

INFO:__main__:TEST_MODEL response: 200 body: {'model_id': '1FlCF5EBMkhoZOcB-mQ9', 'task_type': 'DEPLOY_MODEL', 'function_name': 'TEXT_EMBEDDING', 'state': 'COMPLETED', 'worker_node': ['bGY9drC5T6S-_poyFGzDQQ'], 'create_time': 1722672317109, 'last_update_time': 1722672344443, 'is_async': True}
INFO:__main__:task id 1VlFF5EBMkhoZOcBgmS6 is COMPLETED
INFO:__main__:TEST_MODEL response: 200 body: {'model_id': '01lCF5EBMkhoZOcB-mQf', 'task_type': 'DEPLOY_MODEL', 'function_name': 'SPARSE_ENCODING', 'state': 'COMPLETED', 'worker_node': ['bGY9drC5T6S-_poyFGzDQQ'], 'create_time': 1722672317653, 'last_update_time': 1722672352803, 'is_async': True}
INFO:__main__:task id 1llFF5EBMkhoZOcBhGTW is COMPLETED


# Deploy ReRanker Model

In [16]:
CROSS_ENCODER_MODELS = [
    {
        "MODEL_NAME":"huggingface/cross-encoders/ms-marco-MiniLM-L-6-v2",
        "MODEL_VERSION":"1.0.2",
        "MODEL_FORMAT":"TORCH_SCRIPT",
        "MODEL_TYPE": "cross-encoder"
    }
]

# Register Cross Encoder Models

In [17]:
CE_MODEL_TASK_IDS = []

for m in CROSS_ENCODER_MODELS:
    REGISTER_MODEL_PATH = "/_plugins/_ml/models/_register"
    REGISTER_MODEL_BODY = {
      "name": m['MODEL_NAME'],
      "version": m['MODEL_VERSION'],
      "model_group_id": MODEL_GROUP_ID,
      "model_format": m['MODEL_FORMAT']
    }
    
    response = make_requests(protocol, host, port, REGISTER_MODEL_PATH, 'POST', body=REGISTER_MODEL_BODY, header=header)
    logger.info(f"REGISTER_MODEL response: {response.status_code} body: {response.json()}")
    CE_MODEL_TASK_IDS.append(response.json()['task_id'])

logger.info(f"task ids {CE_MODEL_TASK_IDS}")

INFO:__main__:REGISTER_MODEL response: 200 body: {'task_id': '11lGF5EBMkhoZOcBmmTL', 'status': 'CREATED'}
INFO:__main__:task ids ['11lGF5EBMkhoZOcBmmTL']


In [20]:
CE_MODEL_IDS = []
for idx, t in enumerate(CE_MODEL_TASK_IDS):
    TEST_MODEL_PATH = f"/_plugins/_ml/tasks/{t}"
    response = make_requests(protocol, host, port, TEST_MODEL_PATH, 'GET', header=header)
    logger.info(f"TEST_MODEL response: {response.status_code} body: {response.json()}")
    data = response.json()
    if 'state' in data and data['state'] == 'COMPLETED':
        CE_MODEL_IDS.append(data['model_id'])
logger.info(f"model ids {CE_MODEL_IDS}")

INFO:__main__:TEST_MODEL response: 200 body: {'model_id': '2FlGF5EBMkhoZOcBm2RT', 'task_type': 'REGISTER_MODEL', 'function_name': 'TEXT_SIMILARITY', 'state': 'COMPLETED', 'worker_node': ['bGY9drC5T6S-_poyFGzDQQ'], 'create_time': 1722672388810, 'last_update_time': 1722672398964, 'is_async': True}
INFO:__main__:model ids ['2FlGF5EBMkhoZOcBm2RT']


# Deploy ReRanker Models

In [21]:
CE_MODEL_DEPLOY_TASK_IDS = []

for m in CE_MODEL_IDS:
    DEPLOY_MODEL_PATH = f"/_plugins/_ml/models/{m}/_deploy"
    response = make_requests(protocol, host, port, DEPLOY_MODEL_PATH, 'POST', body={}, header=header)
    logger.info(f"DEPLOY_MODEL response: {response.status_code} body: {response.json()}")
    CE_MODEL_DEPLOY_TASK_IDS.append(response.json()['task_id'])

logger.info(f"task ids {CE_MODEL_DEPLOY_TASK_IDS}")

INFO:__main__:DEPLOY_MODEL response: 200 body: {'task_id': '2VlGF5EBMkhoZOcB02SX', 'task_type': 'DEPLOY_MODEL', 'status': 'CREATED'}
INFO:__main__:task ids ['2VlGF5EBMkhoZOcB02SX']


In [22]:
for t in CE_MODEL_DEPLOY_TASK_IDS:
    TEST_MODEL_DEPLOY_TASK_PATH = f"/_plugins/_ml/tasks/{t}"
    response = make_requests(protocol, host, port, TEST_MODEL_DEPLOY_TASK_PATH, 'GET', header=header)
    logger.info(f"TEST_MODEL response: {response.status_code} body: {response.json()}")
    data = response.json()
    if 'state' in data and data['state'] == 'COMPLETED':
        logger.info(f"task id {t} is {data['state']}")

INFO:__main__:TEST_MODEL response: 200 body: {'model_id': '2FlGF5EBMkhoZOcBm2RT', 'task_type': 'DEPLOY_MODEL', 'function_name': 'TEXT_SIMILARITY', 'state': 'COMPLETED', 'worker_node': ['bGY9drC5T6S-_poyFGzDQQ'], 'create_time': 1722672403350, 'last_update_time': 1722672405415, 'is_async': True}
INFO:__main__:task id 2VlGF5EBMkhoZOcB02SX is COMPLETED


# Create Ingestion Pipeline

In [23]:
processors = []
if len(MODEL_IDS) > 0:
    for i, m in enumerate(MODEL_IDS):
        if MODELS[i]['MODEL_TYPE'] == 'dense':
            processors.append({
                "text_embedding": {
                    "model_id": m,
                    "field_map": {
                      "text": FIELD_MAP[MODEL_ID_MAP[m]]
                    }
              }
            })
        else:
            processors.append({
                "sparse_encoding": {
                    "model_id": m,
                    "field_map": {
                      "text": FIELD_MAP[MODEL_ID_MAP[m]]
                    }
              }
            })
CREATE_PIPELINE_BODY = {
    "description": PIPELINE_NAME,
    "processors": processors
}
CREATE_PIPELINE_PATH = F"/_ingest/pipeline/{PIPELINE_NAME}"
response = make_requests(protocol, host, port, CREATE_PIPELINE_PATH, 'PUT', body=CREATE_PIPELINE_BODY, header=header)
logger.info(f"CREATE_PIPELINE response: {response.status_code} body: {response.json()}")

INFO:__main__:CREATE_PIPELINE response: 200 body: {'acknowledged': True}


In [24]:
GET_PIPELINE_PATH = F"/_ingest/pipeline"
response = make_requests(protocol, host, port, GET_PIPELINE_PATH, 'GET', header=header)
logger.info(f"GET_PIPELINE response: {response.status_code} body: {response.json()}")

INFO:__main__:GET_PIPELINE response: 200 body: {'default_ingestion_pipeline': {'description': 'default_ingestion_pipeline', 'processors': [{'text_embedding': {'model_id': '1FlCF5EBMkhoZOcB-mQ9', 'field_map': {'text': 'bert_embeddings'}}}, {'sparse_encoding': {'model_id': '01lCF5EBMkhoZOcB-mQf', 'field_map': {'text': 'oss_sparse_embeddings'}}}]}}


# Create Search Pipeline

In [76]:
response_processors = []
if len(CE_MODEL_IDS) > 0:
    for i, m in enumerate(CE_MODEL_IDS):
        response_processors.append({
                  "rerank": {
                    "ml_opensearch": {
                      "model_id": m
                    },
                    "context": {
                      "document_fields": [
                        "text"
                      ]
                    }
                  }
                })

request_processor = {}
if len(MODEL_IDS) > 0:
    for i, m in enumerate(MODEL_IDS):
        request_processor[FIELD_MAP[MODEL_ID_MAP[m]]]=m
request_processors = [
    {
        'neural_query_enricher': {
            'neural_field_default_id': request_processor
        }
    }
]
CREATE_SEARCH_PIPELINE_BODY = {
    "description": SEARCH_PIPELINE_NAME,
    "request_processors": request_processors,
    "response_processors": response_processors
}
SEARCH_CREATE_PIPELINE_PATH = F"/_search/pipeline/{SEARCH_PIPELINE_NAME}"
response = make_requests(protocol, host, port, SEARCH_CREATE_PIPELINE_PATH, 'PUT', body=CREATE_SEARCH_PIPELINE_BODY, header=header)
logger.info(f"CREATE_PIPELINE response: {response.status_code} body: {response.json()}")

INFO:__main__:CREATE_PIPELINE response: 200 body: {'acknowledged': True}


In [77]:
GET_PIPELINE_PATH = F"/_search/pipeline"
response = make_requests(protocol, host, port, GET_PIPELINE_PATH, 'GET', header=header)
logger.info(f"GET_PIPELINE response: {response.status_code} body: {response.json()}")

INFO:__main__:GET_PIPELINE response: 200 body: {'default_search_pipeline': {'description': 'default_search_pipeline', 'request_processors': [{'neural_query_enricher': {'neural_field_default_id': {'bert_embeddings': 'vERhsZABary_bsUAiOG2', 'oss_sparse_embeddings': 'vURhsZABary_bsUAieFJ'}}}], 'response_processors': [{'rerank': {'ml_opensearch': {'model_id': 'wURmsZABary_bsUAq-Hf'}, 'context': {'document_fields': ['text']}}}]}}


# Cleanup

In [None]:
import time
"""
Delete ingest pipeline
"""
response = make_requests(protocol, host, port, f"/_search/pipeline/{PIPELINE_NAME}", 'DELETE', header=header)
logger.info(f"response: {response.status_code} body: {response.json()}")

"""
Undeploy models
"""
for m in [*MODEL_IDS, *CE_MODEL_IDS]:
    response = make_requests(protocol, host, port, f"/_plugins/_ml/models/{m}/_undeploy", 'POST', body={}, header=header)
    logger.info(f"response: {response.status_code} body: {response.json()}")
    time.sleep(120)
    response = make_requests(protocol, host, port, f"/_plugins/_ml/models/{m}", 'DELETE', header=header)
    logger.info(f"response: {response.status_code} body: {response.json()}")

"""
Delete model Group
"""
response = make_requests(protocol, host, port, f"/_plugins/_ml/model_groups/{MODEL_GROUP_ID}", 'DELETE', header=header)
logger.info(f"response: {response.status_code} body: {response.json()}")