<a href="https://colab.research.google.com/github/bvader/elasticsearch-test-elser/blob/main/elasticsearch-test-elser.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initial Setup

In [None]:
!pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-8.11.0-py3-none-any.whl (412 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/412.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━[0m [32m204.8/412.6 kB[0m [31m6.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m412.6/412.6 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting elastic-transport<9,>=8 (from elasticsearch)
  Downloading elastic_transport-8.10.0-py3-none-any.whl (59 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/59.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.8/59.8 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: elastic-transport, elasticsearch
Successfully installed elastic-transport-8.10.0 elasticsearch-8.11.0


In [8]:
# Read in connection and auth info
# Note the port is REQUIRED for the elasticsearch endpoint!
import getpass, os

os.environ['es_url'] = getpass.getpass('Enter Elasticsearch Endpoint:  ')
os.environ['es_user'] = getpass.getpass('Enter User:  ')
os.environ['es_pwd'] = getpass.getpass('Enter Password:  ')

Enter Elasticsearch Endpoint:  ··········
Enter User:  ··········
Enter Password:  ··········


In [9]:
# Connect and test connection
from elasticsearch import Elasticsearch


es_url = os.environ['es_url']
es_user = os.environ['es_user']
es_pwd = os.environ['es_pwd']

# Initialize the Elasticsearch client
es = Elasticsearch(
    [es_url],
    basic_auth=(es_user, es_pwd),
    request_timeout=30
)
es.info().body

{'name': 'instance-0000000100',
 'cluster_name': '053f51f005b94c4084c36cb3bd74c467',
 'cluster_uuid': 'lPhIKHfzSGO52N-k2eXlBQ',
 'version': {'number': '8.11.1',
  'build_flavor': 'default',
  'build_type': 'docker',
  'build_hash': '6f9ff581fbcde658e6f69d6ce03050f060d1fd0c',
  'build_date': '2023-11-11T10:05:59.421038163Z',
  'build_snapshot': False,
  'lucene_version': '9.8.0',
  'minimum_wire_compatibility_version': '7.17.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'You Know, for Search'}

# Data and Model Setup

In [13]:
# See https://registry.opendata.aws/amazon-pqa/
# See https://amazon-pqa.s3.amazonaws.com/readme.txt
# aws s3 ls --no-sign-request s3://amazon-pqa/
# https://amazon-pqa.s3.amazonaws.com/amazon-pqa.tar.gz

# Upload the file first
!head /content/sample_data/amazon_pqa_headset.json

{"question_id": "Tx39GCUOS5AYAFK", "question_text": "does this work with cisco ip phone 7942", "asin": "B000LSZ2D6", "bullet_point1": "Noise-Canceling microphone filters out background sound", "bullet_point2": "HW251N P/N 75100-06", "bullet_point3": "Uses Plantronics QD Quick Disconnect Connector. Must be used with Plantronics Amp or with proper phone or USB adapter cable", "bullet_point4": "Connectivity Technology: Wired, Earpiece Design: Over-the-head, Earpiece Type: Monaural, Host Interface: Proprietary, Microphone Design: Boom, Microphone Technology: Noise Canceling, Product Model: HW251N, Product Series: SupraPlus, Standard Warranty: 2 Year", "bullet_point5": "Easy Lightweight Wear -Leaving One Ear Uncovered For Person-to-Person Conversations", "product_description": "", "brand_name": "Plantronics", "item_name": "Plantronics HW251N SupraPlus Wideband Headset (64338-31)", "question_type": "yes-no", "answer_aggregated": "neutral", "answers": [{"answer_text": "Use the Plantronics com

In [20]:
# Load Data file (Just load 1000 for now)

import sys
import datetime
import json
import os
import time

import pandas as pd
import numpy as np

from ast import literal_eval
from tqdm import tqdm

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from datetime import datetime


df = pd.DataFrame(columns=('question', 'answer'))

with open('/content/sample_data/amazon_pqa_headset.json') as f:
    i=0
    for line in f:
        data = json.loads(line)
        df.loc[i] = [data['question_text'],data['answers'][0]['answer_text']]
        i+=1
        if(i == 1000):
            break

print(df.columns)
print(df.shape)
print(df.iloc[999]['question'])
print(df.iloc[999]['answer'])

Index(['question', 'answer'], dtype='object')
(1000, 2)
Do these have more bass than the game zeros?
Due to the closed back design the Game ZERO will have more perceived bass.


# Create the Index, Pipeline and Load Index

In [22]:
# Creates an index in Elasticsearch with raw data
es.options(ignore_status=400).indices.create(
    index="nlp_pqa_1000",
    settings={"number_of_shards": 1},
    mappings={
        "properties": {
            "question": { "type": "text"},
            "answer": {"type": "text"},
        }
    }
)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'nlp_pqa_1000'})

In [23]:
# Generate Action
def generator():
    for index, row in df.iterrows():
        yield {
            "_index": "nlp_pqa_1000",
            "question": row["question"],
            "answer": row["answer"]
        }
# Bulk indexing nlp_
try:
    res = bulk(es, generator())
    print("Response: ", res)
except Exception as e:
    print(e)


Response:  (1000, [])


In [24]:
# Download / Load ELSER
es.ml.put_trained_model(model_id=".elser_model_2_linux-x86_64", input={"field_names": "text_field"})

ObjectApiResponse({'model_id': '.elser_model_2_linux-x86_64', 'model_type': 'pytorch', 'model_package': {'packaged_model_id': 'elser_model_2_linux-x86_64', 'model_repository': 'https://ml-models.elastic.co', 'minimum_version': '11.0.0', 'size': 274756282, 'sha256': 'be69211494bf9cdc57a7aa0ee06814fcccf999407237816c9d9f0963858e2a61', 'metadata': {}, 'tags': [], 'vocabulary_file': 'elser_model_2_linux-x86_64.vocab.json', 'platform_architecture': 'linux-x86_64'}, 'platform_architecture': 'linux-x86_64', 'created_by': 'api_user', 'version': '11.0.0', 'create_time': 1701463399092, 'model_size_bytes': 0, 'estimated_operations': 0, 'license_level': 'platinum', 'description': 'Elastic Learned Sparse EncodeR v2 optimized for linux-x86_64', 'tags': ['elastic'], 'metadata': {}, 'input': {'field_names': ['text_field']}, 'inference_config': {'text_expansion': {'vocabulary': {'index': '.ml-inference-native-000002'}, 'tokenization': {'bert': {'do_lower_case': True, 'with_special_tokens': True, 'max_se

In [None]:
# Start ELSER
es.ml.start_trained_model_deployment(
    model_id=".elser_model_1"
)

In [None]:
# Create Elser Pipeline
es.ingest.put_pipeline(id="elser-expansion",
    description="Elser pipeline",
    processors=[
    {
        "inference": {
        "model_id": ".elser_model_1",
        # This maps the fields between the model input and the input index
        # in this case text_field is the field that the model uses to vectorize
        # question is the field in the source index to be vectorized
        # so this says use the question field as the input to the model
        "field_map": {
            "question": "text_field"
        },
        "target_field": "ml",
        "inference_config": {
          "text_expansion": {
            "results_field": "tokens"
          }
        }
      }
    }
  ],
  on_failure=[
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{_index}}}"
      }
    },
    {
      "set": {
        "description": "Set error message",
        "field": "ingest.failure",
        "value": "{{_ingest.on_failure_message}}"
      }
    }
  ]
)

In [None]:
# Create an index using Elser expansion
es.options(ignore_status=400).indices.create(
    index="nlp_pqa_1000_elser_embeddings",
    settings={"number_of_shards": 1, "index.mapping.total_fields.limit": 2000 },
    mappings={
        "properties": {
            "ml.tokens": {
                "type": "rank_features"
            },
            "question": { "type": "text"},
            "answer": {"type": "text"}
        }
    }
)

In [None]:
# Load data with term expansion
def generator():
    for index, row in df.iterrows():
        yield {
            "_index": "nlp_pqa_1000_elser_embeddings",
            "pipeline": "elser-expansion",
            "question": row["question"],
            "answer": row["answer"]
        }

try:
    res = bulk(es, generator(), chunk_size=100) # batch size 100
    print("Response: ", res)
except Exception as e:
    print(e)


# Queries

In [None]:
# Simple Text Expansion

query_text = input("Enter a question :")
print('\n')

query={
    "text_expansion": {
    "ml.tokens": {
        "model_id":".elser_model_1",
        "model_text": query_text
    }
  }
}

resp = es.search(index="nlp_pqa_1000_elser_embeddings", query=query)

for hit in resp['hits']['hits']:
    doc_id = hit['_id']
    score = hit['_score']
    question = hit['_source']['question']
    answer = hit['_source']['answer']
    print(f"Question: {question}\nAnswer: {answer}\n")

In [None]:
# Text expansion with filter, exclude results
# search for "Does this work with xbox"

query_text = input("Enter a question :")
print('\n')

query={
    "bool": {
      "must": [
        {
          "text_expansion": {
            "ml.tokens": {
              "model_id": ".elser_model_1",
              "model_text": query_text
            }
          }
        }
      ],
      "must_not": [
        {
         "match_phrase": {
            "question": "xbox one"
          }
        },
        {
          "match_phrase": {
            "question": "xbox 1"
          }
        }
      ]
    }
}

resp = es.search(index="nlp_pqa_1000_elser_embeddings", query=query)

for hit in resp['hits']['hits']:
    doc_id = hit['_id']
    score = hit['_score']
    question = hit['_source']['question']
    answer = hit['_source']['answer']
    print(f"Question: {question}\nAnswer: {answer}\n")

In [None]:
# Text Expansion with filter... only include certain results
# search for "Does this work with xbox"
query_text = input("Enter a question :")
print('\n')

query={
    "bool": {
      "must": [
        {
          "text_expansion": {
            "ml.tokens": {
              "model_id": ".elser_model_1",
              "model_text": query_text
            }
          }
        }
      ],
      "must": [
        {
         "match": {
            "question": "xbox 360"
          }
        }
      ]
    }
}

resp = es.search(index="nlp_pqa_1000_elser_embeddings", query=query)

print(f"{resp}\n")

for hit in resp['hits']['hits']:
    doc_id = hit['_id']
    score = hit['_score']
    question = hit['_source']['question']
    answer = hit['_source']['answer']
    print(f"Question: {question}\nAnswer: {answer}\n")



In [None]:
# Hybrid search with text expansion and rrf
# NOTE : This works with a slightly lower level of the API
# The higher level / abstracted  API is still under development
query_text = input ("Enter a question :")

print('\n')
body = {
  "sub_searches": [
    {
      "query": {
        "bool": {
          "must": [
          {
            "match": {
            "answer": "polycom"
              }
            }
          ]
        }
      }
    },
    {
      "query": {
        "text_expansion": {
          "ml.tokens": {
          "model_id": ".elser_model_1",
          "model_text": query_text
          }
        }
      }
    }
  ],
  "rank": {
    "rrf": {
        "window_size": 50,
        "rank_constant": 20
    }
  }
}


index = "nlp_pqa_1000_elser_embeddings"
resp = es.perform_request("POST", f"/{index}/_search", headers={"content-type": "application/json", "accept": "application/json"}, body=body)

print(f"\n Resp:{resp} \n")
for hit in resp['hits']['hits']:
    doc_id = hit['_id']
    rank = hit['_rank']
    question = hit['_source']['question']
    answer = hit['_source']['answer']
    print(f"\nRank: {rank}\nQuestion: {question}\nAnswer: {answer}\n")