<div align="center">

# Human-in-the-Loop (HITL) Agentic Medallion Pipeline

_An advanced data transformation workflow leveraging a multi-agent system to automate ETL development with robust human oversight._

This project demonstrates a sophisticated data engineering pipeline that utilizes a team of specialized AI agents to automate the creation of PySpark transformations within a Medallion architecture. The agents collaboratively handle the entire development lifecycle—from planning and coding to testing—for processing data through Bronze, Silver, and Gold layers.

The core principle is **Human-in-the-Loop (HITL)**. Instead of executing code directly, the final agent action is to commit the generated PySpark logic and its corresponding `pytest` suite to a Git repository and open a pull request. This workflow ensures that all AI-generated code is subject to mandatory human review and approval before being merged and deployed, blending the speed of automation with the quality and control of expert oversight.

</div>

### Cell 1: Installing Dependencies
This cell uses the `%pip` magic command to install necessary Python libraries. Key libraries include `langchain` for building the agentic workflow, `databricks-langchain` for integration with Databricks, `langgraph` for creating the stateful graph, `gitpython` for programmatic Git operations, `pytest` for testing the generated code, and `requests` for making API calls to GitHub. `dbutils.library.restartPython()` is then called to ensure the newly installed libraries are available in the notebook's environment.

In [0]:
# Added gitpython, pytest, and requests for version control, testing, and API calls.
%pip install langchain langchain-community langchain-core databricks-langchain langgraph playwright Pillow gitpython pytest requests

dbutils.library.restartPython()

### Cell 2: Configuring LangSmith Tracing
This cell sets up environment variables to configure LangSmith, a platform for monitoring and debugging LangChain applications. It enables tracing (`LANGCHAIN_TRACING_V2`), sets the API endpoint, and requires a user-provided API key. A project name is also set to organize the traces, which is crucial for observing the complex interactions between agents and tools in the pipeline.

In [0]:
import os

# No changes here. LangSmith is crucial for tracing this more complex workflow.
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = " "  # <--- REPLACE WITH YOUR KEY
os.environ["LANGCHAIN_PROJECT"] = "Databricks - Medallion Pipeline v2.1 (Rejection)"

print(f"LangSmith configured. Project: '{os.environ['LANGCHAIN_PROJECT']}'")

### Cell 3: Git and GitHub Configuration
This cell establishes the configuration parameters required for the agent to interact with a GitHub repository. It defines the local path to the git repository (`GIT_REPO_PATH`), and sets up placeholder variables for a GitHub Personal Access Token (`GITHUB_API_TOKEN`), the repository owner, and the repository name. These variables are essential for the agent's tool that creates pull requests.


In [0]:
import os

# --- Configuration ---
# This path should now simply point to the root of the current repository.
# In Databricks, you can often use a relative path.
GIT_REPO_PATH = "." 

# These variables are still needed for the GitHub API call to create the PR.
GITHUB_API_TOKEN = "" # Replace with a secret in production
GITHUB_REPO_OWNER = "your github username"
GITHUB_REPO_NAME = "HITL"

# The user name and email are no longer needed here, as Databricks will handle it.
GIT_USERNAME = "Your github username"
GIT_EMAIL = "Your Github linked email"

print(f"Git operations will run in the context of the current repository: '{os.path.abspath(GIT_REPO_PATH)}'")
print("Username and email are configured in Databricks User Settings.")

### Cell 4: Importing Core Libraries
This cell imports all the necessary Python classes, functions, and modules that will be used throughout the notebook. This includes components from `langchain` for building the AI agents, `pyspark` for data manipulation, `git` for version control, and standard libraries like `os`, `json`, and `datetime` for various utility tasks.

In [0]:
# Import necessary classes and functions
from langchain_community.chat_models import ChatDatabricks
from langchain_core.messages import HumanMessage, AIMessage
from langchain.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode

from typing import Annotated, TypedDict, List, Optional
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when, lit, md5, concat_ws, expr, to_date, upper, sum as _sum, avg as _avg, count as _count, date_format, year, month, current_timestamp, to_json, struct
from pyspark.sql.window import Window
from pyspark.sql.types import *
import json
import time
from datetime import datetime
import random
import subprocess
import git
import requests
from IPython.display import Image, display

print("All dependencies imported successfully.")

### Cell 5: Initializing Spark and Databases
This cell handles the setup of the core data processing environment. It initializes the `SparkSession`, which is the entry point for any Spark functionality. It also configures the Large Language Model (LLM) using `ChatDatabricks`. Crucially, it creates the SQL databases for the Medallion architecture (`ops_bronze`, `ops_silver`, `ops_gold`) and a centralized `Rejected` database, which includes a `rejected_rows` table to store data that fails validation at any stage.


In [0]:
def log_event(level, source, message):
    colors = {"AGENT": "\033[94m", "TOOL": "\033[93m", "INFO": "\033[92m", "ERROR": "\033[91m", "GRAPH": "\033[95m"}
    reset_color = "\033[0m"
    timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
    print(f"{timestamp} | {colors.get(level, '')}{level:<7}{reset_color} | {colors.get(level, '')}[{source}]{reset_color} {message}")

spark = SparkSession.builder.appName("AgenticDataPipeline").getOrCreate()
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-1-405b-instruct", temperature=0.0, max_tokens=4096)

spark.sql("CREATE DATABASE IF NOT EXISTS ops_bronze")
spark.sql("CREATE DATABASE IF NOT EXISTS ops_silver")
spark.sql("CREATE DATABASE IF NOT EXISTS ops_gold")
spark.sql("CREATE DATABASE IF NOT EXISTS Rejected") # Centralized DB for all rejected data

# A structured table to hold all rejected rows from any pipeline step
spark.sql("""
CREATE TABLE IF NOT EXISTS Rejected.rejected_rows (
    rejection_timestamp TIMESTAMP,
    pipeline_step STRING,
    source_table STRING,
    rejection_reason STRING,
    data_payload STRING
)
""")

log_event("INFO", "Setup", "Environment initialized with central 'Rejected' database.")

### Cell 6: Generating Bronze Layer Data
This function, `create_bronze_data()`, synthesizes raw data to populate the `ops_bronze` tables. It intentionally introduces common data quality issues such as null values, duplicates, and incorrect formats across customer, transaction, account, and opportunity tables. This simulated raw data serves as the starting point for the agentic ETL pipeline to process.


In [0]:
def create_bronze_data():
    log_event("INFO", "DataGen", "Generating Bronze layer data...")
    
    customer_data = []
    for i in range(1, 101):
        name = f"FName{i} LName{i}" if random.random() > 0.1 else None
        email = f"fname{i}.lname{i}@email.com" if random.random() > 0.1 else "invalid-email"
        address = f"{i*10} Main St" if random.random() > 0.05 else None
        customer_data.append((i, name, email, address, f"2023-01-{random.randint(10, 28)}"))
    
    customer_data.extend([
        (1, 'John Doe', 'john.doe@email.com', '123 Elm St', '2023-01-15'), 
        (3, 'Peter Jones', 'peter.jones.dup@email.com', '789 Pine Ln', '2023-01-17')
    ])
    spark.createDataFrame(customer_data, ["customer_id", "name", "email", "address", "join_date"]).write.mode("overwrite").saveAsTable("ops_bronze.customers_raw")

    transactions_data = []
    for i in range(1, 201):
        cust_id = random.randint(1, 110)
        qty = random.randint(-5, 50)
        amount = f"${random.uniform(5, 1000):.2f}" if random.random() > 0.05 else None
        transactions_data.append((100+i, cust_id, qty, amount, f"2023-02-{random.randint(1, 28)}"))
    # Add a transaction for a customer who won't exist in the cleaned customer table
    transactions_data.append((999, 105, 10, "$50.00", "2023-02-20"))
    spark.createDataFrame(transactions_data, ["transaction_id", "customer_id", "quantity", "amount", "transaction_date"]).write.mode("overwrite").saveAsTable("ops_bronze.transactions_raw")

    accounts_data = [('ACC{:03d}'.format(i), f'GlobalCorp-{i}', random.choice(['Technology', 'Healthcare', 'Finance', 'TECH', None]), random.choice(['USA', 'UK', 'Germany'])) for i in range(1, 51)]
    accounts_data.append(('ACC001', 'Global Corporation', 'Tech', 'USA'))
    spark.createDataFrame(accounts_data, ["account_id", "account_name", "industry", "region"]).write.mode("overwrite").saveAsTable("ops_bronze.accounts_raw")
    
    opportunities_data = [(f'OPP{i:03d}', f"ACC{random.randint(1, 55):03d}", random.randint(-1000, 200000), '2024-07-15', random.choice(['Closed Won', 'Negotiation', 'Proposal', 'Qualification', 'Closed Lost'])) for i in range(1, 151)]
    spark.createDataFrame(opportunities_data, ["opportunity_id", "account_id", "value", "close_date", "stage"]).write.mode("overwrite").saveAsTable("ops_bronze.opportunities_raw")
    
    log_event("INFO", "DataGen", "Bronze data creation complete")
    display(spark.sql("SHOW TABLES IN ops_bronze"))

create_bronze_data()

### Cell 7: Defining Agent Tools
This cell defines the set of tools the AI agents can use to perform actions. Each tool is a Python function decorated with `@tool`, which makes it discoverable by the LangChain agent. The tools include `get_table_info` to profile data, `run_pytest_tests` to validate generated code, `create_pull_request` to interact with GitHub, and `Notesbook_visualization` for dashboarding.


In [0]:
@tool
def get_table_info(table_name: str) -> str:
    """Provides comprehensive table profiling including schema, statistics, and data preview for transformation planning."""
    try:
        log_event("TOOL", "get_table_info", f"Profiling: {table_name}")
        df = spark.table(table_name)
        schema_str = "\n".join([f"- {field.name}: {str(field.dataType)}" for field in df.schema.fields])
        count = df.count()
        preview = df.limit(3).toPandas().to_string()
        return f"TABLE: {table_name}\nROW_COUNT: {count}\n\nSCHEMA:\n{schema_str}\n\nSAMPLE_DATA:\n{preview}"
    except Exception as e:
        return f"ERROR: {str(e)}"

@tool
def run_pytest_tests(code: str, tests: str) -> str:
    """Executes a pytest suite against provided PySpark code to ensure correctness. Returns success or failure with logs."""
    log_event("TOOL", "run_pytest_tests", "Validating generated code with pytest...")
    try:
        # Write code and tests to temporary files to be executed
        code_path = os.path.join(GIT_REPO_PATH, "temp_transform.py")
        test_path = os.path.join(GIT_REPO_PATH, "test_temp_transform.py")
        with open(code_path, "w") as f:
            f.write(code)
        with open(test_path, "w") as f:
            f.write(tests)

        # Execute pytest using a subprocess to isolate the run
        result = subprocess.run(
            ["pytest", test_path],
            capture_output=True,
            text=True,
            cwd=GIT_REPO_PATH
        )

        # Clean up temporary files
        os.remove(code_path)
        os.remove(test_path)

        if result.returncode == 0:
            log_event("TOOL", "run_pytest_tests", "SUCCESS: All tests passed.")
            return json.dumps({"status": "success", "report": result.stdout})
        else:
            log_event("ERROR", "run_pytest_tests", f"FAILURE: Tests failed. Return code: {result.returncode}")
            return json.dumps({"status": "error", "error": result.stdout + result.stderr})
    except Exception as e:
        error_msg = f"PYTEST_EXECUTION_ERROR: {str(e)}"
        log_event("ERROR", "run_pytest_tests", error_msg)
        return json.dumps({"status": "error", "error": error_msg})

@tool
def create_pull_request(task_name: str, code: str, tests: str, plan: str) -> str:
    """Creates a new branch, commits the code and tests, and opens a GitHub pull request for human review."""
    log_event("TOOL", "create_pull_request", f"Starting Git workflow for: {task_name}")
    if not GITHUB_API_TOKEN:
        return "ERROR: GitHub API Token is not configured. Cannot create pull request."
    try:
        repo = git.Repo(GIT_REPO_PATH)
        
        # The config is now handled by Databricks User Settings, so these lines are not needed
        # repo.config_writer().set_value("user", "name", GIT_USERNAME).release()
        # repo.config_writer().set_value("user", "email", GIT_EMAIL).release()

        # Create a unique branch name
        branch_name = f"feature/{task_name.lower().replace(' ', '-').replace(':', '')}-{int(time.time())}"
        
        repo.git.checkout('HEAD', b=branch_name)
        log_event("TOOL", "Git", f"Created and checked out new branch: {branch_name}")

        # Create file paths in designated directories
        transform_dir = os.path.join(GIT_REPO_PATH, "transforms")
        test_dir = os.path.join(GIT_REPO_PATH, "tests")
        os.makedirs(transform_dir, exist_ok=True)
        os.makedirs(test_dir, exist_ok=True)
        
        file_name = f"{task_name.split('→')[0].split(':')[-1].strip().lower().replace(' ', '_')}.py"
        code_path = os.path.join(transform_dir, file_name)
        test_path = os.path.join(test_dir, f"test_{file_name}")

        with open(code_path, "w") as f: f.write(code)
        with open(test_path, "w") as f: f.write(tests)
        
        repo.index.add([code_path, test_path])
        repo.index.commit(f"feat: Implement transformation for '{task_name}'")
        log_event("TOOL", "Git", "Committed new code and test files.")

        repo.git.push('--set-upstream', 'origin', branch_name)
        log_event("TOOL", "Git", "Pushed new branch to remote origin.")

        # Create Pull Request via GitHub API
        pr_title = f"Agent-Generated Transformation: {task_name}"
        pr_body = f"### Original Task\n{task_name}\n\n### Agent's Plan\n```\n{plan}\n```\n\nThis PR was generated automatically by the Databricks Agentic Pipeline. Please review the code and tests before merging."
        
        headers = {"Authorization": f"token {GITHUB_API_TOKEN}", "Accept": "application/vnd.github.v3+json"}
        data = {"title": pr_title, "head": branch_name, "base": "main", "body": pr_body}
        url = f"https://api.github.com/repos/{GITHUB_REPO_OWNER}/{GITHUB_REPO_NAME}/pulls"
        response = requests.post(url, headers=headers, json=data)

        if response.status_code == 201:
            pr_url = response.json()['html_url']
            log_event("TOOL", "GitHub", f"SUCCESS: Pull Request created at {pr_url}")
            repo.git.checkout('main')
            return f"SUCCESS: Pull Request created at {pr_url}"
        else:
            error_msg = f"GitHub API Error: {response.status_code} - {response.text}"
            log_event("ERROR", "GitHub", error_msg)
            return error_msg
            
    except Exception as e:
        error_msg = f"GIT_PR_ERROR: {str(e)}"
        log_event("ERROR", "Git", error_msg)
        return error_msg

@tool
def create_notebook_visualization(table_name: str, plot_type: str, x_col: str, y_col: str, title: str) -> str:
    """Creates dashboard visualizations. To be used after transformations are merged and executed."""
    try:
        log_event("TOOL", "create_visualization", f"Creating: {title}")
        df = spark.table(table_name)
        if plot_type == 'bar' and 'Top' in title:
            df_display = df.orderBy(col(y_col).desc()).limit(5)
        else:
            df_display = df.orderBy(x_col)
        print(f"\n=== DASHBOARD COMPONENT: {title} ===")
        display(df_display)
        return f"SUCCESS: '{title}' visualization created and displayed"
    except Exception as e:
        return f"VISUALIZATION_ERROR: {str(e)}"

all_tools = [get_table_info, run_pytest_tests, create_pull_request, create_notebook_visualization]
tool_node = ToolNode(all_tools)

print("All agent tools defined successfully.")

### Cell 8: Defining the Agentic Workflow and State
This cell contains the core logic for the agentic system. It defines the `AgentState` class to manage the flow of information between steps. It then defines the different agents (`planner_agent`, `code_generator_agent`, `code_reviewer_agent`) and the decision-making logic (`after_review_decider`, `after_testing_decider`). Finally, it assembles these components into a `StateGraph` from the `langgraph` library, compiling the complete, multi-agent workflow.

In [0]:
class AgentState(TypedDict):
    messages: Annotated[list, lambda x, y: x + y]
    current_task: str
    plan: Optional[str]
    pyspark_code: Optional[str]
    pyspark_tests: Optional[str]
    review_feedback: Optional[str]
    test_results: Optional[str]
    retry_count: int

def planner_agent(state: AgentState):
    log_event("AGENT", "Planner", "Creating transformation strategy and test plan")
    system_prompt = """You are an expert data architect. Your job is to create a detailed, step-by-step technical plan for a PySpark transformation.
    CRITICAL: You must also define a testing strategy. Describe the specific unit tests needed to validate the logic, including edge cases, null handling, and expected outputs."""
    
    response = llm.invoke([HumanMessage(content=f"{system_prompt}\n\nTASK: {state['current_task']}")])
    return {"messages": [AIMessage(content=response.content, name="PlannerAgent")], "plan": response.content}

def code_generator_agent(state: AgentState):
    log_event("AGENT", "CodeGenerator", "Generating PySpark code and pytest suite with rejection handling")
    plan = state['plan']
    context_prompt = ""
    if state.get('review_feedback') and "APPROVED" not in state['review_feedback'].upper():
        context_prompt = f"\n\nREVIEW FEEDBACK TO ADDRESS:\n{state['review_feedback']}"
    elif state.get('test_results'):
        test_feedback = json.loads(state['test_results'])
        if test_feedback.get('status') == 'error':
            context_prompt = f"\n\nTESTS FAILED. FIX THE CODE. ERROR LOG:\n{test_feedback.get('error')}"

    system_prompt = """You are a senior PySpark developer specializing in robust, production-grade pipelines. Your task is to generate two distinct code blocks based on the plan.

    **CRITICAL REQUIREMENT: REJECTION HANDLING**
    For ANY transformation, you MUST capture all records that are filtered, dropped, or fail validation and write them to the `Rejected.rejected_rows` table. Do not silently drop data.

    - **For Cleansing/Validation (Bronze -> Silver):**
        - First, create a temporary view of the source data.
        - Add columns to flag validation errors (e.g., `is_valid_email`, `has_positive_quantity`).
        - Create a `reasons` array column concatenating all rejection reasons for a given row.
        - Write all rows with one or more rejection reasons to `Rejected.rejected_rows`.
        - The final transformation should only write the clean rows to the target Silver table.
    - **For Joins/Aggregations (Silver -> Gold):**
        - When joining tables (e.g., transactions to customers), you MUST capture records that do not find a match.
        - Perform a **LEFT ANTI JOIN** to identify these "orphan" records from the left table.
        - Write these orphan records to `Rejected.rejected_rows` with a clear reason (e.g., 'NO_MATCHING_CUSTOMER_RECORD').
    - **Structure for Rejected Data:** Use the following format for writing to `Rejected.rejected_rows`:
        - `rejection_timestamp`: `current_timestamp()`
        - `pipeline_step`: A string literal describing the task (e.g., 'BRONZE_TO_SILVER_CUSTOMERS').
        - `source_table`: The name of the source table.
        - `rejection_reason`: The specific reason for failure.
        - `data_payload`: The entire rejected row converted to a JSON string using `to_json(struct(*))`.

    **OUTPUT FORMAT:**
    1.  A Python code block with the complete PySpark transformation logic, including the rejection handling described above.
    2.  A Python code block with a complete `pytest` test suite. The tests must validate both the successful transformation AND the rejection logic.

    Separate the two code blocks with the exact string `---PYTEST_SEPARATOR---`.
    """
    user_prompt = f"IMPLEMENTATION PLAN:\n{plan}{context_prompt}"

    response = llm.invoke([HumanMessage(content=f"{system_prompt}\n\n{user_prompt}")])
    
    if "---PYTEST_SEPARATOR---" in response.content:
        code, tests = response.content.split("---PYTEST_SEPARATOR---", 1)
    else: 
        code = response.content
        tests = "# Could not generate tests. Please review."

    clean_code = code.strip().replace("```python", "").replace("```", "").strip()
    clean_tests = tests.strip().replace("```python", "").replace("```", "").strip()
    
    return {
        "pyspark_code": clean_code, 
        "pyspark_tests": clean_tests, 
        "test_results": None, 
        "review_feedback": None
    }

def code_reviewer_agent(state: AgentState):
    log_event("AGENT", "CodeReviewer", "Reviewing code, tests, and rejection logic")
    system_prompt = """You are a senior data engineering QA lead. Review the provided PySpark code AND the `pytest` test suite.
    
    REVIEW CRITERIA:
    - Code correctness and adherence to the plan.
    - **Crucially, does the code correctly implement the rejection handling logic for all specified cases (validation errors, failed joins)?**
    - Test coverage: Do the tests adequately cover both successful transformations and the rejection logic?
    - Production readiness and best practices.
    
    RESPONSE FORMAT:
    - If all criteria are met: respond with exactly "APPROVED".
    - Otherwise: respond with "REJECTION_REASON: [specific, actionable feedback]".
    """
    user_prompt = f"ORIGINAL PLAN:\n{state['plan']}\n\nPYSPARK CODE:\n```python\n{state['pyspark_code']}\n```\n\nPYTEST SUITE:\n```python\n{state['pyspark_tests']}\n```"

    response = llm.invoke([HumanMessage(content=f"{system_prompt}\n\n{user_prompt}")])
    return {"review_feedback": response.content}

def prepare_for_testing(state: AgentState):
    log_event("AGENT", "TestPrep", "Preparing to run automated tests")
    return {"messages": [AIMessage(content="", tool_calls=[{
        'name': 'run_pytest_tests', 
        'args': {'code': state['pyspark_code'], 'tests': state['pyspark_tests']}, 
        'id': f'test_{datetime.now().isoformat()}'
    }])]}

def prepare_for_pr(state: AgentState):
    log_event("AGENT", "PRPrep", "Preparing to create pull request")
    task_name = state['current_task'].strip().splitlines()[0]
    return {"messages": [AIMessage(content="", tool_calls=[{
        'name': 'create_pull_request',
        'args': {
            'task_name': task_name,
            'code': state['pyspark_code'],
            'tests': state['pyspark_tests'],
            'plan': state['plan']
        },
        'id': f'pr_{datetime.now().isoformat()}'
    }])]}

def after_review_decider(state: AgentState):
    feedback = state.get('review_feedback', '')
    if "APPROVED" in feedback.upper():
        log_event("GRAPH", "Router", "Code approved → Running Tests")
        return "prepare_for_testing"
    else:
        log_event("GRAPH", "Router", f"Code rejected → Revision (attempt {state.get('retry_count', 0) + 1})")
        return "revise_code"

def after_testing_decider(state: AgentState):
    last_message = state['messages'][-1]
    test_results_json = last_message.content
    test_results = json.loads(test_results_json)

    # This state update is now handled by the 'revise_code' node
    # state['test_results'] = test_results_json 
    
    if test_results.get("status") == "success":
        log_event("GRAPH", "Router", "Tests passed → Creating Pull Request")
        return "prepare_for_pr"
    # NEW: Check the retry count here before deciding to revise again
    elif state.get('retry_count', 0) < 3:
        log_event("GRAPH", "Router", "Tests failed → Revising Code")
        # Route to 'revise_code' to increment the counter correctly
        return "revise_code"
    else:
        log_event("ERROR", "Router", "Max retries exceeded on test failures → Abort")
        return END

workflow = StateGraph(AgentState)
workflow.add_node("planner", planner_agent)
workflow.add_node("code_generator", code_generator_agent)
workflow.add_node("code_reviewer", code_reviewer_agent)
workflow.add_node("revise_code", lambda state: {"retry_count": state.get('retry_count', 0) + 1, "messages": []})
workflow.add_node("prepare_for_testing", prepare_for_testing)
workflow.add_node("test_runner", ToolNode([run_pytest_tests]))
workflow.add_node("prepare_for_pr", prepare_for_pr)
workflow.add_node("pr_creator", ToolNode([create_pull_request]))

workflow.set_entry_point("planner")
workflow.add_edge("planner", "code_generator")
workflow.add_edge("code_generator", "code_reviewer")
workflow.add_conditional_edges("code_reviewer", after_review_decider, {
    "prepare_for_testing": "prepare_for_testing",
    "revise_code": "revise_code"
})
workflow.add_edge("revise_code", "code_generator")
workflow.add_edge("prepare_for_testing", "test_runner")
workflow.add_conditional_edges("test_runner", after_testing_decider, {
    "prepare_for_pr": "prepare_for_pr",
    "revise_code": "revise_code", # Corrected: Route to revise_code on failure
    "__end__": END
})
workflow.add_edge("prepare_for_pr", "pr_creator")
workflow.add_edge("pr_creator", END)

app = workflow.compile()
log_event("INFO", "Setup", "HITL-enabled LangGraph pipeline compiled successfully.")

### Cell 9: Visualizing the Agentic Workflow
This cell generates a Mermaid diagram script for the compiled agentic workflow. The `app.get_graph().draw_mermaid()` function inspects the compiled graph object and outputs a text-based representation. When this cell is run in a Databricks notebook, the output is automatically rendered as a visual flowchart, making it easy to understand the relationships and paths between the different nodes (agents and tools) in the graph.


In [0]:
# This is the most reliable method as it has no external dependencies.
# The notebook will render the text output into a visual diagram.

print("--- Agentic Workflow Graph (Mermaid Diagram) ---")
mermaid_diagram = app.get_graph().draw_mermaid()
print(mermaid_diagram)

### Cell 10: Executing Bronze-to-Silver Pipeline Tasks
This cell defines the main execution loop for the pipeline. The `execute_pipeline_task` function takes a natural language task description, initializes the agentic workflow with that task, and streams its execution. It then defines a list of "Bronze to Silver" transformation tasks (customer cleansing and transaction standardization) and iterates through them, triggering the agentic workflow for each one to generate and propose a solution via a pull request.


In [0]:
def execute_pipeline_task(task_description):
    task_name = task_description.strip().splitlines()[0][:50]
    log_event("INFO", "Pipeline", f"INITIATING TASK: {task_name}")
    
    initial_state = {
        "messages": [], 
        "current_task": task_description, 
        "retry_count": 0
    }
    
    final_state = {}
    try:
        # Stream the workflow to get the final state
        for state_update in app.stream(initial_state, {"recursion_limit": 25}):
            node_name = list(state_update.keys())[0]
            log_event("GRAPH", "Flow", f"Completed: {node_name}")
            final_state = state_update
        
        # NEW: Check if the workflow succeeded before trying to get the PR message
        if 'pr_creator' in final_state:
            pr_message = final_state.get('pr_creator', {}).get('messages', [{}])[-1].content
            log_event("INFO", "Pipeline", f"SUCCESS: HITL workflow complete for {task_name}. Result: {pr_message}")
        else:
            log_event("ERROR", "Pipeline", f"Workflow for '{task_name}' did not complete successfully and aborted before creating a PR.")

    except Exception as e:
        log_event("ERROR", "Pipeline", f"An unexpected error occurred during workflow for task '{task_name}': {str(e)}")
    
    time.sleep(1)

log_event("INFO", "PIPELINE", "===== BRONZE → SILVER TRANSFORMATIONS (GENERATING PULL REQUESTS) =====")

bronze_to_silver_transformations = [
    """CUSTOMER DATA CLEANSING: ops_bronze.customers_raw → ops_silver.customers_cleaned
    
    REQUIREMENTS:
    - Remove duplicate customer_id records (keep first occurrence).
    - Fill null 'name' values with 'Unknown Customer'.
    - Validate email format using regex pattern '.+@.+\\..+'.
    - Convert join_date string to proper DateType.
    - **REJECTION HANDLING**: Any row with an invalid email OR a null join_date must be written to `Rejected.rejected_rows` with a clear reason.
    - The final silver table should only contain the clean, valid rows.""",
    
    """TRANSACTION DATA STANDARDIZATION: ops_bronze.transactions_raw → ops_silver.transactions_cleaned
    
    REQUIREMENTS:
    - Deduplicate on transaction_id (keep first record).
    - Clean amount field: remove '$' prefix and convert to Decimal(10,2).
    - Convert transaction_date to DateType.
    - **REJECTION HANDLING**: Write any row to `Rejected.rejected_rows` if it has a negative quantity, a null/zero amount, OR if customer_id > 100."""
]

for task in bronze_to_silver_transformations:
    execute_pipeline_task(task)

### Cell 11: Executing Silver-to-Gold Pipeline Tasks
This cell continues the pipeline by defining and executing the "Silver to Gold" aggregation tasks. Following the same pattern as the previous cell, it provides a high-level task description for creating a `customer_spending` analytics table. It then calls the `execute_pipeline_task` function, which once again invokes the full agentic workflow to plan, code, test, and create a pull request for this aggregation logic.


In [0]:
log_event("INFO", "PIPELINE", "===== SILVER → GOLD AGGREGATIONS (GENERATING PULL REQUESTS) =====")

silver_to_gold_aggregations = [
    """CUSTOMER SPENDING ANALYTICS: Create ops_gold.customer_spending
    
    REQUIREMENTS:
    - Join `ops_silver.transactions_cleaned` with `ops_silver.customers_cleaned` on customer_id.
    - Group by customer_id and customer name to calculate total_spent, total_transactions, etc.
    - **REJECTION HANDLING**: Any transaction from `ops_silver.transactions_cleaned` that does not have a matching customer in `ops_silver.customers_cleaned` must be found using a LEFT ANTI JOIN and written to `Rejected.rejected_rows` with the reason 'NO_MATCHING_CUSTOMER'.""",
]

for task in silver_to_gold_aggregations:
    execute_pipeline_task(task)

### Cell 12: Pausing for Human-in-the-Loop (HITL) Action
This cell marks the critical hand-off point from the automated agent to a human reviewer. It prints a clear message informing the user that the agent's work is complete and that pull requests have been created in GitHub. It provides explicit next steps, instructing the user to navigate to their repository, review the generated code, and merge the PRs. This step is the "Human-in-the-Loop" part of the process, ensuring oversight before any code is deployed.


In [0]:
log_event("INFO", "PIPELINE", "===== HUMAN ACTION REQUIRED =====")
print("\n" + "="*60)
print("  All agentic workflows are complete.")
print("  Pull Requests have been generated in your GitHub repository.")
print("\n  NEXT STEPS:")
print("  1. Go to your repository: " + GIT_REPO_URL + "/pulls")
print("  2. Review, comment on, and approve the generated PRs.")
print("  3. Merge the approved PRs into your main branch.")
print("  4. A separate CI/CD pipeline (e.g., GitHub Actions, Azure DevOps)")
print("     should then trigger to execute the new code against your Databricks workspace.")
print("="*60)
print("\nThe following cells are for validating the data AFTER that human-led process is complete.")

### Cell 13: Post-Merge Validation and Dashboarding
This cell is designed to be run *after* the human has reviewed and merged the pull requests, and a separate CI/CD process has executed the new transformations. It simulates a business intelligence (BI) step by prompting another agent to use the `Notesbook_visualization` tool to build a dashboard. It also runs validation queries against the final Gold and Rejected tables to confirm that the pipeline ran as expected and to inspect the final outputs.


In [0]:
dashboard_prompt = """You are a Business Intelligence specialist. Create executive dashboard visualizations using the create_notebook_visualization tool.

DASHBOARD REQUIREMENTS:
1. Customer Analysis: Top 5 customers by total spending (bar chart from ops_gold.customer_spending)
2. Revenue Trends: Monthly revenue progression over time (line chart from ops_gold.monthly_sales_summary)

Execute both visualizations to complete the executive dashboard."""

bi_agent = llm.bind_tools(all_tools)
dashboard_response = bi_agent.invoke(dashboard_prompt)

if dashboard_response.tool_calls:
    for tool_call in dashboard_response.tool_calls:
        tool_function = {t.name: t for t in all_tools}[tool_call['name']]
        result = tool_function.invoke(tool_call['args'])
        log_event("INFO", "Dashboard", f"Visualization created: {tool_call['args'].get('title', 'Unknown')}")
else:
    log_event("ERROR", "Dashboard", "Failed to generate dashboard visualizations")

log_event("INFO", "VALIDATION", "===== FINAL DATA VALIDATION =====")

validation_tables = [
    "ops_gold.customer_spending",
    "ops_gold.monthly_sales_summary",
    "Rejected.rejected_rows" # Also validate the rejected table
]

for table in validation_tables:
    try:
        print(f"\n=== FINAL VALIDATION: {table} ===")
        df = spark.table(table)
        print(f"Record Count: {df.count()}")
        if "customer_spending" in table:
            display(df.orderBy(col("total_spent").desc()).limit(10))
        elif "monthly_sales" in table:
            display(df.orderBy("month").limit(12))
        else: # For rejected_rows
             display(df.orderBy(col("rejection_timestamp").desc()).limit(10))
    except Exception as e:
        log_event("ERROR", "Validation", f"Failed to validate {table}: {str(e)}")

log_event("INFO", "COMPLETION", "===== MEDALLION PIPELINE EXECUTION COMPLETE =====")

### Cell 14: Final Success Message
This final cell provides a concluding summary of the process. It prints a success message to confirm that the agentic workflows for all transformation tasks were initiated successfully. It reiterates that pull requests are ready for review and concisely summarizes the end-to-end workflow, from PR review to the final validation of the Gold and Rejected tables.

In [0]:
print("\n🎉 SUCCESS: Databricks Medallion Architecture HITL pipeline initiated successfully!")
print("✅ All Bronze → Silver → Gold transformation workflows completed.")
print("✅ Pull Requests are waiting for your review in GitHub.")
print("➡️ Next Steps: Review PRs -> Merge -> CI/CD Execution -> Validate Gold & Rejected Tables.")