In [0]:
%pip install databricks=0.2 databricks-langchain==0.5.1

In [0]:
import os
import sys
import json
import xml.etree.ElementTree as ET
from typing import Any, Dict, List
from schemas.nifi_metadata import processor_schema, processor_properties_schema, processor_connections_schema

from pyspark.sql import SparkSession

from __future__ import annotations

catalog = "sdickey"
schema = "nifi_workflow_metadata"

# from tools.xml_tools import parse_nifi_template

In [0]:
def _trim(s: str | None) -> str:
    """Trim helper that returns '' for None."""
    return (s or "").strip()


def parse_nifi_template_impl(xml_content: str) -> Dict[str, Any]:
    """
    Parse a NiFi XML template and extract processors, properties, and connections.
    Returns Python dict for programmatic use.

    Parameters:
        xml_content: The raw NiFi XML content

    Returns:
        Dict with processors, connections, counts, and process groups
    """
    root = ET.fromstring(xml_content)

    processors: List[Dict[str, Any]] = []
    connections: List[Dict[str, Any]] = []

    # Build process group mapping for enhanced task naming
    process_groups = {}
    for group in root.findall(".//processGroups"):
        group_id = group.findtext("id")
        group_name = group.findtext("name") or "UnnamedGroup"
        if group_id:
            process_groups[group_id] = group_name

    # Extract processors with process group context
    for processor in root.findall(".//processors"):
        parent_group_id = processor.findtext("parentGroupId")
        parent_group_name = (
            process_groups.get(parent_group_id, "Root") if parent_group_id else "Root"
        )

        proc_info = {
            "name": _trim(processor.findtext("name") or "Unknown"),
            "type": _trim(processor.findtext("type") or "Unknown"),
            "id": _trim(processor.findtext("id") or "Unknown"),
            "properties": {},
            "parentGroupId": parent_group_id,
            "parentGroupName": parent_group_name,
        }

        props_node = processor.find(".//properties")
        if props_node is not None:
            for entry in props_node.findall("entry"):
                k = entry.findtext("key")
                v = entry.findtext("value")
                if k is not None:
                    proc_info["properties"][k] = v

        processors.append(proc_info)

    # Extract connections
    for connection in root.findall(".//connections"):
        source = _trim(connection.findtext(".//source/id") or "Unknown")
        destination = _trim(connection.findtext(".//destination/id") or "Unknown")
        rels = [
            _trim(rel.text or "")
            for rel in connection.findall(".//selectedRelationships")
            if rel is not None and rel.text
        ]
        connections.append(
            {
                "source": source,
                "destination": destination,
                "relationships": rels,
            }
        )

    return {
        "processors": processors,
        "connections": connections,
        "processor_count": len(processors),
        "connection_count": len(connections),
        "process_groups": process_groups,
    }


def parse_nifi_template(xml_content: str) -> str:
    """
    Parse a NiFi XML template and return JSON string.
    Legacy interface for existing tool compatibility.
    """
    try:
        result = parse_nifi_template_impl(xml_content)
        result.update(
            {
                "continue_required": False,
                "tool_name": "parse_nifi_template",
            }
        )
        return json.dumps(result, indent=2)
    except ET.ParseError as e:
        return f"Error parsing XML: {str(e)}"
    except Exception as e:
        return f"Unexpected error: {str(e)}"
    

def list_xml_files(xml_volumes_path):
    """
    Return a list of XML file paths from the 
    specified catalog, schema, and volume.
    """

    xml_paths_df = (
        spark.read.format("binaryFile")
            .option("recursiveFileLookup", "true")
            .option("pathGlobFilter", "*.xml")       # only *.xml files
            .load(xml_volumes_path)
            .select("path")                          # keep just the path
            .distinct()
    )

    xml_paths_df.show(truncate=False)
    # If you need them as a Python list:
    xml_paths = [r.path for r in xml_paths_df.collect()]

    return xml_paths

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, ArrayType

def create_table(catalog, schema, table_name, table_schema):

    df = spark.createDataFrame([], table_schema)

    # Save the DataFrame as a managed Delta table
    df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{table_name}")



processor_schema = StructType([
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("type", StringType(), False),
    StructField("worflow_id", StringType(), False),
    StructField("workflow_name", StringType(), False),
    StructField("parent_group_id", StringType(), True),
    StructField("parent_group_name", StringType(), True),
    StructField("properties_id", StringType(), True),
    StructField("created_date", DateType(), False),
    StructField("last_updated_date", DateType(), False)
])

processor_properties_schema = StructType([
    StructField("processor_id", StringType(), False),
    StructField("property_name", StringType(), False),
    StructField("property_value", StringType(), False),
    StructField("property_rank", StringType(), False),
    StructField("created_date", DateType(), False),
    StructField("last_updated_date", DateType(), False)
])

processor_connections_schema = StructType([
    StructField("source_processor_id", StringType(), False),
    StructField("destination_processor_id", StringType(), False),
    StructField("relationships", ArrayType(StringType()), True),
    StructField("created_date", DateType(), False),
    StructField("last_updated_date", DateType(), False)
])

In [0]:
# Parse XML
from pyspark.sql.functions import explode, current_date, lit
from  pyspark.sql import functions as F

xml_file_path = "/Volumes/sdickey/nxp_nifi_flows_xml/raw_files/ICN8_NiFi_flows_2025-05-06.xml" 
xml_volumes_path = "/Volumes/sdickey/nxp_nifi_flows_xml/raw_files"

# Read the XML file contents
with open(xml_file_path, "r", encoding="utf-8") as f:

    xml_file_paths = list_xml_files(xml_volumes_path)
    print(xml_file_paths)
    
    xml_content = f.read()

    response = json.loads(parse_nifi_template(xml_content))

    spark = SparkSession.builder.getOrCreate()


    raw_processors_df = spark.createDataFrame(response["processors"])
    raw_connections_df = spark.createDataFrame(response["connections"])


In [0]:

# create_table("sdickey", "nifi_workflow_metadata", "nifi_processors", processor_schema)

# create_table(catalog, schema, "nifi_processor_properties", processor_properties_schema)

# create_table(catalog, schema, "nifi_connections", processor_connections_schema)

In [0]:

# 1. Base processors table
processors_df = (
    raw_processors_df
    .selectExpr(
        "id",
        "name",
        "type",
        "parentGroupId as parent_group_id",
        "parentGroupName as parent_group_name"
    )
    .withColumn("created_date", current_date())
    .withColumn("last_updated_date", current_date())
)
    
print(processors_df.count())
processors_df.show()


In [0]:
# 2. Properties table (explode the map into rows)
processor_properties_df = (
    raw_processors_df
    .select(
        F.col("id").alias("processor_id"),
        F.posexplode_outer("properties").alias("pos", "property_name", "property_value")
    )
    .select(
        "processor_id",
        F.col("property_name").alias("property_name"),
        F.col("property_value").alias("property_value"),
        (F.col("pos") + F.lit(1)).cast("string").alias("property_rank"),  # 1-based rank
        F.current_date().alias("created_date"),
        F.current_date().alias("last_updated_date"),
    )
)

# print(processor_properties_df.count())
processor_properties_df.show()

In [0]:
connections_df = (
    raw_connections_df
    .selectExpr(
        "source as source_processor_id",
        "destination as destination_processor_id",
        "relationships"
    )
    .withColumn("created_date", current_date())
    .withColumn("last_updated_date", current_date())
)
    
print(connections_df.count())
connections_df.show()

In [0]:
raw_processors_df.createOrReplaceTempView("raw_processors")
processors_df.createOrReplaceTempView("processors")
processor_properties_df.createOrReplaceTempView("processor_properties")

In [0]:
processor_query = "select * from raw_processors where id = '0e44f1c7-17c4-3edf-0000-000000000000'"
tmp_df = spark.sql(processor_query)

tmp_df.show(truncate=False)

In [0]:
processor_query = "select * from processors where id = '0e44f1c7-17c4-3edf-0000-000000000000'"
tmp_df = spark.sql(processor_query)

tmp_df.show(truncate=False)

In [0]:
processor_properties = "select * from processor_properties where processor_id = '0e44f1c7-17c4-3edf-0000-000000000000'"
tmp_df = spark.sql(processor_properties)

tmp_df.show(truncate=False)

In [0]:
# processors_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.nifi_processors")
# processor_properties_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.nifi_processor_properties")
# connections_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.nifi_connections")