In [None]:
"""
Basic connection example.
"""

from base64 import decode
import redis

from dotenv import load_dotenv

load_dotenv()

r = redis.Redis(
    host=os.getenv("REDIS_HOST"),
    port=int(os.getenv("REDIS_PORT", "6379")),
    username=os.getenv("REDIS_USERNAME") or None,
    password=os.getenv("REDIS_PASSWORD"),
    decode_responses=True
)

success = r.set('foo', 'bar')
# True

result = r.get('foo')
print(result)
# >>> bar

bar


In [27]:
# Key-free web search + fetch + extract (googlesearch-python + httpx + trafilatura)
# Adds: language detection, canonical URL normalization, de-duplication, and gated-page filtering.
# Overfetch set to k*5; gated length threshold = 1200 chars.
# pip install googlesearch-python httpx trafilatura lxml langdetect

import time, re
from typing import List, Dict, Any
import httpx
from googlesearch import search
import trafilatura
from lxml import html as lxml_html
from langdetect import detect_langs
from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode

UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36"

# ---- helpers ----
def _domain(u: str) -> str:
    return re.sub(r"^https?://(www\.)?([^/:]+).*$", r"\2", u)

def _strip_tags(html_str: str) -> str:
    try:
        root = lxml_html.fromstring(html_str)
        return "\n".join(s.strip() for s in root.xpath("//text()") if s.strip())
    except Exception:
        return re.sub(r"<[^>]+>", " ", html_str)

def _html_title(html_str: str) -> str:
    try:
        root = lxml_html.fromstring(html_str)
        t = (root.xpath('string(//title)') or '').strip()
        if not t:
            t = (root.xpath('string(//h1[1])') or '').strip()
        if not t:
            metas = root.xpath('//meta[@property="og:title"]/@content | //meta[@name="twitter:title"]/@content')
            t = (metas[0] if metas else '').strip()
        return re.sub(r'\s+', ' ', t)
    except Exception:
        return ""

def _canonical_from_html(html_str: str) -> str:
    try:
        root = lxml_html.fromstring(html_str)
        hrefs = root.xpath('//link[translate(@rel,"CANONICAL","canonical")="canonical"]/@href')
        return hrefs[0].strip() if hrefs else ""
    except Exception:
        return ""

_TRACKING_PARAMS = {
    "utm_source","utm_medium","utm_campaign","utm_term","utm_content",
    "utm_id","gclid","fbclid","mc_cid","mc_eid","igshid","ref","mkt_tok"
}

def _normalize_url(u: str, html_str: str = "") -> str:
    canon = _canonical_from_html(html_str)
    if canon and canon.startswith("http"):
        u = canon
    parts = urlsplit(u)
    scheme = parts.scheme.lower() if parts.scheme else "http"
    netloc = parts.netloc.lower()
    if netloc.endswith(":80"):  netloc = netloc[:-3]
    if netloc.endswith(":443"): netloc = netloc[:-4]
    path = re.sub(r"/{2,}", "/", parts.path or "/")
    qpairs = [(k, v) for (k, v) in parse_qsl(parts.query, keep_blank_values=True) if k not in _TRACKING_PARAMS]
    query = urlencode(qpairs, doseq=True)
    fragment = ""
    if path != "/" and path.endswith("/"):
        path = path[:-1]
    return urlunsplit((scheme, netloc, path, query, fragment))

def _detect_lang(text: str) -> str:
    try:
        langs = detect_langs(text[:4000])
        if not langs: return ""
        best = max(langs, key=lambda l: l.prob)
        return best.lang if best.prob >= 0.6 else ""
    except Exception:
        return ""

def _looks_gated(text: str, html: str) -> bool:
    blob = f"{text}\n{html}".lower()
    bad_markers = [
        "access denied", "requires authorization", "sign in", "sign-in", "log in", "login",
        "subscribe to read", "subscription required", "paywall", "403 forbidden",
        "captcha", "enable javascript", "are you a robot", "request blocked", "not authorized"
    ]
    if any(m in blob for m in bad_markers):
        return True
    # "Soft 404": very short or boilerplate-heavy pages
    return len(text.strip()) < 1200  # bumped threshold

def _extract(html: str, url: str) -> Dict[str, str]:
    txt = trafilatura.extract(
        html, url=url,
        include_comments=False,
        include_tables=False,
        favor_recall=True
    )
    if not txt:
        txt = _strip_tags(html) or ""
    title = ""
    for line in (txt.splitlines()[:10] if txt else []):
        if line.lower().startswith("title:"):
            title = line.split(":", 1)[1].strip()
            break
    if not title:
        title = _html_title(html)
    if not title:
        slug = re.sub(r'[?#].*$', '', url).rstrip('/').rsplit('/', 1)[-1]
        title = slug.replace('-', ' ').replace('_', ' ').strip()
    return {"title": title, "text": (txt or "").strip()}

# ---- main search ----
def web_search_google(query: str, k: int = 5, timeout: int = 15,
                      dedupe_by_domain: bool = True) -> List[Dict[str, Any]]:
    # 1) Overfetch Google results (k*5) for better post-filtering
    urls = list(search(query, num_results=max(k*5, k)))
    if not urls:
        return []

    # 2) Fetch pages (cap downloads to k*5 as well)
    pages = []
    with httpx.Client(follow_redirects=True, headers={"User-Agent": UA}) as client:
        for u in urls:
            if len(pages) >= max(k*5, k):
                break
            try:
                r = client.get(u, timeout=timeout)
                r.raise_for_status()
                pages.append((u, r.text))
            except Exception:
                pages.append((u, ""))

    # 3) Extract, normalize, detect language; drop gated/soft-404s
    now = int(time.time())
    items = []
    for u, h in pages:
        ex = _extract(h, u)
        canon = _normalize_url(u, h)
        text = ex["text"]
        if _looks_gated(text, h):
            continue
        lang = _detect_lang(text) if text else ""
        items.append({
            "query": query,
            "title": ex["title"],
            "snippet": (text[:4000]).strip(),
            "domain": _domain(canon),
            "lang": lang,
            "fetched_at": now,
            "metadata": f"source=google;url={canon}",
            "embedding": b"",
            "url": canon,
            "_len": len(text)
        })

    if not items:
        return []

    # 4) De-duplicate by canonical URL first, then optionally by domain
    unique_by_url = {}
    for it in sorted(items, key=lambda x: x["_len"], reverse=True):
        if it["url"] not in unique_by_url:
            unique_by_url[it["url"]] = it
    deduped = list(unique_by_url.values())

    if dedupe_by_domain:
        winner_by_domain = {}
        for it in deduped:
            d = it["domain"]
            if d not in winner_by_domain or it["_len"] > winner_by_domain[d]["_len"]:
                winner_by_domain[d] = it
        deduped = list(winner_by_domain.values())

    # 5) Trim helpers, sort by snippet length, and return top-k
    for it in deduped:
        it.pop("_len", None)
        it.pop("url", None)
    deduped.sort(key=lambda x: len(x["snippet"]), reverse=True)
    return deduped[:k]

# --- sample run in notebook ---
docs = web_search_google("redis vector search tutorial", k=5)
docs[:5]


[{'query': 'redis vector search tutorial',
  'title': 'Redis Vector Store | 🦜️🔗 LangChain',
  'snippet': "Redis Vector Store\nThis notebook covers how to get started with the Redis vector store.\nRedis is a popular open-source, in-memory data structure store that can be used as a database, cache, message broker, and queue. It now includes vector similarity search capabilities, making it suitable for use as a vector store.\nWhat is Redis?\nMost developers are familiar with Redis\n. At its core, Redis\nis a NoSQL Database in the key-value family that can used as a cache, message broker, stream processing and a primary database. Developers choose Redis\nbecause it is fast, has a large ecosystem of client libraries, and has been deployed by major enterprises for years.\nOn top of these traditional use cases, Redis\nprovides additional capabilities like the Search and Query capability that allows users to create secondary index structures within Redis\n. This allows Redis\nto be a Vector Da

In [28]:
print(r.ft("websearch_cache_idx").info())

{'index_name': 'websearch_cache_idx', 'index_options': [], 'index_definition': ['key_type', 'JSON', 'prefixes', ['search_cache:'], 'default_score', '1', 'indexes_all', 'false'], 'attributes': [['identifier', '$.query', 'attribute', 'query', 'type', 'TEXT', 'WEIGHT', '1', 'SORTABLE', 'UNF', 'NOSTEM'], ['identifier', '$.title', 'attribute', 'title', 'type', 'TEXT', 'WEIGHT', '1'], ['identifier', '$.snippet', 'attribute', 'snippet', 'type', 'TEXT', 'WEIGHT', '1'], ['identifier', '$.domain', 'attribute', 'domain', 'type', 'TAG', 'SEPARATOR', ','], ['identifier', '$.lang', 'attribute', 'lang', 'type', 'TAG', 'SEPARATOR', ','], ['identifier', '$.fetched_at', 'attribute', 'fetched_at', 'type', 'NUMERIC', 'SORTABLE', 'UNF'], ['identifier', '$.metadata', 'attribute', 'metadata', 'type', 'TEXT', 'WEIGHT', '1'], ['identifier', '$.embedding', 'attribute', 'embedding', 'type', 'VECTOR', 'algorithm', 'HNSW', 'data_type', 'FLOAT32', 'dim', 1536, 'distance_metric', 'COSINE', 'M', 16, 'ef_construction'

In [8]:
from sentence_transformers import SentenceTransformer

embedder = SentenceTransformer('msmarco-distilbert-base-v4')

  from .autonotebook import tqdm as notebook_tqdm


In [29]:
from redis.commands.search.index_definition import IndexDefinition, IndexType
from redis.commands.search.field import TextField, TagField, NumericField, VectorField

INDEX_NAME = "websearch_cache_idx"

schema = (
    TextField("$.query",   as_name="query",   no_stem=True, sortable=True),
    TextField("$.title",   as_name="title"),
    TextField("$.snippet", as_name="snippet"),
    TagField("$.domain",   as_name="domain"),
    TagField("$.lang",     as_name="lang"),
    NumericField("$.fetched_at", as_name="fetched_at", sortable=True),
    TextField("$.metadata", as_name="metadata"),
    VectorField("$.embedding", "HNSW", {
        "TYPE": "FLOAT32",
        "DIM": 1536,               # set to your embedding dimension
        "DISTANCE_METRIC": "COSINE",
        "M": 16,
        "EF_CONSTRUCTION": 200
    }, as_name="embedding")
)

definition = IndexDefinition(prefix=["search_cache:"], index_type=IndexType.JSON)

r.ft(INDEX_NAME).create_index(schema, definition=definition)


ResponseError: Index already exists

In [35]:
from dotenv import load_dotenv
import os
from openai import AzureOpenAI


# Load environment variables from .env file
load_dotenv()

openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
openai_key = os.getenv("AZURE_OPENAI_KEY")

client_o3 = AzureOpenAI(
    api_version="2024-12-01-preview",
    azure_endpoint=openai_endpoint,  # predefined
    api_key=openai_key              # predefined
)

resp = client_o3.chat.completions.create(
    model="o3-mini",
    messages=[
        {"role": "system", "content": 
        """
					<System prompt>
				"""},
        {"role": "user", "content": "<question, query>"},
    ]
)

print(resp.choices[0].message.content)




Hello there! I noticed you entered "<question, query>" but I'm not sure what you’d like to know. Could you please provide more details or clarify your question so I can better assist you?


In [34]:
# 1) Recreate index with DIM=768  (only if you accept dropping the old one)
from redis.commands.search.index_definition import IndexDefinition, IndexType
from redis.commands.search.field import TextField, TagField, NumericField, VectorField

INDEX_NAME = "websearch_cache_idx"
r.ft(INDEX_NAME).dropindex(delete_documents=False)

schema = (
    TextField("$.query",   as_name="query",   no_stem=True, sortable=True),
    TextField("$.title",   as_name="title"),
    TextField("$.snippet", as_name="snippet"),
    TagField("$.domain",   as_name="domain"),
    TagField("$.lang",     as_name="lang"),
    NumericField("$.fetched_at", as_name="fetched_at", sortable=True),
    TextField("$.metadata", as_name="metadata"),
    VectorField("$.embedding", "HNSW", {
        "TYPE": "FLOAT32",
        "DIM": 768,                # <-- SBERT dimension
        "DISTANCE_METRIC": "COSINE",
        "M": 16,
        "EF_CONSTRUCTION": 200
    }, as_name="embedding")
)
r.ft(INDEX_NAME).create_index(schema, definition=IndexDefinition(prefix=["search_cache:"], index_type=IndexType.JSON))


'OK'

In [42]:
# SBERT-based cache-first RAG helpers (stores embeddings as JSON float arrays; queries with float32 bytes)

from sentence_transformers import SentenceTransformer
import numpy as np, re, time
from redis.commands.search.query import Query

embedder = SentenceTransformer('msmarco-distilbert-base-v4')  # 768-dim

def embed_text(text: str) -> bytes:
    # Return float32 bytes for querying (COSINE-friendly due to normalization)
    v = embedder.encode(text, normalize_embeddings=True)
    return np.asarray(v, dtype=np.float32).tobytes()

def _knn(vec_bytes: bytes, k: int = 10):
    q = (
        Query(f'(*)=>[KNN {k} @embedding $v AS score]')
        .sort_by("score")
        .return_fields("score","title","snippet","domain","lang","fetched_at","metadata")
        .dialect(2)
    )
    return r.ft("websearch_cache_idx").search(q, query_params={"v": vec_bytes})

def _count_hits(vec_bytes: bytes, k: int = 10, max_cosine_dist: float = 0.30) -> int:
    res = _knn(vec_bytes, k=k)
    return sum(1 for d in res.docs if float(d.score) <= max_cosine_dist)

def _top_context(vec_bytes: bytes, k: int = 5, max_cosine_dist: float = 0.30):
    print(f"[DEBUG] Running cache KNN search for k={k}, threshold={max_cosine_dist}")
    res = _knn(vec_bytes, k=max(k, 20))
    filtered = [d for d in res.docs if float(d.score) <= max_cosine_dist][:k]
    if not filtered:
        print("[DEBUG] No docs under threshold; falling back to top-k by distance")
        filtered = res.docs[:k]
    items = []
    for d in filtered:
        m = re.search(r"url=(\S+)", getattr(d, "metadata", "") or "")
        items.append({
            "title": getattr(d, "title", ""),
            "domain": getattr(d, "domain", ""),
            "url": m.group(1) if m else "",
            "snippet": getattr(d, "snippet", "")
        })
    print(f"[DEBUG] Retrieved {len(res.docs)} docs from cache; {len(items)} kept after threshold")
    return items

def save_docs_with_embeddings(docs):
    """
    Persist full schema to RedisJSON. Store embedding as a JSON array of float32s
    (required for JSON-mode vector fields). Query vectors remain float32 bytes.
    """
    print(f"[DEBUG] Saving {len(docs)} docs to cache...")
    pipe = r.pipeline(transaction=False)
    for i, src in enumerate(docs, 1):
        text_for_embedding = f"{src.get('title','')}\n\n{src.get('snippet','')}".strip()
        vec = embedder.encode(text_for_embedding, normalize_embeddings=True).astype(np.float32)
        payload = {
            "query":      src.get("query", ""),
            "title":      src.get("title", ""),
            "snippet":    (src.get("snippet", "")[:4000]).strip(),
            "domain":     src.get("domain", ""),
            "lang":       src.get("lang", ""),
            "fetched_at": int(src.get("fetched_at") or time.time()),
            "metadata":   src.get("metadata", ""),
            "embedding":  vec.tolist(),  # JSON-safe array of floats
        }
        key = f"search_cache:{payload['fetched_at']}:{i}"
        pipe.json().set(key, "$", payload)
    pipe.execute()
    print(f"[DEBUG] Saved {len(docs)} docs to cache")

def answer_with_cache_or_search(question: str, k: int = 5, threshold: float = 0.30):
    # 1) Ask the LLM to craft a strong web search query
    print(f"[DEBUG] Incoming query: {question}")
    search_term = question
    try:
        q_resp = client_o3.chat.completions.create(
            model="o3-mini",
            messages=[
                {"role": "system", "content": (
                    "Rewrite the user's request as one concise, high-recall web search query. "
                    "Prefer concrete nouns and key phrases; remove fluff; no quotes or commentary."
                )},
                {"role": "user", "content": question},
            ],
        )
        cand = (q_resp.choices[0].message.content or "").strip()
        if cand:
            search_term = cand.splitlines()[0]
    except Exception as e:
        print(f"[DEBUG] LLM query rewrite failed: {e} — falling back to original question")
    print(f"[DEBUG] Search term: {search_term}")

    # 2) Embed the search term and check cache by vector similarity
    qvec = embed_text(search_term)
    hit_count = _count_hits(qvec, k=20, max_cosine_dist=threshold)
    print(f"[DEBUG] Cache check: found {hit_count} matching docs (threshold={threshold})")

    if hit_count >= 3:
        print("[DEBUG] Cache hit: retrieving from cache...")
        ctx = _top_context(qvec, k=k, max_cosine_dist=threshold)
    else:
        print("[DEBUG] Cache miss or insufficient hits — running fresh web search...")
        fresh = web_search_google(search_term, k=k)
        print(f"[DEBUG] Web search returned {len(fresh)} docs")
        if fresh:
            save_docs_with_embeddings(fresh)
        ctx = _top_context(qvec, k=k, max_cosine_dist=threshold)

    # Debug: list retrieved docs
    print(f"[DEBUG] Retrieved {len(ctx)} docs for RAG context:")
    for i, doc in enumerate(ctx, start=1):
        print(f"  [{i}] {doc['title']} ({doc['domain']})")

    # 4) Final answer with RAG context
    system_prompt = "You are a precise research assistant. Use ONLY the provided context. Cite domains inline. Be concise."
    ctx_txt = "\n\n".join(f"- {it['title']} ({it['domain']})\n  {it['url']}\n  {it['snippet'][:800]}" for it in ctx)
    user_msg = f"Question: {question}\nSearch term used: {search_term}\n\nContext:\n{ctx_txt}"

    print("[DEBUG] Sending final prompt to LLM...")
    resp = client_o3.chat.completions.create(
        model="o3-mini",
        messages=[{"role": "system", "content": system_prompt},
                  {"role": "user", "content": user_msg}],
    )
    answer = resp.choices[0].message.content
    print("[DEBUG] Received answer from LLM.")
    return answer


In [43]:
# Quick smoke test for the cache-first RAG flow
sample_queries = [
    "redis vector search tutorial",
    "FT.SEARCH KNN example in Redis",
    "Redis JSON + RediSearch schema for vector embeddings"
]

for run in (1, 2):  # run twice to observe cache usage on the second pass
    print(f"\n=== RUN {run} ===")
    for q in sample_queries:
        t0 = time.time()
        answer = answer_with_cache_or_search(q, k=5, threshold=0.30)
        dt = time.time() - t0
        print(f"\nQuery: {q}\nTime: {dt:.2f}s\nAnswer preview:\n{answer[:700]}")



=== RUN 1 ===
[DEBUG] Incoming query: redis vector search tutorial
[DEBUG] Search term: Redis vector search tutorial Redisearch beginner guide
[DEBUG] Cache check: found 0 matching docs (threshold=0.3)
[DEBUG] Cache miss or insufficient hits — running fresh web search...
[DEBUG] Web search returned 3 docs
[DEBUG] Saving 3 docs to cache...
[DEBUG] Saved 3 docs to cache
[DEBUG] Running cache KNN search for k=5, threshold=0.3
[DEBUG] No docs under threshold; falling back to top-k by distance
[DEBUG] Retrieved 3 docs from cache; 3 kept after threshold
[DEBUG] Retrieved 3 docs for RAG context:
  [1] Redis Vector Store | 🦜️🔗 LangChain (python.langchain.com)
  [2] Redis Vector Database: A Comprehensive Guide - Flutters (flutters.in)
  [3] Valkey · Introducing Vector Search To Valkey (valkey.io)
[DEBUG] Sending final prompt to LLM...
[DEBUG] Received answer from LLM.

Query: redis vector search tutorial
Time: 30.85s
Answer preview:
For an introductory Redis vector search tutorial, consider st

In [44]:
# Quick smoke test for the cache-first RAG flow
sample_queries = [
    "redis vector search tutorial",
    "FT.SEARCH KNN example in Redis",
    "Redis JSON + RediSearch schema for vector embeddings"
]

for run in (1, 2):  # run twice to observe cache usage on the second pass
    print(f"\n=== RUN {run} ===")
    for q in sample_queries:
        t0 = time.time()
        answer = answer_with_cache_or_search(q, k=5, threshold=0.55)
        dt = time.time() - t0
        print(f"\nQuery: {q}\nTime: {dt:.2f}s\nAnswer preview:\n{answer[:700]}")



=== RUN 1 ===
[DEBUG] Incoming query: redis vector search tutorial
[DEBUG] Search term: Redis vector search tutorial examples and guides
[DEBUG] Cache check: found 10 matching docs (threshold=0.55)
[DEBUG] Cache hit: retrieving from cache...
[DEBUG] Running cache KNN search for k=5, threshold=0.55
[DEBUG] Retrieved 10 docs from cache; 5 kept after threshold
[DEBUG] Retrieved 5 docs for RAG context:
  [1] Vector search | Docs (redis-docs.ru)
  [2] Redis Vector Database: A Comprehensive Guide - Flutters (flutters.in)
  [3] Redis Vector Database: A Comprehensive Guide - Flutters (flutters.in)
  [4] Redis Vector Database: A Comprehensive Guide - Flutters (flutters.in)
  [5] Redis Vector Store | 🦜️🔗 LangChain (python.langchain.com)
[DEBUG] Sending final prompt to LLM...
[DEBUG] Received answer from LLM.

Query: redis vector search tutorial
Time: 7.45s
Answer preview:
You can find several tutorials and examples on performing vector search with Redis by checking these resources:

• The Redis