In [3]:
from __future__ import annotations

import re
import hashlib
from dataclasses import dataclass
from typing import Iterable, List, Tuple, Dict, Optional, Set

In [206]:
def get_time():
    import datetime
    return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")

# LSH Embedding

In [4]:
@dataclass(frozen=True)
class LSHEmbedding:
    """Output of LSH embedder."""
    signature: List[int]          # MinHash signature (length = num_hashes)
    band_keys: List[str]          # LSH bucket keys (length = num_bands)
    shingle_count: int            # number of unique shingles used

In [5]:
class LSHMinHashEmbedder:
    """
    MinHash + LSH banding embedder for near-duplicate detection / candidate generation.

    - Partial match support: character shingles (n-grams).
    - Vocabulary-free: uses hashing, no growing vocab table.
    - Persistent-friendly: `band_keys` are stable strings you can store (e.g., in Qdrant payload).

    Typical usage:
        emb = LSHMinHashEmbedder(shingle_size=5, num_hashes=128, bands=32, seed=42)
        out = emb.embed("raw log line ...")
        # candidate retrieval: find items that share any band_key
        # verification: recompute exact Jaccard of shingles (or other metric) vs candidates
    """

    # --- regexes for common log normalization ---
    _RE_UUID = re.compile(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}\b", re.I)
    _RE_HEX  = re.compile(r"\b0x[0-9a-f]+\b", re.I)
    _RE_IPv4 = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
    _RE_NUM  = re.compile(r"\b\d+\b")
    _RE_TS1  = re.compile(r"\b\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?\b", re.I)  # ISO-ish
    _RE_TS2  = re.compile(r"\b\d{2}:\d{2}:\d{2}(?:\.\d+)?\b")  # time only

    def __init__(
        self,
        shingle_size: int = 5,
        num_hashes: int = 128,
        bands: int = 32,
        seed: int = 42,
        normalize: bool = True,
        lowercase: bool = True,
        collapse_whitespace: bool = True,
        stop_short_lines: int = 0,
    ):
        """
        Args:
            shingle_size: character n-gram size (5–7 is typical for logs)
            num_hashes: MinHash signature length (64–256 typical)
            bands: number of LSH bands. Must divide num_hashes exactly.
            seed: controls determinism of the hash family
            normalize: apply log normalization (uuid/ip/nums/timestamps masking)
            lowercase: lowercase before shingling
            collapse_whitespace: replace consecutive whitespace with single space
            stop_short_lines: if >0 and normalized text shorter than this, returns empty shingles/signature behavior
        """
        if shingle_size <= 0:
            raise ValueError("shingle_size must be > 0")
        if num_hashes <= 0:
            raise ValueError("num_hashes must be > 0")
        if bands <= 0 or (num_hashes % bands != 0):
            raise ValueError("bands must be > 0 and must divide num_hashes exactly")

        self.shingle_size = shingle_size
        self.num_hashes = num_hashes
        self.bands = bands
        self.rows_per_band = num_hashes // bands

        self.seed = seed
        self.normalize_enabled = normalize
        self.lowercase = lowercase
        self.collapse_whitespace = collapse_whitespace
        self.stop_short_lines = stop_short_lines

        # A large prime > 2^32 for modular hashing
        self._prime = 4294967311  # near 2^32, prime
        self._max_hash = (1 << 32) - 1

        # Pre-generate hash function parameters (a_i, b_i)
        # h_i(x) = (a_i * x + b_i) mod prime
        self._a, self._b = self._make_hash_params(num_hashes, seed)

    # ------------------------- Public API -------------------------

    def embed(self, text: str) -> LSHEmbedding:
        """Compute MinHash signature + LSH band keys for a single log line."""
        norm = self._preprocess(text)
        if self.stop_short_lines and len(norm) < self.stop_short_lines:
            sig = [self._max_hash] * self.num_hashes
            bands = self._signature_to_band_keys(sig)
            return LSHEmbedding(signature=sig, band_keys=bands, shingle_count=0)

        shingles = self._shingle(norm)
        sig = self._minhash_signature(shingles)
        bands = self._signature_to_band_keys(sig)
        return LSHEmbedding(signature=sig, band_keys=bands, shingle_count=len(shingles))

    def shingles(self, text: str) -> Set[int]:
        """Return hashed shingles (useful for exact Jaccard verification)."""
        return self._shingle(self._preprocess(text))

    @staticmethod
    def jaccard(shingles_a: Set[int], shingles_b: Set[int]) -> float:
        """Exact Jaccard similarity between two shingle sets."""
        if not shingles_a and not shingles_b:
            return 1.0
        if not shingles_a or not shingles_b:
            return 0.0
        inter = len(shingles_a & shingles_b)
        union = len(shingles_a | shingles_b)
        return inter / union

    # ------------------------- Preprocess -------------------------

    def _preprocess(self, text: str) -> str:
        s = text
        if self.lowercase:
            s = s.lower()

        if self.normalize_enabled:
            s = self._RE_UUID.sub("<uuid>", s)
            s = self._RE_IPv4.sub("<ip>", s)
            s = self._RE_TS1.sub("<ts>", s)
            s = self._RE_TS2.sub("<time>", s)
            s = self._RE_HEX.sub("<hex>", s)
            s = self._RE_NUM.sub("<num>", s)

        if self.collapse_whitespace:
            s = re.sub(r"\s+", " ", s).strip()
        return s

    # ------------------------- Shingling -------------------------

    def _shingle(self, s: str) -> Set[int]:
        """
        Character n-gram shingling -> set of 32-bit ints (stable).
        Using a stable hash (blake2b) for shingles to avoid Python hash randomization.
        """
        n = self.shingle_size
        if len(s) < n:
            return set()

        out: Set[int] = set()
        # sliding window
        for i in range(0, len(s) - n + 1):
            gram = s[i : i + n]
            out.add(self._stable_u32(gram))
        return out

    # ------------------------- MinHash -------------------------

    def _minhash_signature(self, shingles: Set[int]) -> List[int]:
        """
        Compute MinHash signature over hashed shingles.
        If shingles empty: return max_hash vector so that it won't spuriously match.
        """
        if not shingles:
            return [self._max_hash] * self.num_hashes

        sig = [self._max_hash] * self.num_hashes
        p = self._prime

        # For each shingle x, update all hash functions:
        # sig[i] = min(sig[i], (a[i]*x + b[i]) % p)
        # (This is O(num_hashes * num_shingles). For very long lines, consider sampling shingles.)
        for x in shingles:
            for i in range(self.num_hashes):
                hx = (self._a[i] * x + self._b[i]) % p
                if hx < sig[i]:
                    sig[i] = hx

        # Convert modulo prime values into 32-bit range for compactness
        # (still deterministic; helps if you want to pack)
        return [v & self._max_hash for v in sig]

    def _signature_to_band_keys(self, sig: List[int]) -> List[str]:
        """
        Convert signature to LSH band keys.
        Band key is a stable string: "lsh:{band_index}:{hex_digest}".
        """
        keys: List[str] = []
        r = self.rows_per_band
        for b in range(self.bands):
            chunk = sig[b * r : (b + 1) * r]
            # Stable digest of the chunk
            digest = hashlib.blake2b(
                (",".join(map(str, chunk))).encode("utf-8"),
                digest_size=8  # 64-bit digest -> short key
            ).hexdigest()
            keys.append(f"lsh:{b}:{digest}")
        return keys

    # ------------------------- Hash utilities -------------------------

    @staticmethod
    def _make_hash_params(k: int, seed: int) -> Tuple[List[int], List[int]]:
        """
        Deterministically generate (a_i, b_i) pairs for k hash functions.
        We derive them from blake2b(seed||i) so results are stable across runs.
        """
        a: List[int] = []
        b: List[int] = []
        for i in range(k):
            h = hashlib.blake2b(f"{seed}:{i}".encode("utf-8"), digest_size=16).digest()
            # 64-bit a and b, then clamp into [1, prime-1] / [0, prime-1]
            ai = int.from_bytes(h[:8], "little") | 1  # make it odd / non-zero-ish
            bi = int.from_bytes(h[8:], "little")
            a.append(ai)
            b.append(bi)
        return a, b

    @staticmethod
    def _stable_u32(s: str) -> int:
        """Stable 32-bit unsigned hash for a string."""
        # blake2b is fast and stable; digest_size=4 gives 32-bit
        return int.from_bytes(hashlib.blake2b(s.encode("utf-8"), digest_size=4).digest(), "little")

In [None]:
emb = LSHMinHashEmbedder(shingle_size=5, num_hashes=128, bands=32, seed=123)

a = "ERROR 2026-01-13T12:00:01Z user_id=123e4567-e89b-12d3-a456-426614174000 request took 153ms"
b = "ERROR 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms"
c = "INFO 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user"

ea = emb.embed(a)
eb = emb.embed(b)
ec = emb.embed(c)

# Candidate check: share any band?
overlap_ab = set(ea.band_keys) & set(eb.band_keys)
overlap_ac = set(ea.band_keys) & set(ec.band_keys)
print("A vs B band overlap:", len(overlap_ab), "out of", emb.bands)
print("A vs C band overlap:", len(overlap_ac), "out of", emb.bands)

# Verify exact Jaccard on shingles
ja = emb.shingles(a)
jb = emb.shingles(b)
jc = emb.shingles(c)
print("A vs B jaccard:", LSHMinHashEmbedder.jaccard(ja, jb))
print("A vs C jaccard:", LSHMinHashEmbedder.jaccard(ja, jc))

A vs B band overlap: 11 out of 32
A vs C band overlap: 1 out of 32
A vs B jaccard: 0.8181818181818182
A vs C jaccard: 0.3148148148148148


In [7]:
ea.band_keys

['lsh:0:5b12bfbeb0ade072',
 'lsh:1:e788d2732f29c5f9',
 'lsh:2:ea56b5865e440b9d',
 'lsh:3:a7f5210ee8333ff9',
 'lsh:4:61e6122b314304a9',
 'lsh:5:a24c3068b732600c',
 'lsh:6:ddc276ec409d0015',
 'lsh:7:f723b4ded7fd6af3',
 'lsh:8:4aa4d7d63d232caf',
 'lsh:9:b11b60769cc85d96',
 'lsh:10:d65429182bc472c8',
 'lsh:11:71328dd07180a8aa',
 'lsh:12:fa688b12e812ff2b',
 'lsh:13:f9240a08744e9750',
 'lsh:14:c4b912d14465d9fd',
 'lsh:15:d37011a6c7a52734',
 'lsh:16:a40884bec6641e62',
 'lsh:17:2f9e86428459eeb3',
 'lsh:18:c9626c2fcdefd5b9',
 'lsh:19:46f0ed02f7833081',
 'lsh:20:61c76801151e84db',
 'lsh:21:6ec7178c726b8bbc',
 'lsh:22:8d01b1c5f517a04b',
 'lsh:23:b3bcb51093d30cdd',
 'lsh:24:0b3833e4bd553639',
 'lsh:25:e4f908130211eacf',
 'lsh:26:4ccb02a86ccd5642',
 'lsh:27:7acefe4a27a71447',
 'lsh:28:314c795d670b9caf',
 'lsh:29:89e15f43c55a6476',
 'lsh:30:f086277b30e57f5e',
 'lsh:31:e6c02dcfbd6016a7']

# Qdrant Adapter Filter

In [8]:
from __future__ import annotations

from typing import Any, Dict, List, Optional, Union
from qdrant_client.http import models


Spec = Dict[str, Any]


def _as_list(x: Any) -> List[Any]:
    if x is None:
        return []
    if isinstance(x, list):
        return x
    return [x]


def _parse_range_value(value: Any) -> Dict[str, Any]:
    """
    Accepts:
      - {"gte": ..., "lte": ...}
      - ["start", "end"] / ("start", "end")
      - "start, end"  (string)
    Returns dict with keys: gte, gt, lte, lt (subset)
    """
    if isinstance(value, dict):
        return value

    if isinstance(value, (list, tuple)) and len(value) == 2:
        return {"gte": value[0], "lte": value[1]}

    if isinstance(value, str) and "," in value:
        a, b = [p.strip() for p in value.split(",", 1)]
        return {"gte": a, "lte": b}

    raise ValueError(f"Unsupported range value format: {value!r}")


def _field_condition_from_atomic(spec: Spec) -> Union[models.FieldCondition, models.HasIdCondition]:
    """
    Build a single Qdrant condition from one atomic spec.
    """
    key = spec.get("key")
    dtype = (spec.get("dtype") or "string").lower()
    op = (spec.get("op") or "equals").lower()
    value = spec.get("value", None)

    if not key and op != "has_id":
        raise ValueError(f"Missing 'key' in spec: {spec}")

    # Special: filter by point ids (Qdrant IDs), not payload field
    if op == "has_id":
        ids = _as_list(value)
        return models.HasIdCondition(has_id=ids)

    # Common string exact match ops (keyword-like fields)
    if op in {"equals", "eq"}:
        return models.FieldCondition(key=key, match=models.MatchValue(value=value))

    if op in {"in"}:
        # value: list
        return models.FieldCondition(key=key, match=models.MatchAny(any=_as_list(value)))

    if op in {"not_in"}:
        return models.FieldCondition(key=key, match=models.MatchExcept(except_=_as_list(value)))

    # Text search (requires text index for best performance; still works otherwise)
    if op in {"contains", "text"}:
        # token-based match; for substring-ish search use text index
        return models.FieldCondition(key=key, match=models.MatchText(text=str(value)))

    if op in {"phrase"}:
        return models.FieldCondition(key=key, match=models.MatchPhrase(phrase=str(value)))

    if op in {"prefix"}:
        # Qdrant supports MatchText; prefix semantics depend on tokenizer/index config.
        # Many log use-cases are better handled by storing normalized keyword field(s).
        return models.FieldCondition(key=key, match=models.MatchText(text=str(value)))

    # Existence / null / empty checks
    if op in {"exists", "has_field"}:
        # "is_null=False" matches when field is present & not null
        return models.FieldCondition(key=key, is_null=False)

    if op in {"is_null"}:
        return models.FieldCondition(key=key, is_null=True)

    if op in {"is_empty"}:
        return models.FieldCondition(key=key, is_empty=True)

    # Numeric / datetime ranges
    if op in {"gt", "gte", "lt", "lte", "between", "range"}:
        r = _parse_range_value(value) if op in {"between", "range"} else {op: value}

        if dtype in {"datetime", "date", "timestamp"}:
            # DatetimeRange expects RFC3339 strings, e.g. "2025-12-08T19:07:34Z"
            return models.FieldCondition(
                key=key,
                range=models.DatetimeRange(
                    gt=r.get("gt"),
                    gte=r.get("gte"),
                    lt=r.get("lt"),
                    lte=r.get("lte"),
                ),
            )
        else:
            # int/float
            return models.FieldCondition(
                key=key,
                range=models.Range(
                    gt=r.get("gt"),
                    gte=r.get("gte"),
                    lt=r.get("lt"),
                    lte=r.get("lte"),
                ),
            )

    raise ValueError(f"Unsupported op={op!r} for spec: {spec}")


def _build_filter_from_expr(expr: Spec) -> models.Filter:
    """
    expr can be:
      - atomic: {key,dtype,op,value}
      - logical: {"logic":"and|or|not", "clauses":[expr, expr, ...]}
    """
    logic = expr.get("logic")
    if not logic:
        # atomic -> filter with a single must condition
        cond = _field_condition_from_atomic(expr)
        return models.Filter(must=[cond])

    logic = logic.lower()
    clauses = expr.get("clauses", [])
    if not isinstance(clauses, list) or not clauses:
        raise ValueError(f"Logical expr must have non-empty list 'clauses': {expr}")

    if logic == "and":
        # Merge as must
        must_conds = []
        for c in clauses:
            if c.get("logic"):
                # nested: embed as Filter in must via FilterCondition
                must_conds.append(models.FilterCondition(filter=_build_filter_from_expr(c)))
            else:
                must_conds.append(_field_condition_from_atomic(c))
        return models.Filter(must=must_conds)

    if logic == "or":
        should_conds = []
        for c in clauses:
            if c.get("logic"):
                should_conds.append(models.FilterCondition(filter=_build_filter_from_expr(c)))
            else:
                should_conds.append(_field_condition_from_atomic(c))
        return models.Filter(should=should_conds, min_should=models.MinShould(min_count=1))

    if logic == "not":
        # NOT of a single clause or many clauses
        must_not_conds = []
        for c in clauses:
            if c.get("logic"):
                must_not_conds.append(models.FilterCondition(filter=_build_filter_from_expr(c)))
            else:
                must_not_conds.append(_field_condition_from_atomic(c))
        return models.Filter(must_not=must_not_conds)

    raise ValueError(f"Unsupported logic={logic!r} in expr: {expr}")


def adapter_specs_to_filters(
    specs: List[Spec],
    *,
    mode: str = "and",
) -> List[models.Filter]:
    """
    Take a list of atomic specs OR logical exprs and return a list of Qdrant Filters.

    Common usage:
      - mode="and": single Filter with must=[...]
      - mode="or":  single Filter with should=[...]
      - or pass a list of logical exprs and get one Filter per expr

    Returns: models.Filter
    """
    mode = mode.lower()

    # If user already provided logical expressions, compile each into a Filter
    if any("logic" in s for s in specs):
        return [_build_filter_from_expr(s) for s in specs]

    # Otherwise treat as atomic list and wrap according to mode
    conds = [_field_condition_from_atomic(s) for s in specs]
    if mode == "and":
        return models.Filter(must=conds)
    if mode == "or":
        raise NotImplementedError
    raise ValueError(f"Unsupported mode={mode!r}. Use 'and' or 'or'.")


# QdrantVectorStore

In [85]:
from qdrant_client import QdrantClient, models
from typing import List, Any
from pydantic import BaseModel
import datetime

In [10]:
class BaseEmbedder:
    def __init__(self, vector_size: int, distance: str = "cosine"):
        self.vector_size = vector_size
        if distance not in {"cosine", "euclidean", "dot"}:
            raise ValueError(f"Unsupported distance metric: {distance}")
        self.distance = distance
    def embed(self, text:str) -> List[Any]:
        raise NotImplementedError
    def embeds(self, texts:List[str]) -> List[List[Any]]:
        raise NotImplementedError
    def compare(self, a:List[Any], b:List[Any]) -> float:
        raise NotImplementedError

In [11]:
class LshEmbedder(BaseEmbedder):
    def __init__(self, 
                 shingle_size: int = 5,
                 num_hashes: int = 128,
                 bands: int = 32,
                 seed: int = 42):
        self.lsh = LSHMinHashEmbedder(
            shingle_size=shingle_size,
            num_hashes=num_hashes,
            bands=bands,
            seed=seed
        )
        super().__init__(vector_size=num_hashes, distance="cosine")

    def embed(self, text: str) -> List[int]:
        lsh_emb = self.lsh.embed(text)
        return lsh_emb.signature

    def embeds(self, texts: List[str]) -> Iterable[List[int]]:
        for text in texts:
            yield self.embed(text)

    def compare(self, a: List[int], b: List[int]) -> float:
        def jaccard(sig_a: List[int], sig_b: List[int]) -> float:
            if len(sig_a) != len(sig_b):
                raise ValueError("Signatures must be of the same length for comparison.")
            matches = sum(1 for x, y in zip(sig_a, sig_b) if x == y)
            return matches / len(sig_a)
        return jaccard(a, b)

In [88]:
class Insertable(BaseModel):
    text: str
    metadata: dict
    insert_timestamp: Optional[str] = None

In [207]:
from pytz import timezone


class VectorStore:
    def __init__(
        self,
        collection_name,
        embedder: BaseEmbedder,
        url="http://localhost:6333",
    ):
        self.collection_name = collection_name
        self.embedder = embedder
        self.url = url
        self.client = QdrantClient(url="http://localhost:6333")

        self.create_collection_if_not_exists(
                vector_size=embedder.vector_size,
                distance=embedder.distance
            )

    def create_collection_if_not_exists(self, 
                                        vector_size: int, 
                                        distance: str = "Cosine"):
        
        if distance == "cosine":
            dist = models.Distance.COSINE
        elif distance == "euclidean":
            dist = models.Distance.EUCLIDEAN
        elif distance == "dot":
            dist = models.Distance.DOT
        else:
            raise ValueError("Unsupported distance metric")


        if not self.client.collection_exists(collection_name=self.collection_name):
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=models.VectorParams(
                    size=vector_size,
                    distance=dist
                )
            )
            print(f"Created collection: {self.collection_name}")
        else:
            print(f"Collection {self.collection_name} already exists.")
    
    def delete_collection_if_exists(self):
        if self.client.collection_exists(collection_name=self.collection_name):
            self.client.delete_collection(collection_name=self.collection_name)
            print(f"Deleted collection: {self.collection_name}")

    def _hash(self, insertable: Insertable) -> str:
        stringifoed_obj = f"{insertable.text}-{insertable.metadata}"
        return hashlib.md5(stringifoed_obj.encode('utf-8')).hexdigest()
    
    def inserts(self, objects: List[Dict]):
        # Validate inputs to check 'text' and 'metadata' keys
        insertables = [Insertable(**obj) for obj in objects]
        texts = [obj.text for obj in insertables]
        for insertable, embedding in zip(insertables, self.embedder.embeds(texts)):
            _id = self._hash(insertable)
            # Current timestamp
            ISO_8601_UTC_REGEX = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?Z$"

            if insertable.insert_timestamp is None:
                insertable.insert_timestamp = get_time()
            else:
                if not re.match(ISO_8601_UTC_REGEX, insertable.insert_timestamp):
                    raise ValueError(
                        f"insert_timestamp must be in ISO 8601 UTC format (…Z): {insertable.insert_timestamp}"
                    )

            print(f"Inserting ID: {_id}")
            self.client.upsert(
                collection_name=self.collection_name,
                points=[
                    {
                        "id": self._hash(insertable),
                        "vector": embedding,
                        "payload": {
                            **insertable.metadata,
                            "insert_timestamp": insertable.insert_timestamp,
                        }
                    }
                ]
            )

    def insert(self, obj: Dict):
        objs = [obj]
        self.inserts(objs)

    def search(self, query: str, filter_: Optional[models.Filter] = None, top_k: int = 5):
        query_embedding = self.embedder.embed(query)
        results = self.client.query_points(
            collection_name=self.collection_name,
            query=query_embedding,
            query_filter=filter_,
            limit=top_k,
            with_payload=True
        )
        return results

    def search_batch(self, queries: List[str], filters: List[Optional[models.Filter]] = None, top_k: int = 5):
        assert filters is None or len(filters) == len(queries), "Filters length must match queries length"
            
        search_queries = [
            models.QueryRequest(
                query=self.embedder.embed(q),
                filter=f if filters else None,
                limit=top_k,
                with_payload=True
            ) for q, f in zip(queries, filters or [None]*len(queries))
        ]

        res = self.client.query_batch_points(
            collection_name=self.collection_name,
            requests=search_queries
        )

        return res
    
    def scroll(self, filters: Optional[models.Filter] = None, batch_size: int = 10):
        scroll_result = self.client.scroll(
            collection_name=self.collection_name,
            scroll_filter=filters,
            limit=batch_size,
            with_payload=True
        )
        return scroll_result
    
    def delete_conditional(self, filters: Optional[models.Filter] = None):
        res = self.client.delete_points(
            collection_name=self.collection_name,
            points_selector=models.PointsSelector(filter=filters)
        ) 
        return res
    
    def update_payload_conditional(self, new_payload: Dict[str, Any], filters: Optional[models.Filter] = None):
        self.client.set_payload(
            collection_name=self.collection_name,
            payload=new_payload,
            points=models.PointsSelector(filter=filters)
        )

In [198]:
lsh_vector_store = VectorStore(
    collection_name="test_collection",
    embedder=LshEmbedder(
        shingle_size=5,
        num_hashes=128,
        bands=32,
        seed=123
    )
)

Collection test_collection already exists.


In [192]:
# lsh_vector_store.delete_collection_if_exists()

In [199]:
sample_points = [
    {
        "text": "ERROR 2026-01-13T12:00:01Z user_id=123e4567-e89b-12d3-a456-426614174000 request took 153ms",
        "metadata": {"pod_name": "pod-1", "timestamp": "2026-01-13T12:00:01Z", "severity": "ERROR", "textPayload": "Cannot assign requested address XYZ"},
        "insert_timestamp": "2026-01-13T12:05:00Z"
    },
    {
        "text": "ERROR 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms",
        "metadata": {"pod_name": "pod-2", "timestamp": "2026-01-13T12:00:09Z", "severity": "ERROR", "textPayload": "Cannot assign requested address ABC"}
    },
    {
        "text": "INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user",
        "metadata": {"pod_name": "pod-2", "timestamp": "2026-01-14T12:00:09Z", "severity": "INFO", "textPayload": "User login successful"}
    },
    {
        "text": "ERROR 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms",
        "metadata": {"pod_name": "pod-2", "timestamp": "2026-01-14T12:00:09Z", "severity": "WARNING", "textPayload": "Cannot assign requested address DED"}
    }
]

In [200]:
lsh_vector_store.inserts(sample_points)

Inserting ID: 00b6968f3ce972617626f646d7288bd4
Inserting ID: d8c48942a4da398e595f15c9aefb735c
Inserting ID: 662691f5f1213832fad725003284f1a5
Inserting ID: cd634c54630985964c5403ae81abec5f


In [195]:
str(datetime.datetime.now(datetime.UTC)).replace("+00:00", "Z")

'2026-01-18 08:55:59.906812Z'

In [168]:
specs = [
    {"key": "pod_name", "dtype": "string", "op": "equals", "value": "pod-1"},
    # {"key": "timestamp", "dtype": "datetime", "op": "between","value": "2026-01-13T00:00:00Z, 2026-01-13T20:00:00Z"},
    # {"key": "severity", "dtype": "string", "op": "in", "value": ["ERROR", "CRITICAL"]},
    # {"key": "textPayload", "dtype": "text", "op": "contains", "value": "ABC"},
]
filter_ = adapter_specs_to_filters(specs, mode="and")

In [169]:
points = lsh_vector_store.search(
    query="ERROR 2026-01-16T12:00:05Z user_id=323e4567-e89b-12d3-a456-4266141731314888 request took 150ms",
    filter_=filter_,
    top_k=2
)

In [124]:
points.points

[ScoredPoint(id='00b6968f-3ce9-7261-7626-f646d7288bd4', version=1, score=0.6531415, payload={'pod_name': 'pod-1', 'timestamp': '2026-01-13T12:00:01Z', 'severity': 'ERROR', 'textPayload': 'Cannot assign requested address XYZ', 'insert_timestamp': '2026-01-13T12:05:00Z'}, vector=None, shard_key=None, order_value=None)]

In [125]:
lsh_vector_store.search_batch(
    ["ERROR 2026-01-16T12:00:05Z user_id=323e4567-e89b-12d3-a456-4266141731314888 request took 150ms",
     "INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user"],
    top_k=1
)

[QueryResponse(points=[ScoredPoint(id='d8c48942-a4da-398e-595f-15c9aefb735c', version=2, score=0.7240081, payload={'pod_name': 'pod-2', 'timestamp': '2026-01-13T12:00:09Z', 'severity': 'ERROR', 'textPayload': 'Cannot assign requested address ABC', 'insert_timestamp': '2026-01-18T06:11:28.746597+00:00Z'}, vector=None, shard_key=None, order_value=None)]),
 QueryResponse(points=[ScoredPoint(id='662691f5-f121-3832-fad7-25003284f1a5', version=3, score=1.0, payload={'pod_name': 'pod-2', 'timestamp': '2026-01-14T12:00:09Z', 'severity': 'INFO', 'textPayload': 'User login successful', 'insert_timestamp': '2026-01-18T06:11:28.756365+00:00Z'}, vector=None, shard_key=None, order_value=None)])]

# General Database

In [24]:
from abc import ABC, abstractmethod
import psycopg2

In [25]:
class BaseConnector(ABC):
    @abstractmethod
    def connect(self, username: str, password: str, database: str, host: str, port: int):
        pass    
    @abstractmethod
    def execute_and_return_result(self, query: str) -> Any:
        pass

    def _validate_dbargs(self, dbargs):
        required_keys = ['username', 'password', 'database', 'host', 'port']
        for key in required_keys:
            if key not in dbargs:
                raise ValueError(f"Missing required database argument: {key}")
    
    @abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        pass

In [26]:
class PostgressConnector(BaseConnector):
    def __init__(self, dbargs: Dict[str, Any]):
        self._validate_dbargs(dbargs)
        self.dbargs = dbargs
        self.connection = None
        try:
            self.connect(
                username=dbargs['username'],
                password=dbargs['password'],
                database=dbargs['database'],
                host=dbargs['host'],
                port=dbargs['port']
            )
        except Exception as e:
            print(f"Failed to connect to the database: {e}")
            self.connection = None

    def connect(self, username: str, password: str, database: str, host: str, port: int):
        self.connection = psycopg2.connect(
            dbname=database,
            user=username,
            password=password,
            host=host,
            port=port
        )
    
    def execute_and_return_result(self, query: str, params=None):
        try:
            with self.connection.cursor() as cursor:
                if params:
                    cursor.execute(query, params)
                else:
                    cursor.execute(query)

                if cursor.description:
                    result = cursor.fetchall()
                else:
                    result = []
                
                self.connection.commit()
                return result
        except Exception as e:
            print(f"Error executing query: {e}")
            try:
                self.connection.rollback()
            except Exception as rollback_error:
                print(f"Error during rollback: {rollback_error}")
            raise e
        
    def __exit__(self, exc_type, exc_value, traceback):
        if self.connection:
            self.connection.close()
            self.connection = None

In [27]:
def get_database_connector(db_type: str, username: str, password: str, database: str, host: str, port: int) -> BaseConnector:
    dbargs = {
        'username': username,
        'password': password,
        'database': database,
        'host': host,
        'port': port
    }
    if db_type.lower() == 'postgresql':
        return PostgressConnector(dbargs)
    else:
        raise ValueError(f"Unsupported database type: {db_type}")

In [28]:
SAMPLE_PG_CONNECTOR = get_database_connector(
    db_type="postgresql",
    username="admin",
    password="admin",
    database="mydatabase",
    host="localhost",
    port=5432
)

In [29]:
SAMPLE_PG_CONNECTOR.execute_and_return_result("SELECT version();")

[('PostgreSQL 18.1 (Debian 18.1-1.pgdg13+2) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 14.2.0-19) 14.2.0, 64-bit',)]

# Log Ingestion Hybrid DB Handler

In [None]:
class LogIngestionHtbridHandler:
    def __init__(self, 
                 qdrant_lsh_collection_name: str,
                 qdrant_lsh_shingle_size: int,
                 qdrant_lsh_num_hashes: int,
                 qdrant_lsh_bands: int,
                 qdrant_lsh_seed: int,
                 pg_lookupdb_username: str,
                 pg_lookupdb_password: str,
                 pg_lookupdb_database: str,
                 pg_lookupdb_host: str,
                 pg_lookupdb_port: int,
                 pg_lookupdb_table_name: str,
                 insert_sim_threshold: float,
                 sim_sync_batch_size: int
                 ):
        
        self.vector_store = VectorStore(
            collection_name=qdrant_lsh_collection_name,
            embedder=LshEmbedder(
                shingle_size=qdrant_lsh_shingle_size,
                num_hashes=qdrant_lsh_num_hashes,
                bands=qdrant_lsh_bands,
                seed=qdrant_lsh_seed
            )
        )

        self.db_connector = get_database_connector(
            db_type="postgresql",
            username=pg_lookupdb_username,
            password=pg_lookupdb_password,
            database=pg_lookupdb_database,
            host=pg_lookupdb_host,
            port=pg_lookupdb_port
        )
        
        self.pg_lookupdb_table_name = pg_lookupdb_table_name
        self.insert_sim_threshold = insert_sim_threshold
        self.sim_sync_batch_size = sim_sync_batch_size

        self._warmup_lookup_db()

    
    def _warmup_lookup_db(self):
        # Simple query to test connection
        result = self.db_connector.execute_and_return_result("SELECT 1;")
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.pg_lookupdb_table_name} (
            id TEXT PRIMARY KEY,
            closest_log_id TEXT NOT NULL,
            similarity FLOAT NOT NULL,
            timestamp TIMESTAMP NOT NULL,
            location TEXT
        );
        """
        res = self.db_connector.execute_and_return_result(create_table_query)
        return res

    def _insert_into_lookup_db(self, 
                               log_id: str, 
                               closest_log_id: str, 
                               similarity: float, 
                               timestamp: str, 
                               location: str):
        insert_query = f"""
        INSERT INTO {self.pg_lookupdb_table_name} (id, closest_log_id, similarity, timestamp, location)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING;
        """
        params = (log_id, closest_log_id, similarity, timestamp, location)
        self.db_connector.execute_and_return_result(insert_query, params)
                 

    def insert_logs(self, log_entries: List[Dict]):
        # Check with vector store for near-duplicates
        texts = [entry['text'] for entry in log_entries]
        search_results = self.vector_store.search_batch(
            queries=texts,
            top_k=1
        )

        if search_results is None or len(search_results) == 0:
            search_results = [None]*len(log_entries)
            
        vector_store_inserts = []
        for entry, result in zip(log_entries, search_results):
            entry['insert_timestamp'] = get_time()
            entry["metadata"]["sim_sync"] = False
            hashval = self.vector_store._hash(Insertable(text=entry['text'], metadata=entry.get('metadata', {})))
            if result.points:
                top_point = result.points[0]
                sim = top_point.score

                # sim = self.vector_store.embedder.compare(
                #     a=self.vector_store.embedder.embed(entry['text']),
                #     b=top_point.vector
                # )
                print(f"{entry['text'][:30]}... <=> Top match ID: {top_point.id}/{top_point.payload.get('text','')[:30]}... with similarity: {sim}")
                
                if sim >= self.insert_sim_threshold:
                    print(f"Skipping insert for log (ID: {hashval}) due to high similarity ({sim}) with existing log (ID: {top_point.id})")
                    # Only insert into lookup DB
                    self._insert_into_lookup_db(
                        log_id=hashval,
                        closest_log_id=str(top_point.id),
                        similarity=sim,
                        timestamp=entry.get('metadata', {}).get('timestamp', ''),
                        location=entry.get('metadata', {}).get('pod_name', '')
                    )
                else:
                    print(f"Inserting new log (ID: {hashval}) into vector store and lookup DB")
                    # Insert into vector store and lookup DB ( Considered new log)
                    self._insert_into_lookup_db(
                        log_id=hashval,
                        closest_log_id=hashval,
                        similarity=1.0,
                        timestamp=entry.get('metadata', {}).get('timestamp', ''),
                        location=entry.get('metadata', {}).get('pod_name', '')
                    )
                    vector_store_inserts.append(entry)

            else:
                print(f"No existing points found. Inserting new log (ID: {hashval}) into vector store and lookup DB")
                self._insert_into_lookup_db(
                    log_id=hashval,
                    closest_log_id=hashval,
                    similarity=1.0,
                    timestamp=entry.get('metadata', {}).get('timestamp', ''),
                    location=entry.get('metadata', {}).get('pod_name', '')
                )
                vector_store_inserts.append(entry)

        if vector_store_inserts:
            self.vector_store.inserts(vector_store_inserts)

    def search(self, query: str, filter_: Optional[List[Dict]] = None, top_k: int = 5):
        qdrant_filter = adapter_specs_to_filters(filter_) if filter_ else None
        results = self.vector_store.search(
            query=query,
            filter_=qdrant_filter,
            top_k=top_k
        )
        return results
    
    def scroll(self, filter_: Optional[List[Dict]] = None, batch_size: int = 10):
        qdrant_filter = adapter_specs_to_filters(filter_) if filter_ else None
        results = self.vector_store.scroll(
            filters=qdrant_filter,
            batch_size=batch_size
        )
        return results
    
    def find_near_occurrences(self, log_id):
        # Fetch log entry from lookup DB
        fetch_query = f"""
        SELECT closest_log_id, similarity FROM {self.pg_lookupdb_table_name}
        WHERE id = %s;
        """
        params = (log_id,)
        result = self.db_connector.execute_and_return_result(fetch_query, params)
        return result

    def clear(self):
        self.vector_store.delete_collection_if_exists()
        delete_query = f"DELETE FROM {self.pg_lookupdb_table_name};"
        self.db_connector.execute_and_return_result(delete_query)

    def sync_similarty(self):
        # Fetch all entries from vector store where sim_sync is False
        filter = adapter_specs_to_filters([
            {"key": "sim_sync", "dtype": "boolean", "op": "equals", "value": False}
        ], mode="and")

        unsync_points = self.vector_store.scroll(
            filters=filter,
            batch_size=self.sim_sync_batch_size
        )[0]

        edges_to_update = []

        texts = [p.payload.get("text", "") for p in unsync_points]
        search_results = self.vector_store.search_batch(
            queries=texts,
            filters=[filter]*len(texts),
            top_k=1
        )

        return unsync_points, search_results, texts
        print(f"Processing batch of {len(unsync_points)} vs {len(search_results)}")

        for point, result in zip(unsync_points, search_results):
            if result.points and len(result.points) > 0:
                top_point = result.points[0]
                sim = top_point.score
                print(f"Log ID: {point.id} <=> Top match ID: {top_point.id} with similarity: {sim}")
                edges_to_update.append([point.id, str(top_point.id), sim])

        return edges_to_update

In [376]:
log_ingestion_handler = LogIngestionHtbridHandler(
    qdrant_lsh_collection_name="log_lsh_collection",
    qdrant_lsh_shingle_size=3,
    qdrant_lsh_num_hashes=256,
    qdrant_lsh_bands=32,
    qdrant_lsh_seed=123,
    pg_lookupdb_username="admin",
    pg_lookupdb_password="admin",
    pg_lookupdb_database="mydatabase",
    pg_lookupdb_host="localhost",
    pg_lookupdb_port=5432,
    pg_lookupdb_table_name="log_lookup_table",
    insert_sim_threshold=0.8,
    sim_sync_batch_size=50
)

Collection log_lsh_collection already exists.


In [377]:
# log_ingestion_handler.clear()

In [378]:
sample_log_entries = [
    {
        "text": "ERROR 2026-01-15T12:00:01Z user_id=323e4567-e89b-12d3-a456-4266141731314888 request took 150ms",
        "metadata": {"pod_name": "pod-3", "timestamp": "2026-01-15T12:00:01Z", "severity": "ERROR", "textPayload": "ERROR 2026-01-15T12:00:01Z user_id=323e4567-e89b-12d3-a456-4266141731314888 request took 150ms"}
    },
    {
        "text": "INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user",
        "metadata": {"pod_name": "pod-2", "timestamp": "2026-01-14T12:00:09Z", "severity": "INFO", "textPayload": "INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user"}
    },
    {
        "text": "ERROR 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms",
        "metadata": {"pod_name": "pod-2", "timestamp": "2026-01-14T12:00:09Z", "severity": "WARNING", "textPayload": "ERROR 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms"}
    },
    {
        "text": "ERROR 2026-01-13T12:00:01Z user_id=123e4567-e89b-12d3-a456-426614174000 request took 153ms",
        "metadata": {"pod_name": "pod-1", "timestamp": "2026-01-13T12:00:01Z", "severity": "ERROR", "textPayload": "ERROR 2026-01-13T12:00:01Z user_id=123e4567-e89b-12d3-a456-426614174000 request took 153ms"}
    },
    {
        "text": "ERROR 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms",
        "metadata": {"pod_name": "pod-2", "timestamp": "2026-01-13T12:00:09Z", "severity": "ERROR", "textPayload": "ERROR 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms"}
    }
]

In [379]:
log_ingestion_handler.insert_logs(sample_log_entries[:])

UnexpectedResponse: Unexpected Response: 400 (Bad Request)
Raw response content:
b'{"status":{"error":"Wrong input: Vector dimension error: expected dim: 128, got 256"},"time":0.000965167}'

In [375]:
log_ingestion_handler.search(
    query="ERROR 2026-01-16T12:00:05Z user_id=323e4567-e89b-12d3-a456-4266141731314888 request took 150ms",
    # filter_=[
    #     {"key": "pod_name", "dtype": "string", "op": "equals", "value": "pod-3"},
    #     {"key": "severity", "dtype": "string", "op": "equals", "value": "ERROR"}
    # ],
    top_k=2
).points

UnexpectedResponse: Unexpected Response: 400 (Bad Request)
Raw response content:
b'{"status":{"error":"Wrong input: Vector dimension error: expected dim: 128, got 256"},"time":0.002704625}'

In [366]:
# log_ingestion_handler.db_connector.execute_and_return_result("SELECT * FROM log_lookup_table;")

In [370]:
v1 = log_ingestion_handler.vector_store.embedder.embed("ERROR 2026-01-15T12:00:01Z user_id=323e4567-e89b-12d3-a456-4266141731314888 request took 150ms")
v2 = log_ingestion_handler.vector_store.embedder.embed("ERROR 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms")
cmp = log_ingestion_handler.vector_store.embedder.compare(v1, v2)
cmp

0.375

In [357]:
x,y = log_ingestion_handler.sync_similarty()

In [358]:
x

 Record(id='093a7511-84a4-8557-7e3e-2ac03af40441', payload={'pod_name': 'pod-1', 'timestamp': '2026-01-13T12:00:01Z', 'severity': 'ERROR', 'textPayload': 'ERROR 2026-01-13T12:00:01Z user_id=123e4567-e89b-12d3-a456-426614174000 request took 153ms', 'sim_sync': False, 'insert_timestamp': '2026-01-18T09:16:09.283414Z'}, vector=None, shard_key=None, order_value=None),
 Record(id='6ac3abf6-b388-0078-4054-c39562712580', payload={'pod_name': 'pod-2', 'timestamp': '2026-01-14T12:00:09Z', 'severity': 'INFO', 'textPayload': 'INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user', 'sim_sync': False, 'insert_timestamp': '2026-01-18T09:16:09.278478Z'}, vector=None, shard_key=None, order_value=None),
 Record(id='e0d11f33-6814-67d3-6264-6ea31b515ef7', payload={'pod_name': 'pod-2', 'timestamp': '2026-01-13T12:00:09Z', 'severity': 'ERROR', 'textPayload': 'ERROR 2026-01-13T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 request took 149ms', 'sim_sync': False, 'insert_

In [359]:
y

[QueryResponse(points=[ScoredPoint(id='6ac3abf6-b388-0078-4054-c39562712580', version=2, score=0.7341126, payload={'pod_name': 'pod-2', 'timestamp': '2026-01-14T12:00:09Z', 'severity': 'INFO', 'textPayload': 'INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user', 'sim_sync': False, 'insert_timestamp': '2026-01-18T09:16:09.278478Z'}, vector=None, shard_key=None, order_value=None)]),
 QueryResponse(points=[ScoredPoint(id='6ac3abf6-b388-0078-4054-c39562712580', version=2, score=0.7341126, payload={'pod_name': 'pod-2', 'timestamp': '2026-01-14T12:00:09Z', 'severity': 'INFO', 'textPayload': 'INFO 2026-01-14T12:00:09Z user_id=223e4567-e89b-12d3-a456-426614174999 passed user', 'sim_sync': False, 'insert_timestamp': '2026-01-18T09:16:09.278478Z'}, vector=None, shard_key=None, order_value=None)]),
 QueryResponse(points=[ScoredPoint(id='6ac3abf6-b388-0078-4054-c39562712580', version=2, score=0.7341126, payload={'pod_name': 'pod-2', 'timestamp': '2026-01-14T12:00:09Z