# Postgres Methods & Experimentation

In [1]:
import sys
import json
# import redis

# from qdrant_client import QdrantClient
from pathlib import Path
from typing import Optional, Sequence

# Add parent directory to path to import from implementation package
# Notebooks are in implementation/notebooks/, so we go up two levels to project root
sys.path.insert(0, str(Path().resolve().parent.parent))

from implementation.classes.movie import BaseMovie
from db.ingest_movie import ingest_movie
from db.postgres import pool, refresh_title_token_doc_frequency

# # Redis
# r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# # Qdrant
# qdrant = QdrantClient(host="localhost", port=6333)

## SETUP

In [2]:
# Open the pool and establish initial connections
await pool.open()
# Validate that connections actually work (fast-fail if Postgres is unreachable)
await pool.check()

In [None]:
# # Gracefully close all connections on shutdown
# await pool.close()

In [3]:
# LOAD MOVIES

json_path = Path("../../saved_imdb_movies.json")
with open(json_path, "r", encoding="utf-8") as f:
    movies_data = json.load(f)

# Convert each dictionary to an IMDBMovie object
movies = [BaseMovie(**movie_dict) for movie_dict in movies_data]

## INGESTION

In [5]:
from tqdm.asyncio import tqdm

# Run ingest_movie on every movie in parallel; tqdm.gather shows progress without blocking async
await tqdm.gather(*[ingest_movie(movie) for movie in movies], desc="Ingesting movies")

Ingesting movies:   0%|          | 0/50 [00:00<?, ?it/s]

Maturity rating: pg and rank: 2.
Maturity rating: r and rank: 4.
Maturity rating: unrated and rank: 999.
Maturity rating: r and rank: 4.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg and rank: 2.
Maturity rating: unrated and rank: 999.
Maturity rating: r and rank: 4.
Maturity rating: r and rank: 4.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg and rank: 2.
Maturity rating: g and rank: 1.
Maturity rating: r and rank: 4.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg-13 and rank: 3.
Maturity rating: r and rank: 4.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg and rank: 2.
Maturity rating: r and rank: 4.
Maturity rating: r and rank: 4.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg-13 and rank: 3.
Maturity rating: pg and rank: 2.
Maturity rating: r and rank: 4.
Maturity rating: pg and rank: 2.
Maturity rating: r and rank: 4.
Maturity r

Ingesting movies: 100%|██████████| 50/50 [00:03<00:00, 13.87it/s]


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [17]:
await refresh_title_token_doc_frequency()

## SEARCHING

In [6]:
import asyncio
import re
from dataclasses import dataclass, fields
from implementation.misc.helpers import normalize_string

# ─── Operational constants (from lexical search guide Section 11) ─────────────
MAX_DF = 10_000
TITLE_SCORE_BETA = 2.0
TITLE_SCORE_THRESHOLD = 0.15
TITLE_MAX_CANDIDATES = 10_000
# Maximum term_ids returned per query phrase during resolution.
# Phrases matching more character names than this are too vague to be
# useful and would bloat the posting join.
CHARACTER_RESOLVE_LIMIT_PER_PHRASE: int = 500

# Fuzzy tier boundaries (Elasticsearch AUTO standard)
EXACT_ONLY_MAX_LEN = 2    # Tokens this short get exact match only
FUZZY_LEV1_MAX_LEN = 5    # Tokens 3–5 chars get lev ≤ 1
                           # Tokens 6+ chars get lev ≤ 2


@dataclass
class LexicalCandidate:
    """Per-movie scoring components returned by the full lexical search."""
    movie_id: int
    matched_people_count: int = 0
    matched_character_count: int = 0
    matched_studio_count: int = 0
    title_score_sum: float = 0.0
    raw_lexical_score: float = 0.0

@dataclass(frozen=True, slots=True)
class MetadataFilters:
    """
    Metadata hard-filter parameters passed through from the query
    understanding layer.  Any field left as None is inactive.

    This dataclass is shared across all posting-search functions so
    each can build its own MATERIALIZED eligible CTE without receiving
    a pre-resolved ID array.
    """

    min_release_ts: Optional[int] = None
    max_release_ts: Optional[int] = None
    min_runtime: Optional[int] = None
    max_runtime: Optional[int] = None
    max_maturity_rank: Optional[int] = None
    genre_ids: Optional[list[int]] = None
    watch_offer_keys: Optional[list[int]] = None

    @property
    def is_active(self) -> bool:
        """True when at least one filter is set."""
        return any(getattr(self, f.name) is not None for f in fields(self))

# ═══════════════════════════════════════════════════════════════════════════════
#  0. Misc Helpers
# ═══════════════════════════════════════════════════════════════════════════════

# Pre-compiled regex for escaping LIKE pattern metacharacters.
_LIKE_ESCAPE_RE = re.compile(r"([\\%_])")


def _escape_like(value: str) -> str:
    r"""
    Escape SQL LIKE metacharacters so *value* is treated as a literal
    substring.  Uses ``\`` as the escape character.

    >>> _escape_like("100%_done")
    '100\\%\\_done'
    """
    return _LIKE_ESCAPE_RE.sub(r"\\\1", value)


# ═══════════════════════════════════════════════════════════════════════════════
#  1. Term Resolution — Dictionary Lookups (Lexical Guide §7)
# ═══════════════════════════════════════════════════════════════════════════════


async def resolve_phrase_term_ids(phrases: list[str]) -> dict[str, int]:
    """
    Batch exact lookup of normalized phrases in lex.lexical_dictionary.

    Used for people, character, and studio entity resolution at query time.
    Phrases not present in the dictionary are silently omitted (they produce
    no candidates).

    Args:
        phrases: List of already-normalized phrase strings.

    Returns:
        Mapping of norm_str → string_id for every phrase that exists.
    """
    if not phrases:
        return {}

    query = """
        SELECT norm_str, string_id
        FROM lex.lexical_dictionary
        WHERE norm_str = ANY(%s::text[])
    """
    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, (phrases,))
            rows = await cur.fetchall()

    return {row[0]: row[1] for row in rows}


async def resolve_title_token_ids(token: str) -> list[int]:
    """
    Resolve a single normalized title token to candidate string_ids using
    a tiered fuzzy strategy based on token length (Elasticsearch AUTO model).

    Tiers:
      - len ≤ 2:  Exact match only. Short tokens are too ambiguous for
                   fuzzy matching (lev=1 on a 2-char token changes 50%).
      - len 3-5:  Trigram shortlist + Levenshtein ≤ 1. Catches single-key
                   typos while keeping results tight.
      - len ≥ 6:  Trigram shortlist + Levenshtein ≤ 2. Longer words warrant
                   more tolerance — lev=2 on a 10-char word is only 20%.

    All tiers enforce MAX_DF filtering via lex.title_token_doc_frequency.
    Exact matches are always prioritised in ordering.

    Args:
        token: A single normalized query token.

    Returns:
        List of candidate string_ids matching the token.
    """
    token_len = len(token)

    if token_len <= EXACT_ONLY_MAX_LEN:
        # Tier 1: exact match only — skip trigram scan entirely
        query = """
            SELECT d.string_id
            FROM lex.title_token_strings d
            JOIN lex.title_token_doc_frequency df
              ON df.term_id = d.string_id
            WHERE d.norm_str = %s
              AND df.doc_frequency <= %s
        """
        params = (token, MAX_DF)

    elif token_len <= FUZZY_LEV1_MAX_LEN:
        # Tier 2 (len 3–5): trigram shortlist + lev ≤ 1
        query = """
            SELECT d.string_id
            FROM lex.title_token_strings d
            JOIN lex.title_token_doc_frequency df
              ON df.term_id = d.string_id
            WHERE
              abs(length(d.norm_str) - length(%s)) <= 1
              AND levenshtein(d.norm_str, %s) <= 1
              AND df.doc_frequency <= %s
            ORDER BY
              (d.norm_str = %s) DESC,
              similarity(d.norm_str, %s) DESC
            LIMIT 20
        """
        params = (token, token, MAX_DF, token, token)

    else:
        # Tier 3 (len ≥ 6): trigram shortlist + lev ≤ 2
        query = """
            SELECT d.string_id
            FROM lex.title_token_strings d
            JOIN lex.title_token_doc_frequency df
              ON df.term_id = d.string_id
            WHERE
              (d.norm_str = %s OR d.norm_str %% %s)
              AND abs(length(d.norm_str) - length(%s)) <= 2
              AND levenshtein(d.norm_str, %s) <= 2
              AND df.doc_frequency <= %s
            ORDER BY
              (d.norm_str = %s) DESC,
              similarity(d.norm_str, %s) DESC
            LIMIT 30
        """
        params = (token, token, token, token, MAX_DF, token, token)

    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, params)
            rows = await cur.fetchall()

    return [row[0] for row in rows]


async def resolve_character_term_ids(
    character_phrases: list[str],
) -> dict[int, list[int]]:
    """
    Resolve normalized character query phrases to term_ids via substring
    matching against lex.character_strings.

    Batches all phrases into a single round-trip.  Each phrase is matched
    with LIKE '%phrase%' accelerated by the trigram GIN index (for
    phrases >= 3 chars; shorter ones fall back to a seqscan of the
    small character_strings table).

    Results are capped at CHARACTER_RESOLVE_LIMIT_PER_PHRASE per phrase.

    Args:
        character_phrases: Normalized character phrase strings.

    Returns:
        Dict of {query_phrase_idx: [term_id, ...]} where the index
        corresponds to the position in the input list.
    """
    if not character_phrases:
        return {}

    query_idxs: list[int] = []
    like_patterns: list[str] = []
    for idx, phrase in enumerate(character_phrases):
        query_idxs.append(idx)
        like_patterns.append(f"%{_escape_like(phrase)}%")

    query = r"""
        SELECT sub.query_idx, sub.string_id
        FROM (
            SELECT
                qc.query_idx,
                cs.string_id,
                ROW_NUMBER() OVER (
                    PARTITION BY qc.query_idx
                    ORDER BY length(cs.norm_str)
                ) AS rn
            FROM unnest(%s::int[], %s::text[]) AS qc(query_idx, like_pattern)
            JOIN lex.character_strings cs
              ON cs.norm_str LIKE qc.like_pattern ESCAPE '\'
        ) sub
        WHERE sub.rn <= %s
    """
    params = [query_idxs, like_patterns, CHARACTER_RESOLVE_LIMIT_PER_PHRASE]

    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, params)
            rows = await cur.fetchall()

    result: dict[int, list[int]] = {}
    for query_idx, string_id in rows:
        result.setdefault(query_idx, []).append(string_id)

    return result


async def _resolve_all_title_tokens(
    include_title_searches: list[list[str]],
) -> list[dict[int, list[int]]]:
    """
    Resolve all title tokens across all title searches concurrently.

    Deduplicates tokens so each unique token is resolved once, then
    distributes results back into per-title-search maps.

    Args:
        include_title_searches: List of token lists (each is one title search).

    Returns:
        List of token_map dicts, one per title search.
    """
    print(include_title_searches)
    # Collect (search_idx, token_idx, token) for every token
    triples: list[tuple[int, int, str]] = []
    for search_idx, title_tokens in enumerate(include_title_searches):
        for token_idx, token in enumerate(title_tokens):
            triples.append((search_idx, token_idx, token))

    if not triples:
        return []

    # Deduplicate tokens while preserving first occurrence for stable ordering
    unique_tokens = list(dict.fromkeys(t[2] for t in triples))

    # Resolve all unique tokens concurrently
    resolved_lists = await asyncio.gather(
        *[resolve_title_token_ids(token) for token in unique_tokens],
    )
    token_to_ids = {
        token: ids
        for token, ids in zip(unique_tokens, resolved_lists)
        if ids
    }

    # Build per-search token maps from triples
    title_token_maps: list[dict[int, list[int]]] = []
    for search_idx, title_tokens in enumerate(include_title_searches):
        token_map: dict[int, list[int]] = {}
        for token_idx, token in enumerate(title_tokens):
            ids = token_to_ids.get(token)
            if ids:
                token_map[token_idx] = ids
        title_token_maps.append(token_map)

    return title_token_maps


# ═══════════════════════════════════════════════════════════════════════════════
#  2. Eligible Set Construction — Metadata Hard Filters (Lexical Guide §8)
# ═══════════════════════════════════════════════════════════════════════════════

def _build_eligible_cte(filters: MetadataFilters) -> tuple[str, list]:
    """
    Build the SQL fragment and parameter list for a MATERIALIZED eligible-set
    CTE against public.movie_card.

    Returns:
        (cte_sql, params) where cte_sql is the full
        ``eligible AS MATERIALIZED (...)`` block ready to prepend into a
        WITH chain, and params is the ordered list of bind values.
    """
    conditions: list[str] = []
    params: list = []

    if filters.min_release_ts is not None:
        conditions.append("release_ts >= %s")
        params.append(filters.min_release_ts)

    if filters.max_release_ts is not None:
        conditions.append("release_ts <= %s")
        params.append(filters.max_release_ts)

    if filters.min_runtime is not None:
        conditions.append("runtime_minutes >= %s")
        params.append(filters.min_runtime)

    if filters.max_runtime is not None:
        conditions.append("runtime_minutes <= %s")
        params.append(filters.max_runtime)

    if filters.max_maturity_rank is not None:
        conditions.append("maturity_rank <= %s")
        params.append(filters.max_maturity_rank)

    if filters.genre_ids is not None:
        conditions.append("genre_ids && %s::int[]")
        params.append(filters.genre_ids)

    if filters.watch_offer_keys is not None:
        conditions.append("watch_offer_keys && %s::int[]")
        params.append(filters.watch_offer_keys)

    where_clause = " AND ".join(conditions) if conditions else "TRUE"

    cte_sql = (
        f"eligible AS MATERIALIZED (\n"
        f"            SELECT movie_id\n"
        f"            FROM public.movie_card\n"
        f"            WHERE {where_clause}\n"
        f"        )"
    )
    return cte_sql, params


# ═══════════════════════════════════════════════════════════════════════════════
#  3. Posting-Table Searches — Phrase Buckets (Lexical Guide §9.2)
# ═══════════════════════════════════════════════════════════════════════════════

# Allowed posting tables — used as a safeguard against SQL injection since
# the table name is interpolated into the query string.
_VALID_POSTING_TABLES: frozenset[str] = frozenset({
    "lex.inv_person_postings",
    "lex.inv_studio_postings",
})

async def _search_phrase_postings(
    table: str,
    term_ids: list[int],
    filters: Optional[MetadataFilters] = None,
) -> dict[int, int]:
    """
    Count distinct matched term_ids per movie from a phrase posting table.

    Builds a MATERIALIZED eligible CTE when metadata filters are active,
    otherwise queries the posting table directly.

    Args:
        table:    Fully-qualified posting table name (must be in
                  _VALID_POSTING_TABLES).
        term_ids: Resolved string_ids for INCLUDE phrases in this bucket.
        filters:  Optional metadata hard-filters.

    Returns:
        Dict of {movie_id: matched_count}.
    """
    if not term_ids:
        return {}

    if table not in _VALID_POSTING_TABLES:
        raise ValueError(f"Invalid posting table: {table!r}")

    use_eligible = filters is not None and filters.is_active

    cte_parts: list[str] = []
    params: list = []

    if use_eligible:
        eligible_cte, eligible_params = _build_eligible_cte(filters)
        cte_parts.append(eligible_cte)
        params.extend(eligible_params)

    params.append(term_ids)

    eligibility_join = (
        "\n            JOIN eligible e ON e.movie_id = p.movie_id"
        if use_eligible
        else ""
    )

    with_clause = f"WITH {cte_parts[0]}\n        " if cte_parts else ""

    query = f"""
        {with_clause}SELECT p.movie_id, COUNT(DISTINCT p.term_id)::int AS matched
        FROM {table} p{eligibility_join}
        WHERE p.term_id = ANY(%s::bigint[])
        GROUP BY p.movie_id
    """

    try:
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, params)
                rows = await cur.fetchall()
    except Exception:
        # TODO - Log here
        raise

    return {row[0]: row[1] for row in rows}


async def search_people_postings(
    people_term_ids: list[int],
    filters: Optional[MetadataFilters] = None,
) -> dict[int, int]:
    """
    Count distinct matched people per movie.

    Args:
        people_term_ids: Resolved string_ids for INCLUDE people phrases.
        filters:         Optional metadata hard-filters.

    Returns:
        Dict of {movie_id: matched_people_count}.
    """
    return await _search_phrase_postings(
        "lex.inv_person_postings", people_term_ids, filters,
    )


async def search_studio_postings(
    studio_term_ids: list[int],
    filters: Optional[MetadataFilters] = None,
) -> dict[int, int]:
    """
    Count distinct matched studios per movie.

    Args:
        studio_term_ids: Resolved string_ids for INCLUDE studio phrases.
        filters:         Optional metadata hard-filters.

    Returns:
        Dict of {movie_id: matched_studio_count}.
    """
    return await _search_phrase_postings(
        "lex.inv_studio_postings", studio_term_ids, filters,
    )


async def search_character_postings(
    character_phrases: list[str],
    filters: Optional[MetadataFilters] = None,
) -> dict[int, int]:
    """
    Count distinct matched character query phrases per movie.

    Calls resolve_character_term_ids to get substring-expanded term_ids,
    then runs the standard posting join pattern: unnest (query_idx, term_id)
    pairs, join inv_character_postings, COUNT(DISTINCT query_idx) per movie.

    Args:
        character_phrases: Normalized character phrase strings.
        filters:           Optional metadata hard-filters.

    Returns:
        Dict of {movie_id: matched_character_count}.
    """
    if not character_phrases:
        return {}

    # Phase 1: resolve phrases → term_ids
    phrase_term_map = await resolve_character_term_ids(character_phrases)

    if not phrase_term_map:
        return {}

    # Flatten into parallel arrays for unnest
    query_idxs: list[int] = []
    term_ids: list[int] = []
    for q_idx, tids in phrase_term_map.items():
        for tid in tids:
            query_idxs.append(q_idx)
            term_ids.append(tid)

    if not term_ids:
        return {}

    # Phase 2: posting join
    use_eligible = filters is not None and filters.is_active

    cte_parts: list[str] = []
    params: list = []

    if use_eligible:
        eligible_cte, eligible_params = _build_eligible_cte(filters)
        cte_parts.append(eligible_cte)
        params.extend(eligible_params)

    params.extend([query_idxs, term_ids])
    cte_parts.append(
        """q_chars AS (
            SELECT unnest(%s::int[]) AS query_idx,
                   unnest(%s::bigint[]) AS term_id
        )"""
    )

    eligibility_join = (
        "\n            JOIN eligible e ON e.movie_id = p.movie_id"
        if use_eligible
        else ""
    )
    cte_parts.append(
        f"""character_matches AS (
            SELECT
                p.movie_id,
                COUNT(DISTINCT qc.query_idx)::int AS matched
            FROM q_chars qc
            JOIN lex.inv_character_postings p
              ON p.term_id = qc.term_id{eligibility_join}
            GROUP BY p.movie_id
        )"""
    )

    with_clause = "WITH " + ",\n        ".join(cte_parts)
    query = f"""
        {with_clause}
        SELECT movie_id, matched
        FROM character_matches
    """

    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, params)
            rows = await cur.fetchall()

    return {row[0]: row[1] for row in rows}


# ═══════════════════════════════════════════════════════════════════════════════
#  4. Title Postings Search + F-Score (Lexical Guide §9.3)
# ═══════════════════════════════════════════════════════════════════════════════

async def search_title_postings(
    token_term_id_map: dict[int, list[int]],
    filters: Optional[MetadataFilters] = None,
) -> dict[int, float]:
    """
    Compute title F-scores for one title search by joining token postings.

    Builds a MATERIALIZED eligible CTE when metadata filters are active
    (avoiding large array transfer), counts matched token positions (m)
    per movie, then applies the coverage-weighted F-score:

        title_score = (1+β²)·(coverage·specificity) / (β²·specificity + coverage)

    where coverage = m/k, specificity = m/L, β = TITLE_SCORE_BETA.

    Results are capped at TITLE_MAX_CANDIDATES (sorted by score desc) in
    the SQL query itself for wire-transfer efficiency.

    Args:
        token_term_id_map: Mapping of token_idx → list[term_id].
            Each key is a positional index for a query token; values are
            all candidate string_ids that fuzzy-matched that position.
        filters: Optional metadata hard-filters.  When active, an
            eligible-set CTE is built inline rather than passing IDs.

    Returns:
        Dict of {movie_id: title_score} for qualifying movies, capped
        at TITLE_MAX_CANDIDATES entries.
    """
    if not token_term_id_map:
        return {}

    # ── Flatten the map into parallel arrays for unnest-based CTE ─────────
    token_idxs: list[int] = []
    term_ids: list[int] = []
    for token_idx, tids in token_term_id_map.items():
        for tid in tids:
            token_idxs.append(token_idx)
            term_ids.append(tid)

    if not term_ids:
        return {}

    k = len(token_term_id_map)

    # Precompute F-score coefficients
    beta_sq = TITLE_SCORE_BETA ** 2
    f_coeff = 1.0 + beta_sq

    # ── Dynamically build query pieces ────────────────────────────────────
    use_eligible = filters is not None and filters.is_active

    cte_prefix_parts: list[str] = []
    params: list = []

    # 1) Eligible CTE (only when filters are active)
    if use_eligible:
        eligible_cte, eligible_params = _build_eligible_cte(filters)
        cte_prefix_parts.append(eligible_cte)
        params.extend(eligible_params)

    # 2) q_tokens CTE (always present)
    params.extend([token_idxs, term_ids])
    cte_prefix_parts.append(
        """q_tokens AS (
            SELECT unnest(%s::int[]) AS token_idx,
                   unnest(%s::bigint[]) AS term_id
        )"""
    )

    # 3) token_matches + title_matches CTEs — one match per query token (no fuzzy expansion bias)
    # First get distinct (movie_id, token_idx) so each of the n query tokens contributes
    # at most 1 regardless of how many fuzzy term_ids it expanded to.
    eligibility_join = (
        "\n                JOIN eligible e ON e.movie_id = p.movie_id"
        if use_eligible
        else ""
    )
    cte_prefix_parts.append(
        f"""token_matches AS (
            SELECT DISTINCT p.movie_id, qt.token_idx
            FROM q_tokens qt
            JOIN lex.inv_title_token_postings p
              ON p.term_id = qt.term_id{eligibility_join}
        ),
        title_matches AS (
            SELECT movie_id, COUNT(*)::int AS m
            FROM token_matches
            GROUP BY movie_id
        )"""
    )

    # 4) title_scored CTE — applies F-score formula
    params.extend([f_coeff, k, beta_sq, k])
    cte_prefix_parts.append(
        """title_scored AS (
            SELECT
                tm.movie_id,
                (%s::double precision
                    * ((tm.m::double precision / %s)
                       * (tm.m::double precision / mc.title_token_count)))
                / (%s::double precision
                    * (tm.m::double precision / mc.title_token_count)
                    + (tm.m::double precision / %s))
                AS title_score
            FROM title_matches tm
            JOIN public.movie_card mc ON mc.movie_id = tm.movie_id
            WHERE mc.title_token_count > 0
              AND %s > 0
        )"""
    )
    params.append(k)  # k > 0 guard bound into the WHERE clause

    # 5) Final SELECT with threshold filter + safety cap
    params.extend([TITLE_SCORE_THRESHOLD, TITLE_MAX_CANDIDATES])

    with_clause = "WITH " + ",\n        ".join(cte_prefix_parts)
    query = f"""
        {with_clause}
        SELECT movie_id, title_score
        FROM title_scored
        WHERE title_score >= %s
        ORDER BY title_score DESC
        LIMIT %s
    """

    # ── Execute ───────────────────────────────────────────────────────────
    try:
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, params)
                rows = await cur.fetchall()
    except Exception:
        # TODO - Log here
        raise

    return {row[0]: float(row[1]) for row in rows}


# ═══════════════════════════════════════════════════════════════════════════════
#  5. Bulk Movie Card Fetch — Metadata Enrichment (DB Guide Part 3 Step 3)
# ═══════════════════════════════════════════════════════════════════════════════


async def fetch_movie_cards(movie_ids: list[int]) -> list[dict]:
    """
    Bulk fetch canonical movie metadata from public.movie_card.

    Single query for all candidates — never per-candidate.  Results feed both
    the reranker (metadata preference scoring) and the final API response
    payload (card rendering).

    Args:
        movie_ids: List of movie IDs to fetch metadata for.

    Returns:
        List of dicts with keys: movie_id, title, poster_url,
        release_ts, runtime_minutes, maturity_rank, genre_ids,
        watch_offer_keys, audio_language_ids, reception_score.
    """
    if not movie_ids:
        return []

    query = """
        SELECT movie_id, title, poster_url, release_ts, runtime_minutes,
               maturity_rank, genre_ids, watch_offer_keys, audio_language_ids,
               reception_score
        FROM public.movie_card
        WHERE movie_id = ANY(%s::bigint[])
    """
    columns = [
        "movie_id", "title", "poster_url", "release_ts",
        "runtime_minutes", "maturity_rank", "genre_ids", "watch_offer_keys",
        "audio_language_ids", "reception_score",
    ]

    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, (movie_ids,))
            rows = await cur.fetchall()

    return [dict(zip(columns, row)) for row in rows]


# ═══════════════════════════════════════════════════════════════════════════════
#  6. EXCLUDE Anti-Join Helper (Lexical Guide §9.7)
# ═══════════════════════════════════════════════════════════════════════════════


async def _fetch_excluded_movie_ids(
    posting_table: str,
    exclude_term_ids: list[int],
    candidate_movie_ids: set[int],
) -> set[int]:
    """
    Find candidate movies that match any excluded term in a posting table.

    Returns the subset of candidate_movie_ids that have at least one posting
    for an excluded term, so the caller can remove them from results.

    Args:
        posting_table:      Fully-qualified table name
                            (e.g. "lex.inv_person_postings").
        exclude_term_ids:   Term IDs to check for exclusion.
        candidate_movie_ids: Current candidate set to filter against.

    Returns:
        Set of movie_ids that should be excluded.
    """
    if not exclude_term_ids or not candidate_movie_ids:
        return set()

    # Whitelist to prevent SQL injection via table name interpolation
    allowed_tables = {
        "lex.inv_person_postings",
        "lex.inv_character_postings",
        "lex.inv_studio_postings",
        "lex.inv_title_token_postings",
    }
    if posting_table not in allowed_tables:
        raise ValueError(f"Unknown posting table: {posting_table}")

    query = f"""
        SELECT DISTINCT p.movie_id
        FROM {posting_table} p
        WHERE p.term_id = ANY(%s::bigint[])
          AND p.movie_id = ANY(%s::bigint[])
    """
    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, (exclude_term_ids, list(candidate_movie_ids)))
            rows = await cur.fetchall()

    return {row[0] for row in rows}


# ═══════════════════════════════════════════════════════════════════════════════
#  7. Full Lexical Search Orchestration (Lexical Guide §10)
# ═══════════════════════════════════════════════════════════════════════════════


async def lexical_search(
    include_people: Optional[list[str]] = None,
    include_characters: Optional[list[str]] = None,
    include_studios: Optional[list[str]] = None,
    include_title_searches: Optional[list[list[str]]] = None,
    exclude_people: Optional[list[str]] = None,
    exclude_characters: Optional[list[str]] = None,
    exclude_studios: Optional[list[str]] = None,
    min_release_ts: Optional[int] = None,
    max_release_ts: Optional[int] = None,
    min_runtime: Optional[int] = None,
    max_runtime: Optional[int] = None,
    max_maturity_rank: Optional[int] = None,
    genre_ids: Optional[list[int]] = None,
    watch_offer_keys: Optional[list[int]] = None,
) -> list[LexicalCandidate]:
    """
    Full lexical search combining all buckets with OR semantics.

    Orchestrates the complete flow described in Lexical Guide §10:
      1. Build eligible set (metadata hard filters)
      2. Resolve phrase term IDs (exact dictionary lookup)
      3. Resolve title token IDs (fuzzy per token)
      4. Run all posting searches concurrently
      5. Aggregate title scores across multiple title searches
      6. Apply title candidate safety cap
      7. Union candidates from all buckets (OR semantics)
      8. Apply EXCLUDE anti-joins
      9. Compute raw_lexical_score per candidate

    Normalise server-side: lexical_score = raw / max_possible, where
    max_possible = #people + #characters + #studios + #title_searches.

    All phrase / token inputs should be pre-normalized strings.
    Title searches are lists of token lists (OR-of-phrases, e.g.
    [["kung", "fu"], ["big", "shot"]]).

    Args:
        include_people:         Normalized people phrase strings.
        include_characters:     Normalized character phrase strings.
        include_studios:        Normalized studio phrase strings.
        include_title_searches: List of token lists, each one title search.
        exclude_people:         Normalized people phrases to hard-exclude.
        exclude_characters:     Normalized character phrases to hard-exclude.
        exclude_studios:        Normalized studio phrases to hard-exclude.
        min_release_ts:         Minimum release timestamp filter.
        max_release_ts:         Maximum release timestamp filter.
        min_runtime:            Minimum runtime filter (minutes).
        max_runtime:            Maximum runtime filter (minutes).
        max_maturity_rank:      Maximum maturity rank filter (≤).
        genre_ids:              Genre overlap filter IDs.
        watch_offer_keys:       Watch-offer overlap filter keys.

    Returns:
        List of LexicalCandidate objects sorted by raw_lexical_score desc.
    """
    include_people = include_people or []
    include_characters = include_characters or []
    include_studios = include_studios or []
    include_title_searches = include_title_searches or []
    exclude_people = exclude_people or []
    exclude_characters = exclude_characters or []
    exclude_studios = exclude_studios or []

    # ── Steps 1–3: Run eligible set, phrase resolution, and title token
    #     resolution concurrently (all are independent) ─────────────────────
    all_phrases = list(set(
        include_people + include_characters + include_studios
        + exclude_people + exclude_characters + exclude_studios
    ))
    eligible_set, phrase_id_map, title_token_maps = await asyncio.gather(
        build_eligible_set(
            min_release_ts=min_release_ts,
            max_release_ts=max_release_ts,
            min_runtime=min_runtime,
            max_runtime=max_runtime,
            max_maturity_rank=max_maturity_rank,
            genre_ids=genre_ids,
            watch_offer_keys=watch_offer_keys,
        ),
        resolve_phrase_term_ids(all_phrases),
        _resolve_all_title_tokens(include_title_searches),
    )
    eligible_ids = list(eligible_set.keys()) if eligible_set is not None else None

    people_term_ids = [phrase_id_map[p] for p in include_people if p in phrase_id_map]
    character_term_ids = [phrase_id_map[c] for c in include_characters if c in phrase_id_map]
    studio_term_ids = [phrase_id_map[s] for s in include_studios if s in phrase_id_map]

    exclude_people_ids = [phrase_id_map[p] for p in exclude_people if p in phrase_id_map]
    exclude_character_ids = [phrase_id_map[c] for c in exclude_characters if c in phrase_id_map]
    exclude_studio_ids = [phrase_id_map[s] for s in exclude_studios if s in phrase_id_map]

    # ── Step 4: Run all posting searches concurrently ─────────────────────
    people_future = search_people_postings(people_term_ids, eligible_ids)
    character_future = search_character_postings(character_term_ids, eligible_ids)
    studio_future = search_studio_postings(studio_term_ids, eligible_ids)
    title_futures = [
        search_title_postings(token_map, eligible_ids)
        for token_map in title_token_maps
    ]

    all_results = await asyncio.gather(
        people_future, character_future, studio_future, *title_futures,
    )

    people_scores: dict[int, int] = all_results[0]
    character_scores: dict[int, int] = all_results[1]
    studio_scores: dict[int, int] = all_results[2]

    # ── Step 5: Aggregate title scores (sum across title searches) ────────
    title_score_results: list[dict[int, float]] = list(all_results[3:])
    title_score_sums: dict[int, float] = {}
    for title_result in title_score_results:
        for movie_id, score in title_result.items():
            title_score_sums[movie_id] = title_score_sums.get(movie_id, 0.0) + score

    # ── Step 6: Title candidate safety cap ────────────────────────────────
    if len(title_score_sums) > TITLE_MAX_CANDIDATES:
        sorted_titles = sorted(title_score_sums.items(), key=lambda x: x[1], reverse=True)
        title_score_sums = dict(sorted_titles[:TITLE_MAX_CANDIDATES])

    # ── Step 7: Union all candidates (OR semantics) ───────────────────────
    all_movie_ids: set[int] = set()
    all_movie_ids.update(people_scores.keys())
    all_movie_ids.update(character_scores.keys())
    all_movie_ids.update(studio_scores.keys())
    all_movie_ids.update(title_score_sums.keys())

    # ── Step 8: EXCLUDE anti-joins (run concurrently) ────────────────────────
    exclude_futures = []
    if exclude_people_ids:
        exclude_futures.append(
            _fetch_excluded_movie_ids(
                "lex.inv_person_postings", exclude_people_ids, all_movie_ids,
            )
        )
    if exclude_character_ids:
        exclude_futures.append(
            _fetch_excluded_movie_ids(
                "lex.inv_character_postings", exclude_character_ids, all_movie_ids,
            )
        )
    if exclude_studio_ids:
        exclude_futures.append(
            _fetch_excluded_movie_ids(
                "lex.inv_studio_postings", exclude_studio_ids, all_movie_ids,
            )
        )
    exclude_results = await asyncio.gather(*exclude_futures) if exclude_futures else []
    exclude_movie_ids: set[int] = set()
    for excluded in exclude_results:
        exclude_movie_ids.update(excluded)

    # ── Step 9: Assemble candidates with per-bucket scores ────────────────
    candidates: list[LexicalCandidate] = []
    for movie_id in all_movie_ids:
        if movie_id in exclude_movie_ids:
            continue

        matched_people = people_scores.get(movie_id, 0)
        matched_characters = character_scores.get(movie_id, 0)
        matched_studios = studio_scores.get(movie_id, 0)
        title_sum = title_score_sums.get(movie_id, 0.0)
        raw = matched_people + matched_characters + matched_studios + title_sum

        candidates.append(LexicalCandidate(
            movie_id=movie_id,
            matched_people_count=matched_people,
            matched_character_count=matched_characters,
            matched_studio_count=matched_studios,
            title_score_sum=title_sum,
            raw_lexical_score=raw,
        ))

    candidates.sort(key=lambda c: c.raw_lexical_score, reverse=True)
    return candidates

In [37]:
matches = await resolve_phrase_term_ids(["marvel studios"])

print(matches)

matches = await search_studio_postings(list(matches.values()), None)

movie_cards = await fetch_movie_cards(list(matches.keys()))

for card in movie_cards:
    print(f"{card['title']}")

{'marvel studios': 6262}
captain america: the first avenger
avengers: endgame


In [24]:
filters = MetadataFilters(min_release_ts=1000000000)

matches = await search_character_postings(["spider"], filters)

print(matches)

movie_cards = await fetch_movie_cards(list(matches.keys()))

for card in movie_cards:
    print(f"{card['title']} - {matches[card['movie_id']]}")

{1584: 1, 299534: 1, 569094: 1}
school of rock - 1
spider-man: across the spider-verse - 1
avengers: endgame - 1


In [None]:
# Methods to test

# ═══════════════════════════════════════════════════════════════════════════════
#  4. Title Postings Search + F-Score (Lexical Guide §9.3)
# ═══════════════════════════════════════════════════════════════════════════════

# async def search_title_postings()

# ═══════════════════════════════════════════════════════════════════════════════
#  6. EXCLUDE Anti-Join Helper (Lexical Guide §9.7)
# ═══════════════════════════════════════════════════════════════════════════════

# async def _fetch_excluded_movie_ids()

# ═══════════════════════════════════════════════════════════════════════════════
#  7. Full Lexical Search Orchestration (Lexical Guide §10)
# ═══════════════════════════════════════════════════════════════════════════════

# async def lexical_search()