In [1]:
import os
import re
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Literal

from supabase import create_client
from dotenv import load_dotenv

In [2]:
SortOrder = Literal["asc", "desc"]


def _clean(s: Optional[str]) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())


def _ilike_pattern(q: str) -> str:
    q = _clean(q)
    if not q:
        return "%"
    return f"%{q}%"

In [3]:

@dataclass
class GrablinDB:
    """Thin wrapper around Supabase queries for acquisition/RAG tooling."""
    url: str
    key: str

    def __post_init__(self):
        self.sb = create_client(self.url, self.key)

    # ----------------------------
    # WKO: branches
    # ----------------------------

    def wko_list_branches(self, limit: int = 5000) -> Dict[str, Any]:
        """Return distinct branch labels + urls."""
        res = (
            self.sb.table("wko_branches")
            .select("branche,branch_url,letter,discovered_at")
            .limit(limit)
            .execute()
        )
        rows = res.data or []
        # Deduplicate by (branche, branch_url)
        seen = set()
        out = []
        for r in rows:
            k = (r.get("branche"), r.get("branch_url"))
            if k in seen:
                continue
            seen.add(k)
            out.append(r)
        return {"rows": out, "count": len(out)}

    def wko_match_branch(self, query: str, limit: int = 10) -> Dict[str, Any]:
        """
        Fuzzy-ish branch lookup using ilike on branche.
        If you later add a pg_trgm RPC, swap this to sb.rpc('match_wko_branch', ...).
        """
        q = _clean(query)
        if not q:
            return {"candidates": []}

        res = (
            self.sb.table("wko_branches")
            .select("branche,branch_url,letter")
            .ilike("branche", _ilike_pattern(q))
            .limit(limit)
            .execute()
        )
        return {"candidates": res.data or [], "query": q}

    # ----------------------------
    # WKO: companies
    # ----------------------------

    def wko_companies_by_branch(
        self,
        branche: str,
        limit: int = 25,
        offset: int = 0,
        order_by: str = "crawled_at",
        order: SortOrder = "desc",
        only_with_email: bool = False,
        only_with_website: bool = False,
    ) -> Dict[str, Any]:
        q = (
            self.sb.table("wko_companies")
            .select("name,branche,address,zip_city,street,email,phone,company_website,wko_detail_url,crawled_at,source_list_url")
            .eq("branche", branche)
        )
        if only_with_email:
            q = q.not_.is_("email", "null")
        if only_with_website:
            q = q.not_.is_("company_website", "null")

        q = q.order(order_by, desc=(order == "desc")).range(offset, offset + limit - 1)
        res = q.execute()
        rows = res.data or []
        return {
            "rows": rows,
            "count": len(rows),
            "branche": branche,
            "limit": limit,
            "offset": offset,
            "filters": {"only_with_email": only_with_email, "only_with_website": only_with_website},
        }

    def wko_search_companies(
        self,
        text: str,
        limit: int = 25,
        offset: int = 0,
        branche: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Keyword search using search_text (exists for fuzzy matching).
        """
        t = _clean(text)
        if not t:
            return {"rows": [], "count": 0, "query": ""}

        q = (
            self.sb.table("wko_companies")
            .select("name,branche,address,email,phone,company_website,wko_detail_url,crawled_at")
            .ilike("search_text", _ilike_pattern(t))
        )
        if branche:
            q = q.eq("branche", branche)

        q = q.order("crawled_at", desc=True).range(offset, offset + limit - 1)
        res = q.execute()
        return {"rows": res.data or [], "count": len(res.data or []), "query": t, "branche": branche}

    def wko_sample_companies(
        self,
        branche: Optional[str] = None,
        sample_n: int = 10,
        fetch_pool: int = 200,
    ) -> Dict[str, Any]:
        """
        Cheap sampling: fetch a recent pool and randomly sample.
        (True DB-side random sampling is better via SQL/RPC, but this is easiest.)
        """
        q = (
            self.sb.table("wko_companies")
            .select("name,branche,address,email,phone,company_website,wko_detail_url,crawled_at")
            .order("crawled_at", desc=True)
            .limit(fetch_pool)
        )
        if branche:
            q = q.eq("branche", branche)

        res = q.execute()
        rows = res.data or []
        if not rows:
            return {"rows": [], "count": 0, "branche": branche}

        sample_n = min(sample_n, len(rows))
        sampled = random.sample(rows, sample_n)
        return {"rows": sampled, "count": len(sampled), "branche": branche, "pool": len(rows)}

    def wko_count_by_branch(self, limit: int = 50) -> Dict[str, Any]:
        """
        Returns top branches by frequency (client-side aggregation for MVP).
        For larger scale, implement as SQL/RPC.
        """
        res = self.sb.table("wko_companies").select("branche").limit(5000).execute()
        rows = res.data or []
        counts: Dict[str, int] = {}
        for r in rows:
            b = r.get("branche") or ""
            if not b:
                continue
            counts[b] = counts.get(b, 0) + 1
        top = sorted(counts.items(), key=lambda kv: kv[1], reverse=True)[:limit]
        return {"top": [{"branche": b, "count": c} for b, c in top], "scanned": len(rows)}

    # ----------------------------
    # Projectfacts
    # ----------------------------

    def pf_search(
        self,
        text: str,
        limit: int = 25,
        offset: int = 0,
        city: Optional[str] = None,
        industry: Optional[str] = None,
        size: Optional[str] = None,
    ) -> Dict[str, Any]:
        t = _clean(text)
        if not t and not (city or industry or size):
            return {"rows": [], "count": 0}

        q = self.sb.table("projectfacts").select(
            "name,company_address,city,state,country,industries,size,last_activity_at,last_changed_at"
        )
        if t:
            q = q.ilike("search_text", _ilike_pattern(t))
        if city:
            q = q.ilike("city", _ilike_pattern(city))
        if industry:
            q = q.ilike("industries", _ilike_pattern(industry))
        if size:
            q = q.ilike("size", _ilike_pattern(size))

        q = q.order("last_activity_at", desc=True).range(offset, offset + limit - 1)
        res = q.execute()
        return {"rows": res.data or [], "count": len(res.data or []), "filters": {"city": city, "industry": industry, "size": size}}

    # ----------------------------
    # EVI Bilanz publications
    # ----------------------------

    def evi_search_publications(
        self,
        text: str,
        limit: int = 25,
        offset: int = 0,
        date_from: Optional[str] = None,  # YYYY-MM-DD
        date_to: Optional[str] = None,    # YYYY-MM-DD
        company_name: Optional[str] = None,
        firmenbuchnummer: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Searches EVI publication events.
        Uses trigram-indexed text columns via ilike (MVP).
        """
        t = _clean(text)
        q = self.sb.table("evi_bilanz_publications").select(
            "publication_date,publication_type,company_name,firmenbuchnummer,detail_url,source_search_url,crawled_at"
        )

        if t:
            q = q.ilike("search_text", _ilike_pattern(t))
        if company_name:
            q = q.ilike("company_name", _ilike_pattern(company_name))
        if firmenbuchnummer:
            q = q.eq("firmenbuchnummer", _clean(firmenbuchnummer))
        if date_from:
            q = q.gte("publication_date", date_from)
        if date_to:
            q = q.lte("publication_date", date_to)

        q = q.order("publication_date", desc=True).range(offset, offset + limit - 1)
        res = q.execute()
        return {
            "rows": res.data or [],
            "count": len(res.data or []),
            "filters": {"date_from": date_from, "date_to": date_to, "company_name": company_name, "firmenbuchnummer": firmenbuchnummer},
        }

    def evi_recent_publications(self, days: int = 30, limit: int = 50) -> Dict[str, Any]:
        """
        Convenience wrapper: relies on publication_date; compute date range client-side if you want.
        For MVP, just return latest by date.
        """
        res = (
            self.sb.table("evi_bilanz_publications")
            .select("publication_date,publication_type,company_name,firmenbuchnummer,detail_url")
            .order("publication_date", desc=True)
            .limit(limit)
            .execute()
        )
        return {"rows": res.data or [], "count": len(res.data or [])}


# ----------------------------
# Helper: build from .env
# ----------------------------

def grablin_db_from_env(use_service_role: bool = True) -> GrablinDB:
    load_dotenv()
    url = os.getenv("SUPABASE_URL")
    key = os.getenv("SUPABASE_SERVICE_ROLE_KEY" if use_service_role else "SUPABASE_ANON_KEY")
    if not url or not key:
        raise RuntimeError("Missing SUPABASE_URL or SUPABASE_*_KEY in env")
    return GrablinDB(url=url, key=key)


# ----------------------------
# Tool registry for an agent
# ----------------------------

def grablin_tools(db: GrablinDB) -> Dict[str, Any]:
    """
    Return callables that can be handed to DSPy (or any agent runner).
    """
    return {
        # WKO
        "wko_list_branches": db.wko_list_branches,
        "wko_match_branch": db.wko_match_branch,
        "wko_companies_by_branch": db.wko_companies_by_branch,
        "wko_search_companies": db.wko_search_companies,
        "wko_sample_companies": db.wko_sample_companies,
        "wko_count_by_branch": db.wko_count_by_branch,
        # Projectfacts
        "pf_search": db.pf_search,
        # EVI
        "evi_search_publications": db.evi_search_publications,
        "evi_recent_publications": db.evi_recent_publications,
    }

In [None]:
GRABLIN_TOOL_SPECS = [
  {
    "name": "wko_list_branches",
    "description": "List known WKO branch labels and URLs from wko_branches. Use to discover valid branch names.",
    "args": {"limit": "int (default 5000)"},
    "returns": "dict: {rows: [{branche, branch_url, letter, discovered_at}], count: int}"
  },
  {
    "name": "wko_match_branch",
    "description": "Fuzzy match a user-provided branch phrase to known WKO branches. Use before querying companies by branch.",
    "args": {"query": "str", "limit": "int (default 10)"},
    "returns": "dict: {candidates: [{branche, branch_url, letter}], query: str}"
  },
  {
    "name": "wko_companies_by_branch",
    "description": "Get companies for an exact WKO branche label. Use after choosing a branche from wko_match_branch.",
    "args": {
      "branche": "str (required)",
      "limit": "int (default 25, max 50 recommended)",
      "offset": "int (default 0)",
      "order_by": "str (default crawled_at)",
      "order": "asc|desc (default desc)",
      "only_with_email": "bool (default False)",
      "only_with_website": "bool (default False)"
    },
    "returns": "dict: {rows: [company], count: int, branche: str, limit: int, offset: int, filters: {...}}"
  },
  {
    "name": "wko_search_companies",
    "description": "Keyword search companies by search_text (name/address/branche/etc). Use if the branch is unknown or query is by company name.",
    "args": {"text": "str", "limit": "int (default 25)", "offset": "int (default 0)", "branche": "str|null"},
    "returns": "dict: {rows: [company], count: int, query: str, branche: str|null}"
  },
  {
    "name": "wko_sample_companies",
    "description": "Return a random sample of companies (optionally within a branche) from a recent pool.",
    "args": {"branche": "str|null", "sample_n": "int (default 10)", "fetch_pool": "int (default 200)"},
    "returns": "dict: {rows: [company], count: int, branche: str|null, pool: int}"
  },
  {
    "name": "wko_count_by_branch",
    "description": "Return approximate top branches by number of companies (client-side aggregation over a capped scan). Use for exploration.",
    "args": {"limit": "int (default 50)"},
    "returns": "dict: {top: [{branche, count}], scanned: int}"
  },
  {
    "name": "pf_search",
    "description": "Search projectfacts for companies with metadata (industries/size/activity). Useful for enrichment and acquisition targeting.",
    "args": {"text": "str", "limit": "int (default 25)", "offset": "int (default 0)", "city": "str|null", "industry": "str|null", "size": "str|null"},
    "returns": "dict: {rows: [projectfact], count: int, filters: {...}}"
  },
  {
    "name": "evi_search_publications",
    "description": "Search EVI bilanz publications by text/company/date/firmenbuchnummer. Use for financial/relevance signals.",
    "args": {"text": "str", "limit": "int (default 25)", "offset": "int (default 0)", "date_from": "YYYY-MM-DD|null", "date_to": "YYYY-MM-DD|null",
             "company_name": "str|null", "firmenbuchnummer": "str|null"},
    "returns": "dict: {rows: [publication], count: int, filters: {...}}"
  },
  {
    "name": "evi_recent_publications",
    "description": "Get latest EVI publications ordered by publication_date. Use for 'recent filings' questions.",
    "args": {"days": "int (default 30)", "limit": "int (default 50)"},
    "returns": "dict: {rows: [publication], count: int}"
  }
]


In [None]:
import os
import json
import dspy
from dotenv import load_dotenv

from grablin_tools import grablin_db_from_env, grablin_tools

# -----------------------------
# Signature (with history)
# -----------------------------
class Grablin(dspy.Signature):
    """Grablin is a tool-using acquisition assistant over Supabase company datasets.

    Rules:
    - Use tools for any factual/company-specific claims.
    - Do not invent company details or counts.
    - If branch name is unclear, use wko_match_branch first.
    """

    user_request: str = dspy.InputField(desc="User message / question.")
    history: dspy.History = dspy.InputField(desc="Conversation history.")
    process_result: str = dspy.OutputField(desc="Answer grounded in tool outputs; include small lists and next-step questions.")


# -----------------------------
# Tool wrappers (docstrings + type hints are important for ReAct)
# -----------------------------
def wko_list_branches(limit: int = 5000) -> dict:
    """List known WKO branches (branche + branch_url). Use to discover valid branch names."""
    return _TOOLS["wko_list_branches"](limit=limit)

def wko_match_branch(query: str, limit: int = 10) -> dict:
    """Fuzzy match a branch phrase to known WKO branches. Use before querying companies_by_branch."""
    return _TOOLS["wko_match_branch"](query=query, limit=limit)

def search_companies(text: str, limit: int = 25, offset: int = 0, branche: str | None = None) -> dict:
    """Search WKO companies by text (name/address/branche etc. via search_text). Use when branch is unknown or user gives keywords."""
    return _TOOLS["wko_search_companies"](text=text, limit=limit, offset=offset, branche=branche)

def companies_by_branch(branche: str, limit: int = 25, offset: int = 0, only_with_email: bool = False, only_with_website: bool = False) -> dict:
    """Get companies for an exact WKO branche label. Use after selecting a branche from wko_match_branch."""
    return _TOOLS["wko_companies_by_branch"](
        branche=branche,
        limit=limit,
        offset=offset,
        only_with_email=only_with_email,
        only_with_website=only_with_website,
    )

def sample_companies(branche: str | None = None, sample_n: int = 10, fetch_pool: int = 200) -> dict:
    """Return a random sample of WKO companies (optionally restricted to a branche)."""
    return _TOOLS["wko_sample_companies"](branche=branche, sample_n=sample_n, fetch_pool=fetch_pool)

def pf_search(text: str, limit: int = 25, offset: int = 0, city: str | None = None, industry: str | None = None, size: str | None = None) -> dict:
    """Search Projectfacts (company profiles: industries/size/activity). Useful for enrichment and acquisition targeting."""
    return _TOOLS["pf_search"](text=text, limit=limit, offset=offset, city=city, industry=industry, size=size)

def evi_search(text: str, limit: int = 25, offset: int = 0, date_from: str | None = None, date_to: str | None = None,
               company_name: str | None = None, firmenbuchnummer: str | None = None) -> dict:
    """Search EVI bilanz publications (financial/relevance signal) by text/company/date/firmenbuchnummer."""
    return _TOOLS["evi_search_publications"](
        text=text, limit=limit, offset=offset,
        date_from=date_from, date_to=date_to,
        company_name=company_name, firmenbuchnummer=firmenbuchnummer
    )

def evi_recent(limit: int = 50) -> dict:
    """Fetch latest EVI bilanz publication events ordered by publication_date."""
    return _TOOLS["evi_recent_publications"](limit=limit)


# -----------------------------
# Main
# -----------------------------
if __name__ == "__main__":
    load_dotenv()

    # DSPy LM
    if not os.getenv("OPENAI_API_KEY"):
        raise SystemExit("Missing OPENAI_API_KEY in env")
    dspy.configure(lm=dspy.LM("openai/gpt-5.2"))

    # Supabase tools (service role for server-side agent)
    db = grablin_db_from_env(use_service_role=True)
    _TOOLS = grablin_tools(db)

    # ReAct agent with tools
    agent = dspy.ReAct(
        Grablin,
        tools=[
            wko_list_branches,
            wko_match_branch,
            search_companies,
            companies_by_branch,
            sample_companies,
            pf_search,
            evi_search,
            evi_recent,
        ],
        max_iters=12,
    )

    # Conversation history
    history = dspy.History(messages=[])

    print("Grablin ready. Ask about branches, companies, or EVI bilanz publications.\n")

    while True:
        user = input("You: ").strip()
        if not user:
            continue

        # Run agent
        out = agent(user_request=user, history=history)

        # Update history (keys must match signature fields)
        history.messages.append({"user_request": user})
        history.messages.append({"process_result": out.process_result})

        print("\nGrablin:\n" + out.process_result + "\n")


In [None]:
import os
from typing import Any, Dict, List, Optional, Literal

import dspy
from dotenv import load_dotenv
from supabase import create_client

Order = Literal["asc", "desc"]
Op = Literal["eq", "neq", "lt", "lte", "gt", "gte", "ilike", "in", "is_null", "not_null"]

# -----------------------------
# "SQL" schema (whitelist)
# -----------------------------
# Tables/columns from your DATA_CATALOG_FULL + INSIGHTS docs.
SCHEMA: Dict[str, List[str]] = {
    "wko_branches": [
        "id", "branche", "branch_url", "letter", "source", "discovered_at", "created_at", "updated_at"
    ],
    "wko_companies": [
        "id", "wko_key", "branche", "name", "wko_detail_url", "company_website", "email", "phone",
        "street", "zip_city", "address", "source_list_url", "crawled_at", "imported_at",
        "search_text", "raw_row", "created_at", "updated_at"
    ],
    "projectfacts": [
        "id", "pf_key", "name", "ort", "name_norm", "street", "plz", "city", "city_norm", "state",
        "country", "segment_country", "industries", "size", "last_changed_at", "last_activity_at",
        "company_address", "raw_addresses", "address_norm", "search_text", "raw_row",
        "created_at", "updated_at"
    ],
    "evi_bilanz_publications": [
        "id", "evi_key", "publication_date", "publication_type", "detail_url", "source_item_path",
        "source_search_url", "company_name", "company_name_norm", "firmenbuchnummer", "search_text",
        "crawled_at", "imported_at", "raw_row", "created_at", "updated_at"
    ],
}

MAX_LIMIT = 50  # keep tool outputs small


def _assert_table_col(table: str, cols: List[str]) -> None:
    if table not in SCHEMA:
        raise ValueError(f"Unknown table: {table}. Allowed: {list(SCHEMA.keys())}")
    for c in cols:
        if c != "*" and c not in SCHEMA[table]:
            raise ValueError(f"Unknown column '{c}' for table '{table}'")


def _apply_filters(q, table: str, where: Optional[List[Dict[str, Any]]]):
    """
    where = [{"col":"branche","op":"eq","val":"Abfallbehandler"}, ...]
    ops: eq, neq, lt, lte, gt, gte, ilike, in, is_null, not_null
    """
    if not where:
        return q
    for f in where:
        col = f["col"]
        op: Op = f["op"]
        val = f.get("val")
        _assert_table_col(table, [col])

        if op == "eq":
            q = q.eq(col, val)
        elif op == "neq":
            q = q.neq(col, val)
        elif op == "lt":
            q = q.lt(col, val)
        elif op == "lte":
            q = q.lte(col, val)
        elif op == "gt":
            q = q.gt(col, val)
        elif op == "gte":
            q = q.gte(col, val)
        elif op == "ilike":
            # PostgREST ilike expects a pattern, e.g. "%foo%"
            q = q.ilike(col, val)
        elif op == "in":
            if not isinstance(val, list):
                raise ValueError("op='in' expects val=list")
            q = q.in_(col, val)
        elif op == "is_null":
            q = q.is_(col, "null")
        elif op == "not_null":
            q = q.not_.is_(col, "null")
        else:
            raise ValueError(f"Unsupported op: {op}")
    return q


class GrablinDB:
    def __init__(self, url: str, key: str):
        self.sb = create_client(url, key)

    # -----------------------------
    # SQL-like tools
    # -----------------------------
    def sql_schema(self) -> dict:
        """Return allowed tables and columns (whitelist). Use to plan queries safely."""
        return {"schema": SCHEMA}

    def sql_select(
        self,
        table: str,
        cols: List[str],
        where: Optional[List[Dict[str, Any]]] = None,
        order_by: Optional[str] = None,
        order: Order = "desc",
        limit: int = 25,
        offset: int = 0,
    ) -> dict:
        """SQL SELECT over a single table with safe filters and pagination."""
        limit = min(int(limit), MAX_LIMIT)
        offset = max(int(offset), 0)
        _assert_table_col(table, cols)
        if order_by:
            _assert_table_col(table, [order_by])

        select_str = "*" if cols == ["*"] else ",".join(cols)
        q = self.sb.table(table).select(select_str)

        q = _apply_filters(q, table, where)

        if order_by:
            q = q.order(order_by, desc=(order == "desc"))

        q = q.range(offset, offset + limit - 1)
        res = q.execute()
        rows = res.data or []
        return {"rows": rows, "count": len(rows), "table": table, "limit": limit, "offset": offset}

    def sql_distinct(
        self,
        table: str,
        col: str,
        where: Optional[List[Dict[str, Any]]] = None,
        limit: int = 50,
    ) -> dict:
