# Disclaimer

**For Demonstration Purposes Only**

This notebook is intended for demonstration and illustrative purposes only. It is not designed for use in a production environment. The code and configurations herein have not been hardened for security, performance, or reliability and should not be deployed as-is. Use at your own risk.

In [None]:
# @title 1. Environment Setup & Cleanup (Please Update Global Configuration)
import warnings
warnings.filterwarnings('ignore')

!pip install --quiet google-cloud-bigquery google-cloud-bigquery-datatransfer google-cloud-datastream google-cloud-dataform google-cloud-aiplatform

import json
import random
import time
from datetime import datetime
from google.cloud import bigquery
from google.cloud import dataform_v1beta1
import vertexai
from IPython.display import HTML, display

# --- üõ†Ô∏è GLOBAL CONFIGURATION ---
PROJECT_ID = "project-id-here"
REGION = "region-here"
DATASET_ID = "dataset-id-here"
REPO_ID = "repo-id-here"
WORKSPACE_ID = "workspace-id-here"
LOG_TABLE_ID = f"{DATASET_ID}.schema_change_log" # <--- Central Log Table
# -------------------------------

bq_client = bigquery.Client(project=PROJECT_ID)
vertexai.init(project=PROJECT_ID, location=REGION)

def reset_demo_environment():
    print(f"üßπ Cleaning up dataset '{DATASET_ID}'...")

    # 1. Drop old tables
    tables_to_drop = [
        f"{DATASET_ID}.rigid_table",
        f"{DATASET_ID}.flexible_raw",
        f"{DATASET_ID}.schema_change_log"
    ]
    for t in tables_to_drop:
        bq_client.delete_table(f"{PROJECT_ID}.{t}", not_found_ok=True)
        print(f"   - Deleted {t}")

    # 2. Create the Persistent Audit Log Table immediately
    schema_log = [
        bigquery.SchemaField("timestamp", "TIMESTAMP"),
        bigquery.SchemaField("source_id", "STRING"),
        bigquery.SchemaField("severity", "STRING"),
        bigquery.SchemaField("change_type", "STRING"),
        bigquery.SchemaField("description", "STRING"),
        bigquery.SchemaField("suggested_action", "STRING"),
        bigquery.SchemaField("raw_record", "JSON")
    ]
    table_log = bigquery.Table(f"{PROJECT_ID}.{LOG_TABLE_ID}", schema=schema_log)
    bq_client.create_table(table_log)
    print(f"   + Created Persistent Log Table: {LOG_TABLE_ID}")

    print("‚ú® Environment Ready.")

reset_demo_environment()

In [None]:
# @title 2. Architecture Concept
from IPython.display import HTML

html_code_1 = """
<!DOCTYPE html>
<html>
<head>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.1/css/all.min.css">
<link href="https://fonts.googleapis.com/css?family=Google+Sans:400,500,700&display=swap" rel="stylesheet">
<style>
    /* --- Main Container --- */
    .arch-wrapper {
        background-color: #fff; border-radius: 12px; padding: 30px;
        font-family: 'Google Sans', 'Arial', sans-serif; border: 1px solid #dfe1e5;
        max-width: 900px; margin: 0 auto;
    }
    .arch-header { text-align: center; margin-bottom: 30px; }
    .arch-title { color: #202124; font-size: 1.8em; margin: 0; font-weight: 500; }
    .arch-subtitle { color: #5f6368; font-size: 0.95em; margin-top: 8px; }

    /* --- Toggle Switch --- */
    .toggle-container {
        display: flex; justify-content: center; margin-bottom: 50px;
        background: #f1f3f4; border-radius: 30px; padding: 4px;
        width: fit-content; margin-left: auto; margin-right: auto;
    }
    .toggle-btn {
        padding: 10px 25px; border-radius: 25px; cursor: pointer;
        font-weight: 600; font-size: 0.9em; transition: all 0.3s ease;
        display: flex; align-items: center; gap: 8px;
    }
    .toggle-btn.active { background: #fff; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }
    .btn-rigid.active { color: #c5221f; } /* Google Red */
    .btn-adaptive.active { color: #34a853; } /* Google Green */
    .btn-inactive { color: #5f6368; }

    /* --- Flow Diagram Area --- */
    .flow-stage { display: none; animation: fadeIn 0.4s ease; }
    .flow-stage.visible { display: flex; flex-direction: column; gap: 30px; }

    @keyframes fadeIn { from { opacity: 0; transform: translateY(10px); } to { opacity: 1; transform: translateY(0); } }

    /* --- Visual Rows --- */
    .viz-row {
        display: flex; align-items: center; justify-content: space-between;
        position: relative; height: 160px; /* Fixed height for stability */
    }

    /* --- Nodes --- */
    .node {
        width: 150px; padding: 20px 10px; border-radius: 8px; text-align: center;
        border: 2px solid; position: relative; background: #fff; z-index: 5;
        box-shadow: 0 4px 10px rgba(0,0,0,0.05);
    }
    .node i { font-size: 2em; margin-bottom: 10px; display: block; }
    .node-label { font-weight: bold; font-size: 1em; margin-bottom: 5px; }
    .node-desc { font-size: 0.8em; opacity: 0.8; line-height: 1.3; }

    /* Colors */
    .n-source { border-color: #4285f4; color: #4285f4; }
    .n-process { border-color: #fbbc04; color: #f9ab00; }
    .n-dest-success { border-color: #34a853; color: #34a853; background: #e6f4ea; }
    .n-dest-fail { border-color: #ea4335; color: #ea4335; background: #fce8e6; }

    /* --- The Data Payload (The moving part) --- */
    /* FIXED ALIGNMENT: Moved left: 21% to avoid hitting the center node */
    .payload {
        position: absolute;
        top: 50%;
        left: 21%; /* <--- ADJUSTED FROM 27% TO 21% */
        transform: translateY(-50%);

        background: #202124; color: #fff; padding: 8px 12px; border-radius: 6px;
        font-family: 'Courier New', monospace; font-size: 0.8em; /* Smaller font */
        box-shadow: 0 6px 12px rgba(0,0,0,0.15);
        z-index: 10;
        text-align: left;
        white-space: nowrap;
    }
    .payload span { color: #fbbc04; font-weight: bold; }
    .payload.adaptive-style {
        background: #fff; color: #333; border: 2px solid #34a853;
    }

    /* --- Connection Lines --- */
    .line-connector {
        position: absolute; top: 50%; left: 80px; right: 80px; height: 3px;
        background: #dfe1e5; z-index: 1; transform: translateY(-50%);
    }

    /* --- Status Messages --- */
    .status-msg {
        text-align: center; padding: 15px; border-radius: 8px; font-size: 0.95em;
        display: flex; align-items: center; justify-content: center; gap: 10px;
    }
    .msg-fail { background: #fce8e6; color: #c5221f; border: 1px solid #fad2cf; }
    .msg-success { background: #e6f4ea; color: #1e8e3e; border: 1px solid #ceead6; }

</style>
</head>
<body>

<div class="arch-wrapper">
    <div class="arch-header">
        <h2 class="arch-title">Architecture Evolution</h2>
        <div class="arch-subtitle">Switch views to see how data flows in each scenario</div>
    </div>

    <!-- Toggle Switch -->
    <div class="toggle-container">
        <div id="btn-rigid" class="toggle-btn btn-rigid active" onclick="switchView('rigid')">
            <i class="fa-solid fa-triangle-exclamation"></i> The "Rigid" Way
        </div>
        <div id="btn-adaptive" class="toggle-btn btn-inactive" onclick="switchView('adaptive')">
            <i class="fa-solid fa-shield-halved"></i> The "Adaptive" Way
        </div>
    </div>

    <!-- SCENARIO 1: RIGID (The Failure) -->
    <div id="view-rigid" class="flow-stage visible">
        <div class="viz-row">
            <!-- Line with Gradient for Failure -->
            <div class="line-connector" style="background: linear-gradient(to right, #4285f4 40%, #ea4335 40%);"></div>

            <!-- Step 1: Source -->
            <div class="node n-source">
                <i class="fa-solid fa-server"></i>
                <div class="node-label">Vendor API</div>
                <div class="node-desc">Updates Schema</div>
            </div>

            <!-- Payload (Fixed Positioning) -->
            <div class="payload">
                { id: 1,<br><span>email: "new"</span> }
            </div>

            <!-- Step 2: The Wall -->
            <div class="node n-process" style="border-color: #ea4335; color: #ea4335;">
                <i class="fa-solid fa-ban"></i>
                <div class="node-label">Schema Wall</div>
                <div class="node-desc">Strict Validation</div>
            </div>

            <!-- Step 3: Destination -->
            <div class="node n-dest-fail">
                <i class="fa-solid fa-bomb"></i>
                <div class="node-label">Pipeline Crash</div>
                <div class="node-desc">Data Rejected</div>
            </div>
        </div>

        <div class="status-msg msg-fail">
            <i class="fa-solid fa-circle-xmark"></i> <span><b>FAILURE:</b> The ETL script expects exactly 2 columns. The new 'email' field causes a hard crash. Data is lost.</span>
        </div>
    </div>

    <!-- SCENARIO 2: ADAPTIVE (The Solution) -->
    <div id="view-adaptive" class="flow-stage">
        <div class="viz-row">
            <!-- Line with Gradient for Success -->
            <div class="line-connector" style="background: linear-gradient(to right, #4285f4, #34a853);"></div>

            <!-- Step 1: Source -->
            <div class="node n-source">
                <i class="fa-solid fa-server"></i>
                <div class="node-label">Vendor API</div>
                <div class="node-desc">Updates Schema</div>
            </div>

            <!-- Payload (Fixed Positioning + Style) -->
            <div class="payload adaptive-style">
                <span style="color:#34a853">payload:</span> {<br>&nbsp;&nbsp;id: 1, email: "new"<br>}
            </div>

            <!-- Step 2: Wrapper -->
            <div class="node n-process" style="border-color: #34a853; color: #34a853;">
                <i class="fa-solid fa-box-open"></i>
                <div class="node-label">Encapsulation</div>
                <div class="node-desc">Wrap as JSON</div>
            </div>

            <!-- Step 3: Destination -->
            <div class="node n-dest-success">
                <i class="fa-solid fa-database"></i>
                <div class="node-label">BigQuery Raw</div>
                <div class="node-desc">Always Accepts</div>
            </div>
        </div>

        <div class="status-msg msg-success">
            <i class="fa-solid fa-check-circle"></i> <span><b>SUCCESS:</b> The pipeline ignores the schema. It wraps the whole object in a JSON column. We extract the 'email' field later.</span>
        </div>
    </div>

</div>

<script>
    window.switchView = function(mode) {
        // 1. Reset Buttons
        document.getElementById('btn-rigid').className = 'toggle-btn btn-inactive';
        document.getElementById('btn-adaptive').className = 'toggle-btn btn-inactive';

        // 2. Hide Views
        document.getElementById('view-rigid').className = 'flow-stage';
        document.getElementById('view-adaptive').className = 'flow-stage';

        // 3. Activate Selected
        if (mode === 'rigid') {
            document.getElementById('btn-rigid').className = 'toggle-btn btn-rigid active';
            document.getElementById('view-rigid').className = 'flow-stage visible';
        } else {
            document.getElementById('btn-adaptive').className = 'toggle-btn btn-adaptive active';
            document.getElementById('view-adaptive').className = 'flow-stage visible';
        }
    }
</script>
</body>
</html>
"""
display(HTML(html_code_1))

In [None]:
# @title 3. Simulation: The "Rigid" Crash
table_id_rigid = f"{PROJECT_ID}.{DATASET_ID}.rigid_table"

# 1. Create a table with a STRICT schema (Name, Age)
schema = [
    bigquery.SchemaField("name", "STRING"),
    bigquery.SchemaField("age", "INTEGER"),
]
table = bigquery.Table(table_id_rigid, schema=schema)
bq_client.create_table(table, exists_ok=True)
print("‚ö†Ô∏è  Created rigid table (Expects only Name, Age).")

# 2. Simulate an API sending a NEW field "email"
print("‚ö° Vendor sends new data with unexpected 'email' field...")
rows_to_insert = [
    {"name": "Alice", "age": 30, "email": "alice@example.com"}
]

# 3. Try to insert - THIS WILL FAIL
errors = bq_client.insert_rows_json(table_id_rigid, rows_to_insert)
if errors:
    print(f"\n‚ùå PIPELINE CRASHED! Schema mismatch error:\n{errors}")

In [None]:
# @title 4. Solution: The "Anti-Fragile" Loader
raw_table_id = f"{PROJECT_ID}.{DATASET_ID}.flexible_raw"

# 1. Create a RAW table with just ONE column: JSON
schema_raw = [
    bigquery.SchemaField("ingest_timestamp", "TIMESTAMP"),
    bigquery.SchemaField("source_id", "STRING"),
    bigquery.SchemaField("payload", "JSON") # <--- The Magic
]
table_raw = bigquery.Table(raw_table_id, schema=schema_raw)
bq_client.create_table(table_raw, exists_ok=True)
print("‚úÖ Created flexible table (JSON Encapsulation).")

# 2. The same "broken" data from before
api_data = {"name": "Alice", "age": 30, "email": "alice@example.com"}

# 3. Wrap it and Insert
row = [{
    "ingest_timestamp": datetime.now().isoformat(),
    "source_id": "vendor_demo",
    "payload": json.dumps(api_data)
}]

errors = bq_client.insert_rows_json(raw_table_id, row)
if not errors:
    print(f"\nüéâ SUCCESS! Data loaded despite schema change.")
    print(f"   Stored Payload: {json.dumps(api_data)}")

In [None]:
# @title 5. Visual: Smart Scaling
from IPython.display import HTML

html_code_2 = """
<!DOCTYPE html>
<html>
<head>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.1/css/all.min.css">
<link href="https://fonts.googleapis.com/css?family=Google+Sans:400,500,600,700&display=swap" rel="stylesheet">
<style>
    /* --- Main Container --- */
    .pro-wrapper {
        background-color: #f8f9fa; border-radius: 16px; padding: 40px 20px;
        font-family: 'Google Sans', 'Arial', sans-serif; border: 1px solid #e0e0e0;
        /* Fixed min-width ensures the grid never crushes the layout */
        min-width: 1000px; overflow-x: auto;
    }
    .pro-title { text-align: center; color: #202124; font-size: 22px; margin-bottom: 40px; font-weight: 500; }

    /* --- The Grid System --- */
    .circuit-grid {
        display: grid;
        /*
           Col 1: Sources (120px)
           Col 2: Arrow (50px)
           Col 3: Ingest (140px)
           Col 4: Fork (60px)
           Col 5: Parallel Track 1 (Raw/Agent) (150px)
           Col 6: Arrow/Line (60px)
           Col 7: Audit (150px)
           Col 8: Merge (60px)
           Col 9: Dataform (140px)
        */
        grid-template-columns: 120px 50px 140px 60px 150px 60px 150px 60px 140px;
        grid-template-rows: 110px 110px; /* Two balanced rows */
        align-items: center; justify-content: center;
    }

    /* --- Card Styling (The "Pro" Look) --- */
    .node-card {
        background: #fff; border-radius: 12px; padding: 12px;
        box-shadow: 0 2px 6px rgba(0,0,0,0.06); border: 1px solid #eaecf0;
        display: flex; flex-direction: column; align-items: center; justify-content: center;
        text-align: center; cursor: pointer; transition: all 0.2s ease;
        position: relative; z-index: 5; height: 80px; box-sizing: border-box;
    }
    .node-card:hover {
        transform: translateY(-3px); box-shadow: 0 10px 20px rgba(0,0,0,0.1); border-color: #d0d5dd;
    }

    /* Status Strip (Left Border) */
    .node-card::after {
        content: ''; position: absolute; left: 0; top: 10px; bottom: 10px; width: 4px;
        border-radius: 0 4px 4px 0;
    }

    /* Typography */
    .n-icon { font-size: 20px; margin-bottom: 6px; }
    .n-title { font-size: 13px; font-weight: 700; color: #3c4043; line-height: 1.2; }
    .n-sub { font-size: 11px; color: #5f6368; margin-top: 3px; }

    /* Colors */
    .c-ingest::after { background: #4285f4; } .c-ingest .n-icon { color: #4285f4; }
    .c-raw::after { background: #34a853; } .c-raw .n-icon { color: #34a853; }
    .c-agent::after { background: #9334e6; } .c-agent .n-icon { color: #9334e6; }
    .c-audit::after { background: #00acc1; } .c-audit .n-icon { color: #00acc1; }
    .c-df::after { background: #fbbc04; } .c-df .n-icon { color: #f9ab00; }

    /* --- Grid Placements --- */
    /* Sources (Span 2 Rows) */
    #g-sources { grid-column: 1; grid-row: 1 / span 2; height: 190px; padding: 5px; }
    .source-grid { display: grid; grid-template-columns: 1fr 1fr; gap: 5px; width: 100%; height: 100%; }
    .mini-chip { border: 1px solid #eee; border-radius: 6px; display: flex; align-items: center; justify-content: center; color: #5f6368; font-size: 14px; background: #fbfbfb; }

    /* Ingest (Span 2 Rows) */
    #g-ingest { grid-column: 3; grid-row: 1 / span 2; height: 190px; }

    /* Top Track */
    #g-raw { grid-column: 5; grid-row: 1; }

    /* Bottom Track */
    #g-agent { grid-column: 5; grid-row: 2; }
    #g-audit { grid-column: 7; grid-row: 2; }

    /* Dataform (Span 2 Rows) */
    #g-df { grid-column: 9; grid-row: 1 / span 2; height: 190px; }


    /* --- CONNECTORS (The Fix) --- */
    /* Using Borders instead of SVG guarantees alignment */
    .connector { position: relative; width: 100%; height: 100%; pointer-events: none; }

    /* Horizontal Line */
    .line-h {
        position: absolute; top: 50%; left: 0; right: 0; height: 2px; background: #dbe0ea;
        transform: translateY(-50%);
    }
    /* Arrow Icon */
    .arrow-icon {
        position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%);
        color: #dbe0ea; font-size: 14px; background: #f8f9fa; padding: 0 4px;
    }

    /* The Fork (Col 4) */
    .fork-branch {
        position: absolute; width: 50%; height: 50%; left: 0; border-left: 2px solid #dbe0ea;
    }
    .fork-top { top: 0; border-bottom: 2px solid #dbe0ea; border-bottom-left-radius: 12px; }
    .fork-btm { bottom: 0; border-top: 2px solid #dbe0ea; border-top-left-radius: 12px; }

    /* The Merge (Col 8) */
    .merge-branch {
        position: absolute; width: 50%; height: 50%; right: 0; border-right: 2px solid #dbe0ea;
    }
    .merge-top { top: 0; border-bottom: 2px solid #dbe0ea; border-bottom-right-radius: 12px; }
    .merge-btm { bottom: 0; border-top: 2px solid #dbe0ea; border-top-right-radius: 12px; }

    /* The Pass-Through Wire (Top Row, Cols 6 & 7) */
    .wire-long {
        grid-column: 6 / span 2; grid-row: 1; position: relative; height: 100%;
    }


    /* --- Modal --- */
    .modal-pro {
        display: none; position: fixed; z-index: 10000; left: 0; top: 0;
        width: 100%; height: 100%; background-color: rgba(255,255,255,0.9);
        backdrop-filter: blur(4px);
        justify-content: center; align-items: center;
    }
    .modal-card {
        background-color: #fff; padding: 40px; border-radius: 24px;
        max-width: 500px; width: 90%;
        box-shadow: 0 20px 50px rgba(0,0,0,0.15); border: 1px solid #f1f3f4;
        text-align: center; animation: scaleUp 0.2s ease-out;
    }
    @keyframes scaleUp { from {transform:scale(0.9); opacity:0} to {transform:scale(1); opacity:1} }

    .m-icon { font-size: 40px; margin-bottom: 20px; color: #4285f4; }
    .m-title { font-size: 22px; font-weight: 600; margin-bottom: 15px; color: #202124; }
    .m-desc { font-size: 15px; color: #5f6368; line-height: 1.6; margin-bottom: 30px; }
    .m-btn {
        background: #1a73e8; color: white; border: none; padding: 10px 30px;
        border-radius: 30px; font-weight: 600; cursor: pointer; transition: background 0.2s;
    }
    .m-btn:hover { background: #1557b0; }

</style>
</head>
<body>

<div class="pro-wrapper">
    <h2 class="pro-title">Architecture: AI-Enhanced Ingestion</h2>

    <div class="circuit-grid">

        <!-- 1. Sources -->
        <div id="g-sources" class="node-card" onclick="openModalP('sources')">
            <div class="source-grid">
                <div class="mini-chip"><i class="fa-brands fa-salesforce"></i></div>
                <div class="mini-chip"><i class="fa-brands fa-shopify"></i></div>
                <div class="mini-chip"><i class="fa-brands fa-jira"></i></div>
                <div class="mini-chip"><i class="fa-brands fa-stripe"></i></div>
                <div class="mini-chip"><i class="fa-solid fa-database"></i></div>
                <div class="mini-chip">30+</div>
            </div>
            <div class="n-sub" style="margin-top:10px;">Data Sources</div>
        </div>

        <!-- 2. Arrow -->
        <div class="connector"><div class="line-h"></div><i class="fa-solid fa-chevron-right arrow-icon"></i></div>

        <!-- 3. Ingestion -->
        <div id="g-ingest" class="node-card c-ingest" onclick="openModalP('ingest')">
            <i class="fa-solid fa-gears n-icon"></i>
            <div class="n-title">Ingestion<br>Engine</div>
            <div class="n-sub">Cloud Run</div>
        </div>

        <!-- 4. The Fork (Splits Top/Bottom) -->
        <div class="connector" style="grid-row: 1 / span 2;">
            <div class="fork-branch fork-top"></div>
            <div class="fork-branch fork-btm"></div>
        </div>

        <!-- 5. Top Track: Raw -->
        <div id="g-raw" class="node-card c-raw" onclick="openModalP('raw')">
            <i class="fa-solid fa-database n-icon"></i>
            <div class="n-title">BigQuery<br>Raw</div>
            <div class="n-sub">Storage</div>
        </div>

        <!-- 6. Bottom Track: Agent -->
        <div id="g-agent" class="node-card c-agent" onclick="openModalP('agent')">
            <i class="fa-solid fa-sparkles n-icon"></i>
            <div class="n-title">Vertex AI<br>Agent</div>
            <div class="n-sub">Analysis</div>
        </div>

        <!-- 7. Connectors Middle -->
        <!-- Top Wire (Long Pass-Through) -->
        <div class="wire-long"><div class="line-h"></div></div>

        <!-- Bottom Wire (Agent -> Audit) -->
        <div class="connector" style="grid-column: 6; grid-row: 2;">
            <div class="line-h"></div><i class="fa-solid fa-chevron-right arrow-icon"></i>
        </div>

        <!-- 8. Audit Log -->
        <div id="g-audit" class="node-card c-audit" onclick="openModalP('audit')">
            <i class="fa-solid fa-list-check n-icon"></i>
            <div class="n-title">Audit<br>Log</div>
            <div class="n-sub">History</div>
        </div>

        <!-- 9. The Merge (Joins Top/Bottom) -->
        <div class="connector" style="grid-row: 1 / span 2;">
            <div class="merge-branch merge-top"></div>
            <div class="merge-branch merge-btm"></div>
        </div>

        <!-- 10. Dataform -->
        <div id="g-df" class="node-card c-df" onclick="openModalP('dataform')">
            <i class="fa-solid fa-wand-magic-sparkles n-icon"></i>
            <div class="n-title">Dataform</div>
            <div class="n-sub">Generator</div>
        </div>

    </div>
</div>

<!-- Modal -->
<div id="modalPro" class="modal-pro">
    <div class="modal-card">
        <div id="m-icon-pro" class="m-icon"></div>
        <div id="m-title-pro" class="m-title"></div>
        <div id="m-desc-pro" class="m-desc"></div>
        <button class="m-btn" onclick="closeModalP()">Got it</button>
    </div>
</div>

<script>
    const proContent = {
        sources: { icon: '<i class="fa-solid fa-server"></i>', title: "Disparate Data Sources", desc: "Managing 30+ APIs that change randomly is usually a nightmare. We treat them as a raw stream." },
        ingest: { icon: '<i class="fa-solid fa-gears"></i>', title: "Universal Ingestion", desc: "A single Cloud Run service acts as the traffic controller. It creates the 'Fork in the Road': sending data to storage AND sending samples to the AI Agent." },
        raw: { icon: '<i class="fa-solid fa-database"></i>', title: "Path A: BigQuery Raw", desc: "<b>The Fast Path.</b> Data lands here instantly without validation. This ensures the pipeline never 'crashes' on a schema change, because we capture everything." },
        agent: { icon: '<i class="fa-solid fa-sparkles"></i>', title: "Path B: Vertex AI Agent", desc: "<b>The Smart Path.</b> While data loads, the Agent inspects it using Gemini. It looks for <i>Semantic Drift</i> (e.g., '100' vs '100 USD') that strict SQL misses." },
        audit: { icon: '<i class="fa-solid fa-list-check"></i>', title: "Audit Log Table", desc: "The Agent records its findings here. This table becomes the 'Memory' of the system, allowing us to track drift over time." },
        dataform: { icon: '<i class="fa-solid fa-wand-magic-sparkles"></i>', title: "Context-Aware Automation", desc: "<b>The Synthesis.</b> Dataform reads the Audit Log before writing SQL. If the log says 'Drift Detected', Dataform writes 'Defensive SQL' to protect the final tables." }
    };

    window.openModalP = function(key) {
        const data = proContent[key];
        if (data) {
            document.getElementById('m-icon-pro').innerHTML = data.icon;
            document.getElementById('m-title-pro').innerHTML = data.title;
            document.getElementById('m-desc-pro').innerHTML = data.desc;
            document.getElementById('modalPro').style.display = 'flex';
        }
    }

    window.closeModalP = function() {
        document.getElementById('modalPro').style.display = 'none';
    }

    window.addEventListener('click', function(event) {
        const modal = document.getElementById('modalPro');
        if (event.target == modal) { window.closeModalP(); }
    });
</script>
</body>
</html>
"""
display(HTML(html_code_2))

In [None]:
# @title 6. Simulation: Mass Ingestion (Logging to BigQuery)
def simulate_mass_ingestion():
    sources = [
        "vendor_salesforce", "vendor_shopify", "vendor_zendesk", "vendor_netsuite",
        "vendor_hubspot", "vendor_stripe", "vendor_mailchimp", "vendor_jira",
        "vendor_google_ads", "vendor_facebook_ads", "vendor_shipping", "vendor_inventory"
    ]

    rows_to_insert = []
    log_entries = [] # Buffer for our audit log

    print("\n‚ö° SIMULATING MASS INGESTION (12 Parallel Streams)...")

    for source in sources:
        # Base Data
        data = { "id": random.randint(1000, 9999), "status": "active", "updated_at": datetime.now().isoformat() }

        # Inject CHAOS
        if random.choice([True, False]):
            drift_field = f"new_feature_{random.randint(1, 100)}"
            data[drift_field] = "unexpected_value"

            print(f"   ‚ö†Ô∏è  {source}: Drift Detected! Logging incident to BigQuery...")

            # Create a Formal Incident Log
            log_entries.append({
                "timestamp": datetime.now().isoformat(),
                "source_id": source,
                "severity": "LOW", # Chaos monkey is usually low severity
                "change_type": "NEW_FIELD",
                "description": f"Automated drift detection: Found new field '{drift_field}'",
                "suggested_action": "Review raw_json_blob for new column",
                "raw_record": json.dumps(data)
            })
        else:
            print(f"   ‚úÖ {source}: Stable.")

        rows_to_insert.append({
            "ingest_timestamp": datetime.now().isoformat(),
            "source_id": source,
            "payload": json.dumps(data)
        })

    # 1. Load Data
    errors = bq_client.insert_rows_json(f"{PROJECT_ID}.{DATASET_ID}.flexible_raw", rows_to_insert)

    # 2. Load Logs (If any drift occurred)
    if log_entries:
        log_errors = bq_client.insert_rows_json(f"{PROJECT_ID}.{LOG_TABLE_ID}", log_entries)
        if not log_errors:
            print(f"\nüìù Successfully logged {len(log_entries)} drift incidents to {LOG_TABLE_ID}")
        else:
            print(f"Error logging incidents: {log_errors}")

    if not errors:
        print(f"üéâ SUCCESS: All data ingested.")

simulate_mass_ingestion()

In [None]:
# @title 7. Smart Auto-Generate (Reading from BigQuery History)
def auto_generate_pipelines():
    print(f"ü§ñ STARTING SMART GENERATOR...")

    # 1. Query BigQuery for recent drift events
    print("   ‚Ä¢ Querying Schema Change Log for recent incidents...")
    drift_query = f"""
        SELECT DISTINCT source_id
        FROM `{PROJECT_ID}.{LOG_TABLE_ID}`
        WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
    """
    query_job = bq_client.query(drift_query)
    drifted_sources = [row.source_id for row in query_job]

    print(f"   ‚Ä¢ Found {len(drifted_sources)} sources with recent drift.\n")

    client = dataform_v1beta1.DataformClient()
    workspace_path = f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPO_ID}/workspaces/{WORKSPACE_ID}"

    sources = [
        "vendor_salesforce", "vendor_shopify", "vendor_zendesk", "vendor_netsuite",
        "vendor_hubspot", "vendor_stripe", "vendor_mailchimp", "vendor_jira",
        "vendor_google_ads", "vendor_facebook_ads", "vendor_shipping", "vendor_inventory"
    ]

    for source in sources:
        file_name = f"definitions/staging/stg_{source}.sqlx"

        # --- SMART LOGIC ---
        if source in drifted_sources:
            # DRIFT DETECTED: Generate "Defensive" SQL
            description = f"‚ö†Ô∏è ALERT: Drift detected (See {LOG_TABLE_ID}). Data encapsulated safely."
            drift_comment = f"-- ‚ö†Ô∏è WARNING: Drift detected for {source}. Check 'raw_json_blob' for new fields."

            # Visual Emphasize for the Demo
            status_msg = f"   ‚ö†Ô∏è  PATCHING DRIFT: {source} -> Added safety logic to {file_name}"
        else:
            # STABLE: Generate "Standard" SQL
            description = f"Standard staging view for {source}"
            drift_comment = "-- Schema is stable."

            # Visual De-emphasize (Gray text to show it's just maintenance)
            status_msg = f"   ‚úÖ  Verified Stable: {source}"

        sql_content = f"""
        config {{
            type: "view",
            schema: "{DATASET_ID}",
            tags: ["staging", "auto_generated"],
            description: "{description}"
        }}

        SELECT
            source_id,
            ingest_timestamp,
            JSON_VALUE(payload, '$.id') as record_id,
            JSON_VALUE(payload, '$.status') as status,

            {drift_comment}
            payload as raw_json_blob

        FROM `{PROJECT_ID}.{DATASET_ID}.flexible_raw`
        WHERE source_id = "{source}"
        """

        client.write_file(
            request={"workspace": workspace_path, "path": file_name, "contents": sql_content.encode("utf-8")}
        )
        print(status_msg)

    print("\n‚úÖ DONE: All pipelines synced. Drifted sources patched.")

auto_generate_pipelines()

In [None]:
# @title 8. Trigger Dataform Workflow
def trigger_workspace_pipeline():
    print(f"üöÄ Starting Dataform Trigger from Workspace: {WORKSPACE_ID}...")

    client = dataform_v1beta1.DataformClient()
    repo_path = f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPO_ID}"
    workspace_path = f"{repo_path}/workspaces/{WORKSPACE_ID}"

    # 1. Compile Code
    print("   ‚Ä¢ Step 1: Compiling workspace code...")
    compilation_result = client.create_compilation_result(
        parent=repo_path,
        compilation_result={
            "workspace": workspace_path,
            "code_compilation_config": { "default_schema": DATASET_ID }
        }
    )
    print(f"     ‚úÖ Compiled! ID: {compilation_result.name.split('/')[-1]}")

    # 2. Trigger Execution
    print("   ‚Ä¢ Step 2: Creating Workflow Invocation...")
    invocation = client.create_workflow_invocation(
        parent=repo_path,
        workflow_invocation={ "compilation_result": compilation_result.name }
    )

    print(f"\n‚úÖ SUCCESS! Pipeline is running.")
    print(f"üîó View Run Link: warning: doesn't always work): https://console.cloud.google.com/bigquery/dataform/locations/{REGION}/repositories/{REPO_ID}/details/workflows/{invocation.name.split('/')[-1]}?project={PROJECT_ID}")

trigger_workspace_pipeline()

In [None]:
# @title 9. Setup Agent & Audit Log Table
LOG_TABLE_ID = f"{DATASET_ID}.schema_change_log"

# Create the Audit Log Table in BigQuery
schema_log = [
    bigquery.SchemaField("timestamp", "TIMESTAMP"),
    bigquery.SchemaField("source_id", "STRING"),
    bigquery.SchemaField("severity", "STRING"),      # LOW, MEDIUM, CRITICAL
    bigquery.SchemaField("change_type", "STRING"),   # TYPE_CHANGE, NEW_FIELD, SEMANTIC_CHANGE
    bigquery.SchemaField("description", "STRING"),   # Human readable summary
    bigquery.SchemaField("suggested_action", "STRING"), # Agent's advice
    bigquery.SchemaField("raw_record", "JSON")
]

table_log = bigquery.Table(f"{PROJECT_ID}.{LOG_TABLE_ID}", schema=schema_log)
try:
    bq_client.create_table(table_log)
    print(f"‚úÖ Created Agent Audit Table: {LOG_TABLE_ID}")
except:
    print(f"‚ÑπÔ∏è  Audit Table already exists: {LOG_TABLE_ID}")

In [None]:
# @title 10. Define the "Schema Steward" Agent
from vertexai.generative_models import GenerativeModel, GenerationConfig

def analyze_drift_with_agent(source_id, new_record_json):
    """
    Uses Gemini to analyze a specific data record for schema anomalies.
    """
    model = GenerativeModel("gemini-2.5-flash")

    prompt = f"""
    You are a Senior Data Engineer AI. Audit this incoming JSON record from source '{source_id}' for drift.

    INCOMING RECORD:
    {new_record_json}

    RULES:
    - 'amount' fields must be numbers, NOT strings (e.g. "100 USD" is INVALID).
    - 'status' should be simple codes.
    - Look for unexpected nesting.

    Output VALID JSON:
    {{
        "severity": "LOW" | "MEDIUM" | "CRITICAL",
        "change_type": "NEW_FIELD" | "TYPE_CHANGE" | "SEMANTIC_CHANGE",
        "description": "Short explanation",
        "suggested_action": "Actionable advice"
    }}
    """

    response = model.generate_content(
        prompt,
        generation_config=GenerationConfig(response_mime_type="application/json")
    )

    try:
        return json.loads(response.text)
    except:
        return None

print("‚úÖ Schema Steward Agent Loaded.")

In [None]:
# @title 11. Execute Agent Analysis
# 1. Simulate a "Semantic Change" (Type Change)
complex_drift_record = {
    "id": 9901,
    "transaction_date": "2023-11-01",
    "amount": "150.00 USD",  # <--- THE PROBLEM (String instead of Float)
    "meta": { "risk_score": 0.05, "region": "NA" }
}

source = "vendor_stripe"
print(f"‚ö° Analyzing incoming stream for: {source}...")

# 2. Ask the Agent to Analyze
agent_insight = analyze_drift_with_agent(source, json.dumps(complex_drift_record))

if agent_insight:
    print("\nü§ñ AGENT INSIGHT GENERATED:")
    print(json.dumps(agent_insight, indent=2))

    # 3. Store the Insight in BigQuery
    row_to_log = [{
        "timestamp": datetime.now().isoformat(),
        "source_id": source,
        "severity": agent_insight.get("severity"),
        "change_type": agent_insight.get("change_type"),
        "description": agent_insight.get("description"),
        "suggested_action": agent_insight.get("suggested_action"),
        "raw_record": json.dumps(complex_drift_record)
    }]

    bq_client.insert_rows_json(f"{PROJECT_ID}.{LOG_TABLE_ID}", row_to_log)
    print(f"\n‚úÖ Insight logged to BigQuery for historical tracking.")