# Lineage Parser

Extracts data lineage edges from Fabric Data Warehouse DDL definitions.

**Pipeline:** `Source DWH (DDL) → Copy Activity → raw.* → This Notebook → meta.* → GraphQL → Frontend`

**Sections:**
1. Configuration - Database settings and URL patterns
2. Extraction Rules - Regex patterns for DDL parsing
3. Parser Class - Core extraction logic
4. Test - Local validation without database
5. Execute - Pipeline entry point

## 1. Configuration

Database connection settings and URL classification patterns.
These values rarely change.

In [None]:
CONFIG = {"db_name": "db_datalineage"}

# Reference type enumeration
REF_LOCAL    = 0  # schema.table (2-part name)
REF_FILE     = 1  # Azure storage: abfss://, *.blob.core.windows.net
REF_OTHER_DB = 2  # Cross-warehouse: database.schema.table (3-part)
REF_LINK     = 3  # OneLake shortcut: *.dfs.fabric.microsoft.com

# URL classification patterns
STORAGE_PATTERNS  = ('abfss://', 'wasbs://', 'wasb://', 'adl://')
FILE_URL_PATTERNS = ('.blob.core.windows.net', '.dfs.core.windows.net')
LINK_URL_PATTERNS = ('.dfs.fabric.microsoft.com', '.blob.fabric.microsoft.com')

# Comma-join pattern template (SQL-89 style)
_COMMA_PATTERN = r'\bFROM\s+(?:[^\s,;()]+(?:\s+(?:AS\s+)?\w+)?\s*,\s*){%d}[^\s,;()]+(?:\s+(?:AS\s+)?\w+)?\s*,\s*([^\s,;()]+)'
_COMMA_JOIN_LIMIT = 10

## 2. Extraction Rules

Regex patterns that define what SQL constructs to extract.
**Edit this section to add new patterns.**

Each rule has:
- `name` - Unique identifier for debugging
- `target` - One of: `source`, `target`, `sp_call`, `external`
- `pattern` - Regex with capture group `([...])` for table/path name

In [None]:
EXTRACTION_RULES = [
    # Source patterns (data flows FROM these objects)
    {"name": "from_join",   "target": "source",   "pattern": r'\b(?:FROM|JOIN|INNER\s+JOIN|LEFT\s+(?:OUTER\s+)?JOIN|RIGHT\s+(?:OUTER\s+)?JOIN|FULL\s+(?:OUTER\s+)?JOIN|CROSS\s+JOIN|OUTER\s+JOIN)\s+([^\s,;()]+)'},
    {"name": "apply",       "target": "source",   "pattern": r'\b(?:CROSS\s+APPLY|OUTER\s+APPLY)\s+([^\s,;()]+)'},
    {"name": "merge_using", "target": "source",   "pattern": r'\bMERGE\s+(?:INTO\s+)?[^\s,;()]+(?:\s+(?:AS\s+)?\w+)?\s+USING\s+([^\s,;()]+)'},
    {"name": "set_ops",     "target": "source",   "pattern": r'\b(?:EXCEPT|INTERSECT)\s+(?:ALL\s+)?SELECT\s+.*?\bFROM\s+([^\s,;()]+)'},
] + [
    # Comma-join patterns: FROM t1, t2, t3 -> captures t2, t3, etc.
    {"name": f"comma_{i+2}", "target": "source", "pattern": _COMMA_PATTERN % i}
    for i in range(_COMMA_JOIN_LIMIT - 1)
] + [
    # Target patterns (data flows INTO these objects)
    {"name": "dml",         "target": "target",   "pattern": r'\b(?:INSERT\s+(?:INTO\s+)?|UPDATE\s+|MERGE\s+(?:INTO\s+)?|DELETE\s+(?:FROM\s+)?)([^\s,;()]+)'},
    {"name": "ctas",        "target": "target",   "pattern": r'\bCREATE\s+TABLE\s+([^\s,;()]+)\s+AS\s+SELECT'},
    {"name": "select_into", "target": "target",   "pattern": r'\bSELECT\s+.*?\s+INTO\s+([^\s,;()]+)\s+FROM'},
    {"name": "copy_into",   "target": "target",   "pattern": r'\bCOPY\s+INTO\s+([^\s,;()]+)'},
    {"name": "bulk_insert", "target": "target",   "pattern": r'\bBULK\s+INSERT\s+([^\s,;()]+)'},

    # Stored procedure calls
    {"name": "exec",        "target": "sp_call",  "pattern": r'\bEXEC(?:UTE)?\s+([^\s,;()]+)'},

    # External file paths (in quotes)
    {"name": "openrowset",  "target": "external", "pattern": r"\bOPENROWSET\s*\(\s*BULK\s+['\"]([^'\"]+)['\"]"},
    {"name": "copy_from",   "target": "external", "pattern": r"\bCOPY\s+INTO\s+\S+\s+FROM\s+['\"]([^'\"]+)['\"]"},
    {"name": "bulk_from",   "target": "external", "pattern": r"\bBULK\s+INSERT\s+\S+\s+FROM\s+['\"]([^'\"]+)['\"]"},
]

# Comment hints for dynamic SQL
HINT_PATTERNS = {
    "input":  r'--\s*@LINEAGE_INPUTS:\s*(.+?)(?:\n|$)',
    "output": r'--\s*@LINEAGE_OUTPUTS:\s*(.+?)(?:\n|$)',
}

## 3. Parser Class

Core extraction logic that processes DDL definitions and builds lineage edges.
Do not modify unless changing parsing behavior.

In [None]:
from __future__ import annotations

import re
import json
import uuid
from datetime import datetime, timezone
from typing import Optional, Any
from notebookutils import data
from notebookutils import notebook as nb_utils


class LineageParserError(Exception):
    """Raised when parser encounters a fatal error that should fail the pipeline."""
    pass


class LineageParser:
    """Extracts data lineage edges from SQL DDL definitions.

    Attributes:
        cursor: Database cursor for executing queries.
        catalog: Mapping of (source_id, name) to object_id.
        debug: Whether to log detailed parsing steps.

    Example:
        >>> parser = LineageParser(connection, debug=True)
        >>> print(parser.run())
        'OK: 42 internal, 3 external edges (15 SPs, 8 views/functions)'
    """

    _compiled: Optional[dict] = None

    def __init__(self, connection: Optional[Any] = None, debug: bool = False) -> None:
        """Initialize the parser.

        Args:
            connection: Database connection from data.connect_to_artifact().
            debug: If True, logs parsing steps to meta.parser_logs.
        """
        self.cursor = connection.cursor() if connection else None
        self.catalog: dict[tuple[int, str], int] = {}
        self.sp_names: set[tuple[int, str]] = set()
        self.debug = debug
        self._debug_logs: list[dict] = []
        self._run_id = str(uuid.uuid4()) if debug else None
        self._run_start: Optional[datetime] = None
        self._external_objects: list[tuple] = []

    @classmethod
    def _compile(cls) -> dict:
        """Compile and cache regex patterns."""
        if cls._compiled is None:
            cls._compiled = {
                "rules": [(r["target"], re.compile(r["pattern"], re.I | re.M | re.S)) for r in EXTRACTION_RULES],
                "hints": {k: re.compile(v, re.I) for k, v in HINT_PATTERNS.items()},
            }
        return cls._compiled

    @staticmethod
    def _clean(ddl: str) -> str:
        """Remove SQL comments including nested block comments."""
        if not ddl:
            return ""
        ddl = re.sub(r'--[^\r\n]*', '', ddl)
        result, i, depth = [], 0, 0
        while i < len(ddl):
            if ddl[i:i+2] == '/*':
                depth += 1
                i += 2
            elif ddl[i:i+2] == '*/' and depth > 0:
                depth -= 1
                i += 2
            elif depth == 0:
                result.append(ddl[i])
                i += 1
            else:
                i += 1
        return ''.join(result)

    @staticmethod
    def _norm(name: str) -> str:
        """Normalize: remove brackets, lowercase, strip."""
        return name.replace('[', '').replace(']', '').lower().strip() if name else ""

    @staticmethod
    def _context(ddl: str, start: int, end: int, width: int = 50) -> str:
        """Extract DDL snippet for debug logging."""
        s, e = max(0, start - width), min(len(ddl), end + width)
        return ("..." if s > 0 else "") + ddl[s:e].replace("\n", " ") + ("..." if e < len(ddl) else "")

    @staticmethod
    def _classify_reference(name: str) -> Optional[tuple[int, str]]:
        """Classify reference: (ref_type, name) or None if ignored."""
        name = name.replace('[', '').replace(']', '').strip()
        name_lower = name.lower()

        if name_lower.startswith(('#', '@')) or 'tempdb.' in name_lower:
            return None
        if any(name_lower.startswith(p) for p in STORAGE_PATTERNS):
            return (REF_FILE, name)
        if name_lower.startswith(('https://', 'http://')):
            if any(p in name_lower for p in LINK_URL_PATTERNS):
                return (REF_LINK, name)
            if any(p in name_lower for p in FILE_URL_PATTERNS):
                return (REF_FILE, name)
            return None

        parts = name.split('.')
        if len(parts) == 2:
            return (REF_LOCAL, name_lower)
        if len(parts) == 3:
            return (REF_OTHER_DB, name_lower)
        return None

    @staticmethod
    def _make_display_name(ref_name: str, ref_type: int) -> str:
        """Extract short display name for UI."""
        if ref_type in (REF_FILE, REF_LINK):
            return ref_name.replace('\\', '/').rstrip('/').split('/')[-1][:100]
        if ref_type == REF_OTHER_DB:
            parts = ref_name.split('.')
            return '.'.join(parts[-2:])[:100] if len(parts) >= 2 else ref_name[:100]
        return ref_name[:100]

    def _check_raw_data(self) -> int:
        """Check if raw.objects has data. Returns count."""
        self.cursor.execute("SELECT COUNT(*) FROM raw.objects")
        return self.cursor.fetchone()[0]

    def _register_sources(self) -> None:
        """Register new warehouse sources from raw.objects."""
        self.cursor.execute("""
            INSERT INTO meta.sources (server_name, database_name)
            SELECT DISTINCT server_name, database_name FROM raw.objects o
            WHERE NOT EXISTS (
                SELECT 1 FROM meta.sources s
                WHERE s.server_name = o.server_name AND s.database_name = o.database_name
            )
        """)
        self.cursor.execute("EXEC meta.sp_set_active_source")
        self.cursor.connection.commit()

    def _load_catalog(self) -> None:
        """Load object catalog for reference validation."""
        self.cursor.execute("EXEC meta.sp_get_catalog")
        for source_id, obj_id, name, obj_type in self.cursor.fetchall():
            key = (source_id, name.lower())
            self.catalog[key] = obj_id
            if obj_type == 'Stored Procedure':
                self.sp_names.add(key)

    def _extract(self, ddl: str, target_type: str, source_id: int, steps: Optional[list] = None) -> dict:
        """Extract references from DDL using EXTRACTION_RULES."""
        local_refs: set[str] = set()
        external_refs: list[tuple[int, str]] = []

        for order, (target, regex) in enumerate(self._compile()["rules"], 1):
            if target != target_type:
                continue

            inputs, outputs, contexts = [], [], []
            for m in regex.finditer(ddl):
                match = m.group(1) if m.lastindex else m.group(0)
                if isinstance(match, tuple):
                    match = next((x for x in match if x), None)
                if not match:
                    continue

                classified = self._classify_reference(match)
                if not classified:
                    continue

                ref_type, name = classified
                if ref_type == REF_LOCAL:
                    if (source_id, name) in self.catalog:
                        local_refs.add(name)
                        (inputs if target == "source" else outputs).append(name)
                        if steps is not None:
                            contexts.append(self._context(ddl, m.start(), m.end()))
                elif ref_type == REF_OTHER_DB:
                    external_refs.append((ref_type, name))
                    (inputs if target == "source" else outputs).append(f"[OTHER_DB] {name}")
                elif ref_type in (REF_FILE, REF_LINK):
                    external_refs.append((ref_type, name))
                    outputs.append(f"[{'FILE' if ref_type == REF_FILE else 'LINK'}] {name[:50]}")

            if steps is not None:
                steps.append({
                    "order": order, "rule_name": f"{target_type}_{order}", "rule_target": target,
                    "pattern": regex.pattern[:150],
                    "inputs_found": list(set(inputs)), "outputs_found": list(set(outputs)), "match_context": contexts,
                })

        return {"local": local_refs, "external": external_refs}

    def _extract_hints(self, ddl: str, source_id: int) -> tuple[set, set]:
        """Extract @LINEAGE_INPUTS/@OUTPUTS comment hints."""
        inputs, outputs = set(), set()
        hints = self._compile()["hints"]
        for match in hints["input"].findall(ddl):
            for item in match.split(','):
                name = self._norm(item)
                if (source_id, name) in self.catalog:
                    inputs.add(name)
        for match in hints["output"].findall(ddl):
            for item in match.split(','):
                name = self._norm(item)
                if (source_id, name) in self.catalog:
                    outputs.add(name)
        return inputs, outputs

    def _parse_sp(self, source_id: int, obj_id: int, full_name: str, ddl: str) -> list[dict]:
        """Parse stored procedure DDL and extract edges."""
        edges: list[dict] = []
        error_msg, status = None, "success"
        raw_ddl = ddl
        hints_in, hints_out = self._extract_hints(ddl, source_id)
        cleaned = self._clean(ddl)
        name = full_name.lower()
        steps = [] if self.debug else None

        try:
            src = self._extract(cleaned, "source", source_id, steps)
            tgt = self._extract(cleaned, "target", source_id, steps)
            sp = self._extract(cleaned, "sp_call", source_id, steps)
            ext = self._extract(cleaned, "external", source_id, steps)

            sources, targets = src["local"], tgt["local"]
            sp_calls = sp["local"]
            external_sources = src["external"] + ext["external"]
            external_targets = tgt["external"]

            # UPDATE/DELETE alias fix: UPDATE o ... FROM dbo.Orders o
            alias_pattern = r'\b(?:UPDATE|DELETE)\s+(\w+)\s+.*?\bFROM\s+([^\s,;()]+)\s+(?:AS\s+)?\1\b'
            for m in re.finditer(alias_pattern, cleaned, re.I | re.S):
                real_table = self._norm(m.group(2))
                sources.discard(real_table)
                targets.add(real_table)

            if steps is not None:
                steps.append({"order": len(steps) + 1, "rule_name": "hints", "rule_target": "hints",
                    "pattern": "@LINEAGE_INPUTS / @LINEAGE_OUTPUTS",
                    "inputs_found": list(hints_in), "outputs_found": list(hints_out), "match_context": []})

            for s in sources | hints_in:
                if s != name and (sid := self.catalog.get((source_id, s))):
                    edges.append({"i": source_id, "s": sid, "t": obj_id, "st": 0, "tt": 0})
            for t in targets | hints_out:
                if t != name and (tid := self.catalog.get((source_id, t))):
                    edges.append({"i": source_id, "s": obj_id, "t": tid, "st": 0, "tt": 0})
            for s in sp_calls:
                if s != name and (source_id, s) in self.sp_names:
                    if spid := self.catalog.get((source_id, s)):
                        edges.append({"i": source_id, "s": obj_id, "t": spid, "st": 0, "tt": 0})

            for ref_type, ref_name in external_sources:
                self._external_objects.append((source_id, ref_type, ref_name, self._make_display_name(ref_name, ref_type)))
                edges.append({"i": source_id, "ext_src": ref_name, "t": obj_id, "st": ref_type, "tt": 0})
            for ref_type, ref_name in external_targets:
                self._external_objects.append((source_id, ref_type, ref_name, self._make_display_name(ref_name, ref_type)))
                edges.append({"i": source_id, "s": obj_id, "ext_tgt": ref_name, "st": 0, "tt": ref_type})

        except Exception as e:
            status, error_msg = "error", str(e)

        if self.debug:
            self._debug_logs.append({"run_id": self._run_id, "run_started_at": self._run_start.isoformat(),
                "db_name": CONFIG["db_name"], "server_name": str(source_id), "sp_object_id": obj_id,
                "sp_full_name": full_name, "raw_ddl": raw_ddl, "cleaned_ddl": cleaned,
                "parsing_steps": steps, "edge_count": len(edges), "status": status, "error_message": error_msg})
        return edges

    def _parse_view(self, source_id: int, obj_id: int, full_name: str, ddl: str) -> list[dict]:
        """Parse view/function DDL for external references."""
        edges: list[dict] = []
        cleaned = self._clean(ddl)

        try:
            ext = self._extract(cleaned, "external", source_id)
            src = self._extract(cleaned, "source", source_id)

            for ref_type, ref_name in ext["external"]:
                if ref_type in (REF_FILE, REF_LINK):
                    self._external_objects.append((source_id, ref_type, ref_name, self._make_display_name(ref_name, ref_type)))
                    edges.append({"i": source_id, "ext_src": ref_name, "t": obj_id, "st": ref_type, "tt": 0})
            for ref_type, ref_name in src["external"]:
                if ref_type == REF_OTHER_DB:
                    self._external_objects.append((source_id, ref_type, ref_name, self._make_display_name(ref_name, ref_type)))
                    edges.append({"i": source_id, "ext_src": ref_name, "t": obj_id, "st": ref_type, "tt": 0})

        except Exception as e:
            if self.debug:
                self._debug_logs.append({"run_id": self._run_id, "run_started_at": self._run_start.isoformat(),
                    "db_name": CONFIG["db_name"], "server_name": str(source_id), "sp_object_id": obj_id,
                    "sp_full_name": full_name, "raw_ddl": ddl, "cleaned_ddl": cleaned,
                    "parsing_steps": [], "edge_count": 0, "status": "error", "error_message": str(e)})
        return edges

    def run(self) -> str:
        """Parse all objects and save lineage edges.
        
        Raises:
            LineageParserError: If no data found in raw.objects (pipeline should fail).
        """
        if self.debug:
            self._run_start = datetime.now(timezone.utc)

        self.cursor.execute("EXEC meta.sp_clear_parser_cache")
        self.cursor.connection.commit()

        # Check raw.objects BEFORE registering sources
        raw_count = self._check_raw_data()
        if raw_count == 0:
            error_msg = (
                "ERROR: No data in raw.objects table. "
                "The Copy Pipeline must run first to copy DDL from source warehouse. "
                "Check: 1) Copy Pipeline ran successfully, 2) Source DWH has objects, "
                "3) Copy activity target is raw.objects table."
            )
            print(error_msg)
            nb_utils.exit(error_msg)

        self._register_sources()
        self._load_catalog()

        if not self.catalog:
            error_msg = (
                f"ERROR: raw.objects has {raw_count} rows but no catalog entries. "
                "This may indicate: 1) No source marked as active in meta.sources, "
                "2) sp_get_catalog returned empty, 3) Object types not recognized."
            )
            print(error_msg)
            nb_utils.exit(error_msg)

        edges: list[dict] = []

        self.cursor.execute("EXEC meta.sp_get_sp_definitions")
        sp_count = 0
        for row in self.cursor.fetchall():
            edges.extend(self._parse_sp(*row))
            sp_count += 1

        self.cursor.execute("EXEC meta.sp_get_view_definitions")
        vf_count = 0
        for row in self.cursor.fetchall():
            edges.extend(self._parse_view(*row))
            vf_count += 1

        if self._external_objects:
            unique_ext = list({(i, r): (i, t, r, d) for i, t, r, d in self._external_objects}.values())
            self.cursor.execute("EXEC meta.sp_save_external_objects @objects_json = ?",
                (json.dumps([{"i": i, "t": t, "r": r, "d": d} for i, t, r, d in unique_ext]),))

        unique_edges = list({(e["i"], e.get("s"), e.get("t"), e.get("ext_src"), e.get("ext_tgt")): e for e in edges}.values())
        self.cursor.execute("EXEC meta.sp_save_parsed_edges @edges_json = ?", (json.dumps(unique_edges),))
        self.cursor.execute("EXEC meta.sp_compute_lineage")

        if self.debug and self._debug_logs:
            self.cursor.execute("EXEC meta.sp_save_parser_log @logs_json = ?", (json.dumps(self._debug_logs),))

        self.cursor.connection.commit()

        internal = sum(1 for e in unique_edges if "s" in e and "ext_tgt" not in e and "ext_src" not in e)
        external = sum(1 for e in unique_edges if "ext_src" in e or "ext_tgt" in e)
        result = f"OK: {internal} internal, {external} external edges ({sp_count} SPs, {vf_count} views/functions)"
        if self._external_objects:
            result += f", {len(set((i,r) for i,_,r,_ in self._external_objects))} ext refs"
        if self.debug:
            result += f" (logged {len(self._debug_logs)} objects)"
        return result

    @classmethod
    def test(cls, ddl_text: str, debug: bool = False) -> str:
        """Test extraction without database connection."""
        parser = cls(debug=debug)
        parser.catalog = {}
        parser._run_start = datetime.now(timezone.utc)
        source_id = 1

        for rule in EXTRACTION_RULES:
            for m in re.finditer(rule["pattern"], ddl_text, re.I | re.M | re.S):
                match = m.group(1) if m.lastindex else m.group(0)
                if isinstance(match, tuple):
                    match = next((x for x in match if x), None)
                if match:
                    classified = cls._classify_reference(match)
                    if classified and classified[0] == REF_LOCAL:
                        parser.catalog[(source_id, classified[1])] = hash(classified[1])

        cleaned = cls._clean(ddl_text)
        steps = [] if debug else None

        src = parser._extract(cleaned, "source", source_id, steps)
        tgt = parser._extract(cleaned, "target", source_id, steps)
        sp = parser._extract(cleaned, "sp_call", source_id, steps)
        ext = parser._extract(cleaned, "external", source_id, steps)

        sources, targets = src["local"], tgt["local"]
        external = src["external"] + tgt["external"] + ext["external"]

        alias_pattern = r'\b(?:UPDATE|DELETE)\s+(\w+)\s+.*?\bFROM\s+([^\s,;()]+)\s+(?:AS\s+)?\1\b'
        for m in re.finditer(alias_pattern, cleaned, re.I | re.S):
            real_table = cls._norm(m.group(2))
            sources.discard(real_table)
            targets.add(real_table)

        lines = [f"Sources: {sorted(sources)}", f"Targets: {sorted(targets)}", f"SP Calls: {sorted(sp['local'])}"]
        if external:
            lines.append(f"External: {[('FILE' if t == REF_FILE else 'LINK' if t == REF_LINK else 'OTHER_DB', n) for t, n in external]}")
        if debug:
            lines.extend(["", f"Cleaned:\n{cleaned}", "", f"Steps:\n{json.dumps(steps, indent=2)}"])
        return "\n".join(lines)

## 4. Test

Validate parser locally before deployment.
Uses `LineageParser.test()` which requires no database connection.

In [None]:
print(LineageParser.test("""
CREATE PROCEDURE dbo.LoadOrders AS
BEGIN
    SELECT * FROM dbo.Orders o
    JOIN dbo.Customers c ON o.customer_id = c.id;

    INSERT INTO dbo.OrderSummary
    SELECT * FROM dbo.Orders;

    EXEC dbo.NotifyComplete;
END
"""))

In [None]:
import time

def connect_with_retry(db_name: str, max_attempts: int = 5) -> Any:
    """Connect to Fabric SQL Database with retry for cold start.
    
    Args:
        db_name: SQL Database display name (case-sensitive)
        max_attempts: Maximum connection attempts
        
    Returns:
        Database connection object
        
    Raises:
        Exception: If all connection attempts fail
    """
    for attempt in range(1, max_attempts + 1):
        wait_secs = 0 if attempt == 1 else min(30 * attempt, 120)
        
        if attempt > 1:
            print(f"  Waiting {wait_secs}s (cold start can take 2-3 min)...")
            time.sleep(wait_secs)
        
        try:
            print(f"  Attempt {attempt}/{max_attempts}...", end=" ")
            conn = data.connect_to_artifact(db_name)
            print("Connected!")
            return conn
        except Exception as e:
            err = str(e)
            print("Timeout" if "timeout" in err.lower() else f"Failed: {err[:80]}")
            
            if attempt == max_attempts:
                raise Exception(f"""
Connection failed after {max_attempts} attempts.

Troubleshooting:
- Database name '{db_name}' must match EXACTLY (case-sensitive)
- Notebook must be in SAME workspace as SQL Database  
- Try opening database in Fabric Portal first to wake it up
- Check your permissions to the SQL Database

Error: {err[:200]}
""")

In [None]:
# Connect and run parser
print(f"Connecting to: {CONFIG['db_name']}")
conn = connect_with_retry(CONFIG["db_name"])

parser = LineageParser(conn, debug=True)
print(parser.run())