This notebook demonstrates how to:

1. Connect to a local Dockerized OpenSearch instance.
2. Create and configure an OpenSearch index for vector storage.
3. Use LangChain to split large Python code into manageable chunks.
4. Generate embeddings using Cohere's embed-english-v3 model from Amazon Bedrock.
5. Store the embeddings and text chunks in OpenSearch.
6. Execute vector searches based on different queries and analyze search accuracy.
7. Optimize retrieval by addressing common mistakes.

Ensure you have:
- A local Dockerized OpenSearch instance running on the default port (9200).
- Valid AWS credentials configured for accessing Amazon Bedrock (for embeddings).
- The necessary Python dependencies installed (opensearch-py, langchain, boto3, etc.).

Let's get started!

In [22]:
# Cell 2: Imports and Preliminary Setup

import os
import json
import time
import boto3
import uuid
import requests
from typing import List

# OpenSearch client from opensearch-py
from opensearchpy import OpenSearch, RequestsHttpConnection, helpers

# LangChain text splitters
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.text_splitter import Language


# import warnings
# from urllib3.exceptions import InsecureRequestWarning

# warnings.simplefilter("ignore", InsecureRequestWarning)


# Adjust these settings if needed
OPENSEARCH_HOST = "localhost"
OPENSEARCH_PORT = 9200
OPENSEARCH_USER = "admin"  # If you have authentication
OPENSEARCH_PASS = "*asca9schasihca0CE"  # Replace if you've set a different password

# For Amazon Bedrock usage:
REGION_NAME = "ap-southeast-2"  # or your region
BEDROCK_MODEL_ID = "cohere.embed-english-v3"

# Index name in OpenSearch
INDEX_NAME = "code-embeddings-index"

sample_file_path = "src/sample_rename.py"

# Additional configurations
CHUNK_SIZE = 200
CHUNK_OVERLAP = 0


In [23]:
# Cell 3: Connect to OpenSearch and Create Index If Needed

# Create the OpenSearch client
# If you have no auth, you might omit http_auth. Adjust verify_certs or use SSL as needed.
auth = (OPENSEARCH_USER, OPENSEARCH_PASS)
client = OpenSearch(
    hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=False,
    connection_class=RequestsHttpConnection,
)

# Check connection
try:
    health = client.cluster.health()
    print("Connected to OpenSearch. Cluster health:", health["status"])
except Exception as e:
    print("Failed to connect to OpenSearch:", e)
    raise e

# Create index mapping for vector search
index_body = {
    "settings": {
        "index": {
            "knn": True,
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "analysis": {
            "analyzer": {
                "keyword_analyzer": {
                    "type": "custom",
                    "tokenizer": "keyword",
                    "filter": ["lowercase"]
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "text": {"type": "text", "analyzer": "keyword_analyzer"},
            "embedding": {
                "type": "knn_vector",
                "dimension": 1024,  # This is the dimension for Cohere embed-english-v3 (example dimension)
                "method": {
                    "name": "hnsw",
                    "space_type": "cosinesimil",
                    "engine": "nmslib",
                    "parameters": {
                        "m": 16,
                        "ef_construction": 100
                    }
                }
            }
        }
    }
}

client.indices.delete(index=INDEX_NAME, ignore=[400, 404])
# Create or update index
if not client.indices.exists(index=INDEX_NAME):
    client.indices.create(index=INDEX_NAME, body=index_body)
    print(f"Created index '{INDEX_NAME}'")
else:
    print(f"Index '{INDEX_NAME}' already exists.")




Connected to OpenSearch. Cluster health: yellow
Created index 'code-embeddings-index'


In [24]:
# Cell 4: Read and Split Sample Python Code

# Assume 'src/sample.py' is a file with ~2500 lines of example Python code
# For demonstration, ensure you have this file in your local environment


if not os.path.exists(sample_file_path):
    raise FileNotFoundError(f"Could not find the file: {sample_file_path}")

with open(sample_file_path, "r", encoding="utf-8") as f:
    code_data = f.read()

extra_splitters = ['\nclass','\ndef']
# Use LangChain's RecursiveCharacterTextSplitter with Python-specific splitting
splitter = RecursiveCharacterTextSplitter(
    RecursiveCharacterTextSplitter.get_separators_for_language(Language.PYTHON).extend(extra_splitters),
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
)
chunks = splitter.split_text(code_data)
print(f"Number of chunks created: {len(chunks)}")


Number of chunks created: 362


In [25]:
# Cell 5: Set Up Amazon Bedrock for Cohere Embeddings

#Read credentials from credentials.json
with open("credentials.json", "r") as f:
    creds = json.load(f)   

# Create a Bedrock runtime client with credentials as a parameter
bedrock = boto3.client("bedrock-runtime", region_name=REGION_NAME,
    aws_access_key_id=creds.get("aws_access_key_id"),
    aws_secret_access_key=creds.get('aws_secret_access_key'),
    aws_session_token=creds.get('aws_session_token')
)
def cohere_embed_texts(texts, model_id):
    # texts is a list of strings
    if not texts:
        return []

    # Prepare the request body for Cohere's embed-english-v3
    body_dict = {
        "texts": texts,                  # A list of strings
        "input_type": "search_document"  # or "query" if you're embedding queries
    }

    response = bedrock.invoke_model(
        modelId=model_id,
        contentType="application/json",
        accept="application/json",
        body=json.dumps(body_dict)
    )

    resp_body = json.loads(response["body"].read())
    # For embed-english-v3, the model returns { "embeddings": [ [vector], [vector], ... ] }
    # so you get an embedding for each text in the input.
    embeddings = resp_body["embeddings"]  # list of lists
    return embeddings


In [26]:
# Cell 6: Generate Embeddings for Each Chunk and Index into OpenSearch

docs_to_index = []
batch_size = 16
all_chunks = chunks[:]  # Copy for manipulation

while all_chunks:
    batch = all_chunks[:batch_size]
    all_chunks = all_chunks[batch_size:]

    # Generate embeddings via Amazon Bedrock
    batch_embeddings = cohere_embed_texts(batch, BEDROCK_MODEL_ID)

    for text_chunk, embedding in zip(batch, batch_embeddings):
        doc_id = str(uuid.uuid4())
        doc_body = {
            "text": text_chunk,
            "embedding": embedding
        }
        docs_to_index.append({
            "_index": INDEX_NAME,
            "_id": doc_id,
            "_source": doc_body
        })

# Use bulk helper to index all
resp = helpers.bulk(client, docs_to_index)
print("Indexing completed. Bulk response:", resp)


Indexing completed. Bulk response: (362, [])


In [27]:
# Cell 7: Refresh Index and Prepare for Searches

# Refresh so that newly indexed documents are available
client.indices.refresh(index=INDEX_NAME)
print("Index refreshed and ready for search.")


Index refreshed and ready for search.


In [28]:
# Cell 8: Define Test Queries for Different Programming Tasks

# Cell: Import Queries

from src.test_queries import simple_queries

# Confirm they're loaded
print(f"Loaded {len(test_queries)} queries:")
for q in simple_queries:
    print("-", q["description"])


ImportError: cannot import name 'simple_queries' from 'src.test_queries' (c:\Users\marcu\Documents\2pi\vectorsearch-test\src\test_queries.py)

In [20]:
# Cell 9: Implement a Vector kNN Search in OpenSearch

def vector_search_opensearch(query_text: str, k: int = 5):
    """
    Perform a vector search in OpenSearch using the query text embedded by Cohere.
    Return top k results.
    """
    # Embed query
    query_embedding = cohere_embed_texts([query_text], BEDROCK_MODEL_ID)[0]

    # Construct kNN query
    search_body = {
        "size": k,
        "query": {
            "knn": {
                "embedding": {
                    "vector": query_embedding,
                    "k": k
                }
            }
        }
    }

    try:
        response = client.search(index=INDEX_NAME, body=search_body)
        hits = response["hits"]["hits"]
        return hits
    except Exception as e:
        print(f"Search error: {e}")
        return []


In [None]:
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

markdown_output = []

for idx, test in enumerate(test_queries, start=1):
    user_query = test["query"]
    results = vector_search_opensearch(user_query, k=3)
    
    md_section = []
    md_section.append(f"## Test {idx}: {test['description']}\n")
    md_section.append(f"**Query**: {user_query}\n")
    md_section.append(f"**Expected Keywords**: {', '.join(test['expected_keywords'])}\n")
    
    for rank, hit in enumerate(results, start=1):
        score = hit["_score"]
        text_snippet = hit["_source"]["text"]
        
        md_section.append(f"### Rank {rank} | Score: {score}\n")
        md_section.append("```python")
        md_section.append(text_snippet)
        md_section.append("```\n")
    
    # Simple keyword check
    expected_any = any(
        all(kw.lower() in hit["_source"]["text"].lower() for kw in test["expected_keywords"])
        for hit in results
    )
    # if expected_any:
    #     md_section.append("At least one result contains the expected keywords. ✅\n")
    # else:
    #     md_section.append("No result contained all expected keywords. ❌\n")
    
    markdown_output.append("\n".join(md_section))

final_markdown = "\n".join(markdown_output)

# Print in the cell output as Markdown
print(final_markdown)

# Save to a .md file
with open(f"out/search_results_{os.path.split(sample_file_path)[-1]}.md", "w", encoding="utf-8") as f:
    f.write(final_markdown)


## Test 1: Parsing imports with resolvers

**Query**: Which function can find all the import statements in Python code?

**Expected Keywords**: import, resolver, find_imports, ImportResolver

### Rank 1 | Score: 0.7755966

```python
def parse_python_imports(code):
    lines = code.split("\n")
    modules = []
    for l in lines:
        if l.startswith("import ") or l.startswith("from "):
            modules.append(l)
```

### Rank 2 | Score: 0.74791616

```python
def nova_import(code_block):
    found = []
    lines = code_block.splitlines()
    for line in lines:
        if 'import ' in line:
            parts = line.strip().split()
```

### Rank 3 | Score: 0.73969615

```python
if 'import' in parts:
                idx = parts.index('import')
                if idx + 1 < len(parts):
                    found.append(parts[idx+1])
        if 'from ' in line:
```

## Test 2: Reverse words in a string

**Query**: How do I reverse the words in a sentence for string manipulation?

**Expec


# Analysis of Common Mistakes

Here we review potential mismatches:
- If the system returns text snippets that have keywords but do not represent the intended functionality, it indicates the model or search relies too heavily on keyword overlap.
- If we see partial code or code that is thematically similar but not correct, it might point to an embedding shortcoming.

# Potential Solutions:
1. Metadata Filtering: 
   We can store additional metadata (function names, docstrings, or file segment categories) in the index. Then filter or boost relevant fields.
2. Re-ranking:
   After retrieving top k=10 or so, we can apply a second step re-rank using more precise similarity scoring or cross-encoder approaches.
3. Chunking/Context Adjustments:
   Larger chunk overlap or different chunk sizes could yield more cohesive snippet retrieval.

# Final Accuracy Metrics:
You can design a custom evaluation by manually labeling correct/incorrect matches for each query. 
The final step is to compute metrics (e.g., recall@k, precision@k) for an objective view of search performance.

"""
