# VDEH Data Fusion Pipeline

**Fokus:** KI-gest√ºtzte Fusion von VDEH und DNB Daten

## üéØ Ziel
- Intelligente Fusion von VDEH-Original und DNB-Daten
- Konfliktaufl√∂sung via Ollama LLM
- Vollst√§ndige Nachvollziehbarkeit aller Entscheidungen
- Qualit√§tsverbesserung durch Datenanreicherung

## üìö Input/Output
- **Input**: `data/vdeh/processed/04_dnb_enriched_data.parquet`
- **Output**: `data/vdeh/processed/05_fused_data.parquet`

## ü§ñ KI-Modell
- **Ollama**: Lokales LLM (llama3.3:70b)
- **API**: http://localhost:11434

## üîÑ Fusion-Architektur

**Drei Fusion-Strategien:**
1. **Keine DNB-Daten** ‚Üí VDEH behalten
2. **Keine Konflikte** ‚Üí Einfacher Merge (VDEH priorisiert, DNB erg√§nzt)
3. **Konflikte vorhanden** ‚Üí KI-Entscheidung via Ollama

**Vollst√§ndige Nachvollziehbarkeit:**
- `fusion_*_source`: Welche Quelle f√ºr jedes Feld
- `fusion_conflicts`: JSON mit allen erkannten Konflikten
- `fusion_ai_reasoning`: KI-Begr√ºndung der Entscheidung

In [None]:
# üõ†Ô∏è SETUP UND DATEN LADEN
import sys
from pathlib import Path
import pandas as pd
import json

from utils.notebook_utils import setup_notebook

project_root, config = setup_notebook()
print(f"‚úÖ Project root: {project_root}")
print(f"‚úÖ Project: {config.get('project.name')} v{config.get('project.version')}")

In [None]:
# üìÇ DNB-ANGEREICHERTE DATEN LADEN
processed_dir = config.project_root / config.get('paths.data.vdeh.processed')
input_path = processed_dir / '04_dnb_enriched_data.parquet'
metadata_path = processed_dir / '04_metadata.json'

if not input_path.exists():
    raise FileNotFoundError(f"Input-Datei nicht gefunden: {input_path}\n"
                          "Bitte f√ºhren Sie zuerst 04_vdeh_data_enrichment.ipynb aus.")

# Daten laden
df_enriched = pd.read_parquet(input_path)

# Vorherige Metadaten laden
with open(metadata_path, 'r') as f:
    prev_metadata = json.load(f)

print(f"üìÇ Daten geladen aus: {input_path}")
print(f"üìä Records: {len(df_enriched):,}")
print(f"üíæ Memory: {df_enriched.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

# DNB-Daten Statistiken
if 'dnb_query_method' in df_enriched.columns:
    dnb_records = df_enriched['dnb_query_method'].notna().sum()
    print(f"\nüìä DNB-Daten vorhanden: {dnb_records:,} ({dnb_records/len(df_enriched)*100:.1f}%)")
    
    method_counts = df_enriched['dnb_query_method'].value_counts()
    for method, count in method_counts.items():
        print(f"   {method}: {count:,}")

In [None]:
# üìã FUSION-SETUP
from fusion import OllamaClient, FusionEngine

print("üìã === FUSION-SETUP ===\n")

# Ollama-Client initialisieren
ollama_client = OllamaClient(
    api_url="http://localhost:11434/api/generate",
    model="llama3.3:70b",
    timeout_sec=220,
    max_retries=4,
    retry_backoff_base_sec=2,
    abort_on_timeout=True,
    enable_fallback=True,
    fallback_model="llama3.2"
)

# Test connection
if ollama_client.test_connection():
    print(f"‚úÖ Ollama verbunden: {ollama_client.model}")
else:
    raise RuntimeError("‚ùå Ollama nicht erreichbar! Stellen Sie sicher, dass Ollama l√§uft: ollama serve")

# Fusion-Engine initialisieren
fusion_engine = FusionEngine(
    ollama_client=ollama_client,
    variant_priority=["id", "title_author"]
)

print(f"‚öôÔ∏è  Timeout: {ollama_client.timeout_sec}s | Retries: {ollama_client.max_retries}")
print(f"ü§ñ Aktives Modell: {ollama_client.model}\n")

In [None]:
# üöÄ FUSION AUSF√úHREN
from tqdm.auto import tqdm
from fusion import OllamaUnavailableError

print("üöÄ === FUSION AUSF√úHREN ===\n")

# Configuration
RESET_FUSION = False  # Set to True to reset all fusion results
SAVE_INTERVAL = 50    # Save progress every N records

# Optional limit for testing
FUSION_LIMIT = None
try:
    FUSION_LIMIT = int(config.get('debug.fusion_limit', 0))
    if FUSION_LIMIT <= 0:
        FUSION_LIMIT = None
except Exception:
    FUSION_LIMIT = None

# Statistics BEFORE fusion
print("üìä Vollst√§ndigkeit VOR Fusion:")
before_stats = {
    'title': df_enriched['title'].notna().sum(),
    'authors': (df_enriched['authors_str'].notna() & (df_enriched['authors_str'] != '')).sum(),
    'year': df_enriched['year'].notna().sum(),
    'publisher': df_enriched['publisher'].notna().sum()
}
for field, count in before_stats.items():
    print(f"   {field}: {count:,} ({count/len(df_enriched)*100:.1f}%)")

# Identify records to process (those with any DNB variant)
has_id = df_enriched[['dnb_title','dnb_authors','dnb_year','dnb_publisher']].notna().any(axis=1) if 'dnb_title' in df_enriched.columns else False
has_ta = df_enriched[['dnb_title_ta','dnb_authors_ta','dnb_year_ta','dnb_publisher_ta']].notna().any(axis=1) if 'dnb_title_ta' in df_enriched.columns else False
records_to_process = df_enriched[has_id | has_ta].copy()

print(f"\nüîÑ Verarbeite {len(records_to_process):,} Records mit DNB-Varianten...\n")

# Progress tracking files
progress_file = processed_dir / '05_fused_data_progress.parquet'
retry_queue_file = processed_dir / '05_fused_retry_queue.json'

# Reset if requested
if RESET_FUSION:
    fusion_cols = [
        'title', 'authors_str', 'year', 'publisher',
        'fusion_title_source', 'fusion_authors_source', 'fusion_year_source', 'fusion_publisher_source',
        'fusion_conflicts', 'fusion_confirmations', 'fusion_ai_reasoning',
        'fusion_dnb_match_rejected', 'fusion_rejection_reason', 'fusion_dnb_variant_selected',
        'fusion_needs_retry', 'fusion_decision_needed'
    ]
    for col in fusion_cols:
        if col in df_enriched.columns:
            df_enriched[col] = None
    
    if progress_file.exists():
        progress_file.unlink()
    print("üóëÔ∏è Fusion-Ergebnisse zur√ºckgesetzt\n")

# Load progress if exists
already_fused = set()
if progress_file.exists():
    df_progress = pd.read_parquet(progress_file)
    if not df_progress.index.is_unique:
        df_progress = df_progress[~df_progress.index.duplicated(keep='last')]
    
    if 'fusion_title_source' in df_progress.columns:
        already_fused = set(df_progress[df_progress['fusion_title_source'].notna()].index)
    
    print(f"üìÇ Fortschritt geladen: {len(already_fused):,} Records bereits fusioniert")
    
    # Restore fused data
    common_cols = [c for c in df_progress.columns if c in df_enriched.columns]
    if len(common_cols) > 0 and len(already_fused) > 0:
        idxs = [i for i in already_fused if i in df_enriched.index]
        if len(idxs) > 0:
            df_enriched.loc[idxs, common_cols] = df_progress.loc[idxs, common_cols].values
        print(f"   Fusionsdaten wiederhergestellt\n")

# Filter already processed
records_to_process = records_to_process[~records_to_process.index.isin(already_fused)]

# Apply limit if in test mode
if FUSION_LIMIT and FUSION_LIMIT > 0:
    print(f"üß™ Testmodus aktiv ‚Äì verarbeite nur die ersten {FUSION_LIMIT} Records.")
    records_to_process = records_to_process.head(FUSION_LIMIT)

# Load and prioritize retry queue
retry_indices = []
if retry_queue_file.exists():
    try:
        with open(retry_queue_file, 'r', encoding='utf-8') as f:
            retry_indices = json.load(f)
    except Exception:
        retry_indices = []

retry_indices = [i for i in retry_indices if i in records_to_process.index]
if len(retry_indices) > 0:
    print(f"üîÅ Retry-Queue: {len(retry_indices):,} Records werden zuerst verarbeitet")
    retry_df = records_to_process.loc[records_to_process.index.isin(retry_indices)]
    fresh_df = records_to_process.loc[~records_to_process.index.isin(retry_indices)]
    records_to_process = pd.concat([retry_df, fresh_df], axis=0)

print(f"üîÑ Verbleibende Records: {len(records_to_process):,}\n")

# Initialize statistics
fusion_stats = {
    'total_processed': len(already_fused),
    'conflicts_found': 0,
    'dnb_preferred': 0,
    'simple_merges': 0,
    'errors': 0,
    'dnb_matches_rejected': 0,
    'ai_decisions': 0,
    'variant_id': 0,
    'variant_title_author': 0,
    'variant_none': 0
}

fusion_count = 0
aborted = False

# Main fusion loop
for idx, row in tqdm(records_to_process.iterrows(), total=len(records_to_process), desc="üîÑ Fusion", unit="records"):
    try:
        # Perform fusion
        result = fusion_engine.merge_record(row)
        result_dict = result.to_dict()
        
        # Update statistics
        variant = result_dict.get('dnb_variant_selected')
        if variant == 'id':
            fusion_stats['variant_id'] += 1
        elif variant == 'title_author':
            fusion_stats['variant_title_author'] += 1
        else:
            fusion_stats['variant_none'] += 1
        
        # Store results in DataFrame
        df_enriched.loc[idx, 'title'] = result_dict.get('title')
        df_enriched.loc[idx, 'authors_str'] = result_dict.get('authors')
        
        # Convert year to numeric
        year_val = result_dict.get('year')
        if pd.notna(year_val):
            try:
                df_enriched.loc[idx, 'year'] = pd.to_numeric(year_val, errors='coerce')
            except:
                df_enriched.loc[idx, 'year'] = year_val
        
        df_enriched.loc[idx, 'publisher'] = result_dict.get('publisher')
        df_enriched.loc[idx, 'fusion_title_source'] = result_dict.get('title_source')
        df_enriched.loc[idx, 'fusion_authors_source'] = result_dict.get('authors_source')
        df_enriched.loc[idx, 'fusion_year_source'] = result_dict.get('year_source')
        df_enriched.loc[idx, 'fusion_publisher_source'] = result_dict.get('publisher_source')
        df_enriched.loc[idx, 'fusion_conflicts'] = result_dict.get('conflicts')
        df_enriched.loc[idx, 'fusion_confirmations'] = result_dict.get('confirmations')
        df_enriched.loc[idx, 'fusion_ai_reasoning'] = result_dict.get('ai_reasoning')
        df_enriched.loc[idx, 'fusion_dnb_match_rejected'] = result_dict.get('dnb_match_rejected', False)
        df_enriched.loc[idx, 'fusion_rejection_reason'] = result_dict.get('rejection_reason')
        df_enriched.loc[idx, 'fusion_dnb_variant_selected'] = result_dict.get('dnb_variant_selected')
        
        # Clear retry flag if set
        if 'fusion_needs_retry' in df_enriched.columns:
            df_enriched.loc[idx, 'fusion_needs_retry'] = False
        if idx in retry_indices:
            retry_indices = [i for i in retry_indices if i != idx]
        
        # Update statistics
        fusion_stats['total_processed'] += 1
        fusion_count += 1
        fusion_stats['ai_decisions'] += 1
        
        if result_dict.get('dnb_match_rejected'):
            fusion_stats['dnb_matches_rejected'] += 1
        elif result_dict.get('conflicts'):
            fusion_stats['conflicts_found'] += 1
            fusion_stats['dnb_preferred'] += 1
        else:
            fusion_stats['simple_merges'] += 1
        
        # Incremental save
        if fusion_count % SAVE_INTERVAL == 0:
            df_enriched.to_parquet(progress_file, index=True)
            with open(retry_queue_file, 'w', encoding='utf-8') as f:
                json.dump(retry_indices, f, ensure_ascii=False, indent=2)
            print(f"\nüíæ Zwischenstand: {fusion_stats['total_processed']:,} Records fusioniert")
    
    except OllamaUnavailableError as e:
        print(f"\n‚ùå Ollama nicht erreichbar: {e}")
        print("üëâ Record wird in die Retry-Queue gelegt")
        fusion_stats['errors'] += 1
        aborted = True
        
        df_enriched.loc[idx, 'fusion_needs_retry'] = True
        if idx not in retry_indices:
            retry_indices.append(idx)
        
        # Save immediately
        df_enriched.to_parquet(progress_file, index=True)
        with open(retry_queue_file, 'w', encoding='utf-8') as f:
            json.dump(retry_indices, f, ensure_ascii=False, indent=2)
        break
    
    except Exception as e:
        print(f"\n‚ö†Ô∏è Fehler bei Record {idx}: {e}")
        fusion_stats['errors'] += 1

# Final save
if fusion_count % SAVE_INTERVAL != 0 or fusion_count == 0:
    df_enriched.to_parquet(progress_file, index=True)
    with open(retry_queue_file, 'w', encoding='utf-8') as f:
        json.dump(retry_indices, f, ensure_ascii=False, indent=2)
    print(f"\nüíæ Finaler Stand gespeichert")

if aborted:
    print("\n‚õîÔ∏è Lauf abgebrochen (Ollama-Timeout)")

print("\n‚úÖ Fusion abgeschlossen")

In [None]:
# üìä FUSION-STATISTIKEN
print("üìä === FUSION-ERGEBNISSE ===\n")

# Statistics AFTER fusion
print("üìä Vollst√§ndigkeit NACH Fusion:")
after_stats = {
    'title': df_enriched['title'].notna().sum(),
    'authors': (df_enriched['authors_str'].notna() & (df_enriched['authors_str'] != '')).sum(),
    'year': df_enriched['year'].notna().sum(),
    'publisher': df_enriched['publisher'].notna().sum()
}
for field, count in after_stats.items():
    improvement = count - before_stats[field]
    print(f"   {field}: {count:,} ({count/len(df_enriched)*100:.1f}%) [+{improvement:,}]")

# Fusion statistics
print(f"\nüìä Fusion-Statistiken:")
print(f"   Verarbeitet: {fusion_stats['total_processed']:,}")
print(f"   Einfache Merges: {fusion_stats['simple_merges']:,}")
print(f"   DNB gew√§hlt: {fusion_stats['dnb_preferred']:,}")
print(f"   Konflikte: {fusion_stats['conflicts_found']:,}")
print(f"   üö´ DNB verworfen: {fusion_stats['dnb_matches_rejected']:,}")
print(f"   KI-Entscheidungen: {fusion_stats['ai_decisions']:,}")
print(f"   Variante ID: {fusion_stats['variant_id']:,}")
print(f"   Variante Titel/Autor: {fusion_stats['variant_title_author']:,}")

# Source distribution
print(f"\nüìä Datenquellen:")
for field in ['title', 'authors', 'year', 'publisher']:
    source_col = f'fusion_{field}_source'
    if source_col in df_enriched.columns:
        sources = df_enriched[source_col].value_counts()
        print(f"\n   {field.upper()}:")
        for source, count in sources.items():
            if source:
                print(f"     {source}: {count:,}")

In [None]:
# üíæ FINALE AUSGABE SPEICHERN
output_path = processed_dir / '05_fused_data.parquet'
output_metadata_path = processed_dir / '05_metadata.json'

# Save fused data
df_enriched.to_parquet(output_path, index=True)
print(f"üíæ Fusionierte Daten gespeichert: {output_path}")
print(f"   Gr√∂√üe: {output_path.stat().st_size / 1024**2:.1f} MB")

# Save metadata
metadata = {
    'notebook': '05_vdeh_data_fusion',
    'timestamp': pd.Timestamp.now().isoformat(),
    'input_file': str(input_path),
    'output_file': str(output_path),
    'total_records': len(df_enriched),
    'fusion_statistics': fusion_stats,
    'completeness_before': before_stats,
    'completeness_after': after_stats,
    'previous_metadata': prev_metadata
}

with open(output_metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2, ensure_ascii=False)

print(f"üìã Metadaten gespeichert: {output_metadata_path}")
print(f"\n‚úÖ Pipeline-Stufe 05 abgeschlossen!")