# Agentic AI Flow: Namecard -> Deep Research -> CRM (Google Sheets)

## Why this notebook exists
This lab demonstrates a practical agentic workflow from **unstructured input (a namecard image)** to **structured business output (a CRM-ready row)**.

By the end of this session, students should be able to explain:
1. Why agentic systems are split into stages.
2. How typed schemas reduce errors between stages.
3. Where human-in-the-loop checks are necessary.
4. How to connect AI outputs to real operational systems (Google Sheets as CRM).

## End-to-end flow
1. **Stage 1 (Perception):** OCR + field extraction from a namecard.
2. **Stage 2 (Reasoning + Tool Use):** Research planner, search execution, dossier synthesis.
3. **Stage 3 (Action):** Convert to CRM schema and write to Google Sheets.

## Teaching framing
As you run each section, keep asking:
- What is the input contract?
- What is the output contract?
- What can fail, and how do we recover safely?


## Section A (10 min): Setup

## What students learn here
- How environment configuration affects agent reliability.
- Why we front-load dependencies and secrets before coding logic.

## What the next cells do
1. Install required Python packages.
2. Import libraries for OCR, research, schema validation, widgets, and Google Sheets.
3. Initialize a shared `workflow_state` dictionary so each stage is inspectable.
4. Configure resilience settings (retry counts, fallback mode, confidence thresholds).

## Instructor guidance
- Run the install cell first.
- If imports fail, re-run the import cell after kernel restart.
- Confirm these env vars before proceeding:
  - `OPENAI_API_KEY`
  - `GOOGLE_SERVICE_ACCOUNT_JSON`
  - `GOOGLE_SHEET_ID`
  - optional: `GOOGLE_WORKSHEET_NAME`

## Expected output before moving on
You should see: `OpenAI client ready: True` (or `False` if key missing).


In [None]:
%pip install -q openai pydantic pandas gspread google-auth ipywidgets python-dotenv

In [None]:
# Beginner note: This cell imports libraries, installs missing packages, and sets global config/state.

import importlib
import subprocess
import sys


def ensure_package(import_name: str, pip_name: str | None = None):
    """Install a package at runtime if it is missing in the current kernel."""
    try:
        importlib.import_module(import_name)
    except ModuleNotFoundError:
        pkg = pip_name or import_name
        print(f"Installing missing package: {pkg}")
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])


ensure_package("pandas")
ensure_package("ipywidgets")
ensure_package("pydantic")
ensure_package("gspread")
ensure_package("google.oauth2", "google-auth")
ensure_package("dotenv", "python-dotenv")
ensure_package("openai")

import base64
import concurrent.futures as futures
import json
import os
import re
import time
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import pandas as pd
from IPython.display import Markdown, display
from dotenv import load_dotenv
import ipywidgets as widgets
from pydantic import BaseModel, Field

import gspread
from google.oauth2.service_account import Credentials
from openai import OpenAI

load_dotenv(override=True)

# Shared in-memory state so students can inspect each stage output.
workflow_state: Dict[str, Any] = {
    "namecard_extraction": None,
    "research_dossier": None,
    "crm_record": None,
    "sheet_write_result": None,
    "errors": [],
}

# When True, classroom demo continues even if external APIs fail.
ALLOW_MOCK_FALLBACK = True
EXTRACTION_CONFIDENCE_THRESHOLD = 0.72
DEFAULT_WORKSHEET_NAME = os.getenv("GOOGLE_WORKSHEET_NAME", "Leads")
RETRY_ATTEMPTS = 3
RETRY_BACKOFF_SECONDS = 1.5

TRUSTED_DOMAINS = {
    "linkedin.com",
    "bloomberg.com",
    "crunchbase.com",
    "reuters.com",
    "forbes.com",
    "wikipedia.org",
}

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) if os.getenv("OPENAI_API_KEY") else None
print("OpenAI client ready:", bool(client))


## Section B (20 min): Stage 1 - Namecard Extraction

## Pedagogical objective
Teach students how to convert unstructured multimodal input into a **typed data object** with confidence signals.

## What this section's code does
1. Defines strict schemas (`NamecardExtraction` and shared models).
2. Adds utility helpers for retry logic, confidence averaging, and upload parsing.
3. Sends uploaded image to a multimodal model for OCR + entity extraction.
4. Sanitizes model output before Pydantic validation.
5. Applies manual field overrides when OCR is uncertain.
6. Triggers low-confidence warning for human review.

## Why this matters
- Raw LLM output is not trusted by default.
- Schema validation catches malformed values early.
- Confidence thresholds decide when humans should intervene.

## Student checkpoint
After this stage, inspect:
- Extracted `full_name`, `company_name`, `email`
- `field_confidence` values
- Whether manual correction was needed


In [None]:
# Beginner note: These Pydantic models define the data shape passed between each stage.

class NamecardExtraction(BaseModel):
    full_name: str
    job_title: Optional[str] = None
    company_name: Optional[str] = None
    email: Optional[str] = None
    phone: Optional[str] = None
    linkedin_url: Optional[str] = None
    website: Optional[str] = None
    raw_text: str
    field_confidence: Dict[str, float] = Field(default_factory=dict)


class CitationItem(BaseModel):
    url: str
    title: str
    snippet: str
    retrieved_at: str


class ResearchDossier(BaseModel):
    person_summary: str
    person_current_role: Optional[str] = None
    person_background: List[str] = Field(default_factory=list)
    company_summary: str
    company_industry: Optional[str] = None
    company_size_signal: Optional[str] = None
    engagement_angle: str
    citations: List[CitationItem] = Field(default_factory=list)
    confidence: Dict[str, float] = Field(default_factory=dict)


class CRMRecord(BaseModel):
    lead_name: str
    title: Optional[str] = None
    company: Optional[str] = None
    email: Optional[str] = None
    phone: Optional[str] = None
    linkedin: Optional[str] = None
    company_website: Optional[str] = None
    person_notes: str
    company_notes: str
    engagement_angle: str
    source_count: int
    overall_confidence: float
    created_at_utc: str


class SheetWriteResult(BaseModel):
    success: bool
    sheet_id: str
    worksheet_name: str
    row_number: Optional[int] = None
    error_message: Optional[str] = None


In [None]:
# Beginner note: Utility helpers used across stages (retry, confidence scoring, upload parsing).

def now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def retry_call(fn, *args, **kwargs):
    """Retry a function call with linear backoff to handle temporary failures."""
    last_err = None
    for i in range(1, RETRY_ATTEMPTS + 1):
        try:
            return fn(*args, **kwargs)
        except Exception as e:
            last_err = e
            if i < RETRY_ATTEMPTS:
                time.sleep(RETRY_BACKOFF_SECONDS * i)
    raise last_err


def mean_confidence(values: Dict[str, float]) -> float:
    if not values:
        return 0.0
    nums = [max(0.0, min(1.0, float(v))) for v in values.values()]
    return round(sum(nums) / len(nums), 4)


def normalize_domain(url: Optional[str]) -> Optional[str]:
    if not url:
        return None
    try:
        host = urlparse(url).netloc.lower().replace("www.", "")
        return host or None
    except Exception:
        return None


def select_top_sources(citations: List[CitationItem], official_domain: Optional[str], limit: int = 8) -> List[CitationItem]:
    scored = []
    for item in citations:
        domain = normalize_domain(item.url)
        trusted = domain in TRUSTED_DOMAINS if domain else False
        official = domain == official_domain if official_domain else False
        score = (2 if official else 0) + (1 if trusted else 0)
        scored.append((score, item))
    scored.sort(key=lambda x: x[0], reverse=True)
    return [item for _, item in scored[:limit]]


def widget_file_to_bytes(upload_value) -> tuple[bytes, str]:
    if not upload_value:
        raise ValueError("No file uploaded. Re-upload the card after running the UI cell.")

    def normalize_item(item, default_name: str = "uploaded_image"):
        if isinstance(item, dict):
            content = item.get("content", b"")
            name = item.get("name", default_name)
        else:
            # Supports UploadedFile-like objects from some ipywidgets versions
            content = getattr(item, "content", b"")
            name = getattr(item, "name", default_name)

        if isinstance(content, memoryview):
            content = content.tobytes()
        elif isinstance(content, bytearray):
            content = bytes(content)
        elif not isinstance(content, (bytes,)):  # best-effort fallback
            try:
                content = bytes(content)
            except Exception:
                content = b""

        return content, name

    # ipywidgets can return dict, tuple, or list depending on version
    if isinstance(upload_value, dict):
        if len(upload_value) == 0:
            raise ValueError("Upload payload is empty.")
        first_key = next(iter(upload_value.keys()))
        item = upload_value[first_key]
        content, name = normalize_item(item, default_name=first_key)
    elif isinstance(upload_value, (list, tuple)):
        if len(upload_value) == 0:
            raise ValueError("Upload payload is empty.")
        content, name = normalize_item(upload_value[0])
    else:
        # Some versions expose a single UploadedFile-like object
        content, name = normalize_item(upload_value)

    if not content:
        raise ValueError(
            "Uploaded file has no readable bytes. Try re-uploading the image (JPG/PNG) and run again."
        )

    return content, name



def heuristic_extract_raw_text(raw_text: str) -> Dict[str, Optional[str]]:
    email = None
    phone = None
    website = None

    email_match = re.search(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", raw_text, re.IGNORECASE)
    if email_match:
        email = email_match.group(0)

    phone_match = re.search(r"(\+?\d[\d\s\-()]{7,}\d)", raw_text)
    if phone_match:
        phone = phone_match.group(0)

    web_match = re.search(r"(https?://[^\s]+|www\.[^\s]+)", raw_text, re.IGNORECASE)
    if web_match:
        website = web_match.group(0)
        if website.startswith("www."):
            website = "https://" + website

    return {"email": email, "phone": phone, "website": website}


In [None]:
# Beginner note: Stage 1 functions for OCR extraction, output cleanup, and manual override handling.



def sanitize_namecard_payload(data: Dict[str, Any]) -> Dict[str, Any]:
    required_fields = [
        "full_name",
        "job_title",
        "company_name",
        "email",
        "phone",
        "linkedin_url",
        "website",
        "raw_text",
    ]

    for field in required_fields:
        if field not in data:
            data[field] = None

    if not data.get("full_name"):
        data["full_name"] = "Unknown Name"

    if data.get("raw_text") is None:
        data["raw_text"] = ""

    fc = data.get("field_confidence")
    if not isinstance(fc, dict):
        fc = {}

    normalized_fc: Dict[str, float] = {}
    for key, val in fc.items():
        try:
            normalized_fc[key] = float(val) if val is not None else 0.0
        except Exception:
            normalized_fc[key] = 0.0

    for key in required_fields:
        normalized_fc.setdefault(key, 0.0)

    data["field_confidence"] = normalized_fc
    return data


def extract_namecard_with_openai(image_bytes: bytes, filename: str) -> NamecardExtraction:
    if not client:
        raise RuntimeError("OPENAI_API_KEY is missing.")

    b64 = base64.b64encode(image_bytes).decode("utf-8")
    image_ext = filename.split(".")[-1].lower() if "." in filename else "png"
    mime_type = "image/png" if image_ext not in {"jpg", "jpeg"} else "image/jpeg"

    prompt = (
        "Extract fields from this business card image and return strict JSON only. "
        "Required keys: full_name, job_title, company_name, email, phone, linkedin_url, website, raw_text, field_confidence. "
        "field_confidence must be an object mapping each field to a float between 0 and 1. "
        "Use null if uncertain."
    )

    # Call OpenAI with retries so temporary API issues do not stop class flow.
    response = retry_call(
        client.responses.create,
        model="gpt-4o-mini",
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": prompt},
                    {"type": "input_image", "image_url": f"data:{mime_type};base64,{b64}"},
                ],
            }
        ],
        temperature=0,
    )

    # Parse model text output into JSON, then sanitize before strict validation.
    text = response.output_text.strip().replace("```json", "").replace("```", "").strip()
    data = json.loads(text)
    data = sanitize_namecard_payload(data)
    return NamecardExtraction.model_validate(data)


def build_deterministic_extraction_fallback(manual_overrides: Dict[str, str], filename: str) -> NamecardExtraction:
    raw_text = "\n".join([f"{k}: {v}" for k, v in manual_overrides.items() if v]) or f"Uploaded card: {filename}"
    heuristics = heuristic_extract_raw_text(raw_text)

    return NamecardExtraction(
        full_name=manual_overrides.get("full_name") or "Unknown Name",
        job_title=manual_overrides.get("job_title") or None,
        company_name=manual_overrides.get("company_name") or None,
        email=manual_overrides.get("email") or heuristics["email"],
        phone=manual_overrides.get("phone") or heuristics["phone"],
        linkedin_url=manual_overrides.get("linkedin_url") or None,
        website=manual_overrides.get("website") or heuristics["website"],
        raw_text=raw_text,
        field_confidence={
            "full_name": 0.55,
            "job_title": 0.45,
            "company_name": 0.45,
            "email": 0.60 if (manual_overrides.get("email") or heuristics["email"]) else 0.30,
            "phone": 0.60 if (manual_overrides.get("phone") or heuristics["phone"]) else 0.30,
            "linkedin_url": 0.40,
            "website": 0.50 if (manual_overrides.get("website") or heuristics["website"]) else 0.30,
        },
    )


def apply_manual_overrides(extraction: NamecardExtraction, manual_overrides: Dict[str, str]) -> NamecardExtraction:
    payload = extraction.model_dump()
    for key, value in manual_overrides.items():
        if value:
            payload[key] = value
            payload["field_confidence"][key] = max(float(payload["field_confidence"].get(key, 0.0)), 0.95)
    return NamecardExtraction.model_validate(payload)


def run_stage1_extraction(image_bytes: bytes, filename: str, manual_overrides: Dict[str, str]) -> NamecardExtraction:
    # First try live OCR extraction from the uploaded card image.
    try:
        extraction = extract_namecard_with_openai(image_bytes, filename)
    except Exception as e:
        # If OCR fails, optionally continue with deterministic fallback for teaching continuity.
        if not ALLOW_MOCK_FALLBACK:
            raise
        print(f"[Stage1] OpenAI extraction failed, using deterministic fallback: {e}")
        extraction = build_deterministic_extraction_fallback(manual_overrides, filename)

    # Apply manual corrections entered by user and compute average confidence.
    extraction = apply_manual_overrides(extraction, manual_overrides)
    avg_conf = mean_confidence(extraction.field_confidence)
    if avg_conf < EXTRACTION_CONFIDENCE_THRESHOLD:
        print(f"[Stage1] Low extraction confidence ({avg_conf:.2f}) -> manual review recommended.")

    return extraction


## Section C (30 min): Stage 2 - Deep Research (Planner -> Search -> Writer)

## Pedagogical objective
Show students how a complex task is decomposed into specialized agentic sub-steps.

## What this section's code does
1. **Planner:** Generates focused web search queries based on extracted contact context.
2. **Search executor:** Runs searches (parallelized) and collects citation candidates.
3. **Source filtering:** Prioritizes official and trusted domains.
4. **Writer/synthesizer:** Produces a structured `ResearchDossier` with confidence and citations.
5. **Fallback behavior:** If live research fails, returns a deterministic dossier scaffold.

## Why this architecture is teachable
- It separates planning, evidence gathering, and synthesis.
- It makes failure isolation easier (you can debug each stage independently).
- It encourages citation-grounded outputs instead of free-form hallucinations.

## Student checkpoint
Verify that dossier fields are evidence-backed:
- `person_summary`
- `company_summary`
- `engagement_angle`
- non-empty `citations`


In [None]:
# Beginner note: Stage 2 functions for planning searches, collecting citations, and synthesizing research.

class WebSearchItem(BaseModel):
    reason: str
    query: str


class WebSearchPlan(BaseModel):
    searches: List[WebSearchItem]


def plan_searches(extraction: NamecardExtraction, n_searches: int = 6) -> WebSearchPlan:
    if not client:
        raise RuntimeError("OPENAI_API_KEY is missing.")

    prompt = f"""
You are a research planning assistant.
Given this contact:
- full_name: {extraction.full_name}
- job_title: {extraction.job_title}
- company_name: {extraction.company_name}
- website: {extraction.website}
- linkedin_url: {extraction.linkedin_url}

Return strict JSON with key 'searches': a list of {n_searches} objects with keys:
- reason
- query

Rules:
- Disambiguate common names using company and title.
- Include searches for both person and company.
- Keep each query under 15 words.
""".strip()

    # Call OpenAI with retries so temporary API issues do not stop class flow.
    # Ask model to generate a focused search plan from extracted contact context.
    response = retry_call(
        client.responses.create,
        model="gpt-4o-mini",
        input=prompt,
        temperature=0,
    )

    text = response.output_text.strip().replace("```json", "").replace("```", "").strip()
    data = json.loads(text)
    return WebSearchPlan.model_validate(data)


def run_web_search(query: str) -> List[CitationItem]:
    if not client:
        raise RuntimeError("OPENAI_API_KEY is missing.")

    prompt = (
        f"Search the web for: {query}. "
        "Return strict JSON with key 'citations'. "
        "Each citation must have url, title, snippet, retrieved_at (UTC ISO). Limit to 5 citations."
    )

    # Call OpenAI with retries so temporary API issues do not stop class flow.
    response = retry_call(
        client.responses.create,
        model="gpt-4o-mini",
        tools=[{"type": "web_search_preview"}],
        input=prompt,
        temperature=0,
    )

    text = response.output_text.strip().replace("```json", "").replace("```", "").strip()
    data = json.loads(text)
    items = []
    for item in data.get("citations", []):
        if not item.get("retrieved_at"):
            item["retrieved_at"] = now_utc_iso()
        items.append(CitationItem.model_validate(item))
    return items


def perform_searches(plan: WebSearchPlan) -> List[CitationItem]:
    all_citations: List[CitationItem] = []
    # Run multiple searches in parallel to reduce total waiting time in class.
    max_workers = min(4, max(1, len(plan.searches)))
    with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
        task_map = {pool.submit(run_web_search, item.query): item for item in plan.searches}
        for task in futures.as_completed(task_map):
            item = task_map[task]
            try:
                result = task.result()
                all_citations.extend(result)
                print(f"[Search] Done: {item.query} -> {len(result)} citations")
            except Exception as e:
                print(f"[Search] Failed: {item.query} -> {e}")
    return all_citations


def synthesize_dossier(extraction: NamecardExtraction, citations: List[CitationItem]) -> ResearchDossier:
    if not client:
        raise RuntimeError("OPENAI_API_KEY is missing.")

    # Prioritize official/trusted sources before synthesis to reduce noisy evidence.
    official_domain = normalize_domain(extraction.website)
    top_sources = select_top_sources(citations, official_domain=official_domain, limit=10)

    prompt = f"""
You are a senior research analyst.
Create a structured dossier for CRM usage.

Contact:
{json.dumps(extraction.model_dump(), ensure_ascii=True)}

Sources:
{json.dumps([c.model_dump() for c in top_sources], ensure_ascii=True)}

Return strict JSON with keys:
- person_summary (string)
- person_current_role (string|null)
- person_background (array of strings)
- company_summary (string)
- company_industry (string|null)
- company_size_signal (string|null)
- engagement_angle (string)
- citations (array of citations)
- confidence (object field->float 0..1)

Rules:
- Only include claims grounded in provided citations.
- If uncertain, set unknown and lower confidence.
""".strip()

    # Call OpenAI with retries so temporary API issues do not stop class flow.
    # Ask model to generate a focused search plan from extracted contact context.
    response = retry_call(
        client.responses.create,
        model="gpt-4o-mini",
        input=prompt,
        temperature=0,
    )

    text = response.output_text.strip().replace("```json", "").replace("```", "").strip()
    data = json.loads(text)
    return ResearchDossier.model_validate(data)


def build_deterministic_dossier_fallback(extraction: NamecardExtraction) -> ResearchDossier:
    person = extraction.full_name
    company = extraction.company_name or "Unknown company"
    role = extraction.job_title or "Unknown role"
    website = extraction.website or ""

    fallback_citations = []
    if website:
        fallback_citations.append(
            CitationItem(
                url=website,
                title=f"{company} official website",
                snippet=f"Company website referenced for {company}.",
                retrieved_at=now_utc_iso(),
            )
        )

    return ResearchDossier(
        person_summary=f"{person} is listed on the uploaded card as {role} at {company}.",
        person_current_role=role,
        person_background=["Background unavailable from fallback mode; verify via live research."],
        company_summary=f"{company} appears as the company on the uploaded card.",
        company_industry=None,
        company_size_signal=None,
        engagement_angle="Start with discovery questions and confirm current business priorities.",
        citations=fallback_citations,
        confidence={
            "person_summary": 0.55,
            "person_current_role": 0.55,
            "company_summary": 0.55,
            "engagement_angle": 0.50,
        },
    )


def run_stage2_research(extraction: NamecardExtraction) -> ResearchDossier:
    # Full Stage 2 flow: plan -> search -> synthesize.
    try:
        plan = plan_searches(extraction)
        print(f"[Stage2] Planned {len(plan.searches)} searches")
        citations = perform_searches(plan)
        print(f"[Stage2] Total citations collected: {len(citations)}")
        dossier = synthesize_dossier(extraction, citations)
        if not dossier.citations:
            dossier.citations = citations[:5]
        return dossier
    except Exception as e:
        # If OCR fails, optionally continue with deterministic fallback for teaching continuity.
        if not ALLOW_MOCK_FALLBACK:
            raise
        print(f"[Stage2] Live research failed, using deterministic fallback: {e}")
        return build_deterministic_dossier_fallback(extraction)


## Section D (20 min): Stage 3 - CRM Mapping + Google Sheets Write

## Pedagogical objective
Teach operationalization: converting AI findings into system-ready business records.

## What this section's code does
1. Defines canonical CRM column order (`CRM_COLUMNS`).
2. Maps extraction + dossier into a typed `CRMRecord`.
3. Loads Google service account credentials safely.
4. Validates worksheet headers before writing.
5. Applies idempotency guard (`email + company`) to avoid duplicates.
6. Appends row and returns structured `SheetWriteResult`.

## Why this matters in production
- Schema drift is common and must be detected early.
- Typed write results make error handling deterministic.
- Idempotency protects downstream systems from duplicate writes.

## Student checkpoint
Confirm:
- Header validation passes.
- Write response includes row number.
- Duplicate run reuses existing row when applicable.


In [None]:
# Beginner note: Stage 3 functions to map AI output into CRM format and write to Google Sheets.

# Canonical sheet column order. Keep this in sync with row 1 in Google Sheets.
CRM_COLUMNS = [
    "lead_name",
    "title",
    "company",
    "email",
    "phone",
    "linkedin",
    "company_website",
    "person_notes",
    "company_notes",
    "engagement_angle",
    "source_count",
    "overall_confidence",
    "created_at_utc",
]


def map_to_crm_record(extraction: NamecardExtraction, dossier: ResearchDossier) -> CRMRecord:
    # Combine extraction and research confidence into one overall CRM confidence signal.
    overall_conf = round((mean_confidence(extraction.field_confidence) + mean_confidence(dossier.confidence)) / 2, 4)

    person_notes = dossier.person_summary
    if dossier.person_background:
        person_notes += "\nBackground: " + "; ".join(dossier.person_background)

    company_notes = dossier.company_summary
    if dossier.company_industry:
        company_notes += f"\nIndustry: {dossier.company_industry}"
    if dossier.company_size_signal:
        company_notes += f"\nSize signal: {dossier.company_size_signal}"

    return CRMRecord(
        lead_name=extraction.full_name,
        title=extraction.job_title or dossier.person_current_role,
        company=extraction.company_name,
        email=extraction.email,
        phone=extraction.phone,
        linkedin=extraction.linkedin_url,
        company_website=extraction.website,
        person_notes=person_notes,
        company_notes=company_notes,
        engagement_angle=dossier.engagement_angle,
        source_count=len(dossier.citations),
        overall_confidence=overall_conf,
        created_at_utc=now_utc_iso(),
    )


def load_google_credentials() -> Credentials:
    raw = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON", "").strip()
    if not raw:
        raise RuntimeError("GOOGLE_SERVICE_ACCOUNT_JSON is missing.")

    scopes = [
        "https://www.googleapis.com/auth/spreadsheets",
        "https://www.googleapis.com/auth/drive",
    ]

    if raw.startswith("{"):
        return Credentials.from_service_account_info(json.loads(raw), scopes=scopes)

    if os.path.exists(raw):
        return Credentials.from_service_account_file(raw, scopes=scopes)

    raise RuntimeError("GOOGLE_SERVICE_ACCOUNT_JSON must be a valid file path or JSON string.")


def open_worksheet(sheet_id: str, worksheet_name: str):
    """Open target Google Sheet worksheet using authenticated client."""
    creds = load_google_credentials()
    gc = gspread.authorize(creds)
    sh = gc.open_by_key(sheet_id)
    return sh.worksheet(worksheet_name)


def validate_sheet_headers(ws, expected_headers: List[str]) -> None:
    found = ws.row_values(1)
    if found != expected_headers:
        raise RuntimeError(f"Sheet header mismatch. Expected: {expected_headers} | Found: {found}")


def crm_to_row(record: CRMRecord) -> List[Any]:
    payload = record.model_dump()
    return [payload.get(col) for col in CRM_COLUMNS]


def find_duplicate_row(ws, email: Optional[str], company: Optional[str]) -> Optional[int]:
    if not email and not company:
        return None

    rows = ws.get_all_records(expected_headers=CRM_COLUMNS)
    target_email = (email or "").strip().lower()
    target_company = (company or "").strip().lower()

    for row_idx, row in enumerate(rows, start=2):
        row_email = str(row.get("email", "")).strip().lower()
        row_company = str(row.get("company", "")).strip().lower()
        if target_email and target_company and row_email == target_email and row_company == target_company:
            return row_idx

    return None


def write_record_to_sheet(record: CRMRecord, enable_idempotency_guard: bool = True) -> SheetWriteResult:
    sheet_id = os.getenv("GOOGLE_SHEET_ID", "").strip()
    worksheet_name = os.getenv("GOOGLE_WORKSHEET_NAME", DEFAULT_WORKSHEET_NAME)

    # Fail early with clear message if required sheet config is missing.
    if not sheet_id:
        return SheetWriteResult(
            success=False,
            sheet_id="",
            worksheet_name=worksheet_name,
            row_number=None,
            error_message="GOOGLE_SHEET_ID is missing.",
        )

    try:
        # Open sheet and verify header contract before any write happens.
        ws = open_worksheet(sheet_id, worksheet_name)
        validate_sheet_headers(ws, CRM_COLUMNS)

        # Optional de-duplication check: reuse existing row if same (email, company).
        if enable_idempotency_guard:
            existing = find_duplicate_row(ws, record.email, record.company)
            if existing:
                return SheetWriteResult(
                    success=True,
                    sheet_id=sheet_id,
                    worksheet_name=worksheet_name,
                    row_number=existing,
                    error_message="Duplicate detected by (email, company); existing row reused.",
                )

        # Append new row when validation and duplicate checks pass.
        ws.append_row(crm_to_row(record), value_input_option="USER_ENTERED")
        new_row = len(ws.col_values(1))
        return SheetWriteResult(
            success=True,
            sheet_id=sheet_id,
            worksheet_name=worksheet_name,
            row_number=new_row,
            error_message=None,
        )

    except Exception as e:
        return SheetWriteResult(
            success=False,
            sheet_id=sheet_id,
            worksheet_name=worksheet_name,
            row_number=None,
            error_message=str(e),
        )


## Section E (10 min): End-to-End Pipeline and UI

## Pedagogical objective
Give students a complete runnable interface that mirrors real operator workflows.

## What this section's code does
1. Builds notebook UI widgets for upload and manual corrections.
2. Displays uploaded filename and byte size for input traceability.
3. Executes full pipeline (`Stage 1 -> Stage 2 -> Stage 3`) on button click.
4. Prints progress logs and renders both summary markdown and CRM payload table.
5. Preserves payload even when Google Sheets write fails.

## How students should run it
1. Upload one JPG/PNG business card.
2. Optionally correct fields manually.
3. Click `Run Pipeline`.
4. Review each stage output before discussing final CRM write result.

## Instructor demo tip
Run one good sample and one noisy sample to compare confidence behavior.


In [None]:
# Beginner note: End-to-end pipeline orchestrator and result formatter.

def run_pipeline(image_bytes: bytes, filename: str, manual_overrides: Dict[str, str]) -> Dict[str, Any]:
    # Reset errors and run each stage in sequence.
    workflow_state["errors"] = []

    extraction = run_stage1_extraction(image_bytes, filename, manual_overrides)
    workflow_state["namecard_extraction"] = extraction.model_dump()

    dossier = run_stage2_research(extraction)
    workflow_state["research_dossier"] = dossier.model_dump()

    crm_record = map_to_crm_record(extraction, dossier)
    workflow_state["crm_record"] = crm_record.model_dump()

    sheet_result = write_record_to_sheet(crm_record, enable_idempotency_guard=True)
    workflow_state["sheet_write_result"] = sheet_result.model_dump()

    return {
        "extraction": extraction,
        "dossier": dossier,
        "crm_record": crm_record,
        "sheet_result": sheet_result,
    }


def format_result_markdown(result: Dict[str, Any]) -> str:
    extraction = result["extraction"]
    dossier = result["dossier"]
    crm_record = result["crm_record"]
    sheet_result = result["sheet_result"]

    lines = []
    lines.append("## Pipeline Result")
    lines.append("### Stage 1: Extraction")
    lines.append(f"- Name: **{extraction.full_name}**")
    lines.append(f"- Title: {extraction.job_title}")
    lines.append(f"- Company: {extraction.company_name}")
    lines.append(f"- Email: {extraction.email}")
    lines.append(f"- Avg confidence: **{mean_confidence(extraction.field_confidence):.2f}**")

    lines.append("### Stage 2: Research Dossier")
    lines.append(f"- Person current role: {dossier.person_current_role}")
    lines.append(f"- Company industry: {dossier.company_industry}")
    lines.append(f"- Source count: **{len(dossier.citations)}**")
    lines.append(f"- Engagement angle: {dossier.engagement_angle}")

    lines.append("### Stage 3: CRM + Google Sheets")
    lines.append(f"- Write success: **{sheet_result.success}**")
    lines.append(f"- Worksheet: `{sheet_result.worksheet_name}`")
    lines.append(f"- Row number: {sheet_result.row_number}")
    lines.append(f"- Error: {sheet_result.error_message}")

    lines.append("### CRM Payload Preview")
    lines.append("```json")
    lines.append(json.dumps(crm_record.model_dump(), indent=2, ensure_ascii=True))
    lines.append("```")

    return "\n".join(lines)


In [None]:
# Beginner note: Notebook UI widgets and button handlers for interactive demo usage.

uploader = widgets.FileUpload(accept=".jpg,.jpeg,.png", multiple=False, description="Upload Namecard")
upload_status = widgets.HTML(value="<i>No file uploaded yet.</i>")

manual_full_name = widgets.Text(description="Name", placeholder="Optional override")
manual_job_title = widgets.Text(description="Title", placeholder="Optional override")
manual_company = widgets.Text(description="Company", placeholder="Optional override")
manual_email = widgets.Text(description="Email", placeholder="Optional override")
manual_phone = widgets.Text(description="Phone", placeholder="Optional override")
manual_linkedin = widgets.Text(description="LinkedIn", placeholder="Optional override")
manual_website = widgets.Text(description="Website", placeholder="Optional override")

run_button = widgets.Button(description="Run Pipeline", button_style="primary")
output = widgets.Output()


def on_upload_change(change):
    """Update UI label immediately when a file is uploaded."""
    if change.get("name") != "value":
        return
    try:
        content, filename = widget_file_to_bytes(uploader.value)
        upload_status.value = f"<b>Uploaded:</b> {filename} ({len(content)} bytes)"
    except Exception:
        upload_status.value = "<i>No file uploaded yet.</i>"


uploader.observe(on_upload_change, names="value")


def on_run_clicked(_):
    """Handle Run button click and execute the full workflow."""
    with output:
        # Clear previous run logs so students only see the latest execution trace.
        output.clear_output()
        try:
            upload_count = len(uploader.value) if hasattr(uploader.value, "__len__") else 1
            print(f"[Upload] Files detected: {upload_count}")
            image_bytes, filename = widget_file_to_bytes(uploader.value)
            print(f"[Upload] Reading: {filename} ({len(image_bytes)} bytes)")
            manual_overrides = {
                "full_name": manual_full_name.value.strip(),
                "job_title": manual_job_title.value.strip(),
                "company_name": manual_company.value.strip(),
                "email": manual_email.value.strip(),
                "phone": manual_phone.value.strip(),
                "linkedin_url": manual_linkedin.value.strip(),
                "website": manual_website.value.strip(),
            }

            print("[Pipeline] Running end-to-end workflow...")
            result = run_pipeline(image_bytes, filename, manual_overrides)
            display(Markdown(format_result_markdown(result)))
            display(pd.DataFrame([result["crm_record"].model_dump()]))

            if result["sheet_result"].success:
                print("[Pipeline] Completed successfully.")
            else:
                print("[Pipeline] Completed with sheet write failure; payload preserved for retry.")

        except Exception as e:
            print("[Pipeline] Failed:", e)


run_button.on_click(on_run_clicked)

form = widgets.VBox([
    widgets.HTML("<h3>1) Upload Namecard</h3>"),
    uploader,
    upload_status,
    widgets.HTML("<h3>2) Optional Manual Corrections</h3>"),
    manual_full_name,
    manual_job_title,
    manual_company,
    manual_email,
    manual_phone,
    manual_linkedin,
    manual_website,
    widgets.HTML("<h3>3) Execute</h3>"),
    run_button,
    output,
])

display(form)


## Validation Scenarios (Class Drills)

## Pedagogical objective
Move from "it runs" to "it is trustworthy." This section teaches students to validate AI pipelines with explicit checks.

## What this validation cell checks
1. Citation quality (missing URLs).
2. Low-confidence extraction guardrail.
3. Google Sheets write success/failure state.
4. CRM schema completeness.

## Suggested classroom drill sequence
1. Run a clean card and observe expected PASS outcomes.
2. Introduce a low-quality card image to trigger confidence warning.
3. Break a sheet header intentionally to show fail-fast schema behavior.
4. Re-run and discuss why these checks matter for deployment readiness.


In [None]:
# Beginner note: Quick validation checks to verify data quality and pipeline health.

def run_validation_checks():
    """Run post-execution checks for quality, schema, and write status."""
    print("Validation checks:")

    dossier_payload = workflow_state.get("research_dossier")
    if dossier_payload:
        missing_url = [c for c in dossier_payload.get("citations", []) if not c.get("url")]
        print("- Citation quality:", "PASS" if not missing_url else f"FAIL ({len(missing_url)} missing URLs)")
    else:
        print("- Citation quality: SKIP (no dossier)")

    extraction_payload = workflow_state.get("namecard_extraction")
    if extraction_payload:
        avg = mean_confidence(extraction_payload.get("field_confidence", {}))
        print("- Low-confidence guard:", "PASS" if avg >= EXTRACTION_CONFIDENCE_THRESHOLD else f"REVIEW NEEDED ({avg:.2f})")
    else:
        print("- Low-confidence guard: SKIP (no extraction)")

    sheet_payload = workflow_state.get("sheet_write_result")
    if sheet_payload:
        print("- Sheets write:", "PASS" if sheet_payload.get("success") else f"FAIL ({sheet_payload.get('error_message')})")
    else:
        print("- Sheets write: SKIP (not run)")

    crm_payload = workflow_state.get("crm_record")
    if crm_payload:
        missing_cols = [c for c in CRM_COLUMNS if c not in crm_payload]
        print("- CRM schema:", "PASS" if not missing_cols else f"FAIL missing {missing_cols}")
    else:
        print("- CRM schema: SKIP (no record)")


run_validation_checks()


## Teaching Flow (90 Minutes)

## Suggested facilitation plan
1. **0-10 min:** Explain architecture, contracts, and failure boundaries.
2. **10-30 min:** Stage 1 live run (clear card + blurry card comparison).
3. **30-60 min:** Stage 2 decomposition (planner/search/writer responsibilities).
4. **60-80 min:** Stage 3 CRM integration and idempotency behavior.
5. **80-90 min:** Validation drills + discussion of production hardening.

## Discussion prompts
- Where should human review be mandatory?
- Which confidence thresholds are acceptable for your domain?
- What additional observability would you add in production?
- How would you batch this pipeline for many cards?

## Stretch extensions (post-class)
1. Add batch ingestion for multiple cards.
2. Add per-claim citation linking in UI.
3. Route low-confidence leads into manual queue instead of auto-write.
