In [0]:
# Install the core LangChain and LangGraph libraries plus the keyless search library
%pip install -U langgraph langchain-community duckduckgo-search
dbutils.library.restartPython()

In [0]:
import os
import json
from langchain_community.chat_models import ChatDatabricks
from langchain_community.tools import DuckDuckGoSearchResults

# 1. Initialize the Databricks-hosted Claude model
# Using the specific endpoint name provided: databricks-claude-sonnet-4-5
llm = ChatDatabricks(
    endpoint="databricks-claude-sonnet-4-5",
    extra_params={"temperature": 0, "max_tokens": 1000}
)

# 2. Initialize DuckDuckGo (Keyless Search)
search_tool = DuckDuckGoSearchResults()

print("Model and search tool initialized successfully.")

In [0]:
from typing import List, TypedDict
from langgraph.graph import StateGraph, END

# --- 1. STATE DEFINITION (without Annotated operator.add) ---
class HierarchyState(TypedDict):
    operator_id: int
    target_name: str
    address_context: str
    current_focus: str
    hierarchy_chain: List[str]  # Simple list, no operator.add
    is_ultimate_parent: bool
    confidence: int
    reasoning: str

# --- 2. NODE DEFINITIONS ---
def research_parent_node(state: HierarchyState) -> dict:
    """
    Researches the immediate parent of the current focus company.
    Returns a dict that will be merged into the state.
    """
    current_target = state["current_focus"]
    
    # 1. Search using PHMSA metadata for grounding
    query = (f"Who is the immediate majority owner or corporate parent of "
             f"{current_target} (Operator ID {state['operator_id']}) "
             f"located at {state['address_context']}?")
    
    try:
        search_results = search_tool.run(query)
    except Exception as e:
        search_results = f"Search unavailable: {str(e)}"

    # 2. Reasoning with Claude to extract structured data
    prompt = f"""
    Search Results: {search_results}
    
    Company: {current_target}
    Address: {state['address_context']}
    Operator ID: {state['operator_id']}
    
    Task: Identify the immediate corporate parent.
    Rules:
    - If it is independent or the top-level parent, return 'ULTIMATE'.
    - If it is a known subsidiary of a major (e.g., Williams, Enbridge, Kinder Morgan), provide that parent name.
    - Return ONLY a JSON object: {{"parent": "Name or ULTIMATE", "confidence": 1-10, "reasoning": "Brief explanation"}}
    """
    
    try:
        response = llm.invoke(prompt).content.strip()
        
        # Clean response for JSON parsing
        if "```json" in response:
            response = response.split("```json")[1].split("```")[0].strip()
        
        data = json.loads(response)
        parent = data.get("parent", "ULTIMATE")
        confidence = data.get("confidence", 0)
        reasoning = data.get("reasoning", "No data")
    except Exception as e:
        parent = "ULTIMATE"
        confidence = 0
        reasoning = f"Failed to parse model response: {str(e)}"

    is_ultimate = "ULTIMATE" in parent.upper() or parent == current_target
    
    # Manually append to hierarchy chain instead of using operator.add
    updated_chain = state["hierarchy_chain"] + [current_target]
    
    # Return a plain dict with state updates
    return {
        "current_focus": parent if not is_ultimate else current_target,
        "is_ultimate_parent": is_ultimate,
        "hierarchy_chain": updated_chain,
        "confidence": confidence,
        "reasoning": reasoning
    }

# --- 3. GRAPH CONSTRUCTION ---
workflow = StateGraph(HierarchyState)
workflow.add_node("researcher", research_parent_node)
workflow.set_entry_point("researcher")

workflow.add_conditional_edges(
    "researcher",
    lambda x: "end" if x["is_ultimate_parent"] else "continue",
    {"continue": "researcher", "end": END}
)

app = workflow.compile()
print("Workflow compiled successfully (without operator.add)")

In [0]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *

# Define output schema for the Unity Catalog table
schema = StructType([
    StructField("OPERATOR_ID", LongType(), True),
    StructField("ORIGINAL_NAME", StringType(), True),
    StructField("ULTIMATE_PARENT", StringType(), True),
    StructField("HIERARCHY_PATH", StringType(), True),
    StructField("CONFIDENCE", IntegerType(), True),
    StructField("MAPPING_REASON", StringType(), True)
])

def find_ultimate_parent(operator_id, name, address_context, max_depth=5):
    """
    Iteratively find the ultimate parent without using LangGraph state.
    This function is serializable and can run in Spark workers.
    """
    hierarchy_chain = []
    current_focus = name
    confidence = 0
    reasoning = ""
    
    for depth in range(max_depth):
        hierarchy_chain.append(current_focus)
        
        # Search for parent
        query = (f"Who is the immediate majority owner or corporate parent of "
                 f"{current_focus} (Operator ID {operator_id}) "
                 f"located at {address_context}?")
        
        try:
            search_results = search_tool.run(query)
        except Exception as e:
            search_results = f"Search unavailable: {str(e)}"
        
        # Ask LLM to identify parent
        prompt = f"""
Search Results: {search_results}

Company: {current_focus}
Address: {address_context}
Operator ID: {operator_id}

Task: Identify the immediate corporate parent.
Rules:
- If it is independent or the top-level parent, return 'ULTIMATE'.
- If it is a known subsidiary of a major (e.g., Williams, Enbridge, Kinder Morgan), provide that parent name.
- Return ONLY a JSON object: {{"parent": "Name or ULTIMATE", "confidence": 1-10, "reasoning": "Brief explanation"}}
"""
        
        try:
            response = llm.invoke(prompt).content.strip()
            
            # Clean response for JSON parsing
            if "```json" in response:
                response = response.split("```json")[1].split("```")[0].strip()
            
            data = json.loads(response)
            parent = data.get("parent", "ULTIMATE")
            confidence = data.get("confidence", 0)
            reasoning = data.get("reasoning", "No data")
        except Exception as e:
            parent = "ULTIMATE"
            confidence = 0
            reasoning = f"Failed to parse model response: {str(e)}"
        
        # Check if we've reached the ultimate parent
        is_ultimate = "ULTIMATE" in parent.upper() or parent == current_focus
        
        if is_ultimate:
            break
        
        current_focus = parent
    
    return {
        "ultimate_parent": current_focus,
        "hierarchy_path": " -> ".join(hierarchy_chain),
        "confidence": confidence,
        "reasoning": reasoning
    }

@pandas_udf(schema)
def get_hierarchy_batch(ids: pd.Series, names: pd.Series, streets: pd.Series, cities: pd.Series, states: pd.Series) -> pd.DataFrame:
    results = []
    for op_id, name, street, city, state in zip(ids, names, streets, cities, states):
        try:
            addr = f"{street}, {city}, {state}"
            output = find_ultimate_parent(op_id, name, addr)
            
            results.append({
                "OPERATOR_ID": op_id,
                "ORIGINAL_NAME": name,
                "ULTIMATE_PARENT": output["ultimate_parent"],
                "HIERARCHY_PATH": output["hierarchy_path"],
                "CONFIDENCE": output["confidence"],
                "MAPPING_REASON": output["reasoning"]
            })
        except Exception as e:
            results.append({
                "OPERATOR_ID": op_id,
                "ORIGINAL_NAME": name,
                "ULTIMATE_PARENT": "ERROR",
                "HIERARCHY_PATH": str(e),
                "CONFIDENCE": 0,
                "MAPPING_REASON": "Agent Failed"
            })
            
    return pd.DataFrame(results)

print("UDF defined successfully (without LangGraph in UDF)")

In [0]:
# 1. Read from your specific Unity Catalog table
#
source_table = "gshen_catalog.enbridge_sr_workshop.annual_hazardous_liquid_2024_updated"
input_df = spark.read.table(source_table) \
    .select("OPERATOR_ID", "PARTA2NAMEOFCOMP", "PARTA4STREET", "PARTA4CITY", "PARTA4STATE") \
    .distinct() \
    .limit(10)

# 2. Run the agentic workflow in parallel
# We process unique entities to save on LLM token costs
mapped_hierarchy_df = input_df.select(
    get_hierarchy_batch(
        "OPERATOR_ID", "PARTA2NAMEOFCOMP", "PARTA4STREET", "PARTA4CITY", "PARTA4STATE"
    ).alias("res")
).select("res.*")

# 3. Save to a new Unity Catalog table
#
output_table = "gshen_catalog.enbridge_sr_workshop.operator_hierarchy_mapped"
mapped_hierarchy_df.write.mode("overwrite").saveAsTable(output_table)

print(f"Workflow complete. Results saved to: {output_table}")
display(spark.table(output_table))