<a href="https://colab.research.google.com/github/Manpreetkour95/NLP/blob/main/parallel_inference.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import concurrent.futures
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
def perform_parallel_inference(texts, num_cores=8):


    # Function to perform inference on a single text
    def predict(text):
        return model.encode(text)

    # Split the input texts into chunks for parallel processing
    chunk_size = len(texts) // num_cores
    text_chunks = [texts[i:i+chunk_size] for i in range(0, len(texts), chunk_size)]

    # Perform parallel inference
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_cores) as executor:
        embeddings = list(executor.map(predict, text_chunks))

    # Combine results from different threads
    embeddings = np.concatenate(embeddings, axis=0)

    return embeddings

# Load the SentenceTransformer model (replace with your BERT model)
# model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# Sample input data
texts = ["This is a sample sentence.", "Another example sentence.", "BERT model parallelization.",
         "This is a sample sentence.", "Another example sentence.", "BERT model parallelization.",
         "This is a sample sentence.", "Another example sentence.", "BERT model parallelization.",
         "This is a sample sentence.", "Another example sentence.", "BERT model parallelization."]*8

# Perform parallel inference and print the embeddings
embeddings = perform_parallel_inference(texts)
print(embeddings)

In [None]:
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
import numpy as np
import onnxruntime as ort
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch.nn.functional as F
import asyncio
from pydantic import BaseModel
from typing import List

app = FastAPI()

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained('optimum/all-MiniLM-L6-v2')

# Load the ONNX model
onnx_model_path = "onnx/model.onnx"

# note: for bool type options in python API, set them as False/True
providers = [
    ('TensorrtExecutionProvider', {
        'device_id': 0,
        'trt_max_workspace_size': 2147483648,
        'trt_fp16_enable': True,
    }),
    ('CUDAExecutionProvider', {
        'device_id': 0,
        'arena_extend_strategy': 'kNextPowerOfTwo',
        'gpu_mem_limit': 2 * 1024 * 1024 * 1024,
        'cudnn_conv_algo_search': 'EXHAUSTIVE',
        'do_copy_in_default_stream': True,
    })
]

sess_opt = ort.SessionOptions()
ort_session = ort.InferenceSession(onnx_model_path, sess_options=sess_opt, providers=providers)

tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')


class InputText(BaseModel):
    text_list: List[str]

async def make_predictions_onnx(input_ids,attention_mask,token_type_ids):
    ort_inputs = {
        'input_ids': input_ids.cpu().numpy(),
        'attention_mask': attention_mask.cpu().numpy(),
        "token_type_ids":token_type_ids.cpu().numpy()
    }
    ort_outputs = ort_session.run(None, ort_inputs)
    return torch.tensor(ort_outputs[0]), attention_mask.cpu().numpy()


async def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)


async def process_text(text):
        encoded_input = tokenizer(text, padding=True, truncation=True, return_tensors='pt')
        model_output=await make_predictions_onnx(encoded_input['input_ids'],encoded_input['attention_mask'],encoded_input['token_type_ids'])
        sentence_embeddings = await mean_pooling(model_output, encoded_input['attention_mask'])
        sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
        return sentence_embeddings.tolist()

import time

@app.post("/predict/")
async def predict_texts(input_data: InputText):
    input_texts = input_data.text_list
    print(input_texts)

    start_time = time.time()  # Record the start time

    tasks = [process_text(text) for text in input_texts]
    print("here")
    results = await asyncio.gather(*tasks)

    elapsed_time = (time.time() - start_time) * 1000  # Calculate overall elapsed time in milliseconds
    print(f"Overall time taken: {elapsed_time:.2f} ms")
    return results


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)




In [None]:
From nvcr.io/nvidia/tensorrt:22.08-py3

COPY ./app.py ./

COPY ./onnx ./onnx

RUN pip install onnxruntime_gpu==1.12.0 uvicorn transformers fastapi

In [None]:
import requests
import torch
# Replace this URL with the appropriate endpoint if you are running on a different host or port
url = "http://172.17.0.2:8000/predict/"

# Example input data
input_data = {"text_list": ["My name is MK"]}

# Send a POST request to the /predict/ endpoint
response = requests.post(url, json=input_data)

# Print the response
print(response.status_code)
if response.status_code == 200:
    for text in response.json():
        print("here")
        print(torch.tensor(text))

In [None]:
import numpy as np
from io import BytesIO
from io import StringIO
import boto3
from boto3.session import Session
import pandas as pd
from opensearchpy import OpenSearch, RequestsHttpConnection
import time
from opensearchpy.helpers import parallel_bulk

client_type = 's3'
account_id = '757738260077'
role = 'DeveloperRole'
region = 'us-east-1'
index_name = 'vector_search_22feb'

def get_sts_session(account_id, role, region):
    sts_client = boto3.client('sts')
    role_arn = f'arn:aws:iam::{account_id}:role/{role}'
    role_session_name = f'{role}-Session'
    response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName=role_session_name)
    return Session(
        aws_access_key_id=response['Credentials']['AccessKeyId'],
        aws_secret_access_key=response['Credentials']['SecretAccessKey'],
        aws_session_token=response['Credentials']['SessionToken'],
        region_name=region
    )

def get_s3_objects(s3_client, s3_bucket, s3_directory):
    response = s3_client.list_objects(Bucket=s3_bucket, Prefix=s3_directory)
    return [obj['Key'] for obj in response.get('Contents', [])]

def initialize_opensearch(aos_host):
    return OpenSearch(
        hosts=[{'host': aos_host, 'port': 443}],
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        timeout=90
    )

def generate_data(text_data, vector_sentences):
    for text, vector in zip(text_data, vector_sentences):
        yield {
            "_op_type": "index",
            "_index": index_name,
            "_source": {"nameadd_vector": vector, "nameadd": text}
        }

def parallel_bulk_indexing(aos_client, text_data, vector_sentences):
    for success, info in parallel_bulk(
        client=aos_client,
        index=index_name,
        actions=generate_data(text_data, vector_sentences),
        timeout=6,
        thread_count=4,
        chunk_size=5000,
        raise_on_error=True,
        raise_on_exception=True
    ):
        if not success:
            print('A document failed:', info)

# Your provided code snippet
s3_bucket = 'iwave-datalake-dev'
s3_directory = 'prebuilt_profiles/master_list/master_20231101'
s3_session = get_sts_session(account_id, role, region)
s3_client = s3_session.client(client_type)
list_key = get_s3_objects(s3_client, s3_bucket, s3_directory)
#Skipping files which does not have vemb
list_key = list_key[0:361]+list_key[362:488]+list_key[489::]
print(len(list_key))
print(list_key[-1])

s3_directory = 'Vector-embeddings/vemb_20231101'
list_key1 = get_s3_objects(s3_client, s3_bucket, s3_directory)
print(len(list_key1))
print(list_key1[-1])

# Initializing OpenSearch client
aos_host_sandbox = 'vpc-iwave-os-cluster-prebuilt-tioaie3iiezmt3w3jym2ov7cry.us-east-1.es.amazonaws.com'
aos_client = initialize_opensearch(aos_host_sandbox)
# Your provided code snippet with parallel bulk indexing
#num_threads = 4  # Adjust as needed
for key, key1 in zip(list_key[1:2], list_key1[1:2]):
    print(key)
    print(key1)
    start_time = time.time()
    df = pd.DataFrame()
    response = s3_client.get_object(Bucket=s3_bucket, Key=key)
    data = response['Body'].read().decode('utf-8')

    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep='|')
    df = df.fillna('')
    df['nameadd'] = (df['Prefix'] + ' ' + df['FirstName'] + ' ' + df['MiddleName_Initial'] + ' ' +
                     df['LastName'] + ' ' + df['Address1'] + ' ' + df['City'] + ' ' +
                     df['State_Province'] + ' ' + df['Country'] + ' ' + df['ZIP_PostalCode'])
    text_data = df['nameadd'].to_list()

    response = s3_client.get_object(Bucket=s3_bucket, Key=key1)
    data = response['Body'].read()
    bytes_io = BytesIO(data)
    vector_sentences = np.load(bytes_io)

    print('adding')
    parallel_bulk_indexing(aos_client, text_data, vector_sentences)

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Time elapsed: {elapsed_time} seconds")