# Test Local Opensearch

Notebook that tests the local Opensearch vector database of Wikipedia content. 

In [None]:
import requests
import numpy as np
from dataclasses import dataclass
from typing import List, Optional

# Server configuration
HOST = "192.168.X.Y"  # Replace with your server's IP address
OPENSEARCH_URL = f"http://{HOST}:9200"
LM_STUDIO_URL = f"http://{HOST}:1234"
EMBEDDING_MODEL = "text-embedding-nomic-embed-text-v1.5@f16"

# Wikipedia index name (from process_and_index.py)
INDEX_NAME = "wikipedia"

print(f"OpenSearch URL: {OPENSEARCH_URL}")
print(f"LM Studio URL: {LM_STUDIO_URL}")
print(f"Index: {INDEX_NAME}")

## 1. Test OpenSearch Connectivity

In [None]:
# Quick connectivity diagnostics
import socket

def check_port(host: str, port: int, timeout: float = 5.0) -> bool:
    """Check if a port is reachable."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except Exception:
        return False

host = HOST
ports = {
    22: "SSH",
    1234: "LM Studio",
    9200: "OpenSearch"
}

print(f"Checking connectivity to {host}...")
print("-" * 40)
for port, service in ports.items():
    status = "✓ Open" if check_port(host, port) else "✗ Closed/Blocked"
    print(f"  Port {port:5} ({service:12}): {status}")

print("\n" + "=" * 40)
print("If OpenSearch port 9200 is blocked, run on server:")
print(f"  sudo ufw allow from {HOST}/24 to any port 9200 proto tcp")
print("\nIf OpenSearch is not running:")
print("  sudo systemctl status opensearch")
print("  sudo systemctl start opensearch")

In [None]:
# Check OpenSearch cluster health
try:
    response = requests.get(f"{OPENSEARCH_URL}/_cluster/health", timeout=10)
    response.raise_for_status()
    health = response.json()
    print("✓ OpenSearch cluster is reachable!")
    print(f"  Cluster: {health['cluster_name']}")
    print(f"  Status: {health['status']}")
    print(f"  Nodes: {health['number_of_nodes']}")
except requests.exceptions.RequestException as e:
    print(f"✗ Failed to connect to OpenSearch: {e}")

In [None]:
# Check the Wikipedia index
try:
    response = requests.get(f"{OPENSEARCH_URL}/{INDEX_NAME}/_count", timeout=10)
    response.raise_for_status()
    count_data = response.json()
    print(f"✓ Index '{INDEX_NAME}' exists!")
    print(f"  Document count: {count_data['count']:,}")
    
    # Get index mapping to verify embedding field
    mapping_resp = requests.get(f"{OPENSEARCH_URL}/{INDEX_NAME}/_mapping", timeout=10)
    mapping = mapping_resp.json()
    props = mapping[INDEX_NAME]['mappings'].get('properties', {})
    if 'embedding' in props:
        emb_info = props['embedding']
        print(f"  Embedding field: {emb_info.get('type', 'unknown')}")
        print(f"  Dimension: {emb_info.get('dimension', 'unknown')}")
    else:
        print("  Warning: No 'embedding' field found in mapping")
except requests.exceptions.RequestException as e:
    print(f"✗ Failed to access index: {e}")

## 2. Embedding Client (from testEmbedding.ipynb)

In [None]:
@dataclass
class EmbeddingResult:
    """Result of an embedding operation."""
    text: str
    embedding: List[float]
    index: int
    
    @property
    def vector(self) -> np.ndarray:
        """Return embedding as numpy array."""
        return np.array(self.embedding, dtype=np.float32)


class EmbeddingClient:
    """
    Client for generating text embeddings using LM Studio's API.
    """
    
    def __init__(
        self, 
        base_url: str = LM_STUDIO_URL,
        model: str = EMBEDDING_MODEL,
        timeout: int = 60
    ):
        self.base_url = base_url.rstrip('/')
        self.model = model
        self.timeout = timeout
        self._dimension: Optional[int] = None
    
    @property
    def dimension(self) -> int:
        """Get the embedding dimension (lazy-loaded)."""
        if self._dimension is None:
            result = self.embed("test")
            self._dimension = len(result)
        return self._dimension
    
    def embed(self, text: str) -> List[float]:
        """
        Embed a single text string.
        
        Args:
            text: Text to embed
            
        Returns:
            Embedding vector as list of floats
        """
        response = requests.post(
            f"{self.base_url}/v1/embeddings",
            json={"model": self.model, "input": [text]},
            timeout=self.timeout
        )
        response.raise_for_status()
        data = response.json()
        return data["data"][0]["embedding"]


# Test the embedding client
try:
    client = EmbeddingClient()
    test_embedding = client.embed("test connection")
    print(f"✓ Embedding client working!")
    print(f"  Model: {client.model}")
    print(f"  Dimension: {len(test_embedding)}")
except Exception as e:
    print(f"✗ Embedding client error: {e}")

## 3. Semantic Search Function

In [None]:
def semantic_search(
    query_text: str, 
    client: EmbeddingClient,
    top_k: int = 3,
    index_name: str = INDEX_NAME
) -> List[dict]:
    """
    Perform semantic search using k-NN on OpenSearch.
    
    Args:
        query_text: The search query text
        client: EmbeddingClient instance
        top_k: Number of results to return
        index_name: OpenSearch index name
        
    Returns:
        List of matching documents with scores
    """
    # Generate embedding for the query
    query_embedding = client.embed(query_text)
    
    # Build k-NN search query
    search_query = {
        "size": top_k,
        "query": {
            "knn": {
                "embedding": {
                    "vector": query_embedding,
                    "k": top_k
                }
            }
        },
        "_source": ["title", "section_title", "text", "url"]
    }
    
    # Execute search
    response = requests.post(
        f"{OPENSEARCH_URL}/{index_name}/_search",
        json=search_query,
        headers={"Content-Type": "application/json"},
        timeout=30
    )
    response.raise_for_status()
    
    # Parse results
    results = []
    hits = response.json().get("hits", {}).get("hits", [])
    for hit in hits:
        source = hit.get("_source", {})
        results.append({
            "score": hit.get("_score", 0),
            "title": source.get("title", "Unknown"),
            "section": source.get("section_title", ""),
            "text": source.get("text", "")[:500],  # Truncate for display
            "url": source.get("url", "")
        })
    
    return results


def display_results(results: List[dict], query: str):
    """Pretty print search results."""
    print(f"\n{'='*60}")
    print(f"Query: {query}")
    print(f"{'='*60}")
    
    if not results:
        print("No results found.")
        return
    
    for i, result in enumerate(results, 1):
        print(f"\n--- Result {i} (Score: {result['score']:.4f}) ---")
        print(f"Title: {result['title']}")
        if result['section']:
            print(f"Section: {result['section']}")
        print(f"\nText preview:")
        print(f"{result['text']}...")
        if result['url']:
            print(f"\nURL: {result['url']}")


print("✓ Search functions defined")

## 4. Run Sample Search

In [None]:
# Sample search query - modify this to test different queries
SAMPLE_QUERY = "art history in Germany and the Bauhaus movement"

# Perform the search
try:
    print(f"Searching for: '{SAMPLE_QUERY}'")
    print("Generating embedding...")
    
    results = semantic_search(SAMPLE_QUERY, client, top_k=3)
    display_results(results, SAMPLE_QUERY)
    
except requests.exceptions.RequestException as e:
    print(f"✗ Search failed: {e}")
except Exception as e:
    print(f"✗ Error: {e}")

## 5. Interactive Search (Optional)

Run the cell below to try different search queries.

In [None]:
# Try different queries here
queries = [
    "quantum mechanics and wave particle duality",
    "French Revolution causes and effects",
    "machine learning neural networks"
]

for query in queries:
    try:
        results = semantic_search(query, client, top_k=3)
        display_results(results, query)
    except Exception as e:
        print(f"\n✗ Query '{query}' failed: {e}")