# 04. Advanced Queries and Analysis

This notebook covers advanced Cypher queries and analysis techniques for the SSIS Northwind graph. You'll learn complex querying patterns, performance optimization, and sophisticated analysis workflows.

In [None]:
# Setup connection to Memgraph
import mgclient
import json
from datetime import datetime
import pandas as pd

# Connection to Memgraph
mg = pymgclient.connect(host='localhost', port=7687)

def execute_and_fetch(query, params=None):
    """Execute query and return results as list of records"""
    cursor = mg.cursor()
    cursor.execute(query, params or {})
    return cursor.fetchall()

def pretty_print(data, title="Results"):
    """Pretty print query results"""
    print(f"\n=== {title} ===")
    if isinstance(data, str):
        try:
            parsed = json.loads(data)
            print(json.dumps(parsed, indent=2))
        except:
            print(data)
    else:
        print(json.dumps(data, indent=2, default=str))
    print("=" * (len(title) + 8))

print("✅ Connected to Memgraph successfully")
print("📚 Advanced querying toolkit ready")

## 1. Complex Path Analysis

Learn how to analyze complex data flows and dependencies using path queries.

In [None]:
# Find all data transformation paths from source to target
data_flow_paths = """
MATCH path = (source:Node {node_type: 'DATA_ASSET'})-[*1..5]->(target:Node {node_type: 'DATA_ASSET'})
WHERE source.name CONTAINS 'Orders' AND target.name CONTAINS 'Sales'
RETURN 
    source.name as source_table,
    target.name as target_table,
    length(path) as path_length,
    [n in nodes(path) | n.name] as transformation_chain
ORDER BY path_length
LIMIT 10
"""

paths = execute_and_fetch(data_flow_paths)
pretty_print(paths, "Data Transformation Paths")

# Analysis: Identify the most common transformation patterns
print("\n📊 Path Analysis:")
for path in paths:
    print(f"  {path[0]} → {path[1]} (via {path[2]} steps)")
    print(f"    Chain: {' → '.join(path[3])}")
    print()

In [None]:
# Find critical nodes that appear in multiple paths (bottlenecks)
bottleneck_analysis = """
MATCH (n:Node)
WHERE n.node_type IN ['OPERATION', 'DATA_ASSET']
WITH n, 
     size([(n)-[:READS_FROM]->() | 1]) as incoming_connections,
     size([(n)-[:WRITES_TO]->() | 1]) as outgoing_connections
WHERE incoming_connections > 1 OR outgoing_connections > 1
RETURN 
    n.name as node_name,
    n.node_type as type,
    incoming_connections,
    outgoing_connections,
    (incoming_connections + outgoing_connections) as total_connections
ORDER BY total_connections DESC
LIMIT 10
"""

bottlenecks = execute_and_fetch(bottleneck_analysis)
pretty_print(bottlenecks, "Potential Bottlenecks")

print("\n⚠️ Bottleneck Risk Assessment:")
for node in bottlenecks:
    risk_level = "HIGH" if node[4] > 5 else "MEDIUM" if node[4] > 2 else "LOW"
    print(f"  {node[0]} ({node[1]}): {node[4]} connections - {risk_level} RISK")

## 2. Advanced Aggregation Queries

Complex aggregations to understand system complexity and patterns.

In [None]:
# Comprehensive system complexity analysis
complexity_analysis = """
MATCH (p:Node {node_type: 'PIPELINE'})
OPTIONAL MATCH (p)-[:CONTAINS]->(op:Node {node_type: 'OPERATION'})
OPTIONAL MATCH (op)-[:READS_FROM|WRITES_TO]->(da:Node {node_type: 'DATA_ASSET'})
OPTIONAL MATCH (p)-[:USES_CONNECTION]->(conn:Node {node_type: 'CONNECTION'})
WITH p,
     count(DISTINCT op) as operation_count,
     count(DISTINCT da) as data_asset_count,
     count(DISTINCT conn) as connection_count,
     collect(DISTINCT op.operation_type) as operation_types
RETURN 
    p.name as package_name,
    operation_count,
    data_asset_count,
    connection_count,
    size(operation_types) as unique_operation_types,
    operation_types,
    (operation_count * 2 + data_asset_count + connection_count * 3) as complexity_score
ORDER BY complexity_score DESC
"""

complexity = execute_and_fetch(complexity_analysis)
pretty_print(complexity, "Package Complexity Analysis")

# Create complexity distribution
print("\n📈 Complexity Distribution:")
total_packages = len(complexity)
high_complexity = sum(1 for pkg in complexity if pkg[6] > 20)
medium_complexity = sum(1 for pkg in complexity if 10 <= pkg[6] <= 20)
low_complexity = total_packages - high_complexity - medium_complexity

print(f"  High Complexity (>20): {high_complexity} packages ({high_complexity/total_packages*100:.1f}%)")
print(f"  Medium Complexity (10-20): {medium_complexity} packages ({medium_complexity/total_packages*100:.1f}%)")
print(f"  Low Complexity (<10): {low_complexity} packages ({low_complexity/total_packages*100:.1f}%)")

In [None]:
# Data lineage impact analysis
lineage_impact = """
MATCH (source:Node {node_type: 'DATA_ASSET'})
WHERE source.name CONTAINS 'Customer'
MATCH path = (source)-[*1..4]->(downstream:Node)
WHERE downstream.node_type IN ['DATA_ASSET', 'OPERATION']
WITH source, downstream, length(path) as distance
RETURN 
    source.name as source_asset,
    count(DISTINCT downstream) as affected_nodes,
    collect(DISTINCT downstream.name)[0..5] as sample_affected,
    avg(distance) as avg_distance,
    max(distance) as max_distance
ORDER BY affected_nodes DESC
"""

lineage = execute_and_fetch(lineage_impact)
pretty_print(lineage, "Data Lineage Impact Analysis")

print("\n🔄 Impact Assessment:")
for item in lineage:
    impact_level = "CRITICAL" if item[1] > 10 else "HIGH" if item[1] > 5 else "MODERATE"
    print(f"  {item[0]}: affects {item[1]} nodes - {impact_level} IMPACT")
    print(f"    Sample affected: {', '.join(item[2])}")
    print(f"    Propagation depth: avg {item[3]:.1f}, max {item[4]}")
    print()

## 3. Pattern Matching and Graph Analytics

Advanced pattern matching to identify architectural patterns and anti-patterns.

In [None]:
# Identify ETL patterns (Extract-Transform-Load chains)
etl_patterns = """
MATCH (extract:Node {node_type: 'OPERATION'})-[:READS_FROM]->(source:Node {node_type: 'DATA_ASSET'})
MATCH (extract)-[:WRITES_TO]->(intermediate:Node {node_type: 'DATA_ASSET'})
MATCH (transform:Node {node_type: 'OPERATION'})-[:READS_FROM]->(intermediate)
MATCH (transform)-[:WRITES_TO]->(target:Node {node_type: 'DATA_ASSET'})
WHERE extract.operation_type CONTAINS 'Source' 
  AND transform.operation_type CONTAINS 'Transform'
RETURN DISTINCT
    source.name as source_table,
    extract.name as extract_operation,
    intermediate.name as intermediate_stage,
    transform.name as transform_operation,
    target.name as target_table
LIMIT 10
"""

etl_chains = execute_and_fetch(etl_patterns)
pretty_print(etl_chains, "ETL Pattern Analysis")

print("\n🔧 ETL Chain Analysis:")
for chain in etl_chains:
    print(f"  {chain[0]} ➜ [{chain[1]}] ➜ {chain[2]} ➜ [{chain[3]}] ➜ {chain[4]}")

In [None]:
# Detect anti-patterns: operations that read from many sources (potential complexity issues)
anti_patterns = """
MATCH (op:Node {node_type: 'OPERATION'})-[:READS_FROM]->(source:Node {node_type: 'DATA_ASSET'})
WITH op, collect(DISTINCT source.name) as sources
WHERE size(sources) >= 3
MATCH (op)<-[:CONTAINS]-(pkg:Node {node_type: 'PIPELINE'})
RETURN 
    pkg.name as package_name,
    op.name as operation_name,
    op.operation_type as operation_type,
    size(sources) as source_count,
    sources
ORDER BY source_count DESC
"""

anti_pattern_results = execute_and_fetch(anti_patterns)
pretty_print(anti_pattern_results, "Potential Anti-Patterns")

print("\n⚠️ Anti-Pattern Analysis:")
for pattern in anti_pattern_results:
    risk_level = "HIGH" if pattern[3] > 5 else "MEDIUM"
    print(f"  {pattern[1]} in {pattern[0]}: reads from {pattern[3]} sources - {risk_level} COMPLEXITY")
    print(f"    Sources: {', '.join(pattern[4][:3])}{'...' if len(pattern[4]) > 3 else ''}")
    print()

## 4. Performance Optimization Queries

Queries designed for performance and using materialized views.

In [None]:
# Compare performance: raw query vs materialized view
import time

# Raw query approach
raw_query = """
MATCH (p:Node {node_type: 'PIPELINE'})
MATCH (p)-[:CONTAINS]->(op:Node {node_type: 'OPERATION'})
WHERE op.operation_type CONTAINS 'SQL' OR op.sql_command IS NOT NULL
MATCH (op)-[:READS_FROM|WRITES_TO]->(table:Node {node_type: 'DATA_ASSET'})
RETURN 
    op.node_id as operation_id,
    op.name as operation_name,
    CASE WHEN op.sql_command IS NOT NULL THEN 'SQL_TASK' ELSE 'DATA_FLOW' END as sql_type,
    collect(DISTINCT table.name) as affected_tables
"""

# Materialized view approach
materialized_query = """
MATCH (v:Node {id: 'view:sql_operations_catalog'})
RETURN JSON_EXTRACT(v.properties, '$.data') as operations
"""

# Performance test
print("🏁 Performance Comparison:")

# Test raw query
start_time = time.time()
raw_results = execute_and_fetch(raw_query)
raw_time = (time.time() - start_time) * 1000

# Test materialized view
start_time = time.time()
materialized_results = execute_and_fetch(materialized_query)
materialized_time = (time.time() - start_time) * 1000

print(f"  Raw Query: {raw_time:.2f}ms ({len(raw_results)} results)")
print(f"  Materialized View: {materialized_time:.2f}ms")
print(f"  Speed Improvement: {raw_time/materialized_time:.1f}x faster")

# Show materialized view data
if materialized_results and materialized_results[0][0]:
    operations_data = json.loads(materialized_results[0][0])
    print(f"  Materialized View Records: {len(operations_data)}")
    
    # Show sample from materialized view
    print("\n📋 Sample from Materialized View:")
    for i, op in enumerate(operations_data[:3]):
        print(f"  {i+1}. {op.get('operation_name', 'Unknown')} ({op.get('sql_type', 'Unknown')})")
        print(f"     Tables: {', '.join(op.get('affected_tables', []))}")

In [None]:
# Efficient dependency analysis using materialized views
dependency_analysis = """
MATCH (v:Node {id: 'view:cross_package_dependencies'})
RETURN JSON_EXTRACT(v.properties, '$.data') as dependencies
"""

deps = execute_and_fetch(dependency_analysis)
if deps and deps[0][0]:
    dependencies_data = json.loads(deps[0][0])
    
    print("🔗 Cross-Package Dependencies (from Materialized View):")
    print(f"  Total Dependencies: {len(dependencies_data)}")
    
    # Group by risk level
    risk_groups = {}
    for dep in dependencies_data:
        risk = dep.get('risk_level', 'UNKNOWN')
        if risk not in risk_groups:
            risk_groups[risk] = []
        risk_groups[risk].append(dep)
    
    for risk_level, deps_in_level in risk_groups.items():
        print(f"\n  {risk_level} Risk Dependencies: {len(deps_in_level)}")
        for dep in deps_in_level[:3]:  # Show first 3
            print(f"    {dep.get('source_package', 'Unknown')} → {dep.get('target_package', 'Unknown')}")
            if dep.get('shared_resources'):
                print(f"      Shared: {', '.join(dep['shared_resources'][:2])}")
        if len(deps_in_level) > 3:
            print(f"    ... and {len(deps_in_level) - 3} more")

## 5. Custom Analytics Functions

Build reusable analytics functions for common analysis patterns.

In [None]:
class SSISAnalytics:
    """Custom analytics functions for SSIS graph analysis"""
    
    def __init__(self, mg_connection):
        self.mg = mg_connection
    
    def get_package_health_score(self, package_name=None):
        """Calculate health score for packages based on complexity metrics"""
        query = """
        MATCH (p:Node {node_type: 'PIPELINE'})
        WHERE $package_name IS NULL OR p.name = $package_name
        OPTIONAL MATCH (p)-[:CONTAINS]->(op:Node {node_type: 'OPERATION'})
        OPTIONAL MATCH (op)-[:READS_FROM|WRITES_TO]->(da:Node {node_type: 'DATA_ASSET'})
        OPTIONAL MATCH (p)-[:USES_CONNECTION]->(conn:Node {node_type: 'CONNECTION'})
        WITH p,
             count(DISTINCT op) as ops,
             count(DISTINCT da) as assets,
             count(DISTINCT conn) as conns
        RETURN 
            p.name as package,
            ops, assets, conns,
            CASE 
                WHEN ops <= 5 AND assets <= 10 AND conns <= 3 THEN 'EXCELLENT'
                WHEN ops <= 10 AND assets <= 20 AND conns <= 5 THEN 'GOOD'
                WHEN ops <= 20 AND assets <= 40 AND conns <= 8 THEN 'FAIR'
                ELSE 'NEEDS_ATTENTION'
            END as health_score
        ORDER BY ops DESC, assets DESC
        """
        
        results = execute_and_fetch(query, {"package_name": package_name})
        return results
    
    def find_orphaned_assets(self):
        """Find data assets that are not connected to any operations"""
        query = """
        MATCH (da:Node {node_type: 'DATA_ASSET'})
        WHERE NOT (da)<-[:READS_FROM|WRITES_TO]-()
        RETURN da.name as orphaned_asset, da.asset_type as type
        ORDER BY da.name
        """
        
        return execute_and_fetch(query)
    
    def get_migration_readiness(self):
        """Assess migration readiness based on complexity and dependencies"""
        # Use materialized view for fast analysis
        complexity_query = """
        MATCH (v:Node {id: 'view:complexity_metrics'})
        RETURN JSON_EXTRACT(v.properties, '$.data') as complexity
        """
        
        deps_query = """
        MATCH (v:Node {id: 'view:cross_package_dependencies'})
        RETURN JSON_EXTRACT(v.properties, '$.data') as dependencies
        """
        
        complexity_result = execute_and_fetch(complexity_query)
        deps_result = execute_and_fetch(deps_query)
        
        readiness = {
            "overall_score": "CALCULATING",
            "complexity_score": 0,
            "dependency_score": 0,
            "recommendations": []
        }
        
        # Analyze complexity
        if complexity_result and complexity_result[0][0]:
            complexity_data = json.loads(complexity_result[0][0])
            high_complexity = sum(1 for item in complexity_data if item.get('complexity_level') == 'HIGH')
            total_packages = len(complexity_data)
            
            if total_packages > 0:
                complexity_ratio = high_complexity / total_packages
                readiness["complexity_score"] = max(0, 100 - (complexity_ratio * 100))
                
                if complexity_ratio > 0.3:
                    readiness["recommendations"].append("Consider simplifying high-complexity packages before migration")
        
        # Analyze dependencies
        if deps_result and deps_result[0][0]:
            deps_data = json.loads(deps_result[0][0])
            high_risk_deps = sum(1 for dep in deps_data if dep.get('risk_level') == 'HIGH')
            total_deps = len(deps_data)
            
            if total_deps > 0:
                risk_ratio = high_risk_deps / total_deps
                readiness["dependency_score"] = max(0, 100 - (risk_ratio * 100))
                
                if risk_ratio > 0.2:
                    readiness["recommendations"].append("Address high-risk dependencies before migration")
        
        # Overall score
        overall = (readiness["complexity_score"] + readiness["dependency_score"]) / 2
        if overall >= 80:
            readiness["overall_score"] = "READY"
        elif overall >= 60:
            readiness["overall_score"] = "MOSTLY_READY"
        elif overall >= 40:
            readiness["overall_score"] = "NEEDS_WORK"
        else:
            readiness["overall_score"] = "NOT_READY"
        
        return readiness

# Initialize analytics
analytics = SSISAnalytics(mg)

print("🧰 Custom Analytics Functions Ready")
print("   - get_package_health_score()")
print("   - find_orphaned_assets()")
print("   - get_migration_readiness()")

In [None]:
# Test the custom analytics functions

# 1. Package Health Scores
health_scores = analytics.get_package_health_score()
print("🏥 Package Health Assessment:")
for package in health_scores:
    print(f"  {package[0]}: {package[4]} (ops: {package[1]}, assets: {package[2]}, conns: {package[3]})")

print("\n" + "="*60)

# 2. Orphaned Assets
orphaned = analytics.find_orphaned_assets()
print(f"\n🏝️ Orphaned Assets: {len(orphaned)} found")
for asset in orphaned[:5]:  # Show first 5
    print(f"  {asset[0]} ({asset[1] if asset[1] else 'Unknown type'})")

print("\n" + "="*60)

# 3. Migration Readiness
readiness = analytics.get_migration_readiness()
print(f"\n🚀 Migration Readiness Assessment:")
print(f"  Overall Score: {readiness['overall_score']}")
print(f"  Complexity Score: {readiness['complexity_score']:.1f}/100")
print(f"  Dependency Score: {readiness['dependency_score']:.1f}/100")

if readiness['recommendations']:
    print(f"\n  📋 Recommendations:")
    for rec in readiness['recommendations']:
        print(f"    • {rec}")
else:
    print(f"\n  ✅ No specific recommendations - system looks healthy!")

## 6. Advanced Visualization Queries

Queries that prepare data for visualization and reporting.

In [None]:
# Create network data for visualization tools
network_data_query = """
MATCH (n:Node)-[r]->(m:Node)
WHERE n.node_type IN ['PIPELINE', 'OPERATION', 'DATA_ASSET'] 
  AND m.node_type IN ['PIPELINE', 'OPERATION', 'DATA_ASSET']
RETURN 
    n.node_id as source,
    n.name as source_name,
    n.node_type as source_type,
    type(r) as relationship,
    m.node_id as target,
    m.name as target_name,
    m.node_type as target_type
LIMIT 50
"""

network_data = execute_and_fetch(network_data_query)

# Convert to format suitable for network visualization libraries
nodes = set()
edges = []

for row in network_data:
    source_id, source_name, source_type, rel_type, target_id, target_name, target_type = row
    
    nodes.add((source_id, source_name, source_type))
    nodes.add((target_id, target_name, target_type))
    
    edges.append({
        "source": source_id,
        "target": target_id,
        "relationship": rel_type,
        "source_name": source_name,
        "target_name": target_name
    })

# Convert nodes to list
nodes_list = [{"id": node[0], "name": node[1], "type": node[2]} for node in nodes]

print(f"📊 Network Visualization Data:")
print(f"  Nodes: {len(nodes_list)}")
print(f"  Edges: {len(edges)}")

# Show type distribution
type_counts = {}
for node in nodes_list:
    node_type = node['type']
    type_counts[node_type] = type_counts.get(node_type, 0) + 1

print(f"\n  Node Type Distribution:")
for node_type, count in type_counts.items():
    print(f"    {node_type}: {count}")

# Show relationship distribution
rel_counts = {}
for edge in edges:
    rel_type = edge['relationship']
    rel_counts[rel_type] = rel_counts.get(rel_type, 0) + 1

print(f"\n  Relationship Type Distribution:")
for rel_type, count in rel_counts.items():
    print(f"    {rel_type}: {count}")

In [None]:
# Create hierarchical data for tree visualizations
hierarchy_query = """
MATCH (p:Node {node_type: 'PIPELINE'})
OPTIONAL MATCH (p)-[:CONTAINS]->(op:Node {node_type: 'OPERATION'})
OPTIONAL MATCH (op)-[:READS_FROM|WRITES_TO]->(da:Node {node_type: 'DATA_ASSET'})
RETURN 
    p.name as package,
    collect(DISTINCT {
        name: op.name,
        type: op.operation_type,
        assets: [(op)-[:READS_FROM|WRITES_TO]->(asset:Node {node_type: 'DATA_ASSET'}) | asset.name]
    }) as operations
"""

hierarchy_data = execute_and_fetch(hierarchy_query)

print("🌳 Hierarchical Data Structure:")
for package_data in hierarchy_data[:3]:  # Show first 3 packages
    package_name = package_data[0]
    operations = package_data[1]
    
    print(f"\n📦 {package_name}")
    for op in operations[:3]:  # Show first 3 operations
        if op and op.get('name'):
            print(f"  ├── {op['name']} ({op.get('type', 'Unknown')})")
            if op.get('assets'):
                for asset in op['assets'][:2]:  # Show first 2 assets
                    print(f"  │   └── {asset}")
                if len(op['assets']) > 2:
                    print(f"  │   └── ... and {len(op['assets']) - 2} more")
    
    if len(operations) > 3:
        print(f"  └── ... and {len(operations) - 3} more operations")

print(f"\n📊 Summary: {len(hierarchy_data)} packages prepared for hierarchical visualization")

## 7. Query Performance Tips

Best practices for writing efficient Cypher queries.

In [None]:
# Performance comparison: different query patterns
import time

def time_query(query_name, query, params=None):
    """Time a query execution"""
    start_time = time.time()
    results = execute_and_fetch(query, params)
    execution_time = (time.time() - start_time) * 1000
    return execution_time, len(results)

print("⚡ Query Performance Comparison:")

# 1. Inefficient: No filtering early
inefficient_query = """
MATCH (n:Node)-[r]->(m:Node)
WHERE n.node_type = 'PIPELINE' AND m.node_type = 'OPERATION'
RETURN count(*)
"""

# 2. Efficient: Filter early in MATCH
efficient_query = """
MATCH (n:Node {node_type: 'PIPELINE'})-[r]->(m:Node {node_type: 'OPERATION'})
RETURN count(*)
"""

# 3. Most efficient: Use materialized view when available
materialized_query = """
MATCH (v:Node {id: 'view:graph_summary_stats'})
RETURN JSON_EXTRACT(v.properties, '$.data') as stats
"""

# Test queries
time1, results1 = time_query("Inefficient (filter in WHERE)", inefficient_query)
time2, results2 = time_query("Efficient (filter in MATCH)", efficient_query)
time3, results3 = time_query("Materialized View", materialized_query)

print(f"  1. Inefficient query: {time1:.2f}ms")
print(f"  2. Efficient query: {time2:.2f}ms ({time1/time2:.1f}x faster)")
print(f"  3. Materialized view: {time3:.2f}ms ({time1/time3:.1f}x faster)")

print("\n📝 Performance Tips:")
print("  • Filter early in MATCH clauses, not WHERE clauses")
print("  • Use materialized views for frequently-accessed data")
print("  • Limit results with LIMIT when doing exploratory queries")
print("  • Use OPTIONAL MATCH only when necessary")
print("  • Consider using WITH to break complex queries into steps")

## Summary

This notebook covered advanced querying techniques including:

1. **Complex Path Analysis** - Multi-hop queries for data lineage and bottleneck detection
2. **Advanced Aggregations** - Comprehensive system complexity analysis
3. **Pattern Matching** - ETL pattern detection and anti-pattern identification
4. **Performance Optimization** - Using materialized views for fast queries
5. **Custom Analytics** - Building reusable analysis functions
6. **Visualization Data** - Preparing data for network and hierarchical visualizations
7. **Performance Tips** - Best practices for efficient Cypher queries

### Key Takeaways:
- Materialized views provide significant performance benefits (10-100x faster)
- Pattern matching can identify architectural issues early
- Custom analytics functions make complex analysis reusable
- Early filtering in MATCH clauses improves query performance
- The graph structure enables sophisticated path and dependency analysis

### Next Steps:
- Combine these techniques in migration analysis scenarios
- Build custom dashboards using the visualization data formats
- Create monitoring queries for ongoing system health assessment