# üîç PulseTrace ‚Äî Multi-Agent Root Cause Analysis for Data Pipelines

PulseTrace automates how data engineers investigate failing pipelines using a **coordinated multi-agent system**.  
Instead of manually scanning logs, schemas, lineage, and past incidents, PulseTrace orchestrates:

- **Detector Agent** ‚Äî detects failures and triggers analysis  
- **Diagnoser Agent** ‚Äî fetches logs, schema diffs, sample data, and builds incident signatures  
- **History Analyzer Agent** ‚Äî matches incidents against the memory bank  
- **Impact Analyzer Agent** ‚Äî determines downstream blast radius using lineage  
- **Advisor Agent** ‚Äî synthesizes the final RCA report and fix recommendations  


## üîß Technology Implemented Behind PulseTrace

PulseTrace demonstrates:
- a **multi-agent workflow**  
- **asynchronous agent-to-agent (A2A) messaging** using an in-memory bus  
- custom diagnostic tools  
- observability via `TRACE_STORE`  
- per-incident session state (`SESSIONS`)  
- historical memory bank (`MEMORY_BANK`)  
- hybrid deterministic + LLM reasoning  
- human confirmation steps  


## üìò Notebook Roadmap

- üîß Setup & environment checks (offline mode + optional Gemini)
- ‚ñ∂Ô∏è How to Run This Notebook
- üß≠ Architecture overview & how the agents interact  
- ü§ñ Agent implementations (Diagnoser, History Analyzer, Impact Analyzer, Advisor)  
- ‚ñ∂Ô∏è Orchestrator: run an end-to-end RCA demo using offline samples
- üì° Observability: traces, sessions, memory bank 
- üé® Interactive UI (ipywidgets) ‚Äî upload logs, run RCA, approve & save reports
- üöÄ Agent Deployment (Vertex AI Agent Engine)
- üõ† Troubleshooting & diagnostics
- ‚ö†Ô∏è Current Limitations
- üöÄ What's Next steps for PulseTrace
- üìù Conclusion  


## üöÄ How to Run PulseTrace  

Running PulseTrace is simple and intentionally structured to feel smooth end-to-end.  
Follow these steps, and you‚Äôll have the full RCA pipeline running in minutes.

### **1. Run the Setup Cells**  
Start at the top of the notebook and execute each setup cell in order.  
These cells:  
- Prepare the environment  
- Register all agents  
- Load synthetic log samples  
- Initialize helper utilities  

You‚Äôll see clear confirmation messages as components are loaded.

### **2. (Optional) Execute the Demo Flow**  
Find the cell titled **‚ÄúRun demo & quick validation.‚Äù**  
Running it gives you a quick sanity check:  
- A complete RCA workflow runs automatically  
- Traces and sessions are generated  
- A draft report is produced and validated  

This helps confirm everything is wired correctly.

### **3. Use the Interactive UI**  
Scroll to the section titled **‚ÄúInteractive UI (ipywidgets)‚Äù**.  
Here you can:  
- Upload your own log file  
- Or choose a demo sample  
- Click **Run Diagnosis** to execute the full RCA pipeline  
- Inspect **Traces**, **Sessions**, and **Memory Bank** live  
- Approve and save reports as needed  

The UI is designed to be fast, clear, and beginner-friendly.


### **4. Inspect Internal Activity (Optional)**  
Use the **Observability** section to view:  
- Agent-to-Agent event traces  
- Active sessions  
- Memory bank summaries  

These tools help verify‚Äîand showcase‚Äîhow the pipeline behaves under the hood.


### **5. Explore Further Sections**  
The notebook ends with:  
- Troubleshooting  
- Current limitations  
- Next steps  
- A clean conclusion  

**You're ready to run PulseTrace.  
Follow the notebook from top to bottom, and the workflow will run smoothly end-to-end.**


## üîß Environment Setup & Mode Detection

This cell initializes the core environment for PulseTrace:
- loads helper libraries  
- prepares global state (sessions, trace store, memory bank)  
- checks whether Gemini is available  
- configures hybrid mode automatically (fallback-safe)  
- detects UI support (ipywidgets)


## (1) Optional: Install ipywidgets (only if missing)
This cell checks whether `ipywidgets` is installed and installs it only if required.


In [1]:
# install ipywidgets if missing (with readable debug logs)
print("üîß Checking for ipywidgets...")

try:
    import ipywidgets
    print("‚úî ipywidgets is already installed.")
except ImportError:
    print("‚ö† ipywidgets not installed. Installing ipywidgets==7.7.1 ...")
    import sys
    !{sys.executable} -m pip install ipywidgets==7.7.1

    print("‚è≥ Installation attempted. A kernel restart may be required depending on the environment.")


üîß Checking for ipywidgets...
‚úî ipywidgets is already installed.


## (2) Optional: Google API key setup (Kaggle Secrets)
Tries to load `GOOGLE_API_KEY`. If found, saves it to the environment for Gemini.


In [2]:
# Kaggle secret loader (with readable status messages)
print("üîë Checking for GOOGLE_API_KEY in Kaggle Secrets...")

import os

try:
    from kaggle_secrets import UserSecretsClient
    user_secrets = UserSecretsClient()

    key = user_secrets.get_secret("GOOGLE_API_KEY")

    if key:
        os.environ["GOOGLE_API_KEY"] = key
        print("‚úî GOOGLE_API_KEY successfully loaded into environment.")
    else:
        print("‚ö† GOOGLE_API_KEY not found in Kaggle Secrets. Continuing in offline mode.")

except Exception as e:
    print("‚ö† Could not load GOOGLE_API_KEY from Kaggle Secrets.")
    print("   Reason:", e)
    print("   Proceeding in offline mode.")


üîë Checking for GOOGLE_API_KEY in Kaggle Secrets...
‚úî GOOGLE_API_KEY successfully loaded into environment.


## (3) Environment Summary & Mode Detection
Detects:
- Gemini availability  
- ipywidgets UI availability  
- Prints a clean summary block


In [3]:
# Environment summary & detection with detailed print messages
print("üåê Initializing environment summary...")

# Detect Gemini availability
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
USE_GEMINI = bool(GOOGLE_API_KEY)

# Detect UI availability
try:
    import ipywidgets as widgets
    from IPython.display import display, clear_output
    UI_AVAILABLE = True
except Exception:
    UI_AVAILABLE = False

# Print summary (with fallbacks if rich is not installed)
summary_lines = [
    "\n=== PulseTrace Environment Summary ===",
    f"Gemini Mode: {'ON (API key detected)' if USE_GEMINI else 'OFF (no API key found)'}",
    f"UI Mode: {'ENABLED (ipywidgets available)' if UI_AVAILABLE else 'UNAVAILABLE'}",
]

try:
    from rich import print as rprint
    for line in summary_lines:
        rprint(line)
except Exception:
    for line in summary_lines:
        print(line)


üåê Initializing environment summary...


## (4) Configure Gemini Client (defensive)
Attempts to import and configure `google-generativeai`.  
Falls back safely if unavailable or misconfigured.


In [4]:
# Gemini client initialization with clear debugging output
print("ü§ñ Checking if Gemini client can be initialized...")

USE_GEMINI = bool(os.getenv("GOOGLE_API_KEY"))

if USE_GEMINI:
    print("üîç GOOGLE_API_KEY detected. Attempting to configure Gemini client...")
    try:
        import google.generativeai as genai
        genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
        print("‚úî Gemini client configured successfully.")
    except Exception as e:
        print("‚ùå Failed to initialize Gemini client.")
        print("   Falling back to deterministic offline mode.")
        print("   Reason:", e)
        USE_GEMINI = False
else:
    print("‚ö† Gemini disabled ‚Äî no GOOGLE_API_KEY found. Running in offline deterministic mode.")


ü§ñ Checking if Gemini client can be initialized...
üîç GOOGLE_API_KEY detected. Attempting to configure Gemini client...
‚úî Gemini client configured successfully.


## üß© Core Runtime Structures  
This cell initializes all global stores, time helpers, hashing helpers, and idempotent guards used across the PulseTrace engine.  
These are required before tools, agents, or the router can function.


In [5]:
print("üîß Initializing core runtime structures...")

import os, time, uuid, json, re, hashlib, traceback, pathlib, threading
from collections import Counter, deque
from IPython.display import display, Markdown

if "_PULSETRACE_FINAL" not in globals():
    _PULSETRACE_FINAL = True
    DEMO_SEED = True

    def now_ts(): 
        return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    def make_id(prefix="inc"): 
        return f"{prefix}-{uuid.uuid4().hex[:8]}"

    def sha256_hex(s: str): 
        return hashlib.sha256(s.encode("utf-8")).hexdigest()

    TRACE_STORE = []
    IN_MEMORY_BUS = deque()
    SESSIONS = {}
    MEMORY_BANK = []
    DRAFT_MEMORY = []

print("‚úÖ Core runtime structures ready.")


üîß Initializing core runtime structures...
‚úÖ Core runtime structures ready.


## ‚¨áÔ∏è Adding PDF Export and Report Utilities

This section introduces helper functions necessary to provide a **"Download as PDF"** option for the final report. 

Since the Jupyter UI is rendering a mix of Markdown and HTML, we implement a file-download mechanism that is common in web applications:

- It uses the pure-Python **`reportlab`** library to convert the Markdown report into a raw PDF file format in memory.
- The PDF bytes are then encoded using **Base64**.
- The Base64 string is embedded into an **HTML download link** (`<a download>`) that triggers the file save when clicked.

This ensures the **integrity and professional formatting** of the final RCA report artifact.

In [6]:
# --- INSTALLATION ---
try:
    import reportlab
except ImportError:
    !pip install reportlab

# --- IMPORTS ---
import base64
import json
from io import BytesIO
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak, Image
from reportlab.lib.enums import TA_CENTER, TA_LEFT

print("‚úÖ 'reportlab' loaded. Defining professional PDF generator with Blue Button...")

# --- PROFESSIONAL PDF GENERATOR (Same layout as before) ---
def create_professional_pdf(report_json):
    buffer = BytesIO()
    doc = SimpleDocTemplate(buffer, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=72)
    styles = getSampleStyleSheet()
    
    # Custom Styles
    style_title = ParagraphStyle('PulseTitle', parent=styles['Title'], fontSize=24, spaceAfter=20, textColor=colors.HexColor("#1f4e79"))
    style_heading = ParagraphStyle('PulseHeading', parent=styles['Heading2'], fontSize=16, spaceBefore=15, spaceAfter=10, textColor=colors.HexColor("#2e75b6"), borderPadding=5)
    style_subheading = ParagraphStyle('PulseSubHeading', parent=styles['Heading3'], fontSize=12, spaceBefore=10, textColor=colors.HexColor("#333333"))
    style_body = ParagraphStyle('PulseBody', parent=styles['BodyText'], fontSize=10, leading=14, spaceAfter=6)
    style_code = ParagraphStyle('PulseCode', parent=styles['Code'], fontSize=8, leading=10, backColor=colors.whitesmoke, borderPadding=5)
    
    meta = report_json.get("meta", {})
    rca = report_json.get("root_cause", {})
    sev = report_json.get("severity", {})
    conf = report_json.get("confidence_breakdown", {})
    evidence = report_json.get("evidence", {})
    impact = report_json.get("impact", {})
    recs = report_json.get("recommended_next_steps", [])
    
    Story = []
    
    # PAGE 1: COVER
    Story.append(Spacer(1, 2 * inch))
    Story.append(Paragraph("Incident Report", style_title))
    Story.append(Spacer(1, 0.2 * inch))
    Story.append(Paragraph(f"<b>Incident ID:</b> {meta.get('incident_id', 'N/A')}", style_body))
    Story.append(Paragraph(f"<b>Job:</b> {meta.get('job', 'N/A')}", style_body))
    Story.append(Paragraph(f"<b>Scenario:</b> {meta.get('scenario', 'N/A')}", style_body))
    Story.append(Paragraph(f"<b>Generated:</b> {meta.get('generated_at', 'N/A')}", style_body))
    Story.append(Spacer(1, 1 * inch))
    Story.append(Paragraph("Executive Summary", style_heading))
    summary_text = (f"A <b>{sev.get('label', 'UNKNOWN')}</b> severity issue was detected in job <b>{meta.get('job')}</b>. "
                    f"The root cause is hypothesized to be <b>{rca.get('hypothesis')}</b>. "
                    f"{len(impact.get('downstreams', []))} downstream assets are potentially impacted.")
    Story.append(Paragraph(summary_text, style_body))
    Story.append(Spacer(1, 2 * inch))
    Story.append(Paragraph("Generated by PulseTrace RCA", ParagraphStyle('Footer', parent=style_body, alignment=TA_CENTER, textColor=colors.grey)))
    Story.append(PageBreak())

    # PAGE 2: DETAILS
    Story.append(Paragraph("2. Root Cause Analysis", style_heading))
    Story.append(Paragraph(f"Hypothesis: {rca.get('hypothesis')}", style_body))
    
    Story.append(Paragraph("3. Confidence Breakdown", style_heading))
    conf_data = [["Component", "Score"], ["Final Score", f"{conf.get('final_score')} ({conf.get('label')})"],
                 ["History Score", str(conf.get('history_score'))], ["Field Drift", str(conf.get('field_score'))], ["Anomaly Score", str(conf.get('anomaly_score'))]]
    t_conf = Table(conf_data, colWidths=[200, 100], hAlign='LEFT')
    t_conf.setStyle(TableStyle([('BACKGROUND', (0, 0), (1, 0), colors.HexColor("#e1e1e1")), ('TEXTCOLOR', (0, 0), (1, 0), colors.black), ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('PADDING', (0, 0), (-1, -1), 6)]))
    Story.append(t_conf)

    Story.append(Paragraph("4. Evidence", style_heading))
    Story.append(Paragraph("4.1 Log Excerpt", style_subheading))
    logs = evidence.get('logs', {}).get('lines', [])
    if logs: Story.append(Paragraph("<br/>".join([line[:120] for line in logs[:10]]), style_code))
    
    anomalies = evidence.get('sample_anomalies', {}).get('anomalies', [])
    if anomalies:
        Story.append(Paragraph("4.2 Sample Anomalies", style_subheading))
        anom_data = [["Row", "Field", "Value"]] + [[str(a.get('row')), str(a.get('field')), str(a.get('value'))] for a in anomalies]
        t_anom = Table(anom_data, colWidths=[60, 150, 200], hAlign='LEFT')
        t_anom.setStyle(TableStyle([('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#fce4d6")), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('PADDING', (0, 0), (-1, -1), 5)]))
        Story.append(t_anom)

    Story.append(Paragraph("5. Downstream Impact", style_heading))
    downstreams = impact.get('downstreams', [])
    if downstreams:
        imp_data = [["Asset", "Type", "Critical"]] + [[d.get('asset'), d.get('type'), "YES" if d.get('critical') else "No"] for d in downstreams]
        t_imp = Table(imp_data, colWidths=[250, 100, 80], hAlign='LEFT')
        t_imp.setStyle(TableStyle([('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#d9e1f2")), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('PADDING', (0, 0), (-1, -1), 5)]))
        Story.append(t_imp)
    
    Story.append(Paragraph("6. Recommendations", style_heading))
    for i, rec in enumerate(recs, 1): Story.append(Paragraph(f"{i}. {rec}", style_body))

    doc.build(Story)
    buffer.seek(0)
    return buffer.read()

def render_pdf_download_button(incident_id, report_json):
    """
    Generates a Blue 'Download as PDF' button that delivers the Professional PDF.
    """
    try:
        pdf_bytes = create_professional_pdf(report_json)
        pdf_base64 = base64.b64encode(pdf_bytes).decode('utf-8')
        filename = f"PulseTrace_Report_{incident_id}.pdf"

        html_button = f"""
        <a download="{filename}" href="data:application/pdf;base64,{pdf_base64}">
            <button style="background-color: #2196F3; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; font-size: 16px; margin-top: 20px;">
                ‚¨áÔ∏è Download as PDF
            </button>
        </a>
        """
        return html_button
    except Exception as e:
        return f"<b>Error generating PDF:</b> {e}"

print("‚ú® Professional PDF utility loaded (Blue Button style restored).")

‚úÖ 'reportlab' loaded. Defining professional PDF generator with Blue Button...
‚ú® Professional PDF utility loaded (Blue Button style restored).


## üß™ Demo Memory Seed + Synthetic Log Inputs  
This cell seeds the demo MEMORY_BANK (idempotent) and defines the synthetic log inputs used in offline RCA mode.


In [7]:
print("üß™ Seeding demo memory & loading synthetic logs...")

def seed_demo_memory_once():
    global MEMORY_BANK
    if not DEMO_SEED: 
        print("  ‚Ü™ DEMO_SEED disabled.")
        return
    if any(e.get("seeded") for e in MEMORY_BANK): 
        print("  ‚Ü™ MEMORY_BANK already seeded.")
        return

    MEMORY_BANK.clear()
    seeded = [
        {
            "incident_id": "past-inc-orders-2025-11-01",
            "job": "orders",
            "error_class": "TypeError",
            "changed_fields": ["price"],
            "sample_anomalies": [{"row":10,"field":"price","value":"999.99"}],
            "created_at": "2025-11-01 09:00:00",
            "hint_snippet": "cannot cast '123.45' to INT",
        },
        {
            "incident_id": "past-inc-pricing-2025-10-20",
            "job": "pricing",
            "error_class": "ValueError",
            "changed_fields": [],
            "sample_anomalies": [{"row":400,"field":"price","value":-5.0}],
            "created_at": "2025-10-20 14:30:00",
            "hint_snippet": "negative value",
        }
    ]

    for e in seeded:
        e["text_hash"] = sha256_hex(
            f"{e['job']}|{e['error_class']}|{','.join(e.get('changed_fields',[]))}|{e.get('hint_snippet','')}"
        )
        e["seeded"] = True
        e["source"] = "demo_seed"
        MEMORY_BANK.append(e)

    print("  ‚úî MEMORY_BANK seeded:", [m["incident_id"] for m in MEMORY_BANK])

seed_demo_memory_once()

SYNTHETIC_LOGS = {
    "schema_drift": [
        "2025-11-24 10:00:01 INFO job=orders ETL step=ingest files=3",
        "2025-11-24 10:03:02 ERROR job=orders transform TypeError: cannot cast '123.45' to INT on column price",
    ],
    "missing_partition": [
        "2025-11-24 11:00:05 ERROR job=reports No files found for partition dt=2025-11-24",
    ],
    "invalid_values": [
        "2025-11-24 12:05:02 ERROR job=pricing transform ValueError: negative value found in price at row 524",
    ],
}

print("‚úÖ Demo memory + synthetic logs loaded.")


üß™ Seeding demo memory & loading synthetic logs...
  ‚úî MEMORY_BANK seeded: ['past-inc-orders-2025-11-01', 'past-inc-pricing-2025-10-20']
‚úÖ Demo memory + synthetic logs loaded.


## üõ† Deterministic Diagnostic Tools  
This cell defines the lightweight deterministic tools used by agents:  
- log_fetch  
- schema_diff  
- sample_data  
- lineage_query  
- history_query  
- save_report  
These simulate a real pipeline environment for offline RCA.


In [8]:
print("üõ† Initializing deterministic diagnostic tools...")

class LogFetch:
    def run(self, params):
        return {"lines": SYNTHETIC_LOGS.get(params.get("scenario"), []), "job": None}

class SchemaDiff:
    def run(self, params):
        if params.get("scenario") == "schema_drift":
            return {"diff":[{"field":"price","old":"INT","new":"FLOAT"}]}
        return {"diff":[]}

class SampleData:
    def run(self, params):
        scenario = params.get("scenario")
        if scenario == "schema_drift":
            return {
                "rows":[{"id":1,"price":"123.45"},{"id":2,"price":"200.00"}],
                "anomalies":[{"row":1,"field":"price","value":"123.45"}]
            }
        if scenario == "invalid_values":
            return {
                "rows":[{"id":524,"price":-12.5},{"id":100,"price":10.0}],
                "anomalies":[{"row":524,"field":"price","value":-12.5}]
            }
        return {"rows":[], "anomalies":[]}

class LineageQuery:
    def run(self, params):
        job = params.get("job")
        if job in ("orders","pricing"):
            return {"downstreams":[
                {"asset":"dashboard.sales_over_time","type":"dashboard","critical":True},
                {"asset":"ml.revenue_forecast","type":"model","critical":True}
            ]}
        if job == "reports":
            return {"downstreams":[{"asset":"dashboard.reports","type":"dashboard","critical":False}]}
        return {"downstreams":[]}

class HistoryQuery:
    def run(self, params):
        signature = params.get("signature", {})
        matches=[]
        for past in MEMORY_BANK:
            score = 0.0
            if signature.get("job") == past.get("job"):
                score += 0.4
            cf=set(signature.get("changed_fields",[])); pf=set(past.get("changed_fields",[]))
            if cf and pf:
                overlap = len(cf & pf)/max(1,len(cf|pf))
                score += 0.4*overlap
            if signature.get("error_class") == past.get("error_class"):
                score += 0.2
            if score>0: 
                matches.append({"past_incident":past,"score":round(score,2)})
        matches.sort(key=lambda x: x["score"], reverse=True)
        return {"matches": matches[:5]}

class SaveReport:
    def run(self, params):
        report_md = params.get("report_md")
        report_json = params.get("report_json")
        incident_id = params.get("incident_id", make_id("rpt"))
        out_dir = params.get("out_dir","submission_reports")
        os.makedirs(out_dir, exist_ok=True)
        base = f"{out_dir}/pulsetrace_report_{incident_id}"
        md_path = f"{base}.md"; json_path = f"{base}.json"
        with open(md_path, "w") as f: f.write(report_md)
        with open(json_path, "w") as f: json.dump(report_json, f, indent=2)
        return {"saved": True, "md_path": md_path, "json_path": json_path}

TOOLS = {
    "log_fetch": LogFetch(),
    "schema_diff": SchemaDiff(),
    "sample_data": SampleData(),
    "lineage_query": LineageQuery(),
    "history_query": HistoryQuery(),
    "save_report": SaveReport()
}

print("‚úÖ Tools initialized.")


üõ† Initializing deterministic diagnostic tools...
‚úÖ Tools initialized.


## üîç Signature Builder & Existing Incident Lookup

These helpers generate the canonical incident signature and support
idempotent behavior by checking whether the same incident signature
already exists in the current session or memory bank.

This ensures:
- recurring incidents reuse past reports  
- no duplicate work  
- deterministic RCA behavior across runs  


In [9]:
print("üîß Loading: Signature builder & existing-entry lookup helpers...")

# -------------------------
# Helpers: signature, find existing
# -------------------------
def build_signature(incident, schema_diff_res, logs_res, samples_res):
    job = incident.get("job")
    
    # 1. Try the snippet provided
    err = incident.get("error_snippet", "")
    
    # 2. If snippet is weak (Unknown), try scanning the full logs if available
    full_log_text = "\n".join(logs_res.get("lines", []))
    
    # Search for specific error classes in snippet OR full logs
    regex = r"(TypeError|ValueError|No files found|timeout|Missing|PermissionError)"
    
    m = re.search(regex, err, re.I)
    if not m and full_log_text:
        m = re.search(regex, full_log_text, re.I)
        
    error_class = m.group(1) if m else "Unknown"

    changed = [d["field"] for d in schema_diff_res.get("diff",[])]
    text = f"{job}|{error_class}|{','.join(changed)}|{err}"
    th = sha256_hex(text)

    print(f"üìå Signature built for job='{job}', error_class='{error_class}', changed_fields={changed}")

    return {
        "job": job,
        "error_class": error_class,
        "changed_fields": changed,
        "text_hash": th,
        "sample_anomalies": samples_res.get("anomalies", []),
        "created_at": now_ts()
    }

def find_existing_by_text_hash(text_hash):
    print(f"üîç Checking for existing incidents with text_hash={text_hash[:10]}...")

    for inc_id, sess in SESSIONS.items():
        sig = sess.get("signature") or {}
        if sig.get("text_hash") == text_hash:
            print(f"‚úî Found matching session: {inc_id}")
            return ("session", inc_id, sess)

    for entry in MEMORY_BANK:
        if entry.get("text_hash") == text_hash:
            print(f"‚úî Found matching historical memory entry: {entry.get('incident_id')}")
            return ("memory", entry.get("incident_id", None), entry)

    print("‚ùå No matching signature found.")
    return (None, None, None)


üîß Loading: Signature builder & existing-entry lookup helpers...


## üìä Confidence Scoring Layer

PulseTrace combines several signals to compute a final confidence score:

- match strength with historical incidents  
- schema field drift  
- anomalies in sampled rows  
- optional Gemini-based augmentation (hybrid mode)

The following section computes:
- the final confidence score  
- a detailed breakdown  
- labels (HIGH / MEDIUM / LOW)  


In [10]:
print("üìà Loading: Confidence scoring helpers...")
def confidence_label(score: float) -> str:
    if score >= 0.80: return "HIGH"
    if score >= 0.50: return "MEDIUM"
    return "LOW"

def compute_confidence(signature: dict, history_matches: list = None, use_gemini: bool = False):
    history_matches = history_matches or []
    history_score = 0.0

    if history_matches:
        try:
            history_score = max(float(m.get("score", 0.0)) for m in history_matches)
        except Exception:
            history_score = 0.0

    field_score = 1.0 if signature.get("changed_fields") else 0.0
    anomalies = signature.get("sample_anomalies") or []
    anomaly_count = len(anomalies)
    anomaly_score = min(1.0, anomaly_count / 3.0)
    gemini_score = 1.0 if use_gemini else 0.0

    weights = {"history": 0.40, "field": 0.25, "anomaly": 0.25, "gemini": 0.10}

    if not use_gemini:
        s = weights["history"] + weights["field"] + weights["anomaly"]
        weights["history"] /= s
        weights["field"]  /= s
        weights["anomaly"]/= s
        weights["gemini"] = 0.0

    score = (
        weights["history"] * history_score +
        weights["field"] * field_score +
        weights["anomaly"] * anomaly_score +
        weights["gemini"] * gemini_score
    )
    score = max(0.0, min(1.0, round(score, 2)))

    breakdown = {
        "history_score": round(history_score,2),
        "field_score": round(field_score,2),
        "anomaly_score": round(anomaly_score,2),
        "gemini_score": round(gemini_score,2),
        "weights": {k: round(v,2) for k,v in weights.items()},
        "final_score": score,
        "label": confidence_label(score)
    }

    print(f"üìä Confidence score computed: {score} ({breakdown['label']})")

    return score, breakdown


üìà Loading: Confidence scoring helpers...


## üö® Severity Computation Layer

Severity is computed using:
- downstream impact (critical dashboards/models)  
- schema drift  
- sample anomalies  

This produces a normalized severity score and label.


In [11]:
print("üö® Loading: Severity computation helpers...")

# -------------------------
# Severity helpers
# -------------------------
def compute_severity(signature, impact):
    score = 0.0

    # --- FIX: Robustly find the list of downstream assets ---
    # This handles both nested impact structures and direct dictionaries
    dlist = []
    if isinstance(impact, dict):
        dlist = impact.get("downstreams") or impact.get("impact", {}).get("downstreams")

    # --- Scoring Logic ---
    if dlist:
        # If any downstream asset is critical -> +0.6
        if any(d.get("critical") for d in dlist):
            score += 0.6
        # If downstreams exist but none are critical -> +0.2
        else:
            score += 0.2

    # Add points for Schema Drift (+0.2)
    if signature.get("changed_fields"):
        score += 0.2

    # Add points for Sample Anomalies (+0.2)
    if signature.get("sample_anomalies"):
        score += 0.2

    final = min(1.0, round(score, 2))
    print(f"üö® Severity computed: {final}")
    
    return final

def severity_label(score):
    if score >= 0.75: return "HIGH"
    if score >= 0.4: return "MEDIUM"
    return "LOW"

üö® Loading: Severity computation helpers...


## üì® Message Bus & Router Core

PulseTrace uses a simple FIFO in-memory message bus (`IN_MEMORY_BUS`)  
to allow agents to send events to one another.

This layer contains:
- `emit()` ‚Üí enqueue messages  
- `pop_next_message()` ‚Üí get next event  
- `router_once()` ‚Üí routes one event to the appropriate agent  
- `router_run_blocking()` ‚Üí drains the queue  

All agents rely on this for communication.


In [12]:
print("üì® Loading: Message bus & router system...")

# -------------------------
# Simple router & agents (deterministic + optional LLM augmentation)
# -------------------------
def emit(frm: str, to: str, payload: dict):
    msg = {
        "id": uuid.uuid4().hex,
        "from": frm,
        "to": to,
        "ts": now_ts(),
        "payload": payload
    }
    TRACE_STORE.append(msg)
    IN_MEMORY_BUS.append(msg)

    print(f"[emit] {frm} ‚Üí {to} | type={payload.get('type', payload.get('pattern',''))}")
    return msg

def pop_next_message():
    return IN_MEMORY_BUS.popleft() if IN_MEMORY_BUS else None

def router_once():
    msg = pop_next_message()
    if not msg:
        return False

    to = msg["to"]

    print(f"[router] Dispatching to agent: {to}")

    if to == "pulse_detector":
        pulse_detector.on_message(msg)
    elif to == "root_cause_diagnoser":
        diagnoser.on_message(msg)
    elif to == "pattern_history_agent":
        history_agent.on_message(msg)
    elif to == "impact_scope_agent":
        impact_agent.on_message(msg)
    elif to == "pulse_advisor":
        advisor.on_message(msg)
    elif to == "save_report":
        pass
    elif to == "ui":
        pass

    return True

def router_run_blocking():
    print("‚ñ∂Ô∏è Router: starting event loop...")
    while IN_MEMORY_BUS:
        router_once()
    print("‚èπ Router: event queue empty.")


üì® Loading: Message bus & router system...


## üîé Pulse Detector Agent (`pulse_detector`)

This is the first agent in the multi-agent pipeline.

It:
- receives uploaded logs or demo logs  
- detects scenario (`schema_drift`, `invalid_values`, etc.)  
- extracts job name and error snippet  
- builds the initial incident object  
- emits it to the Diagnoser agent  

This makes PulseTrace‚Äôs architecture fully aligned with the write-up.


In [13]:
# -------------------------
# Detector Agent (pulse_detector)
# -------------------------
print("üì® Loading: Detector Agent (pulse_detector)...")

class PulseDetector:
    """
    Lightweight failure detector.
    Reads raw log lines and identifies scenario + job + error snippet.
    Emits a detection payload to root_cause_diagnoser.
    """

    def detect(self, lines):
        text = "\n".join(lines).lower()

        # infer scenario
        if "cannot cast" in text or "cannot convert" in text:
            scenario = "schema_drift"
        elif "no files found" in text:
            scenario = "missing_partition"
        elif "negative value" in text:
            scenario = "invalid_values"
        else:
            scenario = "unknown"

        # infer job
        m = re.findall(r"job=([A-Za-z0-9_\-\.]+)", text)
        job = Counter(m).most_common(1)[0][0] if m else "unknown"

        return scenario, job

    def on_message(self, msg):
        payload = msg["payload"]
        lines = payload.get("lines", [])
        incident_id = payload.get("incident_id", make_id("inc"))

        if not lines:
            print("[pulse_detector] No log lines provided.")
            return

        scenario, job = self.detect(lines)
        error_snip = lines[0] if lines else ""

        print(f"[pulse_detector] Scenario={scenario}, job={job}")

        emit(
            "pulse_detector",
            "root_cause_diagnoser",
            {
                "type": "detection",
                "incident_id": incident_id,
                "scenario": scenario,
                "job": job,
                "error_snippet": error_snip,
                "source_lines_count": len(lines),
                "pattern": "detected_by_detector"
            }
        )


üì® Loading: Detector Agent (pulse_detector)...


## üß† Diagnoser Agent (`root_cause_diagnoser`)

This is the core reasoning agent.

It:
- fetches logs  
- detects schema drift  
- collects sample rows  
- builds the incident signature  
- optionally enriches logs using Gemini (hybrid mode)  
- sends signature to History + Impact agents  

This is the heart of the RCA workflow.


In [14]:
print("üß† Loading: Diagnoser agent...")

class Diagnoser:
    def on_message(self, msg):
        inc = msg["payload"]
        incident_id = inc["incident_id"]
        scenario = inc.get("scenario")

        print(f"üîç Diagnoser triggered for incident: {incident_id}, scenario={scenario}")

        # Fetch evidence
        logs_res    = TOOLS["log_fetch"].run({"scenario": scenario, "job": None})
        schema_res  = TOOLS["schema_diff"].run({"scenario": scenario, "job": inc.get("job")})
        samples_res = TOOLS["sample_data"].run({"scenario": scenario, "job": inc.get("job")})

        # Build signature
        signature = build_signature(inc, schema_res, logs_res, samples_res)
        signature["incident_id"] = incident_id

        # Optional Gemini log summarization
        try:
            if USE_GEMINI and _GEMINI_CONFIGURED:
                print("‚ú® Gemini hybrid mode ON ‚Äî summarizing logs...")
                prompt = (
                    "Summarize these log lines and highlight likely root causes:\n\n" +
                    "\n".join(logs_res.get("lines",[])[:200])
                )
                llm_res = call_gemini_api(prompt, timeout_s=5)
                signature["llm"] = {
                    "used": bool(llm_res.get("text") and not llm_res.get("error")),
                    "summary": (llm_res.get("text") or "")[:3000],
                    "confidence": llm_res.get("confidence") or 0.0,
                    "error": llm_res.get("error"),
                    "prompt_snippet": prompt[:800],
                    "ts": now_ts()
                }
            else:
                signature["llm"] = {"used": False}
        except Exception as e:
            signature["llm"] = {"used": False, "error": str(e)}

        # Store draft session
        DRAFT_MEMORY.append(dict(signature))

        SESSIONS[incident_id] = {
            "signature": signature,
            "logs": logs_res.get("lines", [])[:50],
            "schema_diff": schema_res.get("diff", []),
            "samples": samples_res.get("rows", [])[:20],
            "sample_anomalies": samples_res.get("anomalies", []),
            "history": None,
            "impact": None,
            "report": None,
            "stage": "draft"
        }

        print(f"üì¶ Diagnoser built signature & stored session for {incident_id}")

        emit("root_cause_diagnoser", "pattern_history_agent",
             {"type":"signature", "signature": signature})
        emit("root_cause_diagnoser", "impact_scope_agent",
             {"type":"failure_point", "signature": signature})


üß† Loading: Diagnoser agent...


## üß¨ History Analyzer & Impact Analyzer

### **History Analyzer (`pattern_history_agent`)**
Surfaces recurring incidents via `history_query`.

### **Impact Analyzer (`impact_scope_agent`)**
Determines downstream blast radius using `lineage_query`.

Both agents enrich the session and forward results to the Advisor.


In [15]:
print("üß¨ Loading: History & Impact analyzer agents...")

class PatternHistoryAgent:
    def on_message(self, msg):
        sig = msg["payload"]["signature"]
        print(f"üìö History agent running for signature: {sig['incident_id']}")

        res = TOOLS["history_query"].run({"signature": sig})
        emit("pattern_history_agent", "pulse_advisor",
             {"type":"history_matches", "matches": res["matches"], "signature": sig})

class ImpactScopeAgent:
    def on_message(self, msg):
        sig = msg["payload"]["signature"]
        print(f"üåê Impact agent running for job={sig.get('job')}")

        job = sig.get("job")
        res = TOOLS["lineage_query"].run({"job": job})
        emit("impact_scope_agent", "pulse_advisor",
             {"type":"impact", "impact": res, "signature": sig})


üß¨ Loading: History & Impact analyzer agents...


## üßæ Advisor Agent (`pulse_advisor`)

The Advisor is the final reasoning layer.  
It waits until *both* history + impact results arrive, then:

- computes final severity  
- computes confidence (with optional Gemini contribution)  
- merges logs, schema diff, anomalies  
- generates:
  - Markdown RCA report  
  - machine-readable JSON report

Finally, it emits:
- `ui` ‚Üí to display the draft report  
- `save_report` ‚Üí to persist it  


In [16]:
print("üß≠ Loading: Advisor agent...")

class Advisor:
    def __init__(self, use_gemini=False):
        self.use_gemini = use_gemini

    def _extract_downstreams(self, impact):
        if isinstance(impact, dict):
            if "downstreams" in impact:
                return impact.get("downstreams")
            if "impact" in impact and isinstance(impact["impact"], dict) and "downstreams" in impact["impact"]:
                return impact["impact"].get("downstreams")
        return []

    def synthesize_report(self, session):
        signature = session.get("signature", {})
        history = session.get("history") or []
        impact = session.get("impact") or {}
        sev_score = compute_severity(signature, impact)
        sev_label = severity_label(sev_score)
        confidence_score, confidence_breakdown = compute_confidence(signature, history, use_gemini=self.use_gemini)

        # determine llm usage from session.signature if present
        llm_info = session.get("signature", {}).get("llm", {"used": False})

        # Metadata block (ensure seeded/source) -- set mode deterministically: deterministic | gemini | hybrid
        meta = {
            "incident_id": signature.get("incident_id"),
            "job": signature.get("job"),
            "scenario": signature.get("scenario") or signature.get("error_class") or "unknown",
            "text_hash": signature.get("text_hash"),
            "mode": ("hybrid" if (self.use_gemini and llm_info.get("used")) else ("gemini" if self.use_gemini else "deterministic")),
            "generated_at": now_ts()
        }
        # ensure seeded/source presence
        meta["seeded"] = any(e.get("text_hash")==signature.get("text_hash") and e.get("seeded") for e in MEMORY_BANK)
        meta["source"] = "demo_seed" if meta["seeded"] else "runtime_draft"

        # Build markdown (report text preserved as before)
        lines = []
        lines.append("---")
        for k,v in meta.items():
            lines.append(f"{k}: {v}")
        lines.append("---"); lines.append("")
        lines.append(f"# RCA Report - {signature.get('incident_id')}")
        lines.append("")
        lines.append(f"**Root Cause (hypothesis):** {signature.get('error_class')} on job `{signature.get('job')}`")
        lines.append("")
        lines.append(f"**Severity:** {sev_label} ({sev_score})")
        lines.append(f"**Confidence:** {confidence_breakdown['label']} ({confidence_breakdown['final_score']})")
        lines.append("")
        # Confidence breakdown in MD (top)
        cb = confidence_breakdown
        lines.append("**Confidence breakdown:**")
        lines.append(f"- final_score: {cb['final_score']} ({cb['label']})")
        lines.append(f"- history_score: {cb['history_score']}, field_score: {cb['field_score']}, anomaly_score: {cb['anomaly_score']}")
        lines.append("")
        # Evidence
        lines.append("## Evidence")
        log_lines = session.get("logs", [])[:10]
        if log_lines:
            lines.append("### Log excerpt (`log_fetch`)")
            lines.append("```")
            for i, ln in enumerate(log_lines, start=1):
                lines.append(f"{i:3d}: {ln}")
            lines.append("```")
        schema_diff = session.get("schema_diff", [])
        if schema_diff:
            lines.append("### Schema diff (`schema_diff`)")
            for d in schema_diff:
                lines.append(f"- field: `{d.get('field')}` ‚Äî {d.get('old')} ‚Üí {d.get('new')}")
        sample_anoms = session.get("sample_anomalies", [])
        if sample_anoms:
            lines.append("### Sample anomalies (`sample_data`)")
            lines.append("|row|field|value|")
            lines.append("|--:|:--|:--|")
            for a in sample_anoms:
                lines.append(f"|{a.get('row')}|{a.get('field')}|`{a.get('value')}`|")
        if (not log_lines) and (not schema_diff) and (not sample_anoms):
            lines.append("- none")
        lines.append("")

        # Historical matches
        lines.append("## Historical Matches (`history_query`)")
        history = session.get("history") or []
        if history:
            for m in history:
                comp = m.get("components",{})
                lines.append(f"- matched past incident {m['past_incident'].get('created_at','n/a')} ‚Äî score {m['score']} (job:{comp.get('job',0)}, fields:{round(comp.get('fields',0),2)}, error_class:{comp.get('error_class',0)})")
        else:
            lines.append("- none")
        lines.append("")

        # Impact (lineage)
        lines.append("## Impacted Downstream (`lineage_query`)")
        dlist = self._extract_downstreams(impact)
        if dlist:
            for d in dlist:
                lines.append(f"- `{d['asset']}` ({d['type']}) critical={d.get('critical', False)}")
        else:
            lines.append("- none")
        lines.append("")

        # Recommendations (scenario-driven, generic templates with placeholders)
        lines.append("## Recommended Next Steps")

        # Default scenario templates (merge-safe)
        _default_scenario_recs = {
            "missing_partition": {
                "title": "Missing partition detected",
                "steps": [
                    "Verify storage prefix exists and list objects: `aws s3 ls {prefix}` or `gsutil ls {prefix}` and confirm objects under {partition}.",
                    "If upstream should have produced this partition: re-run the upstream producer job for the {partition} window (job: {job}).",
                    "If backup data exists: run a targeted backfill to restore {prefix}/{partition}.",
                    "If upstream intentionally skipped: mark the partition as 'no-data-expected' and configure downstream pipelines to soft-skip."
                ],
                "confidence_hint": "High ‚Äî missing partition pattern found in logs."
            },
            "schema_drift": {
                "title": "Schema change / drift detected",
                "steps": [
                    "Identify the changed field(s) and impacted sinks with a quick schema diff.",
                    "If breaking: coordinate a schema contract update with upstream and deploy a compatible parser or migration.",
                    "Run a selective backfill for affected partitions if data loss or type coercion happened."
                ],
                "confidence_hint": "Medium ‚Äî schema mismatch detected; confirm with schema registry or sample rows."
            },
            "invalid_values": {
                "title": "Invalid values / data quality issues",
                "steps": [
                    "Quantify affected rows (e.g., `SELECT COUNT(*) FROM upstream WHERE <predicate>`).",
                    "Add validation rules at ingest or transformation to reject/flag invalid values.",
                    "Create an automated remediation/backfill run for the affected windows."
                ],
                "confidence_hint": "Medium ‚Äî sample anomalies indicate bad values."
            },
            "unknown": {
                "title": "Unknown / requires further investigation",
                "steps": [
                    "Collect more contextual logs and sample rows for the incident window.",
                    "Run the diagnosis again with enriched context (longer log excerpt, schema diff, sample data).",
                    "If possible, consult upstream job logs or owners for additional clues."
                ],
                "confidence_hint": "Low ‚Äî insufficient evidence to recommend automated remediation."
            }
        }

        # Merge into any global mapping without overwriting existing keys
        _SCENARIO_RECOMMENDATIONS = globals().get("SCENARIO_RECOMMENDATIONS", {})
        for k, v in _default_scenario_recs.items():
            if k not in _SCENARIO_RECOMMENDATIONS:
                _SCENARIO_RECOMMENDATIONS[k] = v
        globals()["SCENARIO_RECOMMENDATIONS"] = _SCENARIO_RECOMMENDATIONS

        # Build recs list using template if available, else fall back to legacy logic
        recs = []

        # -----------------------------
        # Robust scenario detection:
        # try error_class, then signature.scenario, then session-level scenario,
        # then look for keywords in error_snippet or first log lines.
        # -----------------------------
        sig_scenario = (
            signature.get("error_class")
            or signature.get("scenario")
            or session.get("scenario")
            or signature.get("detected_scenario")
            or "unknown"
        )
        
        # --- Force 'schema_drift' if strong evidence exists ---
        log_text = "\n".join(session.get("logs", [])).lower()
        if signature.get("changed_fields") or schema_diff or "cannot cast" in log_text or "mismatch" in log_text:
            sig_scenario = "schema_drift"
            signature["scenario"] = sig_scenario 
        # --- END ---


        # fallback heuristic: inspect error_snippet or first log line for missing-partition cues
        if sig_scenario == "unknown":
            snippet = (signature.get("error_snippet") or "")
            if not snippet and log_lines:
                snippet = log_lines[0]
            s_low = snippet.lower() if isinstance(snippet, str) else ""
            if ("no files" in s_low or "no objects" in s_low or "dt=" in s_low or "partition" in s_low):
                sig_scenario = "missing_partition"

        # Context for formatting placeholders
        context = {
            "prefix": signature.get("prefix", "<prefix>"),
            "partition": signature.get("partition", "<partition>"),
            "job": signature.get("job", "<job>"),
            "owner": signature.get("owner", "<owner>"),
            "error_snippet": signature.get("error_snippet", signature.get("error_class", "<error>"))
        }
        # --- Normalize freeform error_class/scenario -> canonical scenario keys (minimal, in-place) ---
        sig_low = str(sig_scenario).lower() if sig_scenario is not None else ""
        if "no files" in sig_low or "no objects" in sig_low or "missing partition" in sig_low or "dt=" in sig_low:
            sig_scenario = "missing_partition"
        elif "schema" in sig_low or "mismatch" in sig_low or "cannot cast" in sig_low or "cannot convert" in sig_low:
            sig_scenario = "schema_drift"
        elif "jsondecodeerror" in sig_low or "malformed" in sig_low or "parse error" in sig_low:
            sig_scenario = "invalid_values"
        # persist normalized scenario back to signature so downstream code sees canonical key
        if sig_scenario in ["missing_partition", "schema_drift", "invalid_values"]:
             signature["scenario"] = sig_scenario

        template = _SCENARIO_RECOMMENDATIONS.get(sig_scenario)

        if template:
            # Add a header-like first line (keeps readability consistent)
            recs.append(template.get("title", "Recommended actions"))
            for step in template.get("steps", []):
                try:
                    recs.append(step.format(**context))
                except Exception:
                    recs.append(step)  # fallback raw
            if template.get("confidence_hint"):
                recs.append(f"Confidence: {template.get('confidence_hint')}")
        else:
            # Preserve existing special-case logic if no template mapped
            if signature.get("changed_fields"):
                recs.append("1) Run a selective query to quantify affected rows, e.g.: `SELECT COUNT(*) FROM upstream_table WHERE TRY_CAST(price AS INT) IS NULL;`")
                recs.append("2) Add safe CAST/COALESCE in transformation or coordinate upstream backfill.")
                recs.append("3) If critical downstreams exist, run a prioritized backfill for affected partitions.")
            for a in signature.get("sample_anomalies", []):
                v = a.get("value")
                if isinstance(v, (int, float)) and v < 0:
                    recs.append("Add validation rule at ingest: reject or flag negative prices; consider alerting on new validation failures.")
            # Missing partition scenario override
            if signature.get("error_class") == "missing_partition" or signature.get("scenario") == "missing_partition":
                recs = [f"Verify that the expected partition exists in storage: run `aws s3 ls {signature.get('prefix','<prefix>')}` and confirm objects under {signature.get('partition','<partition>')}`.",
                        f"If upstream should have produced this partition: re-run the upstream job for `{signature.get('partition','<partition>')}` (job: {signature.get('job')}).",
                        f"If backup data exists: run a targeted backfill to restore `{signature.get('partition','<partition>')}`.","If upstream intentionally skipped this partition, mark it as 'no-data-expected' and configure downstream pipelines to soft-skip instead of erroring."]
            elif not recs:
                recs.append("Inspect logs, collect additional sample rows, and rerun diagnosis with more context.")

        for r in recs: lines.append(f"- {r}")
        lines.append("")
        lines.append(f"\nGenerated at: {now_ts()}")
        md = "\n".join(lines)

        # Build machine-readable JSON (ensure impact.downstreams always present)
        dlist_json = dlist or []
        rep_json = {
            "meta": meta,
            "root_cause": {"hypothesis": signature.get("error_class"), "job": signature.get("job")},
            "severity": {"label": sev_label, "score": sev_score},
            "confidence_breakdown": confidence_breakdown,
            "evidence": {
                "logs": {"tool":"log_fetch","lines": log_lines},
                "schema_diff": {"tool":"schema_diff","diff": schema_diff},
                "sample_anomalies": {"tool":"sample_data","anomalies": sample_anoms}
            },
            "history_matches": history,
            "impact": {"downstreams": dlist_json},
            "recommended_next_steps": recs,
            "generated_at": now_ts(),
            # add non-intrusive LLM provenance info (machine-readable only)
            "llm": llm_info if isinstance(llm_info, dict) else {"used": False}
        }
        return md, rep_json

    def on_message(self, msg):
        p = msg["payload"]
        sig = p.get("signature") or {}
        inc_id = sig.get("incident_id") or make_id("inc")
        session = SESSIONS.setdefault(inc_id, {"signature":sig,"history":None,"impact":None,"report":None,"stage":"draft"})
        if p.get("type") == "history_matches":
            session["history"] = p.get("matches")
        if p.get("type") == "impact":
            session["impact"] = p.get("impact")

        if session.get("history") is not None and session.get("impact") is not None and session.get("report") is None:
            md, rep_json = self.synthesize_report(session)
            session["report"] = {"md": md, "json": rep_json}
            session["stage"] = "awaiting_approval"
            SESSIONS[inc_id] = session
            emit("pulse_advisor", "ui", {"type":"report_ready", "incident_id": inc_id, "report": md})
            emit("pulse_advisor", "save_report", {"type":"save_pending", "incident_id": inc_id})

üß≠ Loading: Advisor agent...


## ‚öôÔ∏è Agent Instantiation & Router Wiring

Here we create all agents:
- Diagnoser  
- History Analyzer  
- Impact Analyzer  
- Advisor (hybrid mode if Gemini available)

Then they are ready to receive events through the router system.


In [17]:
print("‚öôÔ∏è Instantiating agents...")

# instantiate agents
pulse_detector = PulseDetector()    
diagnoser = Diagnoser()
history_agent = PatternHistoryAgent()
impact_agent = ImpactScopeAgent()
advisor = Advisor(use_gemini=USE_GEMINI)

print("ü§ñ Agents ready.")


‚öôÔ∏è Instantiating agents...
ü§ñ Agents ready.


## üìÑ Robust File Parsing Helper

This helper turns uploaded files (log/txt/json/csv/ndjson)
into clean line lists for the RCA pipeline.

Supports:
- JSON object
- JSONL/NDJSON
- CSV with "message" column
- Plain text log files

Used by:
- offline runner  
- UI handling module  


In [18]:
print("üìÑ Loading: Robust file parsing helper...")

def parse_uploaded_file_bytes(b: bytes):
    s = b.decode("utf-8", errors="replace")

    # Try to parse as a full JSON document
    try:
        obj = json.loads(s)
        if isinstance(obj, dict):
            if "message" in obj:
                print("üìÑ Parsed JSON dict with message.")
                return [obj["message"]]
            return [json.dumps(obj)]
        if isinstance(obj, list):
            print("üìÑ Parsed JSON list.")
            return [
                json.dumps(i) if not isinstance(i,str) else i
                for i in obj
            ]
    except:
        pass

    # Try NDJSON / JSONL
    lines = s.splitlines()
    nd=[]; nd_ok=True
    for ln in lines:
        ln_strip = ln.strip()
        if not ln_strip:
            continue
        try:
            j = json.loads(ln_strip)
            if isinstance(j, dict) and "message" in j:
                nd.append(j["message"])
            else:
                nd.append(json.dumps(j))
        except:
            nd_ok=False
            break

    if nd_ok and nd:
        print("üìÑ Parsed NDJSON / JSONL format.")
        return nd

    # CSV format
    try:
        import io, csv
        reader = csv.DictReader(io.StringIO(s))
        if reader.fieldnames:
            print("üìÑ Parsed CSV file.")
            msgs=[]
            for row in reader:
                if "message" in row and row["message"]:
                    msgs.append(row["message"])
                else:
                    msgs.append(", ".join(f"{k}={v}" for k,v in row.items()))
            return msgs
    except:
        pass

    # fallback: plain text log lines
    print("üìÑ Parsed plain text log file.")
    return [ln for ln in s.splitlines() if ln.strip()]


üìÑ Loading: Robust file parsing helper...


## üõ°Ô∏è Validator helper

This cell defines `validate_report()` which sanity-checks the generated RCA JSON.  
It prints a short summary when called to make debugging easier.


In [19]:
print("üîß Loading: Validator helper...")

def validate_report(rep_json):
    errors = []
    if not rep_json:
        return {"ok": False, "errors": ["report is None"]}

    meta = rep_json.get("meta")
    if not meta:
        errors.append("missing meta")
    else:
        for k in ("incident_id", "job", "text_hash", "generated_at"):
            if k not in meta:
                errors.append(f"meta.{k} missing")

    if "root_cause" not in rep_json:
        errors.append("missing root_cause")

    # severity should be present and include a score key
    sev = rep_json.get("severity")
    if not sev or "score" not in sev:
        errors.append("missing severity.score")

    cb = rep_json.get("confidence_breakdown")
    if not cb or "final_score" not in cb:
        errors.append("confidence breakdown missing final_score")

    ev = rep_json.get("evidence") or {}
    if "logs" not in ev:
        errors.append("evidence.logs missing")
    if "schema_diff" not in ev:
        errors.append("evidence.schema_diff missing")
    # proper presence check for sample_anomalies (allow empty list)
    if "sample_anomalies" not in ev:
        errors.append("evidence.sample_anomalies missing")

    hist = rep_json.get("history_matches", [])
    if not isinstance(hist, list):
        errors.append("history_matches not a list")

    impact = rep_json.get("impact") or {}
    if not ("downstreams" in impact):
        errors.append("impact.downstreams missing")

    if "recommended_next_steps" not in rep_json:
        errors.append("no recommended_next_steps")

    result = {
        "ok": len(errors) == 0,
        "errors": errors,
        "summary": {
            "meta": meta,
            "severity": rep_json.get("severity"),
            "confidence": cb.get("final_score") if cb else None,
            "history_count": len(hist)
        }
    }

    print(f"üîç validate_report -> ok={result['ok']}, errors_count={len(errors)}")
    if errors:
        for e in errors:
            print(f"  - {e}")
    return result

print("üîß Validator helper loaded.")


üîß Loading: Validator helper...
üîß Validator helper loaded.


## ‚ñ∂Ô∏è Offline runner: `run_offline_samples`

This cell contains the main offline runner that:
- parses input samples
- emits detection events (now routed through `pulse_detector`)
- waits for the agent chain to complete
- optionally saves Markdown + JSON reports to `submission_reports/`

Extra prints are included to trace progress.


In [20]:
print("‚ñ∂Ô∏è Loading: Offline runner (run_offline_samples)...")

def run_offline_samples(samples=None, show=True, save_reports=True, out_dir="submission_reports"):
    if samples is None:
        samples = {k:("\n".join(v)).encode("utf-8") for k,v in SYNTHETIC_LOGS.items()}
    results=[]
    if save_reports: os.makedirs(out_dir, exist_ok=True)

    for name, b in samples.items():
        try:
            print(f"\n--- Processing sample: {name} ---")
            IN_MEMORY_BUS.clear(); TRACE_STORE.clear(); SESSIONS.clear(); DRAFT_MEMORY.clear()
            lines = parse_uploaded_file_bytes(b)

            # canonical logs first
            logs_res = TOOLS["log_fetch"].run({"scenario": name, "job": None})
            log_lines = logs_res.get("lines", []) or lines

            # prefer log_fetch.job, else most frequent job= token, else scenario
            parsed_job = logs_res.get("job")
            if not parsed_job:
                jobs = re.findall(r"\bjob=([A-Za-z0-9_\-\.]+)", "\n".join(log_lines))
                if jobs:
                    parsed_job = Counter(jobs).most_common(1)[0][0]
            job = parsed_job or name

            print(f"Detected job='{job}' for sample '{name}' (parsed_job={parsed_job})")

            # call tools using canonical job
            schema_res = TOOLS["schema_diff"].run({"scenario": name, "job": job})
            samples_res = TOOLS["sample_data"].run({"scenario": name, "job": job})

            provisional_incident_id = make_id("inc")
            provisional_inc = {
                "incident_id": provisional_incident_id,
                "scenario": name,
                "job": job,
                "error_snippet": lines[0] if lines else ""
            }
            signature = build_signature(provisional_inc, schema_res, logs_res, samples_res)
            signature["incident_id"] = provisional_incident_id

            kind, existing_id, entry = find_existing_by_text_hash(signature["text_hash"])
            if kind == "session":
                existing_sess = entry
                report_obj = existing_sess.get("report")
                incident_id = existing_id
                md = report_obj.get("md") if report_obj else None
                rep_json = report_obj.get("json") if report_obj else None
                print(f"[idempotency] Reusing existing session report for sample '{name}' -> incident {incident_id}")
            elif kind == "memory":
                incident_id = existing_id or make_id("inc")
                md_lines = [f"# RCA Report - {incident_id}", "", f"*Reused historical incident matching text_hash {signature['text_hash']}*"]
                md = "\n".join(md_lines)
                rep_json = {"meta":{"incident_id": incident_id, "text_hash": signature["text_hash"]}, "note":"reused_from_memory"}
                print(f"[idempotency] Reusing confirmed memory for sample '{name}' -> incident {incident_id}")
            else:
                payload = {
                    "incident_id": provisional_incident_id,
                    "scenario": name,
                    "job": job,
                    "type": "detection",
                    "pattern": "detected_offline",
                    "category": "demo",
                    "error_snippet": lines[0] if lines else "",
                    "source_lines_count": len(lines),
                    "text_hash": signature["text_hash"],
                    # Provide raw lines so PulseDetector can operate
                    "lines": log_lines
                }
                print(f"[emit] offline_runner -> pulse_detector | incident={provisional_incident_id}")
                emit("offline_runner", "pulse_detector", payload)
                router_run_blocking()
                sess = SESSIONS.get(provisional_incident_id)
                if not sess:
                    print(f"[error] No session created for {provisional_incident_id}")
                    md = None; rep_json = None; incident_id = provisional_incident_id
                else:
                    # ensure downstream agents run to completion
                    router_run_blocking()
                    report_obj = sess.get("report")
                    md = report_obj.get("md") if report_obj else None
                    rep_json = report_obj.get("json") if report_obj else None
                    incident_id = provisional_incident_id

            print(f"\n=== Draft report for sample '{name}' (incident: {incident_id}) ===\n")
            if md and show:
                display(Markdown(md))
            else:
                print("No report produced. Check TRACE_STORE and SESSIONS.")

            saved_info = None
            if save_reports and md and rep_json:
                out = TOOLS["save_report"].run({"report_md": md, "report_json": rep_json, "incident_id": incident_id, "out_dir": out_dir})
                saved_info = out
                print(f"Saved artifacts: MD -> {out['md_path']}, JSON -> {out['json_path']}")

            results.append((incident_id, md, rep_json, saved_info))
        except Exception as e:
            print("Error processing sample", name, e)
            traceback.print_exc()
            results.append((None, None, None, None))
    return results

print("‚ñ∂Ô∏è Offline runner loaded.")


‚ñ∂Ô∏è Loading: Offline runner (run_offline_samples)...
‚ñ∂Ô∏è Offline runner loaded.


## ‚ñ∂ Run demo & quick validation

Seed demo memory (idempotent), run the offline demo, and validate saved reports.  
This cell prints concise validation results.


In [21]:
print("üöÄ Running demo: seed memory and run offline samples...")

seed_demo_memory_once()
print("‚úÖ Demo memory seeded.")

demo_results = run_offline_samples(save_reports=True)
print("\n‚úÖ Demo finished. 'demo_results' contains tuples (incident_id, md, json, saved_info).")

# Quick validation output:
for inc_id, md, rep_json, saved in demo_results:
    print(f"\nüîé Validation for {inc_id}: {validate_report(rep_json)}")

print("üèÅ Demo & validation complete.")


üöÄ Running demo: seed memory and run offline samples...
  ‚Ü™ MEMORY_BANK already seeded.
‚úÖ Demo memory seeded.

--- Processing sample: schema_drift ---
üìÑ Parsed CSV file.
Detected job='orders' for sample 'schema_drift' (parsed_job=orders)
üìå Signature built for job='orders', error_class='TypeError', changed_fields=['price']
üîç Checking for existing incidents with text_hash=95ac10f46a...
‚ùå No matching signature found.
[emit] offline_runner -> pulse_detector | incident=inc-2f7a9d23
[emit] offline_runner ‚Üí pulse_detector | type=detection
‚ñ∂Ô∏è Router: starting event loop...
[router] Dispatching to agent: pulse_detector
[pulse_detector] Scenario=schema_drift, job=orders
[emit] pulse_detector ‚Üí root_cause_diagnoser | type=detection
[router] Dispatching to agent: root_cause_diagnoser
üîç Diagnoser triggered for incident: inc-2f7a9d23, scenario=schema_drift
üìå Signature built for job='orders', error_class='TypeError', changed_fields=['price']
üì¶ Diagnoser built signatu

---
incident_id: inc-2f7a9d23
job: orders
scenario: TypeError
text_hash: 78eb9cde7a72074a5aa3dac445160041e34c72d864d5a1c54f9b7a3b2af21d1a
mode: gemini
generated_at: 2025-11-28 16:58:02
seeded: False
source: runtime_draft
---

# RCA Report - inc-2f7a9d23

**Root Cause (hypothesis):** TypeError on job `orders`

**Severity:** HIGH (1.0)
**Confidence:** HIGH (0.83)

**Confidence breakdown:**
- final_score: 0.83 (HIGH)
- history_score: 1.0, field_score: 1.0, anomaly_score: 0.33

## Evidence
### Log excerpt (`log_fetch`)
```
  1: 2025-11-24 10:00:01 INFO job=orders ETL step=ingest files=3
  2: 2025-11-24 10:03:02 ERROR job=orders transform TypeError: cannot cast '123.45' to INT on column price
```
### Schema diff (`schema_diff`)
- field: `price` ‚Äî INT ‚Üí FLOAT
### Sample anomalies (`sample_data`)
|row|field|value|
|--:|:--|:--|
|1|price|`123.45`|

## Historical Matches (`history_query`)
- matched past incident 2025-11-01 09:00:00 ‚Äî score 1.0 (job:0, fields:0, error_class:0)

## Impacted Downstream (`lineage_query`)
- `dashboard.sales_over_time` (dashboard) critical=True
- `ml.revenue_forecast` (model) critical=True

## Recommended Next Steps
- Schema change / drift detected
- Identify the changed field(s) and impacted sinks with a quick schema diff.
- If breaking: coordinate a schema contract update with upstream and deploy a compatible parser or migration.
- Run a selective backfill for affected partitions if data loss or type coercion happened.
- Confidence: Medium ‚Äî schema mismatch detected; confirm with schema registry or sample rows.


Generated at: 2025-11-28 16:58:02

Saved artifacts: MD -> submission_reports/pulsetrace_report_inc-2f7a9d23.md, JSON -> submission_reports/pulsetrace_report_inc-2f7a9d23.json

--- Processing sample: missing_partition ---
üìÑ Parsed CSV file.
Detected job='reports' for sample 'missing_partition' (parsed_job=reports)
üìå Signature built for job='reports', error_class='No files found', changed_fields=[]
üîç Checking for existing incidents with text_hash=01cb6ba658...
‚ùå No matching signature found.
[emit] offline_runner -> pulse_detector | incident=inc-57531b2a
[emit] offline_runner ‚Üí pulse_detector | type=detection
‚ñ∂Ô∏è Router: starting event loop...
[router] Dispatching to agent: pulse_detector
[pulse_detector] Scenario=missing_partition, job=reports
[emit] pulse_detector ‚Üí root_cause_diagnoser | type=detection
[router] Dispatching to agent: root_cause_diagnoser
üîç Diagnoser triggered for incident: inc-57531b2a, scenario=missing_partition
üìå Signature built for job='reports', error_class='No files found', c

---
incident_id: inc-57531b2a
job: reports
scenario: No files found
text_hash: 5ee4b5a44c3e75c1d2494d5c8dca490c2d9be52bd03a5251356879b79129dffe
mode: gemini
generated_at: 2025-11-28 16:58:02
seeded: False
source: runtime_draft
---

# RCA Report - inc-57531b2a

**Root Cause (hypothesis):** No files found on job `reports`

**Severity:** LOW (0.2)
**Confidence:** LOW (0.1)

**Confidence breakdown:**
- final_score: 0.1 (LOW)
- history_score: 0.0, field_score: 0.0, anomaly_score: 0.0

## Evidence
### Log excerpt (`log_fetch`)
```
  1: 2025-11-24 11:00:05 ERROR job=reports No files found for partition dt=2025-11-24
```

## Historical Matches (`history_query`)
- none

## Impacted Downstream (`lineage_query`)
- `dashboard.reports` (dashboard) critical=False

## Recommended Next Steps
- Missing partition detected
- Verify storage prefix exists and list objects: `aws s3 ls <prefix>` or `gsutil ls <prefix>` and confirm objects under <partition>.
- If upstream should have produced this partition: re-run the upstream producer job for the <partition> window (job: reports).
- If backup data exists: run a targeted backfill to restore <prefix>/<partition>.
- If upstream intentionally skipped: mark the partition as 'no-data-expected' and configure downstream pipelines to soft-skip.
- Confidence: High ‚Äî missing partition pattern found in logs.


Generated at: 2025-11-28 16:58:02

Saved artifacts: MD -> submission_reports/pulsetrace_report_inc-57531b2a.md, JSON -> submission_reports/pulsetrace_report_inc-57531b2a.json

--- Processing sample: invalid_values ---
üìÑ Parsed CSV file.
Detected job='pricing' for sample 'invalid_values' (parsed_job=pricing)
üìå Signature built for job='pricing', error_class='ValueError', changed_fields=[]
üîç Checking for existing incidents with text_hash=f9e58980ea...
‚ùå No matching signature found.
[emit] offline_runner -> pulse_detector | incident=inc-c043fed1
[emit] offline_runner ‚Üí pulse_detector | type=detection
‚ñ∂Ô∏è Router: starting event loop...
[router] Dispatching to agent: pulse_detector
[pulse_detector] Scenario=invalid_values, job=pricing
[emit] pulse_detector ‚Üí root_cause_diagnoser | type=detection
[router] Dispatching to agent: root_cause_diagnoser
üîç Diagnoser triggered for incident: inc-c043fed1, scenario=invalid_values
üìå Signature built for job='pricing', error_class='ValueError', changed_fields=[]
üì

---
incident_id: inc-c043fed1
job: pricing
scenario: ValueError
text_hash: bd9555b803cb819c6072036eb090d993df1c0523a5fc9438e0760870bbda0d3b
mode: gemini
generated_at: 2025-11-28 16:58:02
seeded: False
source: runtime_draft
---

# RCA Report - inc-c043fed1

**Root Cause (hypothesis):** ValueError on job `pricing`

**Severity:** HIGH (0.8)
**Confidence:** LOW (0.42)

**Confidence breakdown:**
- final_score: 0.42 (LOW)
- history_score: 0.6, field_score: 0.0, anomaly_score: 0.33

## Evidence
### Log excerpt (`log_fetch`)
```
  1: 2025-11-24 12:05:02 ERROR job=pricing transform ValueError: negative value found in price at row 524
```
### Sample anomalies (`sample_data`)
|row|field|value|
|--:|:--|:--|
|524|price|`-12.5`|

## Historical Matches (`history_query`)
- matched past incident 2025-10-20 14:30:00 ‚Äî score 0.6 (job:0, fields:0, error_class:0)

## Impacted Downstream (`lineage_query`)
- `dashboard.sales_over_time` (dashboard) critical=True
- `ml.revenue_forecast` (model) critical=True

## Recommended Next Steps
- Add validation rule at ingest: reject or flag negative prices; consider alerting on new validation failures.


Generated at: 2025-11-28 16:58:02

Saved artifacts: MD -> submission_reports/pulsetrace_report_inc-c043fed1.md, JSON -> submission_reports/pulsetrace_report_inc-c043fed1.json

‚úÖ Demo finished. 'demo_results' contains tuples (incident_id, md, json, saved_info).
üîç validate_report -> ok=True, errors_count=0

üîé Validation for inc-2f7a9d23: {'ok': True, 'errors': [], 'summary': {'meta': {'incident_id': 'inc-2f7a9d23', 'job': 'orders', 'scenario': 'TypeError', 'text_hash': '78eb9cde7a72074a5aa3dac445160041e34c72d864d5a1c54f9b7a3b2af21d1a', 'mode': 'gemini', 'generated_at': '2025-11-28 16:58:02', 'seeded': False, 'source': 'runtime_draft'}, 'severity': {'label': 'HIGH', 'score': 1.0}, 'confidence': 0.83, 'history_count': 1}}
üîç validate_report -> ok=True, errors_count=0

üîé Validation for inc-57531b2a: {'ok': True, 'errors': [], 'summary': {'meta': {'incident_id': 'inc-57531b2a', 'job': 'reports', 'scenario': 'No files found', 'text_hash': '5ee4b5a44c3e75c1d2494d5c8dca490c2d9be52bd03a5251356879b79129dffe', 'mode': 

### üõ∞Ô∏è Observability: Traces, Sessions & Memory

PulseTrace exposes lightweight observability tools that help you inspect how the system behaves during an RCA run:

- **`render_trace_store()`** ‚Äî displays recent agent-to-agent (A2A) events flowing through the message bus  
- **`render_sessions()`** ‚Äî shows active incident sessions, including signatures, history usage, and impact metadata  
- **Memory Bank Summary** ‚Äî lists long-term stored incident signatures so you can see which patterns and root causes were retained across runs(implemented later in code)

These views make it easy to understand agent communication, trace execution flow, and verify that multi-agent coordination is happening correctly.


In [22]:
# define render_trace_store and render_sessions
print("üîß defining render_trace_store and render_sessions...")

from IPython.display import display, HTML, Markdown, clear_output
import json

def render_trace_store(limit=200):
    """Display TRACE_STORE (most recent entries). Safe to call repeatedly."""
    try:
        ts = globals().get("TRACE_STORE", None)
        if ts is None:
            display(HTML("<i>TRACE_STORE is not defined in globals()</i>"))
            print("render_trace_store: TRACE_STORE not defined.")
            return
        if not ts:
            display(HTML("<i>No trace messages recorded.</i>"))
            print("render_trace_store: TRACE_STORE is empty.")
            return
        rows = []
        for m in ts[-limit:]:
            payload_str = json.dumps(m.get('payload', {}), default=str, indent=2)
            rows.append(
                f"<tr>"
                f"<td style='vertical-align:top;padding:4px'>{m.get('ts')}</td>"
                f"<td style='vertical-align:top;padding:4px'><b>{m.get('from')}</b> ‚Üí <b>{m.get('to')}</b></td>"
                f"<td style='vertical-align:top;padding:4px'><pre style='white-space:pre-wrap;margin:0'>{payload_str}</pre></td>"
                f"</tr>"
            )
        html = (
            "<table style='width:100%;border-collapse:collapse' border=1>"
            "<tr style='background:#f6f6f6'><th>ts</th><th>route</th><th>payload</th></tr>"
            + "".join(rows) + "</table>"
        )
        display(HTML(html))
        print(f"render_trace_store: displayed {min(len(ts), limit)} trace entries (total stored: {len(ts)}).")
    except Exception as e:
        print("render_trace_store: error:", e)
        traceback.print_exc()

def render_sessions():
    """Display SESSIONS dict content (compact)."""
    try:
        sess = globals().get("SESSIONS", None)
        if sess is None:
            display(HTML("<i>SESSIONS is not defined in globals()</i>"))
            print("render_sessions: SESSIONS not defined.")
            return
        if not sess:
            display(HTML("<i>No active sessions.</i>"))
            print("render_sessions: SESSIONS is empty.")
            return
        for sid, s in sess.items():
            hdr = f"<h4>Session: {sid} ‚Äî stage: {s.get('stage')}</h4>"
            meta = {
                "signature": s.get("signature"),
                "history_len": len(s.get("history") or []),
                "impact": s.get("impact")
            }
            display(HTML(hdr))
            display(HTML(f"<pre>{json.dumps(meta, indent=2, default=str)}</pre>"))
        print(f"render_sessions: displayed {len(sess)} sessions.")
    except Exception as e:
        print("render_sessions: error:", e)
        traceback.print_exc()

# expose to globals (redundant but explicit)
globals()['render_trace_store'] = render_trace_store
globals()['render_sessions'] = render_sessions

print("‚úÖ render_trace_store and render_sessions registered.")


üîß defining render_trace_store and render_sessions...
‚úÖ render_trace_store and render_sessions registered.


### UI dependencies & environment checks

Detect & (only if missing) install `ipywidgets` and bring in display helpers.
This cell will not reinstall if `ipywidgets` is already available.


In [23]:
# UI dependencies & environment checks
print("üîé UI deps check: verifying ipywidgets and display utilities...")

import importlib, sys, subprocess

def ensure_package(pkg_name, import_name=None, version_spec=None):
    """
    Ensure a package is importable. If not installed, try pip installing it.
    Returns True if import succeeded, False otherwise.
    """
    import_name = import_name or pkg_name
    try:
        importlib.import_module(import_name)
        print(f"‚úÖ {import_name} already available.")
        return True
    except Exception as e:
        print(f"‚ö†Ô∏è {import_name} not found ({e}). Attempting to install...")
        try:
            cmd = [sys.executable, "-m", "pip", "install", pkg_name] + ([version_spec] if version_spec else [])
            subprocess.check_call(cmd)
            importlib.invalidate_caches()
            importlib.import_module(import_name)
            print(f"‚úÖ Successfully installed and imported {import_name}.")
            return True
        except Exception as ie:
            print(f"‚ùå Failed to install {pkg_name}: {ie}")
            return False

# Only install ipywidgets if missing
_ok_widgets = ensure_package("ipywidgets", "ipywidgets", None)

# Bring common display helpers into scope (safe repeated import)
try:
    from IPython.display import display, Markdown, HTML, clear_output
    from pathlib import Path
    print("‚úÖ IPython.display and Path available.")
except Exception as e:
    print("‚ùå Could not import IPython.display or Path:", e)

# Expose UI_AVAILABLE flag for downstream cells
UI_AVAILABLE = _ok_widgets
print(f"UI_AVAILABLE = {UI_AVAILABLE}")


üîé UI deps check: verifying ipywidgets and display utilities...
‚úÖ ipywidgets already available.
‚úÖ IPython.display and Path available.
UI_AVAILABLE = True


### Optional: Previously uploaded file tracking


### UI Controls & Helpers

Create widgets (dropdown, file uploader, run/inspect buttons) and small helpers:
- make_save_path_for_uploaded()
- approve_and_save_local()
This cell uses the same behavior as your original single-cell UI; it only defines helpers and widgets.


In [24]:
# UI Controls & Helpers (Clean Version)
print("üß© UI: creating controls & helpers (no duplicate imports)...")

# widgets are available only if ipywidgets import succeeded earlier
if not UI_AVAILABLE:
    print("‚ö†Ô∏è ipywidgets unavailable ‚Äî UI widgets will not be created.")
else:
    import ipywidgets as widgets  # safe even if already imported
    from pathlib import Path

    # UI controls (matching your original names)
    sample_dropdown = widgets.Dropdown(
        options=["-- select demo sample --"] + list(SYNTHETIC_LOGS.keys()),
        description="Demo:"
    )
    file_uploader = widgets.FileUpload(accept=".log,.txt,.json,.ndjson,.csv", multiple=False)
    run_button = widgets.Button(description="Run Diagnosis", button_style="success")
    inspect_button = widgets.Button(description="Inspect", button_style="")
    out_area = widgets.Output()

    print("üîß Ensuring make_save_path_for_uploaded is available...")
    if 'make_save_path_for_uploaded' not in globals():
        def make_save_path_for_uploaded(filename, out_dir="submission_reports"):
            safe = filename.replace("/", "_").replace("\\", "_")
            suffix = int(time.time())
            os.makedirs(out_dir, exist_ok=True)
            stem = Path(safe).stem
            ext = Path(safe).suffix or ".log"
            fname = f"{stem}_{suffix}{ext}"
            path = os.path.join(out_dir, fname)
            return path

    print("üîß Ensuring approve_and_save_local is available...")
    if 'approve_and_save_local' not in globals():
        def approve_and_save_local(incident_id, uploaded_bytes=None, uploaded_filename=None, out_dir="submission_reports"):
            # Removed debug print: "attempting to save..."
            sess = SESSIONS.get(incident_id)
            if not sess or not sess.get("report"):
                print("Error: No session or report found to save.")
                return None

            md = sess["report"]["md"]
            rep_json = sess["report"]["json"]

            out = TOOLS["save_report"].run(
                {"report_md": md, "report_json": rep_json, "incident_id": incident_id, "out_dir": out_dir}
            )
            sess["saved_info"] = out
            sess["stage"] = "approved"
            SESSIONS[incident_id] = sess
            
            # Removed debug print: "report saved -> {dict}"

            uploaded_saved_path = None
            if uploaded_bytes is not None:
                try:
                    uploaded_saved_path = make_save_path_for_uploaded(
                        uploaded_filename or f"uploaded_{incident_id}",
                        out_dir=out_dir
                    )
                    with open(uploaded_saved_path, "wb") as fh:
                        fh.write(uploaded_bytes)
                except Exception as e:
                    uploaded_saved_path = f"FAILED_TO_SAVE: {e}"
                    print(f"Error saving uploaded file: {e}")

            return out, uploaded_saved_path

    print("üîß Ensuring wire_save_controls is available...")
    def wire_save_controls(incident_id, uploaded_filename=None, uploaded_bytes=None):
        try:
            if not globals().get("UI_AVAILABLE", False):
                return

            approval_checkbox = widgets.Checkbox(
                description="I confirm and approve saving the report",
                indent=False,
                value=False
            )
            save_btn = widgets.Button(description="Save Report", button_style="primary", disabled=True)

            info_html = widgets.HTML(value=f"<small>Incident id: <b>{incident_id}</b></small>")
            ctrl = widgets.HBox([approval_checkbox, save_btn])
            display(info_html)
            display(ctrl)

            def on_check(change):
                save_btn.disabled = not approval_checkbox.value

            approval_checkbox.observe(on_check, names="value")

            def on_save(b):
                with out_area:
                    # Removed debug prints ("handler triggered", "checkbox value")
                    
                    # Enforce approval UI contract
                    try:
                        if not approval_checkbox.value:
                            display(HTML("<b style='color:red'>Please check the approval box before saving.</b>"))
                            return
                    except Exception as e:
                        display(HTML(f"<b>Save failed (internal error):</b> {e}"))
                        return

                    # Attempt to save
                    try:
                        result = approve_and_save_local(
                            incident_id,
                            uploaded_bytes=uploaded_bytes,
                            uploaded_filename=uploaded_filename,
                            out_dir="submission_reports"
                        )
                    except Exception as e:
                        display(HTML(f"<b>Save failed (exception):</b> {e}"))
                        return

                    if not result:
                        display(HTML("<b>Save failed:</b> no session or report found."))
                        return

                    out, uploaded_saved_path = result
                    
                    # ONLY display the clean success message
                    success_msg = (
                        f"<div style='background-color:#e6fffa; padding:10px; border-radius:5px; border:1px solid #b2f5ea;'>"
                        f"<b style='color:#285e61;'>‚úÖ Report saved successfully!</b><br/>"
                        f"üìÑ MD: <code>{out.get('md_path')}</code><br/>"
                        f"üìä JSON: <code>{out.get('json_path')}</code>"
                    )
                    
                    if uploaded_saved_path:
                        success_msg += f"<br/>üìÇ Uploaded File: <code>{uploaded_saved_path}</code>"
                    
                    success_msg += "</div>"
                    
                    display(HTML(success_msg))

                    save_btn.disabled = True
                    approval_checkbox.disabled = True

            save_btn.on_click(on_save)

        except Exception as e:
            print("‚ùå Failed to initialize wire_save_controls:", e)

print("üß© UI Controls & Helpers ready.")

üß© UI: creating controls & helpers (no duplicate imports)...
üîß Ensuring make_save_path_for_uploaded is available...
üîß Ensuring approve_and_save_local is available...
üîß Ensuring wire_save_controls is available...
üß© UI Controls & Helpers ready.


### Run handler, inspector & UI display

Defines run_from_ui(), inspect_ui(), wires callbacks (idempotent), and renders the UI row.
This cell avoids re-importing modules, and prints debug lines as actions occur.


In [25]:
#### Run handler, inspector & UI display
print("‚ñ∂Ô∏è UI: setting up run handler, inspector, hooking callbacks, and rendering UI...")

def _extract_uploaded_bytes_fallback(u):
    """
    Robust, self-contained extractor for ipywidgets.FileUpload-like values.
    """
    if u is None:
        return None
    try:
        # direct bytes-like
        if isinstance(u, (bytes, bytearray)):
            return bytes(u)
        if isinstance(u, memoryview):
            return u.tobytes()

        # dict mapping filename -> info (common FileUpload.value)
        if isinstance(u, dict):
            # pick first file
            for fname, info in u.items():
                if isinstance(info, dict):
                    cont = info.get("content") or info.get("data") or info.get("content_bytes")
                    if isinstance(cont, (bytes, bytearray)):
                        return bytes(cont)
                    if isinstance(cont, memoryview):
                        return cont.tobytes()
                    for k in ("content","data","body"):
                        val = info.get(k)
                        if isinstance(val, (bytes, bytearray)):
                            return bytes(val)
                    if "metadata" in info and isinstance(info["metadata"], dict):
                        pass
                if isinstance(info, (bytes, bytearray)):
                    return bytes(info)
            return None

        # list/tuple possibilities
        if isinstance(u, (list, tuple)) and len(u) > 0:
            first = u[0]
            if isinstance(first, (list, tuple)) and len(first) >= 2:
                info = first[1]
                if isinstance(info, dict):
                    cont = info.get("content") or info.get("data")
                    if isinstance(cont, (bytes, bytearray)):
                        return bytes(cont)
                if isinstance(info, (bytes, bytearray)):
                    return bytes(info)
            if isinstance(first, dict):
                cont = first.get("content") or first.get("data")
                if isinstance(cont, (bytes, bytearray)):
                    return bytes(cont)
            if isinstance(first, (bytes, bytearray)):
                return bytes(first)

        return str(u).encode("utf-8")
    except Exception as e:
        print("‚ùó _extract_uploaded_bytes_fallback error:", e)
        try:
            return str(u).encode("utf-8")
        except:
            return None

# Primary run handler (same behavior as original but robust extraction)
def run_from_ui(_):
    out_area.clear_output()
    with out_area:
        chosen = sample_dropdown.value if 'sample_dropdown' in globals() else None
        uploaded = file_uploader.value if 'file_uploader' in globals() else None

        # Use local robust extractor first; fall back to previously-defined one if present
        uploaded_bytes = None
        try:
            if uploaded:
                if 'extract_bytes_from_upload' in globals() and callable(globals().get('extract_bytes_from_upload')):
                    try:
                        uploaded_bytes = extract_bytes_from_upload(uploaded)
                        print("run_from_ui: used existing extract_bytes_from_upload ->", "present" if uploaded_bytes else "none")
                    except Exception as e:
                        print("run_from_ui: existing extract_bytes_from_upload raised error, falling back:", e)
                        uploaded_bytes = _extract_uploaded_bytes_fallback(uploaded)
                        print("run_from_ui: fallback extractor ->", "present" if uploaded_bytes else "none")
                else:
                    uploaded_bytes = _extract_uploaded_bytes_fallback(uploaded)
                    print("run_from_ui: fallback extractor ->", "present" if uploaded_bytes else "none")
            else:
                print("run_from_ui: no uploaded value detected (file_uploader.value is empty)")
                uploaded_bytes = None
        except Exception as e:
            print("run_from_ui: extraction failed with unexpected error:", e)
            uploaded_bytes = None

        uploaded_fname = None
        try:
            if isinstance(uploaded, dict):
                uploaded_fname = next(iter(uploaded.keys()), None)
            elif isinstance(uploaded, (list,tuple)) and len(uploaded) > 0:
                first = uploaded[0]
                if isinstance(first, (list,tuple)) and len(first) > 0:
                    uploaded_fname = first[0]
                elif isinstance(first, dict):
                    uploaded_fname = first.get("name") or first.get("filename") or None
            print("run_from_ui: uploaded filename resolved ->", uploaded_fname)
        except Exception:
            uploaded_fname = None

        # Main run logic
        results = None
        if uploaded_bytes:
            try:
                parsed_lines = parse_uploaded_file_bytes(uploaded_bytes) if 'parse_uploaded_file_bytes' in globals() else uploaded_bytes.decode("utf-8", errors="replace").splitlines()
            except Exception as e:
                print("run_from_ui: parse_uploaded_file_bytes failed, falling back to naive decode:", e)
                parsed_lines = uploaded_bytes.decode("utf-8", errors="replace").splitlines()

            detected = detect_scenario_from_lines(parsed_lines) if 'detect_scenario_from_lines' in globals() else None

            if detected:
                scenario_key = detected
            elif chosen and chosen in SYNTHETIC_LOGS and chosen != "-- select demo sample --":
                scenario_key = chosen
                detected = None
            else:
                scenario_key = next(iter(SYNTHETIC_LOGS.keys()))

            print(f"Running diagnosis on uploaded file '{uploaded_fname or 'uploaded'}' ‚Äî detected scenario: {detected or 'none'} (using scenario key: {scenario_key})")
            results = run_offline_samples(samples={scenario_key: uploaded_bytes}, show=False, save_reports=False, out_dir="submission_reports")

        elif chosen and chosen in SYNTHETIC_LOGS and chosen != "-- select demo sample --":
            print(f"Running demo sample: {chosen}")
            results = run_offline_samples(samples={chosen: ("\n".join(SYNTHETIC_LOGS[chosen])).encode("utf-8")}, show=False, save_reports=False, out_dir="submission_reports")

        else:
            print("Select a demo sample or upload a log file.")
            prev_path = globals().get("UPLOADED_FILE_PATH")
            prev_url  = globals().get("UPLOADED_FILE_URL")
            if prev_path and prev_url and Path(prev_path).exists():
                display(HTML(f"<small>Previously uploaded file (for reference only): <a href='{prev_url}' target='_blank'>{prev_path}</a></small>"))
            else:
                print("No previously uploaded file available.")
            return

        if not results:
            print("No results returned.")
            return

        incident_id, md, rep_json, saved_info = results[0]

        if not md:
            print("No draft report produced.")
            return

        print("\nDraft produced. Click 'Save Report' below to persist the report (and uploaded file if present).")
        display(Markdown(md)) 

        # ---  EXPLICITLY RENDER PDF DOWNLOAD BUTTON ---
        if 'render_pdf_download_button' in globals():
            # UPDATED: Passing rep_json so we get the Professional Content
            # The COLOR will be blue because we updated the utility function above.
            pdf_button_html = render_pdf_download_button(incident_id, rep_json)
            
            display(HTML(pdf_button_html))
            
            if incident_id in SESSIONS and SESSIONS[incident_id].get("report"):
                SESSIONS[incident_id]["report"]["md"] += "\n\n" + pdf_button_html
        # --- END ---
        
        # Wire save controls gracefully
        if 'wire_save_controls' in globals() and callable(globals().get('wire_save_controls')):
            try:
                wire_save_controls(incident_id, uploaded_filename=uploaded_fname, uploaded_bytes=uploaded_bytes)
            except Exception as e:
                print("run_from_ui: wire_save_controls failed:", e)
                if 'approve_and_save_local' in globals() and callable(globals().get('approve_and_save_local')):
                    print("run_from_ui: falling back to approve_and_save_local.")
                    out = approve_and_save_local(incident_id, uploaded_bytes=uploaded_bytes, uploaded_filename=uploaded_fname, out_dir="submission_reports")
                    print("approve_and_save_local result:", out)
                else:
                    print("No fallback save function available. Please call approve_and_save_local(...) manually to save.")
        elif 'approve_and_save_local' in globals() and callable(globals().get('approve_and_save_local')):
            print("run_from_ui: using approve_and_save_local directly (wire_save_controls not present).")
            out = approve_and_save_local(incident_id, uploaded_bytes=uploaded_bytes, uploaded_filename=uploaded_fname, out_dir="submission_reports")
            print("approve_and_save_local result:", out)
        else:
            print("run_from_ui: neither wire_save_controls nor approve_and_save_local are available. Report cannot be saved via UI automatically.")

# Inspector UI (unchanged)
def inspect_ui(_):
    out_area.clear_output()
    with out_area:
        print("=== TRACE STORE (most recent) ===")
        if 'render_trace_store' in globals():
            render_trace_store()
        else:
            print("render_trace_store not available.")
        print("\n=== SESSIONS ===")
        if 'render_sessions' in globals():
            render_sessions()
        else:
            print("render_sessions not available.")
        print("\n=== MEMORY_BANK SUMMARY ===")
        if not globals().get("MEMORY_BANK"):
            display(HTML("<i>Memory bank empty.</i>"))
        else:
            for m in globals().get("MEMORY_BANK",[])[-10:]:
                display(HTML(f"<pre>{json.dumps({'incident_id':m.get('incident_id'),'job':m.get('job'),'text_hash':m.get('text_hash'),'created_at':m.get('created_at')}, indent=2)}</pre>"))

# Hook up callbacks safely (idempotent)
try:
    if 'run_button' in globals() and hasattr(run_button, "on_click"):
        run_button.on_click(run_from_ui)
    if 'inspect_button' in globals() and hasattr(inspect_button, "on_click"):
        inspect_button.on_click(inspect_ui)
    print("‚úÖ UI: callbacks hooked for run_button and inspect_button (if available).")
except Exception as e:
    print("‚ö†Ô∏è UI: hooking callbacks failed (maybe already hooked):", e)

# Display UI row if widgets created
if 'sample_dropdown' in globals() and globals().get("UI_AVAILABLE"):
    ui_row = widgets.VBox([
        widgets.HTML("<h3>PulseTrace ‚Äî demo UI </h3>"),
        widgets.HBox([sample_dropdown, file_uploader, run_button, inspect_button]),
        out_area,
        (widgets.HTML(f"<small>Previously uploaded file (for reference only): <a href='{globals().get('UPLOADED_FILE_URL')}' target='_blank'>{globals().get('UPLOADED_FILE_PATH')}</a></small>")
           if globals().get('UPLOADED_FILE_PATH') and globals().get('UPLOADED_FILE_URL') else widgets.HTML("<small>No previously uploaded file.</small>"))
    ])
    display(ui_row)
    print("‚úÖ UI: displayed.")
else:
    print("‚ÑπÔ∏è UI: widgets not available; UI row not rendered.")

‚ñ∂Ô∏è UI: setting up run handler, inspector, hooking callbacks, and rendering UI...
‚úÖ UI: callbacks hooked for run_button and inspect_button (if available).


VBox(children=(HTML(value='<h3>PulseTrace ‚Äî demo UI </h3>'), HBox(children=(Dropdown(description='Demo:', opti‚Ä¶

‚úÖ UI: displayed.


## üöÄ Agent Deployment

The PulseTrace multi-agent system is designed with a **hybrid architecture**, leveraging **online Gemini reasoning** for complex tasks (like report synthesis) and **offline custom tools** for deterministic data operations (like log and schema lookups).

We are deploying this entire hybrid system to **Vertex AI Agent Engine** (Reasoning Engine). This specialized, fully managed runtime provides the necessary infrastructure for hosting reasoning agents, managing sessions, and providing deep observability, which is critical for a production RCA tool.

We configure the environment by relying on **Google Cloud SDK** credentials, which automatically handles project authentication based on environment variables (a standard practice for production code).

## ‚òÅÔ∏è Phase 1: Cloud Deployment Configuration

Now that we have verified the PulseTrace logic locally using the interactive UI, we are ready to deploy it to **Google Cloud Vertex AI Agent Engine**.

This section configures the specific cloud resources required for hosting:
1.  Verifying Cloud Credentials.
2.  Setting the target Project ID and Region.
3.  Enabling the specific APIs required for the Agent Engine runtime.

In [26]:
# Import Deployment Libraries & Verify Auth
import os
import time
import vertexai
from kaggle_secrets import UserSecretsClient
from vertexai.preview import reasoning_engines

print("‚úÖ Deployment libraries imported.")

# Verify Cloud Credentials are loaded
try:
    user_secrets = UserSecretsClient()
    user_credential = user_secrets.get_gcloud_credential()
    user_secrets.set_tensorflow_credential(user_credential)
    print("‚úÖ Cloud credentials verified.")
except Exception as e:
    print(f"‚ö†Ô∏è Authentication Error: {e}")
    print("Please ensure 'Google Cloud SDK' is enabled in Add-ons.")

‚úÖ Deployment libraries imported.
‚úÖ Cloud credentials verified.


### 1.2 Configure Project & Region
We need to define which Google Cloud Project will host the agent. We also export these variables to the system environment so the deployment tools can access them automatically.

In [39]:
# Configure Project & Region
# --- CONFIGURATION ---
PROJECT_ID = "your-project-id"  # ADD your Project ID here
REGION = "us-central1" # We use us-central1 for maximum stability
# ---------------------

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = "global"

if PROJECT_ID == "your-project-id":
    raise ValueError("‚ö†Ô∏è Please replace 'your-project-id' with your actual Google Cloud Project ID.")

print("‚úÖ Target Project: Configured")
print(f"‚úÖ Target Region: {REGION}")

‚úÖ Target Project: Configured
‚úÖ Target Region: us-central1


### 1.3 Enable Required Cloud APIs
The **Agent Engine** requires several specific Google Cloud APIs to be active. 

For this tutorial, you'll need to enable the following APIs in the Google Cloud Console:
* **Vertex AI API** (`aiplatform.googleapis.com`)
* **Cloud Storage API** (`storage.googleapis.com`)
* **Cloud Logging API** (`logging.googleapis.com`)
* **Cloud Monitoring API** (`monitoring.googleapis.com`)
* **Cloud Trace API** (`cloudtrace.googleapis.com`)
* **Telemetry API** (`clouderrorreporting.googleapis.com`)

You can [use this link to open the Google Cloud Console](https://console.cloud.google.com/apis/library) and follow the steps there to enable these APIs manually.

In [28]:
# Initialize Vertex AI SDK
# Once you have enabled the APIs above, run this cell to verify connectivity.

print(f"‚öôÔ∏è Initializing Vertex AI SDK for project {PROJECT_ID} in {REGION}...")

try:
    vertexai.init(project=PROJECT_ID, location=REGION)
    print("‚úÖ Vertex AI SDK Initialized successfully.")
    print("   (This confirms the Vertex AI API is enabled and accessible).")
except Exception as e:
    print(f"‚ùå Initialization failed: {e}")
    print("   Please double-check that the 'Vertex AI API' is enabled in the Console.")

‚öôÔ∏è Initializing Vertex AI SDK for project project-9cada6ab-d137-4c1d-be0 in us-central1...
‚úÖ Vertex AI SDK Initialized successfully.
   (This confirms the Vertex AI API is enabled and accessible).


## üèóÔ∏è Section 2: Create Your Agent with ADK

Before we deploy, we need a functional agent to host. In this section, we will package the **PulseTrace** logic into a standard ADK Agent format.

This agent is optimized for production deployment with the following configuration:
* **Model:** Uses `gemini-2.0-flash-001` for low latency and high-speed reasoning.
* **Tools:** Includes the `analyze_pipeline_logs` tool to deterministically parse errors.
* **Persona:** Acts as a Site Reliability Engineer (SRE) specialized in Data Pipelines.

We'll create the following files and directory structure:
```text
pulsetrace_adk/
‚îú‚îÄ‚îÄ agent.py                  # The PulseTrace logic
‚îú‚îÄ‚îÄ requirements.txt          # Dependencies (ADK, ReportLab)
‚îú‚îÄ‚îÄ .env                      # Environment Config
‚îî‚îÄ‚îÄ .agent_engine_config.json # Hardware specs

### 2.1: Create agent directory
We need a clean workspace to package our agent for deployment. We will create a directory named `pulsetrace_adk`.
All necessary files will be written into this folder to prepare it for the `adk deploy` command.

In [29]:
# Create agent directory
import os

# Create the specific folder for your project
!mkdir -p pulsetrace_adk
print(f"‚úÖ PulseTrace Agent directory created: 'pulsetrace_adk/'")

‚úÖ PulseTrace Agent directory created: 'pulsetrace_adk/'


### 2.2: Create requirements file
The Agent Engine builds a dedicated environment for your agent. To ensure it runs correctly, we must declare our dependencies.

We will write a `requirements.txt` file containing:
* `google-adk`: The core agent framework.
* `reportlab`: Required by PulseTrace for PDF generation.
* `opentelemetry-instrumentation-google-genai`: For observability.

In [30]:
%%writefile pulsetrace_adk/requirements.txt
google-adk
opentelemetry-instrumentation-google-genai
reportlab

Overwriting pulsetrace_adk/requirements.txt


### 2.3: Create environment configuration
We need to provide the agent with the necessary cloud configuration settings.

We will write a `.env` file that sets the cloud location to `global` and explicitly enables the Vertex AI backend. Crucially, we inject your **Project ID** here so the agent can authenticate immediately.

In [31]:
# reate environment configuration
# We use Python to write this file so we can inject the PROJECT_ID variable safely.

env_content = f"""
GOOGLE_CLOUD_PROJECT="{PROJECT_ID}"
GOOGLE_CLOUD_LOCATION="global"
GOOGLE_GENAI_USE_VERTEXAI=1
"""

with open("pulsetrace_adk/.env", "w") as f:
    f.write(env_content.strip())

print("‚úÖ .env configuration created.")

‚úÖ .env configuration created.


### 2.4: Create agent code
We will now generate the `agent.py` file. This script defines the behavior of **PulseTrace**.

It includes:
1.  **The Tool (`analyze_pipeline_logs`)**: Your custom logic that parses log lines to find schema drifts or missing partitions.
2.  **The Agent (`root_agent`)**: The ADK wrapper that connects your tool to the Gemini 2.0 model.

In [32]:
%%writefile pulsetrace_adk/agent.py
from google.adk.agents import Agent
import vertexai
import os
import re
import time
import uuid
from collections import Counter

# Initialize Vertex AI
vertexai.init(
    project=os.environ["GOOGLE_CLOUD_PROJECT"],
    location=os.environ["GOOGLE_CLOUD_LOCATION"],
)

# --- 1. THE TOOL ---
def analyze_pipeline_logs(log_content: str) -> dict:
    """
    Analyzes raw data pipeline logs to identify root causes of failures.
    
    This is a TOOL that the agent can call when users provide logs.
    For this demo, it parses the input text directly using deterministic patterns.
    
    Args:
        log_content: The raw text of the logs.
        
    Returns:
        dict: A structured RCA report containing root cause and severity.
    """
    # Logic to parse logs
    log_lines = log_content.splitlines()
    text_blob = "\n".join(log_lines).lower()
    
    job = "unknown"
    m = re.findall(r"job=([A-Za-z0-9_\-\.]+)", text_blob)
    if m: job = Counter(m).most_common(1)[0][0]
        
    if "cannot cast" in text_blob:
        rca = f"Schema Drift in job '{job}': Type Mismatch"
        sev = "MEDIUM"
        remediation = ["Update schema definition", "Backfill affected partitions"]
    elif "no files found" in text_blob:
        rca = f"Missing Partition Data for job '{job}'"
        sev = "HIGH"
        remediation = ["Check upstream producer", "Verify S3 paths"]
    elif "valueerror" in text_blob:
        rca = f"Data Quality Violation in job '{job}'"
        sev = "MEDIUM"
        remediation = ["Add data validation rules", "Filter invalid rows"]
    else:
        rca = "Unknown Error - Pattern not recognized"
        sev = "LOW"
        remediation = ["Investigate logs manually"]

    return {
        "incident_id": f"cloud-{uuid.uuid4().hex[:6]}",
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
        "job_detected": job,
        "root_cause": rca,
        "severity": sev,
        "recommended_actions": remediation
    }

# --- 2. THE AGENT DEFINITION ---
root_agent = Agent(
    name="pulsetrace_agent",
    model="gemini-2.0-flash-001", # Using the latest fast model
    description="An AI Site Reliability Engineer that analyzes data pipeline logs.",
    instruction="""
    You are PulseTrace, an automated Root Cause Analysis system.
    
    When a user provides log data:
    1. Call the 'analyze_pipeline_logs' tool with the log content.
    2. Return the JSON summary provided by the tool directly to the user.
    3. If the tool identifies a HIGH severity issue, emphasize the remediation steps.
    """,
    tools=[analyze_pipeline_logs]
)

Overwriting pulsetrace_adk/agent.py


## ‚òÅÔ∏è Section 3: Deploy to Agent Engine

ADK supports multiple deployment platforms. You'll be deploying to **Vertex AI Agent Engine** in this notebook.

This fully managed service provides:
* **Auto-scaling:** Scales to zero when not in use.
* **Session management:** Built-in memory handling.
* **Easy deployment:** Uses the simple `adk deploy` command.

### 3.1: Create deployment configuration
The `.agent_engine_config.json` file controls the deployment settings.

We define the resource limits here. For PulseTrace, we allocate **2Gi of memory** to ensure it can process log chunks efficiently.

In [33]:
%%writefile pulsetrace_adk/.agent_engine_config.json
{
    "min_instances": 0,
    "max_instances": 1,
    "resource_limits": {"cpu": "1", "memory": "2Gi"}
}

Overwriting pulsetrace_adk/.agent_engine_config.json


### 3.2: Select deployment region
Agent Engine is available in specific regions. We'll select `us-central1` for this deployment as it is the primary region for Reasoning Engine features.

In [34]:
# Select deployment region
regions_list = ["us-central1", "europe-west1", "europe-west4", "us-east4", "us-west1"]

# We select us-central1 for stability
deployed_region = "us-central1"
print(f"‚úÖ Selected deployment region: {deployed_region}")

‚úÖ Selected deployment region: us-central1


### 3.3: Deploy the agent
This uses the **ADK CLI** to deploy your agent to Agent Engine.

**Note:** This process typically takes **3-5 minutes**. Please wait until you see the message `‚úÖ Created agent engine` in the output logs.

In [35]:
# Deploy the agent
print(f"üöÄ Deploying PulseTrace to: {deployed_region}...")

!adk deploy agent_engine \
    --project=$PROJECT_ID \
    --region=$deployed_region \
    pulsetrace_adk \
    --agent_engine_config_file=pulsetrace_adk/.agent_engine_config.json

üöÄ Deploying PulseTrace to: us-central1...
Staging all files in: /kaggle/working/pulsetrace_adk_tmp20251128_165839
Copying agent source code...
Copying agent source code complete.
Resolving files and dependencies...
Reading agent engine config from pulsetrace_adk/.agent_engine_config.json
Reading environment variables from /kaggle/working/pulsetrace_adk/.env
[33mIgnoring GOOGLE_CLOUD_PROJECT in .env as `--project` was explicitly passed and takes precedence[0m
[33mIgnoring GOOGLE_CLOUD_LOCATION in .env as `--region` was explicitly passed and takes precedence[0m
Initializing Vertex AI...
Vertex AI initialized.
Created pulsetrace_adk_tmp20251128_165839/agent_engine_app.py
Files and dependencies resolved
Deploying to agent engine...
INFO:vertexai_genai.agentengines:Creating in-memory tarfile of source_packages
INFO:vertexai_genai.agentengines:Using agent framework: google-adk
INFO:vertexai_genai.agentengines:View progress and logs at https://console.cloud.google.com/logs/query?projec

## ü§ñ Section 4: Retrieve and Test Your Deployed Agent

### 4.1: Retrieve the deployed agent
After deploying with the CLI, we need to retrieve the agent object in our Python session to interact with it.

In [36]:
# Retrieve the deployed agent
import vertexai
from vertexai.preview import reasoning_engines

# Initialize Vertex AI
vertexai.init(project=PROJECT_ID, location=deployed_region)

# List agents
print("‚è≥ Listing deployed agents...")
try:
    agents_list = list(reasoning_engines.ReasoningEngine.list())
    
    if agents_list:
        remote_agent = agents_list[0]
        print(f"‚úÖ Connected to deployed agent: {remote_agent.resource_name}")
    else:
        print("‚ùå No agents found. Please wait 1-2 minutes if deployment just finished.")
except Exception as e:
    print(f"‚ùå Error: {e}")

‚è≥ Listing deployed agents...
‚úÖ Connected to deployed agent: projects/1088013112886/locations/us-central1/reasoningEngines/5571845542499057664


### 4.2: Test the deployed agent (REST + SSE)

This step sends a sample log line to the deployed PulseTrace agent using a REST-based SSE client.  
The response is streamed back in real time using `alt=sse`, allowing you to view partial outputs as they are generated.


In [37]:
# REST SSE client for Vertex AI Agent Engine (streamQuery)
# Works in Jupyter / Kaggle. Requires google-auth installed (usually present).
import json, requests, time
from google.auth.transport.requests import Request
import google.auth

# ----- CONFIG -----
# Replace with the resource_name you printed earlier (the full resource path)
resource_name = "resource_name" #ADD your resource name here
# Region endpoint (eg. us-central1)
endpoint_base = "https://us-central1-aiplatform.googleapis.com/v1"
# message to send
message = "Analyze these logs: 2025-11-24 10:03:02 ERROR job=orders transform TypeError: cannot cast '123.45' to INT"
user_id = "test_user_01"
# Optionally provide a session_id if you already created one; otherwise omit
session_id = None  # or "your_session_id"

# ----- GET ACCESS TOKEN -----
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(Request())
token = creds.token
if not token:
    raise RuntimeError("Failed to obtain access token. Make sure credentials are configured.")

# ----- BUILD REQUEST -----
# Use alt=sse to request Server-Sent Events streaming
url = f"{endpoint_base}/{resource_name}:streamQuery?alt=sse"

headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json; charset=utf-8",
    "Accept": "text/event-stream",
}

# The body format: include 'class_method' optionally and 'input' as a JSON struct.
# ADK / community examples use 'class_method': 'stream_query' and input with user_id/session_id/message
body = {
    "class_method": "stream_query",
    "input": {
        "user_id": user_id,
        "message": message,
    }
}
if session_id:
    body["input"]["session_id"] = session_id

print("‚û°Ô∏è POST", url)
print("‚û°Ô∏è Sending payload:", json.dumps(body)[:400], "...")
print("‚è≥ Waiting for streamed events... (Ctrl-C to cancel)\n")

# ----- STREAM (SSE) -----
with requests.post(url, headers=headers, json=body, stream=True, timeout=120) as resp:
    resp.raise_for_status()
    # Iterate over the raw lines from the SSE stream
    for raw_line in resp.iter_lines(decode_unicode=True):
        # skip keep-alives or empty lines
        if raw_line is None or raw_line.strip() == "":
            continue
        # SSE lines usually start with "data: "
        line = raw_line.strip()
        if line.startswith("data:"):
            payload = line[len("data:"):].strip()
        else:
            payload = line

        # Some server emits "event: ...", "data: ...", or plain JSON lines.
        # Try to parse JSON; otherwise just print.
        try:
            obj = json.loads(payload)
            # Pretty-print important fields (adjust as needed)
            print(json.dumps(obj, indent=2, ensure_ascii=False))
        except Exception:
            # not JSON ‚Äî print raw payload (helps debug)
            print(payload)

print("\n‚úÖ Stream finished.")


‚û°Ô∏è POST https://us-central1-aiplatform.googleapis.com/v1/projects/1088013112886/locations/us-central1/reasoningEngines/152220787795820544:streamQuery?alt=sse
‚û°Ô∏è Sending payload: {"class_method": "stream_query", "input": {"user_id": "test_user_01", "message": "Analyze these logs: 2025-11-24 10:03:02 ERROR job=orders transform TypeError: cannot cast '123.45' to INT"}} ...
‚è≥ Waiting for streamed events... (Ctrl-C to cancel)

{
  "model_version": "gemini-2.0-flash-001",
  "content": {
    "parts": [
      {
        "function_call": {
          "id": "adk-b41b8031-038c-4895-bdd0-26913db80db7",
          "args": {
            "log_content": "2025-11-24 10:03:02 ERROR job=orders transform TypeError: cannot cast '123.45' to INT"
          },
          "name": "analyze_pipeline_logs"
        }
      }
    ],
    "role": "model"
  },
  "finish_reason": "STOP",
  "usage_metadata": {
    "candidates_token_count": 46,
    "candidates_tokens_details": [
      {
        "modality": "TEXT",


## üßπ Section 6: Cleanup

### ‚ö†Ô∏è Important: Prevent unexpected charges
To avoid incurring costs for the running agent engine, you should delete the resource when you are done testing.

In [38]:
# Cleanup resources
# Uncomment the lines below to delete the agent
if 'remote_agent' in locals():
    print(f"üóëÔ∏è Deleting Agent: {remote_agent.resource_name}...")
    try:
        remote_agent.delete()
        print("‚úÖ Agent successfully deleted")
    except Exception as e:
        print(f"‚ùå Deletion error: {e}")

üóëÔ∏è Deleting Agent: projects/1088013112886/locations/us-central1/reasoningEngines/5571845542499057664...
‚úÖ Agent successfully deleted


## üõ†Ô∏è Troubleshooting

This section helps diagnose issues that may appear while running the PulseTrace demo.  
Use the points below to understand and resolve common problems.


### ‚öôÔ∏è UI Panel Not Showing
If the UI dropdown, file uploader, or buttons do not appear:

- `ipywidgets` may not be available in the environment.  
- Ensure the `UI_AVAILABLE` flag printed during setup shows **True**.
- If widgets cannot be installed, you can still run PulseTrace manually using:
  ```python
  run_offline_samples(...)
  ```


### üßæ No Draft Report Generated
If the UI prints **‚ÄúNo draft report produced.‚Äù**:

- The Diagnoser may not have produced a signature (often due to log parsing failures).
- Ensure your uploaded file contains readable text lines.
- Use the Inspector UI to verify:
  - **Trace Store** ‚Üí agent flow reached Advisor  
  - **Sessions** ‚Üí signature, history, and impact exist


### üì° Traces / Sessions Not Visible
If the Inspector UI shows:

- `render_trace_store not available`  
- `render_sessions not available`

Then:

- Confirm that the Trace & Session Rendering Helpers cell was executed.
- Verify that `TRACE_STORE` and `SESSIONS` were initialized earlier.
- You may also inspect manually:
  ```python
  TRACE_STORE[-5:]
  SESSIONS.keys()
  ```


### ‚òÅÔ∏è Gemini / Hybrid Mode Errors
If Gemini calls fail or hybrid mode doesn‚Äôt activate:

- Ensure `GOOGLE_API_KEY` is added to Kaggle Secrets.
- Verify the environment setup printed **‚ÄúGemini Mode: ON‚Äù**.
- In restricted environments, Gemini is automatically disabled.
- You can also force offline mode by setting:
  ```python
  USE_GEMINI = False
  ```


### üìÅ Uploaded File Not Detected
If you see:

- **‚Äúno uploaded value detected‚Äù**  
- or  
  **‚ÄúSelect a demo sample or upload a log file.‚Äù**

Then:

- Make sure you selected a file *and* clicked **Run Diagnosis** after uploading.
- Some notebook environments return unusual `FileUpload.value` structures ‚Äî the fallback extractor handles most, but not all malformed objects.
- Try uploading a simple `.log` or `.txt` file.


### ‚õìÔ∏è General Diagnostics
If PulseTrace behaves unexpectedly:

- Ensure all previous cells executed without errors.
- Confirm router behavior by checking Trace Store.
- Test a synthetic scenario manually:
  ```python
  run_offline_samples(show=True)
  ```
- If you modified agent code, ensure every agent still defines a valid `on_message()` method.


### ‚úÖ Quick Troubleshooting Checklist

- [ ] UI visible (`UI_AVAILABLE == True`)  
- [ ] Uploaded file detected or sample selected  
- [ ] Diagnoser produced a signature (`SESSIONS[...]["signature"]` exists)  
- [ ] A2A traces appear in Trace Store  
- [ ] Advisor produced Markdown draft (`SESSIONS[...]["report"]["md"]`)  
- [ ] Gemini optional ‚Äî offline mode fully functional


## ‚ö†Ô∏è Current Limitations

PulseTrace is a functional multi-agent RCA demo, but it operates within a few intentional constraints:

- **Offline-first design**  
  The system runs fully offline by default using synthetic logs, schemas, and lineage metadata. Real system integrations are not included.

- **Simplified failure patterns**  
  Scenario detection is optimized for structured log formats and may not generalize to noisy, multi-stage, or unstructured logs.

- **No persistent storage layer**  
  Reports are displayed in the UI but not permanently written to disk unless the save function is explicitly triggered.

- **Basic agent memory**  
  Memory stores lightweight incident fingerprints but does not include embeddings, similarity search, or long-term vector memory.

- **Notebook UI dependence**  
  The interactive UI relies on `ipywidgets`. Environments without widget support must use manual execution via code.

- **Hybrid Gemini mode is optional**  
  PulseTrace supports live online reasoning via Gemini when an API key is present, but only specific steps (like log summarization) use it currently.

These limitations allow PulseTrace to stay lightweight and responsive while still demonstrating a complete multi-agent RCA workflow.


## üöÄ What's Next for PulseTrace

PulseTrace already demonstrates a complete multi-agent RCA workflow, but there are several exciting directions to expand the system:

- **Add real integrations**  
  Connect to actual log stores, schema registries, lineage tools, and monitoring systems instead of offline simulation.

- **Strengthen Gemini hybrid mode**  
  Route more agent reasoning through Gemini when available and add richer summaries or deeper log insights.

- **Extend the Diagnoser**  
  Implement anomaly detection, data quality checks, and graph-based propagation logic for more complex failures.

- **Improve the UI**  
  Add collapsible panels, richer report previews, and a timeline view of agent-to-agent messages.

- **Model memory enhancements**  
  Store richer historical fingerprints and use them to surface smarter, pattern-based suggestions.

These additions will help PulseTrace evolve from a demo into a robust, production-grade RCA assistant for data engineering workflows.


## üìù Conclusion

PulseTrace demonstrates how a coordinated multi-agent system can streamline root cause analysis for data pipeline failures. By combining deterministic tools, message-based agent orchestration, hybrid Gemini reasoning, and optional UI interaction, it provides a clear blueprint for building intelligent, modular RCA systems.

The workflow‚ÄîDetector ‚Üí Diagnoser ‚Üí History Analyzer ‚Üí Impact Analyzer ‚Üí Advisor‚Äîshows how specialized agents can collaborate, exchange context, and synthesize a final explanation that is both actionable and transparent.

Although this notebook runs on simulated data, the architecture is designed to extend naturally to real monitoring systems, log stores, lineage platforms, and large-scale data ecosystems. With further enhancements in memory, anomaly detection, and integrations, PulseTrace can evolve into a fully capable, production-ready RCA assistant for modern data engineering teams.

**Thank you for exploring PulseTrace!**
