# Dynamic Data Intelligence (DDI) Workshop Notebook

> Aligns with slides: setup → lineage → quality → risk → governance.

Objectives:
- Build a small lineage graph of AI data assets
- Compute basic quality metrics
- Analyze risk propagation across the graph
- Simulate simple governance rules and actions

Sections:
1. Setup and imports
2. Build asset inventory and lineage
3. Quality monitoring basics
4. Risk propagation analysis
5. Governance rules and automated actions

In [None]:
# 1) Setup and imports
from datetime import datetime, timedelta
import os
import numpy as np
import pandas as pd
import networkx as nx

from governance_tools.data_lineage import (
    DataLineageTracker, DataAsset, DataAssetType, RiskLevel
)

# Optional: configure data paths
DATA_DIR = os.path.join(os.getcwd(), 'data-samples')
print('Data dir:', DATA_DIR)

Installing packages: ['numpy==1.24.3']
Installing packages: ['numpy==1.24.3']
Installing packages: ['numpy==1.24.3']


In [4]:
# 2) Build asset inventory and lineage
tracker = DataLineageTracker()

now = datetime.now()

assets = [
    DataAsset(
        id="customer_raw_data", name="Customer Raw Data",
        type=DataAssetType.SOURCE, source_system="CRM",
        created_at=now - timedelta(days=30), last_updated=now - timedelta(days=1),
        quality_score=0.85, risk_level=RiskLevel.MEDIUM,
        metadata={"format": "CSV", "size_gb": 2.5},
    ),
    DataAsset(
        id="customer_cleaned_data", name="Customer Cleaned Data",
        type=DataAssetType.TRANSFORMATION, source_system="Data Pipeline",
        created_at=now - timedelta(days=29), last_updated=now - timedelta(hours=6),
        quality_score=0.92, risk_level=RiskLevel.LOW,
        metadata={"transformation": "data_cleaning_v2"},
    ),
    DataAsset(
        id="churn_prediction_model", name="Customer Churn Prediction Model",
        type=DataAssetType.MODEL, source_system="ML Platform",
        created_at=now - timedelta(days=7), last_updated=now - timedelta(hours=12),
        quality_score=0.88, risk_level=RiskLevel.HIGH,
        metadata={"model_type": "RandomForest", "accuracy": 0.94},
    ),
    DataAsset(
        id="churn_predictions", name="Churn Predictions",
        type=DataAssetType.OUTPUT, source_system="Serving",
        created_at=now - timedelta(days=2), last_updated=now - timedelta(hours=1),
        quality_score=0.90, risk_level=RiskLevel.MEDIUM,
        metadata={"sla": "99.9%"},
    ),
]

for a in assets:
    tracker.add_asset(a)

tracker.add_dependency("customer_raw_data", "customer_cleaned_data", "data_cleaning", 0.95)
tracker.add_dependency("customer_cleaned_data", "churn_prediction_model", "model_training", 0.90)
tracker.add_dependency("churn_prediction_model", "churn_predictions", "inference", 0.98)

print(f"Assets: {len(tracker.assets)} | Dependencies: {tracker.graph.number_of_edges()}")

NameError: name 'DataLineageTracker' is not defined

In [5]:
# 3) Quality monitoring basics
import numpy as np

# Simulated dataset quality metrics for assets
quality_metrics = pd.DataFrame([
    {
        'asset_id': a.id,
        'completeness': np.clip(a.quality_score + np.random.normal(0, 0.05), 0, 1),
        'validity': np.clip(a.quality_score + np.random.normal(0, 0.05), 0, 1),
        'consistency': np.clip(a.quality_score + np.random.normal(0, 0.05), 0, 1),
        'timeliness': np.clip(a.quality_score + np.random.normal(0, 0.05), 0, 1),
    }
    for a in assets
])

quality_thresholds = {
    'completeness': 0.95,
    'validity': 0.90,
    'consistency': 0.85,
    'timeliness': 0.90,
}

def check_thresholds(row, thresholds):
    violations = []
    for k, v in thresholds.items():
        if row[k] < v:
            violations.append(f"{k}<{v}")
    return violations

quality_metrics['violations'] = quality_metrics.apply(lambda r: check_thresholds(r, quality_thresholds), axis=1)
display(quality_metrics)
print("Assets below thresholds:")
display(quality_metrics[quality_metrics['violations'].map(len) > 0][['asset_id', 'violations']])

NameError: name 'pd' is not defined

In [None]:
# 4) Risk propagation analysis
source_asset_id = 'customer_raw_data'
risk_scores = tracker.calculate_risk_propagation(source_asset_id)

print('Risk scores from', source_asset_id)
for k, v in risk_scores.items():
    print(f"  {k}: {v:.3f}")

report = tracker.generate_lineage_report()
print('\nLineage Report Summary:')
print('- Total assets:', report['total_assets'])
print('- Total dependencies:', report['total_dependencies'])
print('- Average quality score:', f"{report['quality_summary']['average_quality']:.2f}")
print('- Assets below quality threshold:', len(report['quality_summary']['below_threshold_assets']))

In [None]:
# 5) Governance rules and automated actions
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class RuleResult:
    asset_id: str
    rule: str
    passed: bool
    details: Dict

def evaluate_governance_rules(assets: List[DataAsset], quality_df: pd.DataFrame) -> List[RuleResult]:
    results: List[RuleResult] = []
    quality_by_asset = quality_df.set_index('asset_id').to_dict(orient='index')
    for a in assets:
        q = quality_by_asset.get(a.id, {})
        # Rule 1: Quality threshold gate
        overall_ok = (q.get('completeness', 1) >= 0.95 and
                      q.get('validity', 1) >= 0.90 and
                      q.get('consistency', 1) >= 0.85 and
                      q.get('timeliness', 1) >= 0.90)
        results.append(RuleResult(a.id, 'QualityThresholds', overall_ok, q))
        # Rule 2: High-risk assets must have high quality
        if a.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
            high_risk_ok = a.quality_score >= 0.90
            results.append(RuleResult(a.id, 'HighRiskQuality>=0.90', high_risk_ok, {'quality_score': a.quality_score}))
    return results

def remediation_actions(result: RuleResult):
    if result.rule == 'QualityThresholds' and not result.passed:
        return ['quarantine_dataset', 'notify_owner', 'open_incident']
    if result.rule == 'HighRiskQuality>=0.90' and not result.passed:
        return ['increase_sampling', 'trigger_retraining', 'add_monitoring']
    return []

results = evaluate_governance_rules(assets, quality_metrics)
violations = [r for r in results if not r.passed]
print(f"Violations: {len(violations)}")
for v in violations:
    print(f"- {v.asset_id} failed {v.rule} -> actions: {remediation_actions(v)}")

# Dynamic Data Intelligence Workshop

## Enabling Proactive Governance and Risk Management in AI Systems

Welcome to this hands-on workshop on Dynamic Data Intelligence (DDI). We'll explore practical techniques for understanding, assessing, and managing data risks in AI systems.

### What You'll Learn:
1. How to track data lineage and dependencies
2. Implementing dynamic quality assessment
3. Risk propagation analysis across data pipelines
4. Building proactive governance frameworks
5. Real-world case studies and solutions

## Setup Instructions

First, let's start our infrastructure:

```bash
# In your terminal, run:
docker-compose up -d
pip install -r requirements.txt
```

This will start:
- PostgreSQL (localhost:5432) - metadata storage
- DataHub (http://localhost:9002) - data catalog
- Redis (localhost:6379) - caching layer
- Jaeger (http://localhost:16686) - observability

In [1]:
# Import our DDI modules
import sys

from governance_tools.data_lineage import DataLineageTracker, DataAsset, DataAssetType, RiskLevel
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import json

print('✅ DDI modules loaded successfully!')
print('📊 Ready for Dynamic Data Intelligence exploration')
print('🔍 Data lineage tracking enabled')
print('⚠️ Risk assessment tools ready')


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.5 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Users\Public\Anaconda\Lib\site-packages\ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "c:\Users\Public\Anaconda\Lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance
    app.start()
  File "c:\Users\Public\Anaconda\Lib\site-packages\ipykernel\kernelapp.py", line 701, in start
    self.io_loop.start()
  File "c:\Users\Public\Anaconda\Lib\site-pack

ImportError: 
A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.5 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.




A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.5 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Users\Public\Anaconda\Lib\site-packages\ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "c:\Users\Public\Anaconda\Lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance
    app.start()
  File "c:\Users\Public\Anaconda\Lib\site-packages\ipykernel\kernelapp.py", line 701, in start
    self.io_loop.start()
  File "c:\Users\Public\Anaconda\Lib\site-pack

ImportError: 
A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.5 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.



ImportError: numpy.core.multiarray failed to import

## Part 1: Understanding Data Assets

Let's start by creating and cataloging different types of data assets in our AI system.

In [None]:
# Initialize our DDI system
ddi_tracker = DataLineageTracker()

# Create different types of data assets
print('🏗️ Creating sample data assets...')

# Raw data source
customer_data = DataAsset(
    id="customer_transactions",
    name="Customer Transaction Data",
    type=DataAssetType.SOURCE,
    source_system="Banking Core",
    created_at=datetime.now() - timedelta(days=90),
    last_updated=datetime.now() - timedelta(hours=2),
    quality_score=0.78,  # Below our threshold
    risk_level=RiskLevel.HIGH,  # Contains PII
    metadata={
        "format": "JSON",
        "size_gb": 15.2,
        "contains_pii": True,
        "update_frequency": "hourly"
    }
)

# Cleaned/processed data
processed_data = DataAsset(
    id="processed_transactions",
    name="Processed Transaction Features",
    type=DataAssetType.TRANSFORMATION,
    source_system="Data Pipeline",
    created_at=datetime.now() - timedelta(days=89),
    last_updated=datetime.now() - timedelta(hours=2),
    quality_score=0.92,
    risk_level=RiskLevel.MEDIUM,
    metadata={
        "transformation": "feature_engineering_v3",
        "features_count": 47,
        "anonymized": True
    }
)

# ML Model
fraud_model = DataAsset(
    id="fraud_detection_model",
    name="Real-time Fraud Detection Model",
    type=DataAssetType.MODEL,
    source_system="ML Platform",
    created_at=datetime.now() - timedelta(days=14),
    last_updated=datetime.now() - timedelta(days=3),
    quality_score=0.94,
    risk_level=RiskLevel.CRITICAL,  # Business critical
    metadata={
        "model_type": "XGBoost",
        "f1_score": 0.87,
        "precision": 0.91,
        "recall": 0.84
    }
)

# Model outputs
fraud_scores = DataAsset(
    id="fraud_risk_scores",
    name="Real-time Fraud Risk Scores",
    type=DataAssetType.OUTPUT,
    source_system="ML Platform",
    created_at=datetime.now() - timedelta(days=14),
    last_updated=datetime.now() - timedelta(minutes=5),
    quality_score=0.89,
    risk_level=RiskLevel.HIGH,
    metadata={
        "output_format": "real_time_stream",
        "latency_ms": 45,
        "throughput_tps": 1200
    }
)

# Add all assets to our tracker
for asset in [customer_data, processed_data, fraud_model, fraud_scores]:
    ddi_tracker.add_asset(asset)
    print(f'   📦 Added: {asset.name} (Risk: {asset.risk_level.value}, Quality: {asset.quality_score:.2f})')

## Part 2: Building Data Lineage

Now let's connect our assets to understand how data flows through our AI system.

In [None]:
print('🔗 Building data lineage connections...')

# Define the data flow
ddi_tracker.add_dependency(
    "customer_transactions", 
    "processed_transactions", 
    "data_cleaning_and_feature_engineering", 
    0.95
)

ddi_tracker.add_dependency(
    "processed_transactions", 
    "fraud_detection_model", 
    "model_training_and_validation", 
    0.92
)

ddi_tracker.add_dependency(
    "fraud_detection_model", 
    "fraud_risk_scores", 
    "real_time_inference", 
    0.98
)

print('✅ Data lineage established!')
print(f'📊 Total assets: {len(ddi_tracker.assets)}')
print(f'🔗 Total dependencies: {ddi_tracker.graph.number_of_edges()}')

# Let's visualize the lineage
print('\n🗺️ Data Flow Visualization:')
print('Customer Transactions → Processed Features → ML Model → Fraud Scores')
print('     (Risk: HIGH)    →    (Risk: MED)    → (Risk: CRIT) → (Risk: HIGH)')

## Part 3: Risk Propagation Analysis

Let's analyze how data quality issues and risks propagate through our system.

In [None]:
print('⚠️ Analyzing Risk Propagation...')

# Analyze risk propagation from our source data
source_risks = ddi_tracker.calculate_risk_propagation("customer_transactions")

print('\n📈 Risk Propagation from Customer Transactions:')
for asset_id, risk_score in source_risks.items():
    asset = ddi_tracker.assets[asset_id]
    print(f'   {asset.name}: {risk_score:.3f} (Quality: {asset.quality_score:.2f})')

# Find what assets would be impacted if our source data had issues
print('\n🎯 Impact Analysis:')
downstream_impacts = ddi_tracker.get_downstream_impacts("customer_transactions")
print(f'   Assets impacted by customer transaction issues: {len(downstream_impacts)}')

for asset_id in downstream_impacts:
    asset = ddi_tracker.assets[asset_id]
    print(f'   - {asset.name} ({asset.type.value})')

# Quality impact analysis
quality_impact = ddi_tracker.get_quality_impact_analysis("customer_transactions")
print(f'\n📉 Quality Impact Analysis:')
print(f'   Quality degradation risk: {quality_impact["quality_degradation_risk"]:.3f}')
print(f'   Models potentially affected: {len(quality_impact["impacted_models"])}')
print(f'   Outputs potentially affected: {len(quality_impact["impacted_outputs"])}')

## Part 4: Dynamic Quality Monitoring

Let's implement real-time quality monitoring and alerting.

In [None]:
import random

class DynamicQualityMonitor:
    def __init__(self, lineage_tracker):
        self.tracker = lineage_tracker
        self.quality_thresholds = {
            'completeness': 0.95,
            'validity': 0.90,
            'consistency': 0.85,
            'timeliness': 0.90
        }
        self.alerts = []
    
    def simulate_quality_check(self, asset_id):
        """Simulate a quality assessment"""
        if asset_id not in self.tracker.assets:
            return None
        
        # Simulate quality metrics (in real system, these would come from actual data profiling)
        quality_metrics = {
            'completeness': random.uniform(0.8, 1.0),
            'validity': random.uniform(0.75, 0.98),
            'consistency': random.uniform(0.70, 0.95),
            'timeliness': random.uniform(0.85, 1.0)
        }
        
        # Calculate overall quality score
        overall_quality = sum(quality_metrics.values()) / len(quality_metrics)
        
        # Check for threshold violations
        violations = []
        for metric, score in quality_metrics.items():
            if score < self.quality_thresholds[metric]:
                violations.append(f'{metric}: {score:.3f} < {self.quality_thresholds[metric]}')
        
        # Generate alerts if needed
        if violations:
            alert = {
                'timestamp': datetime.now(),
                'asset_id': asset_id,
                'asset_name': self.tracker.assets[asset_id].name,
                'violations': violations,
                'overall_quality': overall_quality,
                'downstream_impact': len(self.tracker.get_downstream_impacts(asset_id))
            }
            self.alerts.append(alert)
        
        return {
            'asset_id': asset_id,
            'metrics': quality_metrics,
            'overall_quality': overall_quality,
            'violations': violations
        }
    
    def monitor_all_assets(self):
        """Run quality checks on all assets"""
        results = []
        for asset_id in self.tracker.assets.keys():
            result = self.simulate_quality_check(asset_id)
            if result:
                results.append(result)
        return results

# Initialize quality monitoring
monitor = DynamicQualityMonitor(ddi_tracker)

print('🔍 Running Dynamic Quality Assessment...')
quality_results = monitor.monitor_all_assets()

for result in quality_results:
    asset = ddi_tracker.assets[result['asset_id']]
    print(f'\n📊 {asset.name}:')
    print(f'   Overall Quality: {result["overall_quality"]:.3f}')
    
    if result['violations']:
        print('   ⚠️ Quality Issues:')
        for violation in result['violations']:
            print(f'      - {violation}')
    else:
        print('   ✅ All quality thresholds met')

# Show alerts
if monitor.alerts:
    print('\n🚨 QUALITY ALERTS:')
    for alert in monitor.alerts:
        print(f'   Alert for {alert["asset_name"]}:')
        print(f'   - {len(alert["violations"])} violations found')
        print(f'   - {alert["downstream_impact"]} assets potentially impacted')
else:
    print('\n✅ No critical quality issues detected')

## Part 5: Proactive Risk Management

Let's implement proactive risk management strategies.

In [None]:
class ProactiveRiskManager:
    def __init__(self, lineage_tracker, quality_monitor):
        self.tracker = lineage_tracker
        self.monitor = quality_monitor
        self.risk_mitigation_strategies = {
            'data_backup': 'Create backup of critical data sources',
            'model_fallback': 'Use fallback model with different data sources',
            'quality_filtering': 'Filter out low-quality data points',
            'alert_stakeholders': 'Notify relevant stakeholders immediately',
            'circuit_breaker': 'Temporarily halt processing to prevent propagation'
        }
    
    def assess_system_risk(self):
        """Assess overall system risk"""
        risk_assessment = {
            'high_risk_assets': [],
            'quality_concerns': [],
            'critical_paths': [],
            'recommendations': []
        }
        
        # Identify high-risk assets
        for asset_id, asset in self.tracker.assets.items():
            if asset.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
                downstream_count = len(self.tracker.get_downstream_impacts(asset_id))
                risk_assessment['high_risk_assets'].append({
                    'asset': asset.name,
                    'risk_level': asset.risk_level.value,
                    'quality_score': asset.quality_score,
                    'downstream_impact': downstream_count
                })
        
        # Identify quality concerns
        for asset_id, asset in self.tracker.assets.items():
            if asset.quality_score < 0.85:
                risk_assessment['quality_concerns'].append({
                    'asset': asset.name,
                    'quality_score': asset.quality_score,
                    'risk_level': asset.risk_level.value
                })
        
        # Find critical paths
        critical_paths = self.tracker.identify_critical_paths()
        risk_assessment['critical_paths'] = critical_paths
        
        # Generate recommendations
        if risk_assessment['high_risk_assets']:
            risk_assessment['recommendations'].append(
                'Implement additional monitoring for high-risk assets'
            )
        
        if risk_assessment['quality_concerns']:
            risk_assessment['recommendations'].append(
                'Improve data quality processes for low-scoring assets'
            )
        
        if critical_paths:
            risk_assessment['recommendations'].append(
                'Create redundant data paths for critical flows'
            )
        
        return risk_assessment
    
    def simulate_incident(self, asset_id):
        """Simulate a data incident and recommended response"""
        if asset_id not in self.tracker.assets:
            return None
        
        asset = self.tracker.assets[asset_id]
        downstream = self.tracker.get_downstream_impacts(asset_id)
        
        incident = {
            'timestamp': datetime.now(),
            'affected_asset': asset.name,
            'incident_type': 'data_quality_degradation',
            'severity': 'HIGH' if len(downstream) > 2 else 'MEDIUM',
            'impact_radius': downstream,
            'recommended_actions': []
        }
        
        # Generate recommended actions based on asset type and impact
        if asset.type == DataAssetType.SOURCE:
            incident['recommended_actions'].extend([
                self.risk_mitigation_strategies['data_backup'],
                self.risk_mitigation_strategies['alert_stakeholders']
            ])
        
        if asset.type == DataAssetType.MODEL:
            incident['recommended_actions'].extend([
                self.risk_mitigation_strategies['model_fallback'],
                self.risk_mitigation_strategies['circuit_breaker']
            ])
        
        if len(downstream) > 2:
            incident['recommended_actions'].append(
                self.risk_mitigation_strategies['quality_filtering']
            )
        
        return incident

# Initialize risk manager
risk_manager = ProactiveRiskManager(ddi_tracker, monitor)

print('🛡️ Conducting System Risk Assessment...')
risk_assessment = risk_manager.assess_system_risk()

print('\n📊 RISK ASSESSMENT RESULTS:')
print(f'   High-risk assets: {len(risk_assessment["high_risk_assets"])}')
print(f'   Quality concerns: {len(risk_assessment["quality_concerns"])}')
print(f'   Critical paths: {len(risk_assessment["critical_paths"])}')

if risk_assessment['high_risk_assets']:
    print('\n⚠️ High-Risk Assets:')
    for asset in risk_assessment['high_risk_assets']:
        print(f'   - {asset["asset"]} (Risk: {asset["risk_level"]}, Quality: {asset["quality_score"]:.2f})')

if risk_assessment['recommendations']:
    print('\n💡 Recommendations:')
    for i, rec in enumerate(risk_assessment['recommendations'], 1):
        print(f'   {i}. {rec}')

# Simulate an incident
print('\n🚨 INCIDENT SIMULATION:')
incident = risk_manager.simulate_incident('customer_transactions')
if incident:
    print(f'   Incident: {incident["incident_type"]} in {incident["affected_asset"]}')
    print(f'   Severity: {incident["severity"]}')
    print(f'   Assets at risk: {len(incident["impact_radius"])}')
    print('   Recommended Actions:')
    for action in incident['recommended_actions']:
        print(f'   - {action}')

## Part 6: Comprehensive DDI Report

Let's generate a comprehensive Dynamic Data Intelligence report.

In [None]:
# Generate comprehensive lineage report
lineage_report = ddi_tracker.generate_lineage_report()

print('📋 DYNAMIC DATA INTELLIGENCE REPORT')
print('=' * 50)
print(f'Generated: {lineage_report["generated_at"]}')

print('\n📊 SYSTEM OVERVIEW:')
print(f'   Total Data Assets: {lineage_report["total_assets"]}')
print(f'   Total Dependencies: {lineage_report["total_dependencies"]}')
print(f'   Average Quality Score: {lineage_report["quality_summary"]["average_quality"]:.3f}')

print('\n🏗️ ASSET TYPE DISTRIBUTION:')
for asset_type, count in lineage_report['asset_types'].items():
    print(f'   {asset_type.title()}: {count}')

print('\n⚠️ RISK DISTRIBUTION:')
for risk_level, count in lineage_report['risk_distribution'].items():
    print(f'   {risk_level.title()}: {count}')

if lineage_report['quality_summary']['below_threshold_assets']:
    print('\n🔍 QUALITY CONCERNS:')
    for asset in lineage_report['quality_summary']['below_threshold_assets']:
        print(f'   - {asset["name"]}: Quality {asset["quality_score"]:.3f}')

if lineage_report['critical_paths']:
    print('\n🛤️ CRITICAL DATA PATHS:')
    for i, path in enumerate(lineage_report['critical_paths'], 1):
        path_names = [ddi_tracker.assets[asset_id].name for asset_id in path]
        print(f'   Path {i}: {": → ".join(path_names)}')

print('\n💡 KEY INSIGHTS:')
insights = []

if lineage_report['quality_summary']['average_quality'] < 0.85:
    insights.append('Overall system quality is below recommended threshold')

high_risk_count = lineage_report['risk_distribution'].get('high', 0) + lineage_report['risk_distribution'].get('critical', 0)
if high_risk_count > 0:
    insights.append(f'{high_risk_count} assets classified as high or critical risk')

if lineage_report['critical_paths']:
    insights.append(f'{len(lineage_report["critical_paths"])} critical data paths require attention')

if not insights:
    insights.append('System appears to be operating within acceptable parameters')

for i, insight in enumerate(insights, 1):
    print(f'   {i}. {insight}')

print('\n🎯 RECOMMENDED ACTIONS:')
actions = [
    'Implement continuous quality monitoring for all data sources',
    'Establish automated alerts for quality threshold violations',
    'Create backup data sources for critical assets',
    'Regular review and update of risk classifications',
    'Implement data validation at ingestion points'
]

for i, action in enumerate(actions, 1):
    print(f'   {i}. {action}')

## Part 7: Advanced Analytics

Let's explore some advanced DDI analytics techniques.

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

def analyze_data_centrality(tracker):
    """Analyze which data assets are most central to the system"""
    
    # Calculate different centrality measures
    centrality_measures = {}
    
    # Degree centrality (number of connections)
    degree_centrality = nx.degree_centrality(tracker.graph)
    
    # Betweenness centrality (how often a node appears on shortest paths)
    betweenness_centrality = nx.betweenness_centrality(tracker.graph)
    
    # PageRank (importance based on connections)
    pagerank = nx.pagerank(tracker.graph)
    
    print('🎯 DATA ASSET CENTRALITY ANALYSIS:')
    print('\nMost Connected Assets (Degree Centrality):')
    sorted_degree = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)
    for asset_id, score in sorted_degree:
        asset_name = tracker.assets[asset_id].name
        print(f'   {asset_name}: {score:.3f}')
    
    print('\nMost Critical Path Assets (Betweenness Centrality):')
    sorted_betweenness = sorted(betweenness_centrality.items(), key=lambda x: x[1], reverse=True)
    for asset_id, score in sorted_betweenness:
        asset_name = tracker.assets[asset_id].name
        print(f'   {asset_name}: {score:.3f}')
    
    print('\nMost Influential Assets (PageRank):')
    sorted_pagerank = sorted(pagerank.items(), key=lambda x: x[1], reverse=True)
    for asset_id, score in sorted_pagerank:
        asset_name = tracker.assets[asset_id].name
        print(f'   {asset_name}: {score:.3f}')
    
    return {
        'degree': degree_centrality,
        'betweenness': betweenness_centrality,
        'pagerank': pagerank
    }

def simulate_cascading_failure(tracker, failed_asset_id):
    """Simulate what happens when an asset fails"""
    if failed_asset_id not in tracker.assets:
        return None
    
    failed_asset = tracker.assets[failed_asset_id]
    
    # Get all downstream assets that would be affected
    affected_assets = tracker.get_downstream_impacts(failed_asset_id)
    
    # Calculate business impact score
    business_impact = 0
    critical_systems_affected = 0
    
    for asset_id in affected_assets:
        asset = tracker.assets[asset_id]
        if asset.risk_level == RiskLevel.CRITICAL:
            critical_systems_affected += 1
            business_impact += 0.4
        elif asset.risk_level == RiskLevel.HIGH:
            business_impact += 0.2
        elif asset.risk_level == RiskLevel.MEDIUM:
            business_impact += 0.1
    
    failure_analysis = {
        'failed_asset': failed_asset.name,
        'total_affected': len(affected_assets),
        'critical_systems_affected': critical_systems_affected,
        'estimated_business_impact': min(business_impact, 1.0),
        'recovery_priority': 'HIGH' if critical_systems_affected > 0 else 'MEDIUM',
        'affected_asset_details': [
            {
                'name': tracker.assets[asset_id].name,
                'type': tracker.assets[asset_id].type.value,
                'risk_level': tracker.assets[asset_id].risk_level.value
            }
            for asset_id in affected_assets
        ]
    }
    
    return failure_analysis

# Run advanced analytics
print('🔬 ADVANCED DDI ANALYTICS')
print('=' * 40)

centrality_analysis = analyze_data_centrality(ddi_tracker)

print('\n💥 CASCADING FAILURE ANALYSIS:')
failure_scenarios = ['customer_transactions', 'fraud_detection_model']

for scenario_asset in failure_scenarios:
    if scenario_asset in ddi_tracker.assets:
        analysis = simulate_cascading_failure(ddi_tracker, scenario_asset)
        if analysis:
            print(f'\n   Scenario: {analysis["failed_asset"]} Failure')
            print(f'   - Total assets affected: {analysis["total_affected"]}')
            print(f'   - Critical systems affected: {analysis["critical_systems_affected"]}')
            print(f'   - Business impact score: {analysis["estimated_business_impact"]:.3f}')
            print(f'   - Recovery priority: {analysis["recovery_priority"]}')

## Part 8: Interactive Exploration

Now it's your turn to explore and experiment with DDI concepts!

In [None]:
# 🧪 EXPERIMENT: Add your own data asset and see how it affects the system

print('🧪 ADD YOUR OWN DATA ASSET:')

# Create a new asset - modify these values to experiment
your_asset = DataAsset(
    id="my_experimental_asset",
    name="My Experimental Data Asset",
    type=DataAssetType.SOURCE,  # Try: SOURCE, TRANSFORMATION, MODEL, OUTPUT
    source_system="My System",
    created_at=datetime.now() - timedelta(days=1),
    last_updated=datetime.now(),
    quality_score=0.75,  # Try different values: 0.0 to 1.0
    risk_level=RiskLevel.MEDIUM,  # Try: LOW, MEDIUM, HIGH, CRITICAL
    metadata={"experiment": True}
)

# Add to tracker
ddi_tracker.add_asset(your_asset)

# Connect it to existing assets (experiment with different connections)
# ddi_tracker.add_dependency("my_experimental_asset", "processed_transactions", "my_transformation", 0.8)

print(f'✅ Added: {your_asset.name}')
print(f'   Risk Level: {your_asset.risk_level.value}')
print(f'   Quality Score: {your_asset.quality_score}')

# Re-run analysis with your new asset
updated_report = ddi_tracker.generate_lineage_report()
print(f'\n📊 Updated System Stats:')
print(f'   Total Assets: {updated_report["total_assets"]}')
print(f'   Average Quality: {updated_report["quality_summary"]["average_quality"]:.3f}')

# What changes did your asset make to the system?
print('\n🤔 Discussion Questions:')
print('1. How did adding your asset affect the overall system quality?')
print('2. What would happen if your asset had a quality issue?')
print('3. How would you monitor this asset in production?')

## Part 9: Real-World Implementation Strategies

Let's discuss practical implementation approaches for DDI.

In [None]:
print('🏢 REAL-WORLD DDI IMPLEMENTATION STRATEGIES')
print('=' * 50)

implementation_phases = {
    'Phase 1: Foundation (Months 1-2)': [
        '• Inventory all data assets and systems',
        '• Establish data quality metrics and thresholds',
        '• Implement basic lineage tracking',
        '• Set up monitoring infrastructure'
    ],
    'Phase 2: Intelligence (Months 3-4)': [
        '• Deploy automated quality assessment',
        '• Build risk scoring algorithms',
        '• Create impact analysis capabilities',
        '• Establish alerting and notification systems'
    ],
    'Phase 3: Proactive Governance (Months 5-6)': [
        '• Implement predictive quality monitoring',
        '• Deploy automated remediation workflows',
        '• Build governance dashboards',
        '• Establish governance policies and procedures'
    ]
}

for phase, tasks in implementation_phases.items():
    print(f'\n📅 {phase}')
    for task in tasks:
        print(f'   {task}')

print('\n🛠️ TECHNICAL ARCHITECTURE COMPONENTS:')
architecture_components = {
    'Data Catalog': 'Apache Atlas, DataHub, or AWS Glue Data Catalog',
    'Quality Engine': 'Great Expectations, Deequ, or custom Python validators',
    'Lineage Tracking': 'OpenLineage, DataHub, or custom graph database',
    'Monitoring': 'Prometheus + Grafana, DataDog, or custom dashboards',
    'Workflow Orchestration': 'Apache Airflow, Prefect, or AWS Step Functions',
    'Storage': 'PostgreSQL for metadata, Redis for caching, S3 for artifacts'
}

for component, options in architecture_components.items():
    print(f'   {component}: {options}')

print('\n⚠️ COMMON IMPLEMENTATION CHALLENGES:')
challenges_and_solutions = {
    'Data Silos': 'Use APIs and standardized metadata formats for integration',
    'Scale': 'Implement sampling strategies and distributed processing',
    'Legacy Systems': 'Start with new systems, gradually extend to legacy',
    'Cultural Resistance': 'Demonstrate value early with quick wins',
    'Resource Constraints': 'Begin with critical paths, expand incrementally'
}

for challenge, solution in challenges_and_solutions.items():
    print(f'   • {challenge}: {solution}')

print('\n📊 SUCCESS METRICS TO TRACK:')
success_metrics = [
    'Mean Time to Detection (MTTD) for data quality issues',
    'Mean Time to Resolution (MTTR) for data incidents',
    'Percentage of data assets with quality scores > 90%',
    'Number of prevented downstream incidents',
    'Reduction in model performance degradation events',
    'Stakeholder satisfaction with data reliability'
]

for i, metric in enumerate(success_metrics, 1):
    print(f'   {i}. {metric}')

## Workshop Summary and Next Steps

Let's wrap up with key takeaways and actionable next steps.

In [None]:
print('🎓 DYNAMIC DATA INTELLIGENCE WORKSHOP SUMMARY')
print('=' * 55)

print('\n🎯 KEY TAKEAWAYS:')
takeaways = [
    'Data lineage is essential for understanding system dependencies',
    'Quality issues propagate and amplify through connected systems',
    'Proactive monitoring prevents small issues from becoming big problems',
    'Risk assessment must consider both direct and indirect impacts',
    'Graph-based analysis reveals hidden system vulnerabilities',
    'Automation is key to scaling data governance efforts'
]

for i, takeaway in enumerate(takeaways, 1):
    print(f'   {i}. {takeaway}')

print('\n🛠️ TOOLS AND TECHNIQUES EXPLORED:')
techniques = [
    '✅ Data asset inventory and classification',
    '✅ Automated lineage tracking with NetworkX',
    '✅ Risk propagation analysis',
    '✅ Dynamic quality monitoring',
    '✅ Centrality analysis for critical path identification',
    '✅ Cascading failure simulation',
    '✅ Proactive governance recommendations'
]

for technique in techniques:
    print(f'   {technique}')

print('\n🚀 IMMEDIATE NEXT STEPS (This Week):')
immediate_steps = [
    'Inventory your critical data assets and AI models',
    'Identify your highest-risk data dependencies',
    'Set up basic quality monitoring for key datasets',
    'Create a simple lineage map of your most critical AI workflow'
]

for i, step in enumerate(immediate_steps, 1):
    print(f'   {i}. {step}')

print('\n📅 LONGER-TERM ACTIONS (Next Month):')
longterm_actions = [
    'Implement automated lineage tracking in your data pipelines',
    'Build a data quality dashboard for stakeholders',
    'Establish data quality SLAs and alerting',
    'Create incident response procedures for data quality issues',
    'Train your team on DDI principles and tools'
]

for i, action in enumerate(longterm_actions, 1):
    print(f'   {i}. {action}')

print('\n🌐 RESOURCES FOR CONTINUED LEARNING:')
resources = [
    'Great Expectations documentation: https://docs.greatexpectations.io/',
    'Apache Atlas for data governance: https://atlas.apache.org/',
    'OpenLineage for lineage tracking: https://openlineage.io/',
    'DataHub for data discovery: https://datahubproject.io/',
    'This workshop repository with all examples and code'
]

for resource in resources:
    print(f'   • {resource}')

print('\n💬 DISCUSSION QUESTIONS:')
discussion_questions = [
    'What was the most surprising insight from the workshop?',
    'Which DDI technique would have the biggest impact in your organization?',
    'What are the biggest barriers to implementing DDI in your environment?',
    'How would you measure ROI for a DDI implementation?',
    'What governance challenges are unique to AI/ML systems?'
]

for i, question in enumerate(discussion_questions, 1):
    print(f'   {i}. {question}')

print('\n🎉 Thank you for participating in the Dynamic Data Intelligence workshop!')
print('   Remember: Great AI starts with great data governance.')
print('   The future of AI depends on our ability to understand and trust our data.')

# Final system stats
final_report = ddi_tracker.generate_lineage_report()
print(f'\n📊 Final Workshop Stats:')
print(f'   Assets Created: {final_report["total_assets"]}')
print(f'   Dependencies Mapped: {final_report["total_dependencies"]}')
print(f'   Quality Assessments Run: {len(quality_results) if "quality_results" in locals() else 0}')
print(f'   Risk Analyses Performed: Multiple scenarios covered')

## Cleanup

When you're done exploring, clean up the Docker containers:

```bash
docker-compose down
```

---

*Thank you for participating in the Dynamic Data Intelligence workshop!*