In [2]:
import csv
import os
from elasticsearch import Elasticsearch, helpers

os.environ["RABBITMQ_HOST"] = "localhost"

from celery_tasks import ingest_data

CHUNK_SIZE = 400
INDEX_NAME = "es_tripadvisor_nyc_idx"
MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
MODEL_ID_ES = "sentence-transformers__all-minilm-l6-v2"
MODEL_DIM = 384
MODEL_SIMILARITY = "dot_product"

ES_HOST = "https://localhost:9200/"
ES_PASS = "y5AADXZR0l63CvTz1AsWznNiAM1Ukq7KSd3MEra"
COHERE_API_KEY = "9DUothnkQyEhX9NW7Jr5lr7XsugovOuzYhptkMai"

In [3]:
# Create the client instance
client = Elasticsearch(
    # For local development
    hosts=[ES_HOST],
    basic_auth=('elastic', ES_PASS), 
    verify_certs=False
)
print(client.info())

{'name': 'es01', 'cluster_name': 'docker-cluster', 'cluster_uuid': '8KpPKPbgQKiA5VpXnNoX3Q', 'version': {'number': '8.13.0', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '09df99393193b2c53d92899662a8b8b3c55b45cd', 'build_date': '2024-03-22T03:35:46.757803203Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


  _transport = transport_class(


In [4]:
!eland_import_hub_model \
    -u elastic -p $ES_PASS \
    --url $ES_HOST \
    --hub-model-id $MODEL_ID \
    --task-type text_embedding \
    --insecure \
    --clear-previous \
    --start

2024-05-03 21:25:20,295 INFO : Establishing connection to Elasticsearch
  _transport = transport_class(
2024-05-03 21:25:20,328 INFO : Connected to cluster named 'docker-cluster' (version: 8.13.0)
2024-05-03 21:25:20,329 INFO : Loading HuggingFace transformer tokenizer and model 'sentence-transformers/all-MiniLM-L6-v2'
INFO:2024-05-03 21:25:29 72559:72559 init.cpp:158] If you see CUPTI_ERROR_INSUFFICIENT_PRIVILEGES, refer to https://developer.nvidia.com/nvidia-development-tools-solutions-err-nvgpuctrperm-cupti
STAGE:2024-05-03 21:25:29 72559:72559 ActivityProfilerController.cpp:312] Completed Stage: Warm Up
STAGE:2024-05-03 21:25:29 72559:72559 ActivityProfilerController.cpp:318] Completed Stage: Collection
STAGE:2024-05-03 21:25:29 72559:72559 ActivityProfilerController.cpp:322] Completed Stage: Post Processing
2024-05-03 21:25:31,245 INFO : Stopping deployment for model with id 'sentence-transformers__all-minilm-l6-v2'
2024-05-03 21:25:31,414 INFO : Deleting model with id 'sentence-t

In [6]:
# Setup the pipeline
client.ingest.put_pipeline(
    id="chunk_text_to_passages",
    processors=[
        {
            "script": {
                "description": "Chunk body_content into sentences by looking for . followed by a space",
                "lang": "painless",
                "source": """
          String[] envSplit = /((?<!M(r|s|rs)\.)(?<=\.) |(?<=\!) |(?<=\?) )/.split(ctx['text']);
          ctx['passages'] = new ArrayList();
          int i = 0;
          boolean remaining = true;
          if (envSplit.length == 0) {
            return
          } else if (envSplit.length == 1) {
            Map passage = ['text': envSplit[0]];ctx['passages'].add(passage)
          } else {
            while (remaining) {
              Map passage = ['text': envSplit[i++]];
              while (i < envSplit.length && passage.text.length() + envSplit[i].length() < params.model_limit) {passage.text = passage.text + ' ' + envSplit[i++]}
              if (i == envSplit.length) {remaining = false}
              ctx['passages'].add(passage)
            }
          }
          """,
                "params": {"model_limit": CHUNK_SIZE},
            }
        },
        {
            "foreach": {
                "field": "passages",
                "processor": {
                    "inference": {
                        "model_id": MODEL_ID_ES,
                        "input_output": [
                            { 
                                "input_field": "_ingest._value.text",
                                "output_field": "_ingest._value.vector.predicted_value"
                            }
                        ],
                        "on_failure": [
                            {
                                "append": {
                                    "field": "_source._ingest.inference_errors",
                                    "value": [
                                        {
                                            "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
                                            "pipeline": "ml-inference-title-vector",
                                            "timestamp": "{{{ _ingest.timestamp }}}",
                                        }
                                    ],
                                }
                            }
                        ],
                    }
                },
            }
        },
    ],
)



ObjectApiResponse({'acknowledged': True})

In [7]:
client.indices.delete(index=INDEX_NAME, ignore_unavailable=True)

# Setup the index
client.indices.create(
    index=INDEX_NAME,
    settings={"index": {"default_pipeline": "chunk_text_to_passages"}},
    mappings={
        "dynamic": "true",
        "properties": {
            "passages": {
                "type": "nested",
                "properties": {
                    "vector": {
                        "properties": {
                            "predicted_value": {
                                "type": "dense_vector",
                                "index": True,
                                "dims": MODEL_DIM,
                                "similarity": MODEL_SIMILARITY,
                            }
                        }
                    }
                },
            }
        },
    },
)



ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'es_tripadvisor_nyc_idx'})

In [11]:
file = '../csv/New_York_reviews.csv'

#Read CSV File
def read_CSV(csv_file):
    csv_rows = []
    with open(csv_file) as csvfile:
        reader = csv.DictReader(csvfile)
        fields = reader.fieldnames
        fields[0] = '_id'
        for row in reader:
            new_row = {fields[i]:row[fields[i]] for i in range(len(fields))}
            new_row['_index'] = INDEX_NAME
            new_row['name'] = new_row.pop('review_id')
            new_row['text'] = new_row.pop('review_full')
            csv_rows.append(new_row)
        return csv_rows

docs = read_CSV(file)

print(len(docs))

# Add the documents to the index directly
ingest_data.apply_async(
    kwargs={
        "docs": docs[:1000]
    }
)

510463


<AsyncResult: 788fd1de-2932-49f2-a387-c6cf24d5a380>

In [8]:
def pretty_response(response):
    if len(response["hits"]["hits"]) == 0:
        print("Your search returned no results.")
    else:
        for hit in response["hits"]["hits"]:
            id = hit["_id"]
            score = hit["_score"]
            doc_title = hit["_source"]["name"]
            restaurant_name = hit["_source"]["restaurant_name"]
            title_review = hit["_source"]["title_review"]
            passage_text = ""

            for passage in hit["inner_hits"]["passages"]["hits"]["hits"]:
                passage_text += passage["fields"]["passages"][0]["text"][0] + "\n\n"

            pretty_output = f"ID: {id}\nDoc Title: {doc_title}\nRestaurant Name: {restaurant_name}\nTitle Review: {title_review}\nPassage Text:\n{passage_text}Score: {score}"
            print(pretty_output)
            print("---")

In [12]:
response = client.search(
    index=INDEX_NAME,
    knn={
        "inner_hits": {"size": 1, "_source": False, "fields": ["passages.text"]},
        "field": "passages.vector.predicted_value",
        "k": 20,
        "num_candidates": 100,
        "query_vector_builder": {
            "text_embedding": {
                "model_id": MODEL_ID_ES,
                "model_text": "Best pasta in New York",
            }
        },
    },
)

pretty_response(response)

ID: 149
Doc Title: review_695311754
Restaurant Name: San_Carlo_Osteria_Piemonte
Title Review: Outstanding food,  great service and atmosphere 
Passage Text:
I'm a huge fan of picolla cucina on Spring St and I still think they have the best pastas in New York. It's my favorite in NYC, but a block away is San Carlo which may bemy second favorite. It is slightly different in terms of the menu, with less focus on pasta. It also has a slightly larger footprint with a small intimate bar, and has a very good wine and cocktail list.

Score: 0.8833201
---
ID: 206
Doc Title: review_725479424
Restaurant Name: San_Carlo_Osteria_Piemonte
Title Review: Excellent genuine Italian food. 
Passage Text:
I spend an excellent evening great food an service, fresh product and superb presentations, if you are looking for an original Italian food that the best place in New York. 😋😋😋😋😋

Score: 0.8350849
---
ID: 808
Doc Title: review_672663933
Restaurant Name: Norma
Title Review: Murray Hill-Kips Bay Gem. Delici

