# Synapse Metadata Extractor
## Extract DMV metadata to Parquet files for Data Lineage Parser

**Version:** 1.1.0 (Updated DMV queries)
**Date:** 2025-11-12

**Required Libraries:**
- pyodbc
- pandas
- pyarrow

**Output Files:**
1. `objects.parquet` - Database objects (tables, views, SPs, functions)
2. `dependencies.parquet` - Object dependencies
3. `definitions.parquet` - DDL definitions
4. `query_logs.parquet` - Query execution logs (optional)

In [None]:
# Import required libraries
import pyodbc
import pandas as pd
from pathlib import Path
from datetime import datetime

print(f"Libraries loaded successfully")
print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [None]:
# Configuration - UPDATE THESE VALUES
SERVER = "yourserver.sql.azuresynapse.net"
DATABASE = "yourdatabase"
USERNAME = "youruser"
PASSWORD = "yourpassword"
OUTPUT_DIR = "parquet_snapshots"

# Create output directory
output_path = Path(OUTPUT_DIR)
output_path.mkdir(parents=True, exist_ok=True)

print(f"Configuration:")
print(f"  Server: {SERVER}")
print(f"  Database: {DATABASE}")
print(f"  Output Directory: {output_path.absolute()}")

In [None]:
# Establish connection to Synapse
conn_str = (
    f"DRIVER={{ODBC Driver 18 for SQL Server}};"
    f"SERVER={SERVER};"
    f"DATABASE={DATABASE};"
    f"UID={USERNAME};"
    f"PWD={PASSWORD};"
    f"Encrypt=yes;"
    f"TrustServerCertificate=no;"
    f"Connection Timeout=30;"
)

try:
    print(f"Connecting to {SERVER}...")
    conn = pyodbc.connect(conn_str)
    print(f"‚úÖ Connected successfully to database: {DATABASE}")
except Exception as e:
    print(f"‚ùå Connection failed: {e}")
    raise

## Query 1: Extract Objects (Tables, Views, SPs, Functions)

**Changes from v1.0:**
- Consolidated function types: TF/IF/FN ‚Üí "Function"
- Only 4 object types: Table, View, Stored Procedure, Function

In [None]:
query_objects = """
SELECT
    o.object_id,
    s.name AS schema_name,
    o.name AS object_name,
    o.type AS type_code,
    CASE o.type
        WHEN 'U' THEN 'Table'
        WHEN 'V' THEN 'View'
        WHEN 'P' THEN 'Stored Procedure'
        WHEN 'TF' THEN 'Function'
        WHEN 'IF' THEN 'Function'
        WHEN 'FN' THEN 'Function'
        ELSE o.type_desc
    END AS object_type,
    o.create_date,
    o.modify_date,
    o.type_desc AS full_type_description
FROM sys.objects o
JOIN sys.schemas s ON o.schema_id = s.schema_id
WHERE o.type IN ('U', 'V', 'P', 'TF', 'IF', 'FN')
    AND o.is_ms_shipped = 0
ORDER BY s.name, o.name
"""

print("Extracting objects (tables, views, SPs, functions)...")
start_time = datetime.now()

df_objects = pd.read_sql(query_objects, conn)

elapsed = (datetime.now() - start_time).total_seconds()
print(f"‚úÖ Extracted {len(df_objects):,} objects in {elapsed:.2f}s")
print(f"\nObject Type Distribution:")
print(df_objects['object_type'].value_counts())

# Save to Parquet
objects_file = output_path / 'objects.parquet'
df_objects.to_parquet(objects_file, engine='pyarrow', compression='snappy', index=False)
file_size = objects_file.stat().st_size / 1024
print(f"\nüíæ Saved: {objects_file} ({file_size:.2f} KB)")

## Query 2: Extract Dependencies

From `sys.sql_expression_dependencies` (DMV metadata)

In [None]:
query_dependencies = """
SELECT
    d.referencing_id AS referencing_object_id,
    d.referenced_id AS referenced_object_id,
    d.referenced_schema_name,
    d.referenced_entity_name,
    d.referenced_database_name,
    d.is_ambiguous,
    d.is_schema_bound_reference,
    d.is_caller_dependent,
    d.referencing_class_desc,
    d.referenced_class_desc,
    o1.type_desc AS referencing_type,
    o2.type_desc AS referenced_type
FROM sys.sql_expression_dependencies d
JOIN sys.objects o1 ON d.referencing_id = o1.object_id
LEFT JOIN sys.objects o2 ON d.referenced_id = o2.object_id
WHERE d.referencing_id IS NOT NULL
    AND o1.is_ms_shipped = 0
ORDER BY d.referencing_id, d.referenced_id
"""

print("Extracting dependencies...")
start_time = datetime.now()

df_dependencies = pd.read_sql(query_dependencies, conn)

elapsed = (datetime.now() - start_time).total_seconds()
print(f"‚úÖ Extracted {len(df_dependencies):,} dependencies in {elapsed:.2f}s")

# Save to Parquet
deps_file = output_path / 'dependencies.parquet'
df_dependencies.to_parquet(deps_file, engine='pyarrow', compression='snappy', index=False)
file_size = deps_file.stat().st_size / 1024
print(f"üíæ Saved: {deps_file} ({file_size:.2f} KB)")

## Query 3: Extract Definitions (DDL)

From `sys.sql_modules` (stored procedure/view/function definitions)

In [None]:
query_definitions = """
SELECT
    m.object_id,
    o.name AS object_name,
    s.name AS schema_name,
    o.type_desc AS object_type,
    m.definition,
    m.uses_ansi_nulls,
    m.uses_quoted_identifier,
    m.is_schema_bound,
    o.create_date,
    o.modify_date
FROM sys.sql_modules m
JOIN sys.objects o ON m.object_id = o.object_id
JOIN sys.schemas s ON o.schema_id = s.schema_id
WHERE o.is_ms_shipped = 0
ORDER BY s.name, o.name
"""

print("Extracting definitions (DDL)...")
start_time = datetime.now()

df_definitions = pd.read_sql(query_definitions, conn)

elapsed = (datetime.now() - start_time).total_seconds()
print(f"‚úÖ Extracted {len(df_definitions):,} definitions in {elapsed:.2f}s")

# Save to Parquet
defs_file = output_path / 'definitions.parquet'
df_definitions.to_parquet(defs_file, engine='pyarrow', compression='snappy', index=False)
file_size = defs_file.stat().st_size / 1024
print(f"üíæ Saved: {defs_file} ({file_size:.2f} KB)")

## Query 4: Extract Query Logs (Optional)

**Changes from v1.0:**
- **Removed:** Label filter (not used in your environment)
- **Removed:** DDL operations (CREATE/ALTER/DROP) - not helpful for lineage
- **Removed:** Explicit SELECT/WITH exclusion (redundant with whitelist)

**Includes ONLY:**
- Stored procedure executions (EXEC/EXECUTE)
- DML operations (INSERT/UPDATE/DELETE/MERGE/TRUNCATE)

**Automatically Excludes:**
- Ad-hoc SELECT queries (not in whitelist)
- Ad-hoc WITH/CTE queries (not in whitelist)
- DDL operations (not in whitelist)

**Note:** This query requires elevated DMV permissions. Skip if access is restricted.

In [None]:
query_logs = """
SELECT TOP 10000
    r.request_id,
    r.session_id,
    r.submit_time,
    r.start_time,
    r.end_time,
    r.status,
    r.command,
    r.total_elapsed_time,
    r.[label],
    SUBSTRING(r.command, 1, 4000) AS command_text
FROM sys.dm_pdw_exec_requests r
WHERE r.command IS NOT NULL
    AND r.command NOT LIKE '%sys.dm_pdw_exec_requests%'
    AND r.status IN ('Completed', 'Failed')
    AND r.submit_time >= DATEADD(day, -7, GETDATE())
    AND (
        -- Stored procedure executions (most important for lineage)
        r.command LIKE 'EXEC %'
        OR r.command LIKE 'EXECUTE %'

        -- DML operations (data transformation)
        OR r.command LIKE 'INSERT %'
        OR r.command LIKE 'UPDATE %'
        OR r.command LIKE 'DELETE %'
        OR r.command LIKE 'MERGE %'
        OR r.command LIKE 'TRUNCATE %'
    )
ORDER BY r.submit_time DESC
"""

print("Extracting query logs (last 7 days, max 10,000)...")
print("Note: Requires elevated DMV permissions")
start_time = datetime.now()

try:
    df_logs = pd.read_sql(query_logs, conn)
    
    elapsed = (datetime.now() - start_time).total_seconds()
    print(f"‚úÖ Extracted {len(df_logs):,} query logs in {elapsed:.2f}s")
    
    # Show command type distribution
    print(f"\nQuery Type Distribution:")
    df_logs['command_type'] = df_logs['command'].str.split().str[0]
    print(df_logs['command_type'].value_counts())
    
    # Save to Parquet
    logs_file = output_path / 'query_logs.parquet'
    df_logs.to_parquet(logs_file, engine='pyarrow', compression='snappy', index=False)
    file_size = logs_file.stat().st_size / 1024
    print(f"\nüíæ Saved: {logs_file} ({file_size:.2f} KB)")
    
except Exception as e:
    print(f"‚ö†Ô∏è  Query logs extraction failed (this is OK if DMV access restricted): {e}")
    print("   Continuing without query logs...")

## Summary & Next Steps

In [None]:
# Close connection
conn.close()
print("\n" + "="*70)
print("Extraction Complete!")
print("="*70)

# List generated files
print(f"\nGenerated Files in {output_path.absolute()}:")
for file in sorted(output_path.glob('*.parquet')):
    file_size = file.stat().st_size / 1024
    print(f"  ‚úì {file.name} ({file_size:.2f} KB)")

print(f"\nNext Steps:")
print(f"  1. Upload these Parquet files via the UI")
print(f"  2. Or use the API: curl -X POST http://localhost:8000/api/upload-parquet -F files=@{output_path}/objects.parquet ...")
print(f"  3. Or run parser directly: python lineage_v3/main.py run --parquet {output_path}")
print("="*70)