### Deploy semantic search using with finetuned model 
The deployment architecture includes: 
- Choose a pretrain BERT model, here we use all-MiniLM-L6-v2 model
- Save the ML models in S3 bucket
- Host the ML models using SageMaker endpoints 
- Create Vector index and load data into the index 
- Create API gateway handels queries from web applications and pass it to lambda 
- Create a Lambda function to call SageMaker endpoints to generate embeddings from user query, and send the query results back to API gateway 
- API gateway sends the search results to frontend, and return search results to the users 

![Semantic_search_finetuned_fullstack](image/Semantic_search_finetune_fullstack.png)

### 1.Preprocess and embed the text 

In [34]:
import pandas as pd 
from sentence_transformers import SentenceTransformer, util
import boto3
import torch
import argparse
from tqdm import tqdm
from io import BytesIO
from inference import model_fn, predict_fn

In [5]:
# Load metadata
def read_parquet_from_s3_as_df(region, s3_bucket, s3_key):
    """
    Load a Parquet file from an S3 bucket into a pandas DataFrame.

    Parameters:
    - region: AWS region where the S3 bucket is located.
    - s3_bucket: Name of the S3 bucket.
    - s3_key: Key (path) to the Parquet file within the S3 bucket.

    Returns:
    - df: pandas DataFrame containing the data from the Parquet file.
    """

    # Setup AWS session and clients
    session = boto3.Session(region_name=region)
    s3 = session.resource('s3')

    # Load the Parquet file as a pandas DataFrame
    object = s3.Object(s3_bucket, s3_key)
    body = object.get()['Body'].read()
    df = pd.read_parquet(io.BytesIO(body))
    return df


# Upload the duplicate date to S3 as a parquet file 
def upload_df_to_s3_as_parquet(df, bucket_name, file_key):
    # Save DataFrame as a Parquet file locally
    parquet_file_path = 'temp.parquet'
    df.to_parquet(parquet_file_path)

    # Create an S3 client
    s3_client = boto3.client('s3')

    # Upload the Parquet file to S3 bucket
    try:
        response = s3_client.upload_file(parquet_file_path, bucket_name, file_key)
        os.remove(parquet_file_path)
        print(f'Uploading {file_key} to {bucket_name} as parquet file')
        # Delete the local Parquet file
        return True
    except Exception as e:
        print(e)
        return False

# Create new column 'organization_en' required by the API JSON response 
def extract_organisation_en(contact_str):
    try:
        # Parse the stringified JSON into Python objects
        contact_data = json.loads(contact_str)
        # If the parsed data is a list, iterate through it
        if isinstance(contact_data, list):
            for item in contact_data:
                # Check if 'organisation' and 'en' keys exist
                if 'organisation' in item and 'en' in item['organisation']:
                    return item['organisation']['en']
        elif isinstance(contact_data, dict):
            # If the data is a dictionary, extract 'organisation' in 'en' directly
            return contact_data.get('organisation', {}).get('en', None)
    except json.JSONDecodeError:
        # Handle cases where the contact string is not valid JSON
        return None
    except Exception as e:
        # Catch-all for any other unexpected errors
        return f"Error: {str(e)}"


# Text preprocess
def preprocess_records_into_text(df):
    selected_columns = ['features_properties_title_en','features_properties_description_en','features_properties_keywords_en']
    df = df[selected_columns]
    return df.apply(lambda x: f"{x['features_properties_title_en']}\n{x['features_properties_description_en']}\nkeywords:{x['features_properties_keywords_en']}",axis=1 )


In [67]:
#1) Step1: Load the data 
df_parquet = read_parquet_from_s3_as_df('ca-central-1', 'webpresence-geocore-geojson-to-parquet-dev', 'records.parquet')
df_sentinel1 = read_parquet_from_s3_as_df('ca-central-1', 'webpresence-geocore-geojson-to-parquet-dev', 'sentinel1.parquet')
df = pd.concat([df_parquet, df_sentinel1], ignore_index=True)

#2) Step2: Clean the data  
col_names_list = [
    'features_properties_id','features_geometry_coordinates','features_properties_title_en',
    'features_properties_description_en','features_properties_date_published_date',
    'features_properties_keywords_en','features_properties_options','features_properties_contact',
    'features_properties_topicCategory','features_properties_date_created_date',
    'features_properties_spatialRepresentation','features_properties_type',
    'features_properties_temporalExtent_begin','features_properties_temporalExtent_end',
    'features_properties_graphicOverview','features_properties_language','features_popularity',
    'features_properties_sourceSystemName','features_properties_eoCollection',
    'features_properties_eoFilters'
]
df_en = df[col_names_list]
df_en['organisation_en'] = df_en['features_properties_contact'].apply(extract_organisation_en)

# Create a new column 'temporalExtent' as a dictionary of {'begin': ..., 'end': ...}
values_to_replace = {'Present': None, 'Not Available; Indisponible': None}
columns_to_replace = ['features_properties_temporalExtent_begin', 'features_properties_temporalExtent_end']
df_en[columns_to_replace] = df_en[columns_to_replace].replace(values_to_replace)

df_en['temporalExtent'] = df_en.apply(lambda row: {'begin': row['features_properties_temporalExtent_begin'], 'end': row['features_properties_temporalExtent_end']}, axis=1)
df_en = df_en.drop(columns =['features_properties_temporalExtent_begin', 'features_properties_temporalExtent_end'])

values_to_replace = {'Not Available; Indisponible': None} # modifies dates to acceptable values
columns_to_replace = ['features_properties_date_published_date', 'features_properties_date_created_date']
df_en[columns_to_replace] = df_en[columns_to_replace].replace(values_to_replace)

#3) Step 3: Preprocess text 
df_en['text'] = preprocess_records_into_text(df_en)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_en['organisation_en'] = df_en['features_properties_contact'].apply(extract_organisation_en)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_en[columns_to_replace] = df_en[columns_to_replace].replace(values_to_replace)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_en['temporalExtent'] = df_

In [69]:
df_en.describe()
print(type(df_en['text'].head(6)[1]))
print(df_en['text'].head(6)[1])


<class 'str'>
Canadian Digital Elevation Model, 1945-2011
This collection is a legacy product that is no longer supported. It may not meet current government standards.\n\nThe Canadian Digital Elevation Model (CDEM) is part of Natural Resources Canada's altimetry system designed to better meet the users' needs for elevation data and products.\n\nThe CDEM stems from the existing Canadian Digital Elevation Data (CDED). In these data, elevations can be either ground or reflective surface elevations.\n\nA CDEM mosaic can be obtained for a pre-defined or user-defined extent. The coverage and resolution of a mosaic varies according to latitude and to the extent of the requested area.\nDerived products such as slope, shaded relief and colour shaded relief maps can also be generated on demand by using the Geospatial-Data Extraction tool. Data can then be saved in many formats.\n\nThe pre-packaged GeoTiff datasets are based on the National Topographic System of Canada (NTS) at the 1:250 000 sca

In [71]:
# Step 4: Embedding text 
tqdm.pandas()
model_directory ="/home/ec2-user/SageMaker/semantic-search-with-aws-opensearch/src/model/all-mpnet-base-v2-mpf-huggingface"
model = model_fn(model_directory)
df_en['vector'] = df_en['text'].progress_apply(lambda x: predict_fn({"inputs": x}, model))

Some weights of the model checkpoint at /home/ec2-user/SageMaker/semantic-search-with-aws-opensearch/src/model/all-mpnet-base-v2-mpf-huggingface were not used when initializing MPNetModel: ['pooler.dense.bias', 'pooler.dense.weight']
- This IS expected if you are initializing MPNetModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing MPNetModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
100%|██████████| 63661/63661 [2:12:54<00:00,  7.98it/s]   


In [75]:
vector = df_en['vector'].head(6) 
#print(vector[0])
print(type(vector[0]))
print(len(vector[0]))
print(df_en['vector'].shape)

<class 'list'>
768
(63661,)


In [84]:
# Step 5 Upload the embeddings as a parquet file to S3 bucket 
upload_df_to_s3_as_parquet(df=df_en, bucket_name='webpresence-nlp-data-preprocessing-dev', file_key='semantic_search_embeddings.parquet') 


Uploading semantic_search_embeddings.parquet to webpresence-nlp-data-preprocessing-dev as parquet file


True

### 2. Create OpenSearch index and load text/vector data into the index 


In [None]:
# import json
# import time
# import boto3

# from tqdm import tqdm
# from urllib.parse import urlparse
# from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from opensearch import get_awsauth_from_secret, create_opensearch_connection, delete_aos_index_if_exists, load_data_to_opensearch_index
from Preprocess_and_embed_text import read_parquet_from_s3_as_df
import argparse

In [None]:
#Optional: read the embedding data from the S3 bucket 
df_en = read_parquet_from_s3_as_df('ca-central-1', 'webpresence-nlp-data-preprocessing-dev', 'semantic_search_embeddings.parquet')

In [85]:
# Create a opensearch connection 
# region = environ['MY_AWS_REGION']
# aos_host = environ['OS_ENDPOINT'] 
# os_secret_id = environ['OS_SECRET_ID']

region = "ca-central-1"
aos_host = "search-semantic-search-dfcizxxxuj62dusl5skmeu3czu.ca-central-1.es.amazonaws.com"
os_secret_id = "dev/OpenSearch/SemanticSearch"

awsauth = get_awsauth_from_secret(region, secret_id=os_secret_id)
aos_client =create_opensearch_connection(aos_host, awsauth)

Connection to OpenSearch established: <OpenSearch([{'host': 'search-semantic-search-dfcizxxxuj62dusl5skmeu3czu.ca-central-1.es.amazonaws.com', 'port': 443}])>


In [86]:
#Create an index 
index_name = "mpnet-mpf-knn"
knn_index = {
    "settings": {
        "index.knn": True, #This enables the k-nearest neighbor (KNN) search capability on the index.
        "index.knn.space_type": "cosinesimil", #cosine similarity 
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },
    "mappings": {
        "properties": {
            "vector": {
                "type": "knn_vector",
                "dimension": 768,
                "store": True
            },
            "coordinates":{
              "type": "geo_shape", 
              "store": True 
            }  
        }
    }
}

In [87]:
#Delete index if it exists 
delete_aos_index_if_exists(aos_client, index_to_delete=index_name)

Current indexes: ['nlp_knn', '.opensearch-observability', '.plugins-ml-config', 'search', 'keyword_search', '.ql-datasources', '.opendistro_security', '.kibana_1', 'mpnet-mpf-knn']
Deleted index: mpnet-mpf-knn
Response: {'acknowledged': True}
Indexes after deletion attempt: ['nlp_knn', '.opensearch-observability', '.plugins-ml-config', 'search', 'keyword_search', '.ql-datasources', '.opendistro_security', '.kibana_1']


In [88]:
#Create a index 
aos_client.indices.create(index=index_name,body=knn_index,ignore=400)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'mpnet-mpf-knn'}

In [89]:
#Load data to OpenSearch Index 
load_data_to_opensearch_index(df_en, aos_client, index_name)

Indexing Records:   9%|▉         | 5674/63661 [01:34<14:32, 66.49it/s]  

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')
RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:   9%|▉         | 5702/63661 [01:35<14:34, 66.25it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')
RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:   9%|▉         | 5746/63661 [01:35<13:57, 69.14it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')
RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:   9%|▉         | 5768/63661 [01:36<14:21, 67.17it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  10%|▉         | 6271/63661 [01:43<15:33, 61.48it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  10%|▉         | 6358/63661 [01:45<13:04, 73.04it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 6780/63661 [01:51<14:22, 65.93it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 6831/63661 [01:52<14:08, 67.01it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 6981/63661 [01:54<14:07, 66.85it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 7011/63661 [01:55<14:04, 67.11it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 7035/63661 [01:55<14:46, 63.86it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 7067/63661 [01:56<13:51, 68.06it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  11%|█         | 7126/63661 [01:56<11:44, 80.24it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  12%|█▏        | 7901/63661 [02:09<12:50, 72.39it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  13%|█▎        | 8142/63661 [02:13<13:01, 71.05it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  13%|█▎        | 8348/63661 [02:16<14:52, 61.97it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')
RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  13%|█▎        | 8368/63661 [02:17<17:12, 53.55it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  13%|█▎        | 8381/63661 [02:17<16:03, 57.39it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  13%|█▎        | 8404/63661 [02:17<13:59, 65.82it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  13%|█▎        | 8438/63661 [02:18<15:19, 60.07it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  14%|█▎        | 8610/63661 [02:21<20:08, 45.54it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  14%|█▎        | 8687/63661 [02:22<14:37, 62.62it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  14%|█▍        | 9034/63661 [02:29<13:50, 65.74it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  14%|█▍        | 9170/63661 [02:31<14:35, 62.26it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  14%|█▍        | 9212/63661 [02:32<15:44, 57.64it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  15%|█▍        | 9254/63661 [02:33<15:23, 58.92it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  15%|█▍        | 9434/63661 [02:36<13:56, 64.80it/s]

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records:  53%|█████▎    | 33567/63661 [09:26<13:45, 36.46it/s]  

RequestError(400, 'mapper_parsing_exception', 'failed to parse field [coordinates] of type [geo_shape]')


Indexing Records: 100%|██████████| 63661/63661 [18:19<00:00, 57.91it/s]


ConnectionTimeout: ConnectionTimeout caused by - ReadTimeout(HTTPSConnectionPool(host='search-semantic-search-dfcizxxxuj62dusl5skmeu3czu.ca-central-1.es.amazonaws.com', port=443): Read timed out. (read timeout=10))

In [90]:
#Check 
res = aos_client.search(index=index_name, body={"query": {"match_all": {}}})
print(f"Records loaded into the index {index_name} is {res['hits']['total']['value']}.")

Records loaded into the index mpnet-mpf-knn is 10000.


### 3. Deploy all-mpnet-base-v2-mpf-huggingface model using sagemaker 

In [None]:
import boto3
import re
import time
import sagemaker
from sagemaker import get_execution_role
from sagemaker.huggingface.model import HuggingFaceModel

In [2]:
sagemaker_session = sagemaker.Session()
inputs = sagemaker_session.upload_data(path='model/all-mpnet-base-v2-mpf-huggingface.tar.gz', key_prefix='sentence-transformers-model')
print(f"Response from model upload: {inputs}") 

# Create a SageMaker session and get the execution role to be used later 
role = sagemaker.get_execution_role()

# Deploy with model data 
hub = {
    'HF_TASK':'feature-extraction'
}

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
   model_data=inputs,  # path to your trained SageMaker model
   role=role,                                            # IAM role with permissions to create an endpoint
   transformers_version="4.26",                           # Transformers version used
   pytorch_version="1.13",                                # PyTorch version used
   py_version='py39',                                    # Python version used
   env=hub
)

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
   initial_instance_count=1,
   instance_type="ml.t2.medium",
   endpoint_name = f'all-mpnet-base-v2-mpf-huggingface-test'
)

# example request: you always need to define "inputs"
data = {"inputs":" Today is a sunny and nice day in Ottawa"} 
# request
vector = predictor.predict(data)
len(vector)

Response from model upload: s3://sagemaker-ca-central-1-006288227511/sentence-transformers-model/all-mpnet-base-v2-mpf-huggingface.tar.gz
----------------------!

768

### 4. Test the model endpoints and perform search in OpenSearch index 

In [39]:
from sagemaker_fn import invoke_sagemaker_endpoint

In [41]:
endpoint_name ='all-mpnet-base-v2-mpf-huggingface-test'
payload = {"inputs": "floods event in Canada"}
vector = invoke_sagemaker_endpoint(endpoint_name, payload)
print(len(vector))

768


In [42]:
region = "ca-central-1"
aos_host = "search-semantic-search-dfcizxxxuj62dusl5skmeu3czu.ca-central-1.es.amazonaws.com"
os_secret_id = "dev/OpenSearch/SemanticSearch"

awsauth = get_awsauth_from_secret(region, secret_id=os_secret_id)
aos_client =create_opensearch_connection(aos_host, awsauth)

query={
    "size": 20,
    "query": {
        "knn": {
            "vector":{
                "vector":vector,
                "k":20
            }
        }
    }
}

res = aos_client.search(index='mpnet-mpf-knn', size=20, body=query, request_timeout=55)
query_result=[]
for hit in res['hits']['hits']:
    row=[hit['_id'],hit['_score'],hit['_source']['title'],hit['_source']['id']]
    query_result.append(row)
query_result_df = pd.DataFrame(data=query_result,columns=["_id","relevancy_score","title",'uuid'])
display(query_result_df)

Connection to OpenSearch established: <OpenSearch([{'host': 'search-semantic-search-dfcizxxxuj62dusl5skmeu3czu.ca-central-1.es.amazonaws.com', 'port': 443}])>


Unnamed: 0,_id,relevancy_score,title,uuid
0,15eQT5ABdooaaHUe8g0S,0.727605,Floods in Canada - Cartographic Product Collec...,08b810c2-7c81-40f1-adb1-c32c8a2c9f50
1,65eST5ABdooaaHUeTiL6,0.664807,High tides December 2010: breaking waves,39bdcc75-dbaf-424d-9dbd-265c282f14f5
2,GZeST5ABdooaaHUeZCRK,0.653705,CGDIWH-142543,CGDIWH-142543
3,_ZeRT5ABdooaaHUeixbR,0.643081,Flood Risk Areas Database (BDZI),3ac8ddff-fe0a-4a7a-8393-d5938e8f35e5
4,bJeRT5ABdooaaHUeghbl,0.63736,Flooding zones,12c51ab4-e22a-4abd-bf90-eaeb274a98c9
5,C5eRT5ABdooaaHUe-B5j,0.624253,Flood Risk Areas and Historical Floods,35782937-d7ac-b721-7fb3-bf51f18903ba
6,QJeRT5ABdooaaHUezBvt,0.623981,Forest Abiotic Damage Event,c32dfe71-bb89-4301-a8a3-4f97d1629c00
7,35eTT5ABdooaaHUeeTT-,0.621082,2023 - Dynamic Surface Water Maps of Canada fr...,ccmeo-dynamic-surface-water-compilation-dsw-19...
8,WpeST5ABdooaaHUe5ywG,0.620454,Government of Qc - 2019 Flood,CGDIWH-117987
9,gZeST5ABdooaaHUeaiRp,0.620081,Flood_Inondation_EGS_Flood_Product_Active_en,CGDIWH-150532
