In [2]:
import json
import time
import os
import pandas as pd
import vertexai
from vertexai.generative_models import GenerativeModel, GenerationConfig
import PyPDF2
import concurrent.futures
import random

# --- 1. CONFIGURATION ---
PROJECT_ID = "revolut-dev-apps"
LOCATION = "us-central1"
MODEL_NAME = "gemini-2.5-pro"

# Output Files
FILE_PAYLOAD = "cm_risk_payload.jsonl"
FILE_ANALYSIS = "cm_structural_rca.jsonl"
FILE_PROPOSAL = "2026_technical_analysis.md"

# Reference Docs
POLICY_PDF = "CM-pol.pdf"
SWITCHBOARD_PDF = "CMS.pdf"

TOTAL_NARRATIVE_THRESHOLD = 25

# --- 2. HELPERS ---
def extract_pdf_text(file_path):
    if not os.path.exists(file_path): return ""
    text = ""
    try:
        with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            for page in reader.pages: text += page.extract_text() + "\n"
        return text
    except: return ""

def process_row(row, model, instruction):
    """
    Process a single row with Auto-Retry logic for 429 errors.
    """
    # 1. Narrative Check
    narrative = f"{row.get('summary', '')} {row.get('description', '')} {row.get('root_cause_details', '')}"
    if len(narrative) < TOTAL_NARRATIVE_THRESHOLD:
        return None

    # 2. Payload Prep
    full_payload = {k: (str(v) if not pd.isna(v) else "") for k, v in row.items()}
    row_id = full_payload.get('id', 'unknown')

    # 3. AI Analysis with Retry Backoff
    max_retries = 5
    for attempt in range(max_retries):
        try:
            response = model.generate_content(
                [instruction, json.dumps(full_payload)],
                generation_config=GenerationConfig(response_mime_type="application/json", temperature=0.0)
            )
            res = json.loads(response.text)

            record = {
                "id": str(row_id),
                "meta": {
                    "dept": str(full_payload.get('responsible_department', '')),
                    "severity": str(full_payload.get('severity', '')),
                    "impacted": str(full_payload.get('impacted_entities', '')),
                    "financial_impact": str(full_payload.get('impact_financial', '')),
                    "customer_impact": str(full_payload.get('impact_customer', '')),
                    "regulatory_impact": str(full_payload.get('impact_regulator', '')),
                    "top_risk": str(full_payload.get('primary_risk_is_top_risk', '')),
                    "governance_tier": str(full_payload.get('governance_tier', '')),
                    "non_compliance": str(full_payload.get('regulation_has_risk_of_non_compliance', '')),
                    "risk_type_l1": str(full_payload.get('primary_risk_type_l1', '')),
                    "exam_name": str(full_payload.get('related_exam_name', '')),
                    "due_date": str(full_payload.get('remediation_due_date', ''))
                },
                "analysis": res
            }
            return (full_payload, record)

        except Exception as e:
            error_str = str(e)
            if "429" in error_str or "Resource exhausted" in error_str:
                if attempt < max_retries - 1:
                    sleep_time = (2 ** attempt) + random.uniform(0, 1) # Exponential backoff
                    print(f" -> ⚠️ 429 on {row_id}, retrying in {sleep_time:.1f}s...")
                    time.sleep(sleep_time)
                    continue
            
            # If not 429, or out of retries, log and fail
            print(f" -> ❌ Error {row_id}: {e}")
            return None

# --- 3. SYSTEM INSTRUCTIONS ---

RCA_INSTRUCTION = """
Analyze the incident using the full metadata, specifically considering:
1. Financial Impact: impact_financial
2. Customer Volume: impact_customer
3. Regulatory Risk: impact_regulator / Non-compliance: regulation_has_risk_of_non_compliance
4. Strategic Urgency: primary_risk_is_top_risk / governance_tier

RETURN JSON ONLY:
{
  "failure_mode": "[Validation Oversight | Tooling/Automation Failure | Human Error | Process Complexity | Legacy/Backward Compatibility | Guidance Gap]",
  "technical_root_cause": "A concise, technical 5-Why summary.",
  "risk_priority_score": "Score from 1-10 based on the enriched impact metrics provided (Financial, Customer, Regulatory).",
  "policy_gap_classification": "Classify using this strict logic:
   - IGNORED: A mandate exists in the Policy/Playbook (even high-level), but was not executed.
   - INSUFFICIENT: The policy was followed, but the incident still occurred because the rule was too vague.
   - MISSING: No rule exists for this domain at all.",
  "policy_gap_logic": "Explain the gap. If IGNORED, mention what requirement was missed.",
  "preventative_control": "Specific technical or procedural control to prevent recurrence."
}
"""

SYNTHESIS_INSTRUCTION = """
Perform a Technical Risk Audit of the aggregated RCA data against the provided Policy Library.
Apply the Pyramid Principle: Start with the most critical findings and impact-weighted remediation.

OUTPUT FORMAT: Strict Technical Report (Markdown). No conversational text.

REQUIRED SECTIONS:
1. EXECUTIVE SUMMARY (Pyramid Principle): Top-level findings prioritized by actual risk (Financial, Customer, Regulatory).
2. IMPACT-WEIGHTED PARETO ANALYSIS: Top Failure Modes weighted by risk metrics, not just frequency.
3. COMPLIANCE AUDIT: Table of [ID | Failure Mode | Gap Type | Policy Reference].
   - If IGNORED: Cite the specific Policy/Switchboard section violated.
   - If INSUFFICIENT: Explain the specific deficiency in the current text.
4. SWITCHBOARD LOGIC GAPS: Identify specific criteria in the Switchboard that are failing.
5. DOCUMENT REDLINES: Proposed text amendments ("Current Text" -> "Proposed Text").
"""

# --- 4. SQL QUERY ---
RCA_QUERY = """
SELECT
    id, report_id, summary, description, root_cause_details,
    responsible_department, assignee_entity, impacted_entities,
    operating_type, brm_department_type, risk_type_id,
    additional_risk_type_ids, cause_category_l1, cause_category_l2,
    cause_category_l3, severity, impact_customer, impact_regulator,
    issue_state, issue_state_category, reporting_date,
    impact_financial, governance_tier, regulation_has_risk_of_non_compliance,
    primary_risk_type_l1, related_exam_name, remediation_due_date,
    primary_risk_is_top_risk
FROM risk.risk_issues
WHERE (risk_type_id = 186 OR additional_risk_type_ids LIKE '%186%')
AND issue_state NOT IN ('invalid', 'deleted')
AND reporting_date >= TIMESTAMP '2024-01-01 00:00:00 UTC'
ORDER BY reporting_date DESC
"""

def run_production_pipeline():
    print(f"🚀 Starting Production Pipeline ({LOCATION})...")
    vertexai.init(project=PROJECT_ID, location=LOCATION)
    model = GenerativeModel(MODEL_NAME)

    # A. DATA EXTRACTION
    print("Step 1: Extracting Full Dataset...")
    try:
        with zeus() as cur: df = execute_sql(cur, RCA_QUERY, None)
        print(f" -> Records found: {len(df)}")
    except Exception as e: print(f"ERROR: {e}"); return

    # B. STRUCTURAL ANALYSIS (SAFE PARALLEL)
    print(f"Step 2: Processing RCAs (Max Workers=5)...")
    analysis_results = []
    records = df.to_dict('records')

    with open(FILE_PAYLOAD, 'w') as f_pay, open(FILE_ANALYSIS, 'w') as f_ana:
        # Reduced workers to 5 to stay under quota
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            results = executor.map(lambda r: process_row(r, model, RCA_INSTRUCTION), records)

            count = 0
            for res in results:
                if res:
                    payload, record = res
                    f_pay.write(json.dumps(payload) + '\n')
                    f_ana.write(json.dumps(record) + '\n')
                    analysis_results.append(record)
                    
                    count += 1
                    if count % 20 == 0:
                        print(f" -> Processed {count}/{len(df)}...")

    # C. TECHNICAL SYNTHESIS
    print("Step 3: Generating Technical Report...")
    policy_text = extract_pdf_text(POLICY_PDF)
    switchboard_text = extract_pdf_text(SWITCHBOARD_PDF)

    # Payload optimization: remove large text fields if they are already summarized in analysis
    optimized_results = []
    for r in analysis_results:
        optimized_results.append({
            "id": r["id"],
            "meta": r["meta"],
            "analysis": {
                "failure_mode": r["analysis"].get("failure_mode"),
                "technical_root_cause": r["analysis"].get("technical_root_cause"),
                "risk_priority_score": r["analysis"].get("risk_priority_score"),
                "policy_gap_classification": r["analysis"].get("policy_gap_classification")
            }
        })

    synthesis_payload = f"""
    {SYNTHESIS_INSTRUCTION}
    
    --- REFERENCE LIBRARY ---
    [POLICY v1.3]: {policy_text[:40000]} 
    [SWITCHBOARD]: {switchboard_text[:20000]}
    
    --- RCA DATASET ---
    {json.dumps(optimized_results)}
    """

    for attempt in range(max_retries):
        try:
            final_report = model.generate_content(
                synthesis_payload,
                generation_config=GenerationConfig(temperature=0.1)
            )
            with open(FILE_PROPOSAL, 'w') as f_rep:
                f_rep.write(final_report.text)
            break
        except Exception as e:
            if "429" in str(e) and attempt < max_retries - 1:
                sleep_time = (2 ** attempt) + random.uniform(0, 1)
                print(f" -> ⚠️ Synthesis 429, retrying in {sleep_time:.1f}s...")
                time.sleep(sleep_time)
                continue
            print(f"Synthesis Error: {e}")
            break

    # D. VERIFICATION
    print("\nStep 4: Exporting Enriched Dataset to CSV...")
    try:
        enriched_df = pd.DataFrame([
            {**r['meta'], **r['analysis'], 'id': r['id']} for r in analysis_results
        ])
        enriched_df.to_csv("enriched_risk_analysis.csv", index=False)
        print("  [CREATED] enriched_risk_analysis.csv")
    except Exception as e: print(f"CSV Export Error: {e}")

    print("\n✅ PIPELINE COMPLETE. OUTPUT FILES:")
    for fname in [FILE_PAYLOAD, FILE_ANALYSIS, FILE_PROPOSAL, "enriched_risk_analysis.csv"]:
        if os.path.exists(fname):
            size = os.path.getsize(fname)
            print(f"  [CREATED] {fname:<30} Size: {size/1024:.1f} KB")

if __name__ == "__main__":
    run_production_pipeline()

🚀 Starting Production Pipeline (us-central1)...
Step 1: Extracting Full Dataset...
[2026-01-26 22:44:18.446470] Connected successfully.






 -> Records found: 220
Step 2: Processing RCAs (Max Workers=5)...
 -> Processed 20/220...
 -> Processed 40/220...
 -> Processed 60/220...
 -> Processed 80/220...
 -> Processed 100/220...
 -> Processed 120/220...
 -> Processed 140/220...
 -> Processed 160/220...
 -> Processed 180/220...
 -> Processed 200/220...
 -> Processed 220/220...
Step 3: Generating Technical Report...
Synthesis Error: 429 Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.

✅ PIPELINE COMPLETE. OUTPUT FILES:
  [CREATED] cm_risk_payload.jsonl          Size: 231.9 KB
  [CREATED] cm_structural_rca.jsonl        Size: 304.0 KB
  [CREATED] 2026_technical_analysis.md     Size: 13.1 KB
