# Cognee + Mem0 Behavior Workbook

This notebook traces the **exact app pipeline** used in Lux CRM for a message:

1. Raw message text -> `extract_candidates` (Cognee adapter)
2. Candidates -> `candidates_to_claims`
3. Claim bundle -> `propose_memory_ops` (Mem0 adapter)
4. Ops -> graph-ready relation payloads (same logic as worker)

It supports two modes:
- **Real adapters**: use your configured Cognee/Mem0/OpenAI/DB stack
- **Deterministic fallback**: force local heuristic/rules behavior for debugging

In [None]:
import json
import os
import sys
from pathlib import Path
from pprint import pprint

# Resolve repo root regardless of where Jupyter starts.
cwd = Path.cwd().resolve()
if (cwd / "apps" / "api").exists():
    repo_root = cwd
elif (cwd.parent / "apps" / "api").exists():
    repo_root = cwd.parent
else:
    raise RuntimeError("Run this notebook from repo root or notebooks/ directory.")

api_root = repo_root / "apps" / "api"
if str(api_root) not in sys.path:
    sys.path.insert(0, str(api_root))

print(f"repo_root={repo_root}")
print(f"api_root={api_root}")

In [None]:
# Optional: force deterministic local fallback mode.
# Set to True if you want reproducible behavior without live Cognee/Mem0/LLM dependencies.
FORCE_DETERMINISTIC_FALLBACK = False

if FORCE_DETERMINISTIC_FALLBACK:
    os.environ["COGNEE_BACKEND"] = "unknown"
    os.environ["COGNEE_ENABLE_HEURISTIC_FALLBACK"] = "true"
    os.environ["MEM0_BACKEND"] = "unknown"
    os.environ["MEM0_ENABLE_RULES_FALLBACK"] = "true"
    print("Deterministic fallback mode enabled.")
else:
    print("Using configured adapters from environment.")

In [None]:
from app.core.config import get_settings

get_settings.cache_clear()
settings = get_settings()

summary = {
    "llm_provider": settings.llm_provider,
    "llm_model": settings.llm_model,
    "embedding_provider": settings.embedding_provider,
    "embedding_model": settings.embedding_model,
    "embedding_dim": settings.embedding_dim,
    "cognee_backend": settings.cognee_backend,
    "cognee_local_module": settings.cognee_local_module,
    "cognee_search_type": settings.cognee_search_type,
    "cognee_search_top_k": settings.cognee_search_top_k,
    "cognee_enable_heuristic_fallback": settings.cognee_enable_heuristic_fallback,
    "mem0_backend": settings.mem0_backend,
    "mem0_local_module": settings.mem0_local_module,
    "mem0_search_limit": settings.mem0_search_limit,
    "mem0_enable_rules_fallback": settings.mem0_enable_rules_fallback,
    "auto_accept_threshold": settings.auto_accept_threshold,
}

pprint(summary)
print("OPENAI_API_KEY set:", bool(os.getenv("OPENAI_API_KEY")))

## Example Inputs

Edit or add message payloads below to test different extraction behavior.

In [None]:
samples = [
    {
        "interaction_id": "demo-interaction-001",
        "text": (
            "Great to catch up. We should plan the Nature Investment Hub workshop next month. "
            "My daughter just started at Stanford and schedules are tight, but I can do Tuesday afternoon. "
            "Also, I moved into the Head of Partnerships role at Acme Ventures."
        ),
        "contact_id": "demo-contact-001",
    },
    {
        "interaction_id": "demo-interaction-002",
        "text": (
            "Thanks for the intro. I am evaluating whether we should run a pilot in Q2. "
            "No firm budget yet; let us keep this exploratory and share proposal options."
        ),
        "contact_id": "demo-contact-002",
    },
]

for s in samples:
    print("-", s["interaction_id"], "chars=", len(s["text"]))

## Step 1: Cognee Extraction (`extract_candidates`)

In [None]:
from app.services.extraction.cognee_client import extract_candidates

cognee_results = {}

for sample in samples:
    interaction_id = sample["interaction_id"]
    text = sample["text"]
    try:
        result = extract_candidates(interaction_id=interaction_id, text=text)
        cognee_results[interaction_id] = result
        print("\n===", interaction_id, "===")
        print(
            "counts:",
            {
                "entities": len(result.get("entities", [])),
                "relations": len(result.get("relations", [])),
                "topics": len(result.get("topics", [])),
            },
        )
        pprint(result)
    except Exception as exc:
        print("\n===", interaction_id, "FAILED===")
        print(type(exc).__name__, str(exc))

## Step 2: Candidate -> Claim Mapping (`candidates_to_claims`)

In [None]:
from app.services.extraction.cognee_mapper import candidates_to_claims

candidate_claims_by_interaction = {}
for sample in samples:
    interaction_id = sample["interaction_id"]
    candidates = cognee_results.get(interaction_id)
    if not candidates:
        continue
    claims = candidates_to_claims(candidates)
    candidate_claims_by_interaction[interaction_id] = claims
    print("\n===", interaction_id, "claims===")
    print("claim_count=", len(claims))
    pprint(claims)

## Step 3: Mem0 Bundle + Ops (`build_mem0_bundle` + `propose_memory_ops`)

In [None]:
from app.services.memory.mem0_mapper import build_mem0_bundle
from app.services.memory.mem0_client import propose_memory_ops

mem0_ops_by_interaction = {}

for sample in samples:
    interaction_id = sample["interaction_id"]
    contact_id = sample["contact_id"]
    claims = candidate_claims_by_interaction.get(interaction_id, [])

    prepared_claims = []
    for c in claims:
        c2 = dict(c)
        c2["evidence_refs"] = [
            {
                "interaction_id": interaction_id,
                "chunk_id": f"demo-chunk-{interaction_id}",
                "span_json": {"start": 0, "end": min(180, len(sample["text"]))},
            }
        ]
        prepared_claims.append(c2)

    bundle = build_mem0_bundle(
        interaction_summary=sample["text"][:280].strip(),
        recent_claims=[],
        cognee_candidates=prepared_claims,
        auto_accept_threshold=settings.auto_accept_threshold,
        scope_ids={
            "user_id": contact_id,
            "agent_id": settings.mem0_agent_id,
            "run_id": interaction_id,
            "contact_id": contact_id,
            "interaction_id": interaction_id,
        },
    )

    try:
        ops = propose_memory_ops(bundle)
        mem0_ops_by_interaction[interaction_id] = ops
        print("\n===", interaction_id, "mem0 ops===")
        print("ops_count=", len(ops))
        pprint(ops)
    except Exception as exc:
        print("\n===", interaction_id, "mem0 FAILED===")
        print(type(exc).__name__, str(exc))

## Step 4: Worker-Equivalent Claims + Relation Payloads

This mirrors the transformation logic used before `upsert_relation_triple(...)` in `apps/api/app/workers/jobs.py`.

In [None]:
def _normalized_text(value):
    if not isinstance(value, str):
        return ""
    return " ".join(value.split()).strip()


def claims_from_ops(ops, interaction_id):
    claims = []
    for op in ops:
        claim = dict(op.get("claim") or {})
        if not claim:
            continue
        operation = op.get("op", "ADD")
        if operation == "REJECT":
            claim["status"] = "rejected"
        if operation not in {"ADD", "UPDATE", "SUPERSEDE", "REJECT"}:
            continue
        if not claim.get("evidence_refs"):
            claim["evidence_refs"] = [
                {
                    "interaction_id": interaction_id,
                    "chunk_id": f"demo-chunk-{interaction_id}",
                    "span_json": {"start": 0, "end": 180},
                }
            ]
        claims.append(claim)
    return claims


def relation_payload_from_claim(claim):
    value_json = claim.get("value_json")
    if not isinstance(value_json, dict):
        return None

    claim_type = _normalized_text(claim.get("claim_type"))
    subject = _normalized_text(value_json.get("subject")) or "contact"

    predicate = _normalized_text(value_json.get("predicate"))
    if not predicate:
        if claim_type == "employment":
            predicate = "works_at"
        elif claim_type == "topic":
            predicate = "discussed_topic"
        else:
            predicate = "related_to"

    object_name = (
        _normalized_text(value_json.get("object"))
        or _normalized_text(value_json.get("company"))
        or _normalized_text(value_json.get("destination"))
        or _normalized_text(value_json.get("target"))
        or _normalized_text(value_json.get("label"))
    )
    if not object_name:
        return None

    if object_name.lower() == subject.lower():
        return None

    object_kind = _normalized_text(value_json.get("object_type")) or "Entity"
    if claim_type == "employment" and object_kind == "Entity":
        object_kind = "Company"
    if claim_type == "topic" and _normalized_text(value_json.get("label")):
        object_kind = "Topic"

    return {
        "subject_name": subject,
        "predicate": predicate,
        "object_name": object_name,
        "subject_kind": _normalized_text(value_json.get("subject_type"))
        or ("Contact" if subject == "contact" else "Entity"),
        "object_kind": object_kind,
    }


for sample in samples:
    interaction_id = sample["interaction_id"]

    cognee_claims = candidate_claims_by_interaction.get(interaction_id, [])
    mem0_ops = mem0_ops_by_interaction.get(interaction_id, [])
    mem0_claims = claims_from_ops(mem0_ops, interaction_id)

    merged_claims = cognee_claims + mem0_claims
    relation_payloads = []
    for claim in merged_claims:
        payload = relation_payload_from_claim(claim)
        if payload:
            relation_payloads.append(
                {
                    "claim_id": claim.get("claim_id"),
                    "claim_type": claim.get("claim_type"),
                    "status": claim.get("status", "proposed"),
                    "confidence": float(claim.get("confidence", 0.0) or 0.0),
                    "source_system": claim.get("source_system"),
                    "relation": payload,
                    "evidence_refs": claim.get("evidence_refs", []),
                }
            )

    print("\n===", interaction_id, "graph-ready relation payloads===")
    print("payload_count=", len(relation_payloads))
    pprint(relation_payloads)

## Optional: Live Neo4j Write Check

Set `ENABLE_NEO4J_WRITE = True` only if you want to write demo relations into your graph.

By default this cell is read-only and just shows the write arguments that would be sent to `upsert_relation_triple(...)`.

In [None]:
from datetime import datetime, timezone

ENABLE_NEO4J_WRITE = False

def build_upsert_args(contact_id, interaction_id, relation_item):
    rel = relation_item["relation"]
    return {
        "contact_id": contact_id,
        "interaction_id": interaction_id,
        "interaction_timestamp_iso": datetime.now(timezone.utc).isoformat(),
        "subject_name": rel["subject_name"],
        "predicate": rel["predicate"],
        "object_name": rel["object_name"],
        "claim_id": relation_item.get("claim_id"),
        "confidence": relation_item.get("confidence", 0.0),
        "status": relation_item.get("status", "proposed"),
        "source_system": relation_item.get("source_system") or "demo",
        "uncertain": relation_item.get("status") != "accepted",
        "evidence_refs": relation_item.get("evidence_refs", []),
        "subject_kind": rel["subject_kind"],
        "object_kind": rel["object_kind"],
    }

if ENABLE_NEO4J_WRITE:
    from app.db.neo4j.queries import upsert_relation_triple

    print("Neo4j writes ENABLED")
else:
    print("Neo4j writes DISABLED (dry run)")

## Optional: Inspect Existing Contact Graph Signals

Set `DEMO_CONTACT_ID` to inspect how stored graph triples feed scoring and drafting retrieval.

In [None]:
DEMO_CONTACT_ID = os.getenv("DEMO_CONTACT_ID", "").strip()

if not DEMO_CONTACT_ID:
    print("Set DEMO_CONTACT_ID env var to inspect live graph signals.")
else:
    from app.db.neo4j.queries import get_contact_graph_metrics, get_contact_graph_paths

    metrics = get_contact_graph_metrics(DEMO_CONTACT_ID)
    paths = get_contact_graph_paths(DEMO_CONTACT_ID, objective="follow up", max_hops=3, limit=8)
    print("metrics:")
    pprint(metrics)
    print("\npaths:")
    pprint(paths)

## Optional: Inspect Draft Retrieval Bundle

This shows the hybrid retrieval object consumed by draft generation (vector chunks + graph snippets + graph paths).

In [None]:
if not DEMO_CONTACT_ID:
    print("Set DEMO_CONTACT_ID first.")
else:
    from app.db.pg.session import SessionLocal
    from app.services.drafting.retriever import build_retrieval_bundle

    db = SessionLocal()
    try:
        bundle = build_retrieval_bundle(
            db=db,
            contact_id=DEMO_CONTACT_ID,
            objective="follow up on recent priorities",
            allow_sensitive=False,
        )
        preview = {
            "contact": bundle.get("contact"),
            "objective": bundle.get("objective"),
            "hybrid_graph_query": bundle.get("hybrid_graph_query"),
            "graph_metrics": bundle.get("graph_metrics"),
            "graph_claim_snippets": bundle.get("graph_claim_snippets"),
            "graph_path_snippets": bundle.get("graph_path_snippets"),
            "email_context_snippets": bundle.get("email_context_snippets"),
            "relevant_chunk_count": len(bundle.get("relevant_chunks", [])),
            "graph_path_count": len(bundle.get("graph_paths", [])),
        }
        pprint(preview)
    finally:
        db.close()

## Notes

- If Cognee/Mem0 cells fail in real mode, check adapter env and credentials first.
- In deterministic fallback mode, outputs come from local heuristic/rule code paths in this repo.
- This workbook demonstrates transformation behavior; production persistence still occurs in worker `process_interaction(...)`.