# üèóÔ∏è Digital Construction Archive (DCA) - Integrierte Pipeline

**Zweck**: Vollst√§ndige Integration von DROID Analyse ‚Üí RDF Konvertierung ‚Üí EXIF/XMP Anreicherung  
**Zielumgebung**: Lokale Entwicklung mit ETH DCA Standards  
**Standards**: RiC-O, PREMIS, Dublin Core mit DCA-spezifischen Erweiterungen  
**Datum**: M√§rz 2026  

---

## üéØ Pipeline-√úbersicht

Diese Pipeline f√ºhrt drei kritische Schritte in einem integrierten Workflow durch:

1. **üîç DROID File Analysis**: Systematische Dateierkennung mit MD5-Hashes
2. **üîó RDF Conversion**: DROID CSV ‚Üí standardkonforme RDF mit DCA Ontologie  
3. **üì∏ XMP Integration**: ExifTool XMP-Metadaten ‚Üí PREMIS Identifier & Derivations

### ‚úÖ Validierung zwischen jedem Schritt
- Dateivollst√§ndigkeit pr√ºfen
- Konsistenz der MD5-basierten URIs sicherstellen  
- Alle relevanten Dateien erfassen und verarbeiten

## üì¶ 1. Environment Setup & Dependencies

Installation und Import aller erforderlichen Bibliotheken mit Logging f√ºr Prozess-Tracking.

In [None]:
# =====================================================
# ENVIRONMENT SETUP & DEPENDENCIES
# =====================================================

from pathlib import Path
import pandas as pd
import json, subprocess, hashlib, sys, math, re, os, shutil
from datetime import datetime
from typing import Optional, Dict, List, Set, Union
import warnings
import logging
from urllib.parse import unquote

# RDF Core Libraries
from rdflib import Graph, Namespace, URIRef, BNode, Literal
from rdflib.namespace import RDF, RDFS, XSD, DCTERMS
from rdflib.plugins.serializers.turtle import TurtleSerializer

# Optional: Network analysis for provenance graphs
try:
    import networkx as nx
    NX_AVAILABLE = True
except ImportError:
    print("‚ö†Ô∏è  NetworkX not available - provenance graphs disabled")
    NX_AVAILABLE = False

# Logging Setup f√ºr Pipeline-Tracking
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler(f'dca_pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
    ]
)
logger = logging.getLogger('DCA_Pipeline')

# Pipeline Status Tracking
pipeline_status = {
    'start_time': datetime.now(),
    'steps_completed': [],
    'steps_failed': [],
    'file_counts': {},
    'errors': []
}

def log_step(step_name: str, success: bool, details: str = ""):
    """Log pipeline step with status tracking"""
    timestamp = datetime.now().strftime('%H:%M:%S')
    if success:
        pipeline_status['steps_completed'].append(step_name)
        logger.info(f"‚úÖ [{timestamp}] {step_name}: {details}")
    else:
        pipeline_status['steps_failed'].append(step_name)
        logger.error(f"‚ùå [{timestamp}] {step_name}: {details}")

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

print("‚úÖ All dependencies loaded successfully")
print(f"üìÖ DCA Integrated Pipeline started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"üîß Python {sys.version_info.major}.{sys.version_info.minor}")
print(f"üìö RDFLib version: {getattr(Graph(), 'version', 'unknown')}")
print(f"üìã Log file: dca_pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

log_step("Environment Setup", True, "All dependencies loaded")

## üìÇ 2. Path Configuration & Validation

Setup dynamischer Pfade f√ºr DROID Binary, Input-Ordner und Output-Verzeichnisse mit Validierung aller erforderlichen Dateien und Verzeichnisse.

In [None]:
# =====================================================
# PATH CONFIGURATION & VALIDATION
# =====================================================

# Project Configuration (ANPASSEN f√ºr verschiedene Projekte)
project_path = "WeingutGantenbein"
dataset_to_analyze = "gramazio-kohler-archiv-server"

# Dynamisches Home-Verzeichnis (lokale Entwicklung)
home_dir = os.path.expanduser("~")
base_path = os.path.join(home_dir, "work")

# DROID Configuration
droid_script_path = os.path.join(base_path, "27_DCA_Ingest/src/droid-binary-6.7.0-bin/droid.sh")
folder_to_analyze = os.path.join(base_path, f"dcaonnextcloud-500gb/DigitalMaterialCopies/{project_path}/{dataset_to_analyze}")
output_folder = os.path.join(base_path, f"dcaonnextcloud-500gb/dca-metadataraw/{project_path}/{dataset_to_analyze}_results")
droid_csv_path = os.path.join(output_folder, f"{dataset_to_analyze}_DROIDresults.csv")

# RDF Output Configuration
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
rdf_output_path = Path(output_folder) / f"{dataset_to_analyze}_catalog_integrated_{timestamp}.ttl"
rdf_backup_path = Path(output_folder) / f"{dataset_to_analyze}_backup_{timestamp}.ttl"

# ExifTool Configuration
exiftool_command = "/opt/homebrew/bin/exiftool"  # Adjust for your system

# Files base directory for XMP processing
files_base_dir = Path(folder_to_analyze)

# Project metadata for RDF
PROJECT_NAME = project_path
ACTIVITY_NAME = f"IntegratedArchiving{project_path}2026"

# Path zu src/ hinzuf√ºgen, um lokale Module zu importieren
src_path = os.path.join(base_path, "27_DCA_Ingest/src")
if src_path not in sys.path:
    sys.path.append(src_path)

# =====================================================
# PATH VALIDATION
# =====================================================

def validate_paths():
    """Validate all required paths and files exist"""
    validation_results = []
    
    # Check DROID script
    if os.path.exists(droid_script_path):
        validation_results.append(("‚úÖ DROID script", droid_script_path))
    else:
        validation_results.append(("‚ùå DROID script", droid_script_path))
        pipeline_status['errors'].append(f"DROID script not found: {droid_script_path}")
    
    # Check folder to analyze
    if os.path.exists(folder_to_analyze):
        file_count = sum(1 for p in Path(folder_to_analyze).rglob("*") if p.is_file())
        validation_results.append(("‚úÖ Input folder", f"{folder_to_analyze} ({file_count:,} files)"))
        pipeline_status['file_counts']['input_files'] = file_count
    else:
        validation_results.append(("‚ùå Input folder", folder_to_analyze))
        pipeline_status['errors'].append(f"Input folder not found: {folder_to_analyze}")
    
    # Check/create output folder
    os.makedirs(output_folder, exist_ok=True)
    if os.path.exists(output_folder):
        validation_results.append(("‚úÖ Output folder", output_folder))
    else:
        validation_results.append(("‚ùå Output folder", output_folder))
        pipeline_status['errors'].append(f"Cannot create output folder: {output_folder}")
    
    # Check ExifTool
    try:
        result = subprocess.run([exiftool_command, "-ver"], capture_output=True, text=True)
        if result.returncode == 0:
            validation_results.append(("‚úÖ ExifTool", f"version {result.stdout.strip()}"))
        else:
            validation_results.append(("‚ùå ExifTool", "not working"))
            pipeline_status['errors'].append("ExifTool not working")
    except FileNotFoundError:
        validation_results.append(("‚ùå ExifTool", "not found"))
        pipeline_status['errors'].append(f"ExifTool not found: {exiftool_command}")
    
    return validation_results

print("üìÇ PATH CONFIGURATION:")
print(f"üèóÔ∏è  Projekt: {PROJECT_NAME}")
print(f"üìä Dataset: {dataset_to_analyze}")
print(f"üìÅ Input: {folder_to_analyze}")
print(f"üíæ Output: {output_folder}")
print(f"üìÑ RDF Output: {rdf_output_path}")
print()

# Run validation
validation_results = validate_paths()
print("üîç PATH VALIDATION:")
for status, path in validation_results:
    print(f"   {status}: {path}")

# Check if all critical paths are valid
critical_errors = [error for error in pipeline_status['errors'] if "not found" in error]
if critical_errors:
    print("\n‚ùå CRITICAL ERRORS FOUND:")
    for error in critical_errors:
        print(f"   ‚Ä¢ {error}")
    print("\n‚ö†Ô∏è  Please fix path configuration before proceeding!")
    log_step("Path Validation", False, f"{len(critical_errors)} critical errors")
else:
    print("\n‚úÖ All paths validated successfully!")
    log_step("Path Validation", True, "All required paths found")

## üîç 3. DROID File Analysis Execution

Ausf√ºhrung der DROID Binary mit Hash-Generierung und Filteroptionen, Erfassung von Output und Fehlern, sowie ordnungsgem√§√üe Behandlung von Subprocess-Exceptions.

In [None]:
# =====================================================
# DROID FILE ANALYSIS EXECUTION
# =====================================================

def run_droid_analysis():
    """Execute DROID analysis with comprehensive error handling"""
    
    # Skip if critical errors from path validation
    if pipeline_status['errors']:
        print("‚è≠Ô∏è  Skipping DROID analysis due to path validation errors")
        return False
    
    print(f"üîç Starting DROID Analysis...")
    print(f"üìÅ Analyzing folder: {folder_to_analyze}")
    print(f"üíæ Output will be saved to: {droid_csv_path}")
    
    # Ensure output directory exists
    os.makedirs(output_folder, exist_ok=True)
    
    start_time = datetime.now()
    
    try:
        # DROID command with comprehensive options
        droid_command = [
            droid_script_path,
            "-R", folder_to_analyze,           # Recursive analysis
            "-o", droid_csv_path,              # Output CSV path
            "-Pr", "profile.generateHash=true", # Generate MD5 hashes
            "-ff", "file_name not startswith ~$"  # Filter temp files
        ]
        
        print(f"üöÄ Executing DROID command:")
        print(f"   {' '.join(droid_command)}")
        
        # Execute DROID with progress tracking
        result = subprocess.run(
            droid_command,
            check=True,
            capture_output=True,
            text=True
        )
        
        execution_time = datetime.now() - start_time
        
        # Process result
        print(f"‚úÖ DROID execution completed in {execution_time}")
        if result.stdout:
            print(f"üìù DROID output: {result.stdout}")
        
        # Validate output file
        if os.path.exists(droid_csv_path):
            file_size = os.path.getsize(droid_csv_path) / 1024 / 1024
            print(f"üìÑ CSV file created: {file_size:.2f} MB")
            log_step("DROID Analysis", True, f"CSV generated: {file_size:.2f} MB")
            return True
        else:
            print(f"‚ùå Expected CSV file not created: {droid_csv_path}")
            log_step("DROID Analysis", False, "CSV file not created")
            return False
            
    except subprocess.CalledProcessError as e:
        error_msg = f"DROID execution failed with exit code {e.returncode}"
        print(f"‚ùå {error_msg}")
        if e.stderr:
            print(f"üîç DROID error output: {e.stderr}")
        log_step("DROID Analysis", False, f"{error_msg}: {e.stderr}")
        pipeline_status['errors'].append(error_msg)
        return False
        
    except PermissionError as e:
        error_msg = f"Permission error: {e}"
        print(f"‚ùå {error_msg}")
        print("   ‚Üí Pr√ºfe Schreibrechte f√ºr Zielordner")
        log_step("DROID Analysis", False, error_msg)
        pipeline_status['errors'].append(error_msg)
        return False
        
    except FileNotFoundError as e:
        error_msg = f"File or directory not found: {e}"
        print(f"‚ùå {error_msg}")
        print("   ‚Üí Pr√ºfe alle Pfade auf Korrektheit")
        log_step("DROID Analysis", False, error_msg)
        pipeline_status['errors'].append(error_msg)
        return False
        
    except Exception as e:
        error_msg = f"Unexpected error during DROID analysis: {e}"
        print(f"‚ùå {error_msg}")
        log_step("DROID Analysis", False, error_msg)
        pipeline_status['errors'].append(error_msg)
        return False

# Execute DROID Analysis
droid_success = run_droid_analysis()

# Store result in pipeline status
pipeline_status['droid_success'] = droid_success
if droid_success:
    pipeline_status['droid_csv_path'] = droid_csv_path
    print(f"\nüéØ DROID Analysis completed successfully!")
    print(f"   üìÑ Results available at: {droid_csv_path}")
else:
    print(f"\n‚ö†Ô∏è  DROID Analysis failed - check errors above")

## üìä 4. DROID Results Validation

Laden und Validierung der generierten DROID CSV-Datei, √úberpr√ºfung auf erwartete Spalten (MD5_HASH, FILE_PATH, FORMAT_NAME) und Analyse der Dateityp-Verteilung.

In [None]:
# =====================================================
# DROID RESULTS VALIDATION
# =====================================================

# DCA File Type Definitions for validation
IMG_EXTENSIONS = {
    "jpg", "jpeg", "tif", "tiff", "png", "gif", "bmp", "webp",
    "dng", "cr2", "cr3", "nef", "arw", "orf", "rw2"  # RAW formats
}

ADOBE_EXTENSIONS = {
    "psd", "psb", "ai", "indd", "idml", "eps", "pdf"
}

CAD_EXTENSIONS = {
    "dwg", "dxf", "step", "stp", "iges", "igs", 
    "ifc", "3dm", "skp"
}

TARGET_EXTENSIONS = IMG_EXTENSIONS | ADOBE_EXTENSIONS | CAD_EXTENSIONS

def load_and_validate_droid_csv():
    """Load DROID CSV with comprehensive validation"""
    
    # Skip if DROID failed
    if not pipeline_status.get('droid_success', False):
        print("‚è≠Ô∏è  Skipping CSV validation - DROID analysis failed")
        return None, {}
    
    csv_path = pipeline_status['droid_csv_path']
    
    if not os.path.exists(csv_path):
        error_msg = f"DROID CSV not found: {csv_path}"
        log_step("CSV Validation", False, error_msg)
        return None, {}
    
    try:
        # Load CSV with encoding detection
        try:
            droid_df = pd.read_csv(csv_path, encoding='utf-8')
        except UnicodeDecodeError:
            print("‚ö†Ô∏è  UTF-8 failed, trying latin-1 encoding...")
            droid_df = pd.read_csv(csv_path, encoding='latin-1')
        
        print(f"üìä DROID CSV loaded: {len(droid_df):,} records")
        print(f"üìã Columns found: {list(droid_df.columns)}")
        
        # Validate expected columns
        expected_columns = ['MD5_HASH', 'FILE_PATH', 'FORMAT_NAME', 'NAME', 'EXT', 'SIZE']
        missing_columns = []
        
        for col in expected_columns:
            # Check for column variations
            found = False
            for df_col in droid_df.columns:
                if col in df_col.upper():
                    found = True
                    break
            if not found:
                missing_columns.append(col)
        
        if missing_columns:
            print(f"‚ö†Ô∏è  Missing expected columns: {missing_columns}")
        else:
            print("‚úÖ All expected columns found")
        
        # Analyze data quality
        analysis = analyze_droid_data(droid_df)
        
        # Store in pipeline status
        pipeline_status['droid_df'] = droid_df
        pipeline_status['file_counts']['droid_records'] = len(droid_df)
        
        log_step("CSV Validation", True, f"{len(droid_df):,} records loaded")
        return droid_df, analysis
        
    except Exception as e:
        error_msg = f"Failed to load DROID CSV: {e}"
        print(f"‚ùå {error_msg}")
        log_step("CSV Validation", False, error_msg)
        return None, {}

def analyze_droid_data(df: pd.DataFrame) -> Dict[str, any]:
    """Comprehensive analysis of DROID data"""
    if df.empty:
        return {}
    
    analysis = {}
    
    # File type analysis
    if 'EXT' in df.columns:
        ext_counts = df['EXT'].value_counts()
        analysis['extensions'] = ext_counts.head(20).to_dict()
        
        # Target extensions analysis (for XMP processing)
        df['ext_lower'] = df['EXT'].str.lower() if 'EXT' in df.columns else ''
        target_mask = df['ext_lower'].isin(TARGET_EXTENSIONS)
        analysis['target_files'] = target_mask.sum()
        analysis['total_files'] = len(df)
    
    # MD5 hash analysis
    md5_col = None
    for col in ['MD5_HASH', 'HASH', 'md5', 'MD5']:
        if col in df.columns:
            md5_col = col
            break
    
    if md5_col:
        analysis['md5_column'] = md5_col
        analysis['md5_available'] = df[md5_col].notna().sum()
        analysis['md5_missing'] = df[md5_col].isna().sum()
    else:
        analysis['md5_column'] = None
        analysis['md5_available'] = 0
        analysis['md5_missing'] = len(df)
    
    # Size analysis
    if 'SIZE' in df.columns:
        total_size = df['SIZE'].fillna(0).sum()
        analysis['total_size_gb'] = total_size / (1024**3)
        analysis['avg_size_mb'] = (df['SIZE'].fillna(0).mean()) / (1024**2)
    
    # Format analysis
    if 'FORMAT_NAME' in df.columns:
        format_counts = df['FORMAT_NAME'].value_counts()
        analysis['top_formats'] = format_counts.head(10).to_dict()
    
    return analysis

# Load and validate DROID CSV
print("üîÑ Loading and validating DROID CSV...")
droid_df, analysis = load_and_validate_droid_csv()

if droid_df is not None:
    print(f"\nüìà DROID Data Analysis:")
    print(f"üìä Total files: {analysis.get('total_files', 0):,}")
    print(f"üéØ Target files (img/adobe/cad): {analysis.get('target_files', 0):,}")
    print(f"#Ô∏è‚É£  MD5 hashes available: {analysis.get('md5_available', 0):,}")
    print(f"‚ùì MD5 hashes missing: {analysis.get('md5_missing', 0):,}")
    
    if analysis.get('md5_column'):
        print(f"üîë MD5 column: '{analysis['md5_column']}'")
    
    if analysis.get('total_size_gb'):
        print(f"üíæ Total size: {analysis['total_size_gb']:.2f} GB")
        print(f"üìè Average file size: {analysis['avg_size_mb']:.2f} MB")
    
    if analysis.get('extensions'):
        print(f"\nüìÅ Top file extensions:")
        for ext, count in list(analysis['extensions'].items())[:10]:
            print(f"   .{ext}: {count:,} files")
    
    if analysis.get('top_formats'):
        print(f"\nüè∑Ô∏è  Top formats:")
        for fmt, count in list(analysis['top_formats'].items())[:5]:
            print(f"   {fmt}: {count:,} files")
    
    pipeline_status['analysis'] = analysis
    print(f"\n‚úÖ DROID CSV validation completed successfully!")
    
else:
    print(f"\n‚ùå DROID CSV validation failed - cannot proceed to RDF conversion")

## üèóÔ∏è 5. RDF Graph Initialization

Initialisierung des RDF-Graphs mit DCA Ontologie-Namespaces, Hinzuf√ºgung der Kern-Ontologie-Definitionen und Erstellung von Projekt- und Aktivit√§ts-Ressourcen.

In [None]:
# =====================================================
# RDF GRAPH INITIALIZATION WITH DCA ONTOLOGY
# =====================================================

# DCA Ontology Namespaces - ETH Z√ºrich Standard
DCA      = Namespace("http://dca.ethz.ch/ontology#")     # DCA Classes & Properties
DCA_ID   = Namespace("http://dca.ethz.ch/id/")          # Individual Resources
DCA_TECH = Namespace("http://dca.ethz.ch/tech#")        # Technical Properties

# International Standards
PREMIS   = Namespace("http://www.loc.gov/premis/rdf/v3/")  # Digital Preservation Metadata
RICO     = Namespace("https://www.ica.org/standards/RiC/ontology#")  # Records in Contexts
# DCTERMS already imported from rdflib.namespace

# Utility
OWL      = Namespace("http://www.w3.org/2002/07/owl#")

# Base URI for all DCA identifiers
DCA_ID_BASE = "http://dca.ethz.ch/id/"

def dca_file_uri_from_md5(md5_hex: Optional[str]) -> Optional[URIRef]:
    """
    Creates dca-id:file_<md5[:16]> from MD5 hex string (DROID format).
    
    Args:
        md5_hex: MD5 hash as hex string (32 chars)
        
    Returns:
        URIRef or None if invalid input
    """
    if not md5_hex or not isinstance(md5_hex, str):
        return None
    
    # Clean and validate
    md5_clean = md5_hex.strip().lower()
    if len(md5_clean) < 16 or not re.match(r'^[a-f0-9]+$', md5_clean):
        return None
    
    # Use first 16 characters for consistent short IDs
    short_id = md5_clean[:16]
    return URIRef(DCA_ID_BASE + f"file_{short_id}")

def dca_file_uri_from_path_fallback(file_path: str) -> URIRef:
    """
    Fallback ID generation from file path (when MD5 not available).
    
    WARNING: Only use this if DROID MD5 is unavailable!
    Path-based IDs are less stable than content-based MD5.
    """
    path_normalized = str(Path(file_path)).replace('\\\\', '/')  # Normalize separators
    path_hash = hashlib.sha256(path_normalized.encode('utf-8')).hexdigest()[:16]
    return URIRef(DCA_ID_BASE + f"file_{path_hash}")

def dca_project_uri(project_name: str) -> URIRef:
    """
    Generate project URI following DCA conventions.
    
    Example: "WeingutGantenbein" ‚Üí dca-id:project_WeingutGantenbein
    """
    # Sanitize project name for URI use
    sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', project_name)
    return URIRef(DCA_ID_BASE + f"project_{sanitized}")

def dca_activity_uri(activity_name: str) -> URIRef:
    """
    Generate activity URI for provenance tracking.
    
    Example: "ArchivingGantenbein2026" ‚Üí dca-id:ArchivingGantenbein2026
    """
    sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', activity_name)
    return URIRef(DCA_ID_BASE + sanitized)

def create_dca_graph() -> Graph:
    """
    Create RDF graph with DCA ontology structure and namespace bindings.
    """
    g = Graph()
    
    # Bind namespaces for clean Turtle output
    g.bind("dca", DCA)
    g.bind("dca-id", DCA_ID)
    g.bind("dca-tech", DCA_TECH)
    g.bind("premis", PREMIS)
    g.bind("rico", RICO)
    g.bind("dcterms", DCTERMS)
    g.bind("owl", OWL)
    g.bind("xsd", XSD)
    
    return g

def add_ontology_definitions(g: Graph):
    """
    Add DCA ontology class and property definitions.
    """
    # Ontology declaration
    ontology_uri = URIRef("http://dca.ethz.ch/ontology")
    g.add((ontology_uri, RDF.type, OWL.Ontology))
    g.add((ontology_uri, DCTERMS.created, Literal(datetime.now().strftime("%Y-%m-%d"), datatype=XSD.date)))
    g.add((ontology_uri, DCTERMS.creator, Literal("ETH Zurich - Digital Construction Archive Project")))
    g.add((ontology_uri, DCTERMS.description, 
           Literal("Standards-based ontology for digital construction archives using RiC-O, PREMIS, and Dublin Core", lang="en")))
    
    # DCA Classes
    g.add((DCA.ConstructionProject, RDF.type, OWL.Class))
    g.add((DCA.ConstructionProject, RDFS.label, Literal("Construction Project", lang="en")))
    g.add((DCA.ConstructionProject, RDFS.comment, 
           Literal("A construction or architectural project containing digital files", lang="en")))
    g.add((DCA.ConstructionProject, RDFS.subClassOf, RICO.RecordSet))
    
    g.add((DCA.ArchiveFile, RDF.type, OWL.Class))
    g.add((DCA.ArchiveFile, RDFS.label, Literal("Archive File", lang="en")))
    g.add((DCA.ArchiveFile, RDFS.comment, 
           Literal("A digital file within the construction archive", lang="en")))
    g.add((DCA.ArchiveFile, RDFS.subClassOf, PREMIS.Object))
    g.add((DCA.ArchiveFile, RDFS.subClassOf, RICO.Record))
    
    # DCA Properties
    g.add((DCA.belongsToProject, RDF.type, OWL.ObjectProperty))
    g.add((DCA.belongsToProject, RDFS.label, Literal("belongs to project", lang="en")))
    g.add((DCA.belongsToProject, RDFS.comment, 
           Literal("Indicates that a file belongs to a specific construction project", lang="en")))
    g.add((DCA.belongsToProject, RDFS.domain, DCA.ArchiveFile))
    g.add((DCA.belongsToProject, RDFS.range, DCA.ConstructionProject))

def add_project_and_activity(g: Graph, project_name: str, activity_name: str):
    """
    Add project and provenance activity to graph.
    """
    # Project
    project_uri = dca_project_uri(project_name)
    g.add((project_uri, RDF.type, DCA.ConstructionProject))
    g.add((project_uri, RDF.type, RICO.RecordSet))
    g.add((project_uri, DCTERMS.title, Literal(project_name)))
    
    # Activity
    activity_uri = dca_activity_uri(activity_name)
    g.add((activity_uri, RDF.type, RICO.Activity))
    g.add((activity_uri, RDFS.label, Literal(f"Integrierte Archivierung {project_name} Konstruktionsunterlagen")))
    g.add((activity_uri, DCTERMS.description, 
           Literal(f"Systematische digitale Erfassung und Archivierung der Konstruktionsdokumentation des {project_name} Projekts mit DROID+XMP Integration.")))
    g.add((activity_uri, RICO.hasEndDate, Literal("M√§rz 2026")))
    g.add((activity_uri, RICO.occurredAtDate, Literal(datetime.now().isoformat(), datatype=XSD.dateTime)))
    g.add((activity_uri, RICO.technique, 
           Literal("DROID file identification with PRONOM registry, RDF metadata generation, ExifTool XMP integration, integrierte Pipeline")))
    
    # Team
    team_uri = URIRef(DCA_ID_BASE + "DCA_Team")
    g.add((team_uri, RDF.type, RICO.Group))
    g.add((team_uri, RDFS.label, Literal("Digital Construction Archive Team, ETH Z√ºrich")))
    g.add((activity_uri, RICO.isOrWasPerformedBy, team_uri))
    
    # Link project to activity
    g.add((project_uri, RICO.isOrWasDocumentedBy, activity_uri))
    
    return project_uri, activity_uri

# Initialize the RDF graph
print("üîÑ Initializing DCA RDF Graph...")
graph = create_dca_graph()

# Add ontology structure
add_ontology_definitions(graph)
print("‚úÖ DCA ontology definitions added")

# Add project and activity
project_uri, activity_uri = add_project_and_activity(graph, PROJECT_NAME, ACTIVITY_NAME)
print(f"‚úÖ Project added: {project_uri}")
print(f"‚úÖ Activity added: {activity_uri}")

print(f"üìä Current graph size: {len(graph)} triples")

# Store in pipeline status
pipeline_status['graph'] = graph
pipeline_status['project_uri'] = project_uri
pipeline_status['activity_uri'] = activity_uri

log_step("RDF Graph Initialization", True, f"Graph initialized with {len(graph)} triples")

## üîó 6. DROID CSV to RDF Conversion

Verarbeitung jedes DROID-Records zur Erstellung konsistenter MD5-basierter File-URIs, Hinzuf√ºgung von PREMIS-Metadaten, Formatinformationen und NextCloud WebDAV-Identifiern.

In [None]:
# =====================================================
# DROID CSV TO RDF CONVERSION
# =====================================================

def safe_literal(value, datatype=None, lang=None):
    """
    Create RDF literal with error handling.
    """
    if pd.isna(value) or value == '':
        return None
    try:
        return Literal(str(value), datatype=datatype, lang=lang)
    except Exception as e:
        logger.warning(f"Literal creation failed for '{value}': {e}")
        return Literal(str(value))  # Fallback without datatype

def safe_add_triple(g: Graph, subject, predicate, obj):
    """
    Safely add triple to graph, only if all components are not None.
    """
    if subject is not None and predicate is not None and obj is not None:
        g.add((subject, predicate, obj))
        return True
    return False

def add_premis_identifier(g: Graph, file_uri: URIRef, id_type: str, value: str):
    """
    Add PREMIS identifier as blank node.
    """
    if not value or pd.isna(value):
        return
    
    bn = BNode()
    g.add((file_uri, PREMIS.hasIdentifier, bn))
    g.add((bn, PREMIS.identifierType, Literal(id_type)))
    g.add((bn, PREMIS.identifierValue, Literal(str(value))))

def process_droid_record(g: Graph, record: pd.Series, project_uri: URIRef) -> Optional[URIRef]:
    """
    Process single DROID record and add to RDF graph.
    
    Returns the created file URI or None if processing failed.
    """
    try:
        # Determine MD5 hash column (DROID CSV variations)
        md5_hash = None
        for col in ['MD5_HASH', 'HASH', 'md5', 'MD5']:
            if col in record.index and not pd.isna(record.get(col)):
                md5_hash = record[col]
                break
        
        # Generate file URI
        if md5_hash:
            file_uri = dca_file_uri_from_md5(md5_hash)
        else:
            # Fallback to path-based ID
            file_path = record.get('FILE_PATH', record.get('PATH', ''))
            if not file_path:
                return None
            file_uri = dca_file_uri_from_path_fallback(file_path)
        
        if not file_uri:
            return None
        
        # Core classes
        g.add((file_uri, RDF.type, DCA.ArchiveFile))
        g.add((file_uri, RDF.type, PREMIS.Object))
        g.add((file_uri, RDF.type, RICO.Record))
        
        # Project relationship
        g.add((file_uri, DCA.belongsToProject, project_uri))
        g.add((file_uri, RICO.isOrWasIncludedIn, project_uri))
        
        # Basic metadata
        if 'NAME' in record.index:
            title_literal = safe_literal(record['NAME'])
            if title_literal:
                g.add((file_uri, DCTERMS.title, title_literal))
        
        # File path as identifier (WebDAV-style)
        file_path = record.get('FILE_PATH', record.get('PATH', ''))
        if file_path:
            # Convert to WebDAV URL format
            webdav_url = f"https://nextcloud.ethz.ch/remote.php/dav/files/padrian/DCA/{PROJECT_NAME}/{file_path}"
            g.add((file_uri, DCTERMS.identifier, URIRef(webdav_url)))
        
        # Timestamps
        for time_col in ['LAST_MODIFIED', 'MODIFIED', 'DATE_MODIFIED']:
            if time_col in record.index and not pd.isna(record[time_col]):
                timestamp = safe_literal(record[time_col], datatype=XSD.dateTime)
                if timestamp:
                    g.add((file_uri, DCTERMS.modified, timestamp))
                break
        
        # PREMIS format information
        if 'FORMAT_NAME' in record.index:
            format_literal = safe_literal(record['FORMAT_NAME'])
            if format_literal:
                g.add((file_uri, PREMIS.hasFormatName, format_literal))
        
        # Format details
        format_notes = []
        if 'MIME_TYPE' in record.index and not pd.isna(record['MIME_TYPE']):
            format_notes.append(f"MIME: {record['MIME_TYPE']}")
        if 'PUID' in record.index and not pd.isna(record['PUID']):
            format_notes.append(f"PRONOM ID: {record['PUID']}")
        
        for note in format_notes:
            g.add((file_uri, PREMIS.hasFormatNote, Literal(note)))
        
        # File size
        if 'SIZE' in record.index and not pd.isna(record['SIZE']):
            try:
                size_val = int(float(record['SIZE']))
                g.add((file_uri, PREMIS.hasSize, Literal(size_val, datatype=XSD.long)))
            except (ValueError, TypeError):
                pass
        
        # DROID identification method
        g.add((file_uri, PREMIS.hasCreatingApplication, Literal("DROID: Signature")))
        
        # Store MD5 for later XMP matching
        if md5_hash:
            add_premis_identifier(g, file_uri, "MD5 Hash", md5_hash)
        
        return file_uri
        
    except Exception as e:
        logger.warning(f"Failed to process record: {e}")
        return None

def convert_droid_to_rdf():
    """Convert DROID CSV to RDF with comprehensive validation"""
    
    # Skip if prerequisites not met
    if 'graph' not in pipeline_status or 'droid_df' not in pipeline_status:
        print("‚è≠Ô∏è  Skipping RDF conversion - prerequisites not met")
        return False
    
    g = pipeline_status['graph']
    droid_df = pipeline_status['droid_df']
    project_uri = pipeline_status['project_uri']
    
    if droid_df.empty:
        print("‚ùå No DROID data to process")
        return False
    
    print(f"üîÑ Converting {len(droid_df):,} DROID records to RDF...")
    
    processed_count = 0
    error_count = 0
    file_uris = []  # Store for later XMP processing
    
    # Progress tracking
    total_records = len(droid_df)
    batch_size = 1000
    
    for idx, record in droid_df.iterrows():
        file_uri = process_droid_record(g, record, project_uri)
        if file_uri:
            processed_count += 1
            file_uris.append(str(file_uri))
        else:
            error_count += 1
        
        # Progress indicator
        if (idx + 1) % batch_size == 0:
            progress = ((idx + 1) / total_records) * 100
            print(f"   Progress: {idx + 1:,} / {total_records:,} records ({progress:.1f}%)")
    
    # Final validation
    current_triples = len(g)
    added_triples = current_triples - pipeline_status.get('initial_triples', 0)
    
    print(f"‚úÖ DROID to RDF conversion completed:")
    print(f"   üìä Successfully processed: {processed_count:,} files")
    print(f"   ‚ùå Errors: {error_count:,} files")
    print(f"   üìà Graph size: {current_triples:,} triples (+{added_triples:,})")
    
    # Store results
    pipeline_status['file_counts']['rdf_files'] = processed_count
    pipeline_status['file_counts']['conversion_errors'] = error_count
    pipeline_status['file_uris'] = file_uris
    pipeline_status['graph'] = g
    
    success = error_count < (processed_count * 0.1)  # Less than 10% errors
    log_step("DROID to RDF Conversion", success, f"{processed_count:,} files converted")
    
    return success

# Execute DROID to RDF conversion
if pipeline_status.get('droid_df') is not None:
    pipeline_status['initial_triples'] = len(pipeline_status['graph'])
    conversion_success = convert_droid_to_rdf()
    pipeline_status['rdf_conversion_success'] = conversion_success
    
    if conversion_success:
        print(f"\nüéØ DROID to RDF conversion completed successfully!")
    else:
        print(f"\n‚ö†Ô∏è  DROID to RDF conversion completed with errors")
else:
    print("‚è≠Ô∏è  Skipping DROID to RDF conversion - no DROID data available")

## üîç 7. RDF Content Validation

Validierung der generierten RDF mittels SPARQL-Queries, √úberpr√ºfung von Triple-Anzahlen, Verifizierung der File-URI-Konsistenz und Sicherstellung, dass alle DROID-Records verarbeitet wurden.

In [None]:
# =====================================================
# RDF CONTENT VALIDATION
# =====================================================

def validate_rdf_content():
    """Comprehensive RDF validation using SPARQL queries"""
    
    if 'graph' not in pipeline_status:
        print("‚è≠Ô∏è  Skipping RDF validation - no graph available")
        return False
    
    g = pipeline_status['graph']
    
    print("üîç Validating RDF content with SPARQL queries...")
    
    validation_queries = {
        "Total Files": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file a dca:ArchiveFile .
            }
        """,
        
        "Files with Titles": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>
            PREFIX dcterms: <http://purl.org/dc/terms/>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file a dca:ArchiveFile ;
                      dcterms:title ?title .
            }
        """,
        
        "Files with Identifiers": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>  
            PREFIX dcterms: <http://purl.org/dc/terms/>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file a dca:ArchiveFile ;
                      dcterms:identifier ?identifier .
            }
        """,
        
        "Files with Format Info": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>
            PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file a dca:ArchiveFile ;
                      premis:hasFormatName ?format .
            }
        """,
        
        "Files with MD5 Hashes": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>
            PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file a dca:ArchiveFile .
                ?file premis:hasIdentifier ?id .
                ?id premis:identifierType "MD5 Hash" .
            }
        """,
        
        "Project Info": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>
            PREFIX dcterms: <http://purl.org/dc/terms/>
            SELECT ?project ?title (COUNT(?file) AS ?fileCount) WHERE {
                ?project a dca:ConstructionProject ;
                         dcterms:title ?title .
                ?file dca:belongsToProject ?project .
            } GROUP BY ?project ?title
        """,
        
        "Top Formats": """
            PREFIX dca: <http://dca.ethz.ch/ontology#>
            PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
            SELECT ?format (COUNT(?file) AS ?count) WHERE {
                ?file a dca:ArchiveFile ;
                      premis:hasFormatName ?format .
            } GROUP BY ?format ORDER BY DESC(?count) LIMIT 10
        """
    }
    
    validation_results = {}
    
    print("\\nüìä RDF Validation Results:")
    
    for query_name, query_text in validation_queries.items():
        try:
            results = list(g.query(query_text))
            
            if query_name in ["Total Files", "Files with Titles", "Files with Identifiers", 
                            "Files with Format Info", "Files with MD5 Hashes"]:
                if results:
                    count = int(results[0][0])
                    validation_results[query_name] = count
                    print(f"   ‚úÖ {query_name}: {count:,}")
                else:
                    validation_results[query_name] = 0
                    print(f"   ‚ùå {query_name}: 0")
            
            elif query_name == "Project Info":
                for row in results:
                    print(f"   üìÇ Project: {row.title} ({int(row.fileCount):,} files)")
                    validation_results[query_name] = int(row.fileCount)
            
            elif query_name == "Top Formats":
                print(f"   üè∑Ô∏è  Top File Formats:")
                for i, row in enumerate(results[:5]):
                    print(f"      {i+1}. {row.format}: {int(row.count):,} files")
                
        except Exception as e:
            print(f"   ‚ùå {query_name}: Query failed - {e}")
            validation_results[query_name] = f"Error: {e}"
    
    # Calculate coverage percentages
    total_files = validation_results.get("Total Files", 0)
    if total_files > 0:
        print(f"\\nüìà Metadata Coverage Analysis:")
        
        coverage_metrics = {
            "Titles": validation_results.get("Files with Titles", 0),
            "Identifiers": validation_results.get("Files with Identifiers", 0), 
            "Format Info": validation_results.get("Files with Format Info", 0),
            "MD5 Hashes": validation_results.get("Files with MD5 Hashes", 0)
        }
        
        for metric, count in coverage_metrics.items():
            if isinstance(count, int):
                coverage = (count / total_files) * 100
                print(f"   {metric}: {coverage:.1f}% ({count:,}/{total_files:,})")
    
    # Validation checks
    expected_files = pipeline_status.get('file_counts', {}).get('rdf_files', 0)
    actual_files = validation_results.get("Total Files", 0)
    
    validation_passed = True
    validation_issues = []
    
    # Check file count consistency
    if actual_files != expected_files:
        validation_issues.append(f"File count mismatch: expected {expected_files}, got {actual_files}")
        validation_passed = False
    
    # Check minimum coverage requirements
    min_coverage = 0.95  # 95% minimum coverage
    for metric, count in [("Titles", validation_results.get("Files with Titles", 0)),
                          ("Identifiers", validation_results.get("Files with Identifiers", 0))]:
        if isinstance(count, int) and total_files > 0:
            coverage = count / total_files
            if coverage < min_coverage:
                validation_issues.append(f"{metric} coverage too low: {coverage:.1%} < {min_coverage:.1%}")
                validation_passed = False
    
    # Show validation summary
    if validation_passed:
        print(f"\\n‚úÖ RDF Content Validation PASSED")
        log_step("RDF Content Validation", True, f"{total_files:,} files validated")
    else:
        print(f"\\n‚ö†Ô∏è  RDF Content Validation FAILED:")
        for issue in validation_issues:
            print(f"   ‚Ä¢ {issue}")
        log_step("RDF Content Validation", False, f"{len(validation_issues)} issues found")
    
    pipeline_status['validation_results'] = validation_results
    pipeline_status['validation_passed'] = validation_passed
    
    return validation_passed

# Execute RDF validation
if pipeline_status.get('rdf_conversion_success'):
    validation_success = validate_rdf_content()
    
    if validation_success:
        print(f"\\nüéØ RDF content validation completed successfully!")
    else:
        print(f"\\n‚ö†Ô∏è  RDF content validation found issues - review before proceeding")
else:
    print("‚è≠Ô∏è  Skipping RDF validation - RDF conversion not successful")

## üì∏ 8. ExifTool XMP Data Extraction

Ausf√ºhrung von ExifTool im Batch-Modus zur Extraktion von XMP-Metadaten aus Bild- und Adobe-Dateien, Fokus auf DocumentID, InstanceID und Derivation-Beziehungen.

In [None]:
# =====================================================
# EXIFTOOL XMP DATA EXTRACTION  
# =====================================================

def run_exiftool_json(files: List[str], fast=False) -> List[Dict]:
    """
    Run ExifTool as JSON with error tolerance and UTF-8 filenames
    """
    if not files:
        return []
    
    cmd = [exiftool_command, "-a", "-s", "-G1", "-json", "-charset", "filename=UTF8", "-m"]
    if fast:
        cmd.insert(-1, "-fast")
    
    # Specific XMP tags we need
    xmp_tags = [
        "XMP-xmpMM:DocumentID",
        "XMP-xmpMM:InstanceID", 
        "XMP-xmpMM:OriginalDocumentID",
        "XMP-xmpMM:DerivedFromDocumentID",
        "XMP-xmpMM:DerivedFromInstanceID",
        "XMP-xmp:CreatorTool",
        "File:FileName",
        "File:Directory",
        "File:FileModifyDate"
    ]
    
    cmd.extend(xmp_tags)
    cmd.extend(files)
    
    try:
        result = subprocess.run(cmd, text=True, capture_output=True, timeout=300)
        output = result.stdout.strip()
        
        if output:
            return json.loads(output)
        else:
            return []
            
    except subprocess.TimeoutExpired:
        logger.warning("ExifTool timeout - skipping batch")
        return []
    except json.JSONDecodeError as e:
        logger.warning(f"ExifTool JSON parse error: {e}")
        return []
    except Exception as e:
        logger.warning(f"ExifTool execution error: {e}")
        return []

def find_target_files_from_rdf():
    """Find target files (images/Adobe) from RDF data"""
    
    if 'graph' not in pipeline_status:
        return []
    
    g = pipeline_status['graph']
    
    # SPARQL query to find target file types from RDF
    target_query = """
        PREFIX dca: <http://dca.ethz.ch/ontology#>
        PREFIX dcterms: <http://purl.org/dc/terms/>
        PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
        
        SELECT ?file ?title ?identifier ?format WHERE {
            ?file a dca:ArchiveFile ;
                  dcterms:title ?title ;
                  dcterms:identifier ?identifier .
            OPTIONAL { ?file premis:hasFormatName ?format }
            
            # Filter for image/Adobe files by extension
            FILTER(
                CONTAINS(LCASE(?title), ".jpg") ||
                CONTAINS(LCASE(?title), ".jpeg") ||
                CONTAINS(LCASE(?title), ".tif") ||
                CONTAINS(LCASE(?title), ".tiff") ||
                CONTAINS(LCASE(?title), ".png") ||
                CONTAINS(LCASE(?title), ".psd") ||
                CONTAINS(LCASE(?title), ".psb") ||
                CONTAINS(LCASE(?title), ".ai") ||
                CONTAINS(LCASE(?title), ".pdf") ||
                CONTAINS(LCASE(?title), ".indd")
            )
        }
    """
    
    target_files = []
    
    for row in g.query(target_query):
        file_uri = str(row.file)
        title = str(row.title)
        identifier = str(row.identifier)
        format_name = str(row.format) if row.format else "Unknown"
        
        # Convert NextCloud URL to local path
        local_path = None
        if identifier.startswith("https://nextcloud.ethz.ch/"):
            try:
                # Extract path after dataset name  
                if dataset_to_analyze in identifier:
                    parts = identifier.split(f"{dataset_to_analyze}/")
                    if len(parts) > 1:
                        path_part = unquote(parts[-1])
                        local_path = files_base_dir / path_part
                        
                        # Check if file exists locally
                        if local_path.exists():
                            target_files.append({
                                'file_uri': file_uri,
                                'title': title,
                                'local_path': str(local_path),
                                'format': format_name,
                                'exists': True
                            })
                        else:
                            target_files.append({
                                'file_uri': file_uri,
                                'title': title, 
                                'local_path': str(local_path),
                                'format': format_name,
                                'exists': False
                            })
            except Exception as e:
                logger.warning(f"Path conversion failed for {identifier}: {e}")
    
    return target_files

def extract_xmp_metadata():
    """Extract XMP metadata using ExifTool in batch mode"""
    
    print("üîç Finding target files for XMP extraction...")
    target_files = find_target_files_from_rdf()
    
    if not target_files:
        print("‚ùå No target files found for XMP extraction")
        return []
    
    # Filter to existing files only
    existing_files = [f for f in target_files if f['exists']]
    
    print(f"üìä Target files analysis:")
    print(f"   Total target files: {len(target_files):,}")
    print(f"   Files exist locally: {len(existing_files):,}")
    print(f"   Missing files: {len(target_files) - len(existing_files):,}")
    
    if not existing_files:
        print("‚ùå No local files available for XMP extraction")
        return []
    
    # Batch processing setup
    BATCH_SIZE = 50  # Smaller batches for ExifTool
    all_xmp_records = []
    total_files = len(existing_files)
    
    print(f"üîÑ Starting XMP extraction for {total_files:,} files in batches of {BATCH_SIZE}...")
    
    file_paths = [f['local_path'] for f in existing_files]
    
    for i in range(0, len(file_paths), BATCH_SIZE):
        batch_paths = file_paths[i:i+BATCH_SIZE]
        batch_num = (i // BATCH_SIZE) + 1
        total_batches = math.ceil(len(file_paths) / BATCH_SIZE)
        
        print(f"   üì¶ Processing batch {batch_num}/{total_batches} ({len(batch_paths)} files)...")
        
        # Extract XMP metadata for batch
        xmp_results = run_exiftool_json(batch_paths)
        
        for xmp_data in xmp_results:
            record = {
                "SourceFile": xmp_data.get("SourceFile"),
                "DocumentID": xmp_data.get("XMP-xmpMM:DocumentID"),
                "InstanceID": xmp_data.get("XMP-xmpMM:InstanceID"),
                "OriginalDocumentID": xmp_data.get("XMP-xmpMM:OriginalDocumentID"),
                "DerivedFromDocumentID": xmp_data.get("XMP-xmpMM:DerivedFromDocumentID"),
                "DerivedFromInstanceID": xmp_data.get("XMP-xmpMM:DerivedFromInstanceID"),
                "CreatorTool": xmp_data.get("XMP-xmp:CreatorTool"),
                "FileModifyDate": xmp_data.get("File:FileModifyDate")
            }
            all_xmp_records.append(record)
        
        progress = min(i + BATCH_SIZE, total_files)
        print(f"      Progress: {progress:,}/{total_files:,} files processed")
    
    # Create DataFrame for analysis
    xmp_df = pd.DataFrame(all_xmp_records)
    
    # Analysis of extracted data
    print(f"\\n‚úÖ XMP extraction completed:")
    print(f"   üìä Files processed: {len(xmp_df):,}")
    
    if not xmp_df.empty:
        # Count files with XMP metadata
        has_doc_id = xmp_df['DocumentID'].notna().sum()
        has_inst_id = xmp_df['InstanceID'].notna().sum()
        has_derived = (xmp_df['DerivedFromDocumentID'].notna() | 
                      xmp_df['DerivedFromInstanceID'].notna()).sum()
        
        print(f"   üîó Files with DocumentID: {has_doc_id:,}")
        print(f"   üÜî Files with InstanceID: {has_inst_id:,}")
        print(f"   üìé Files with derivation info: {has_derived:,}")
        
        # Creator tool analysis
        if xmp_df['CreatorTool'].notna().sum() > 0:
            creator_tools = xmp_df['CreatorTool'].value_counts().head(5)
            print(f"   üîß Top creator tools:")
            for tool, count in creator_tools.items():
                print(f"      {tool}: {count:,} files")
    
    # Store results
    pipeline_status['xmp_df'] = xmp_df
    pipeline_status['target_files'] = existing_files
    pipeline_status['file_counts']['xmp_processed'] = len(xmp_df)
    pipeline_status['file_counts']['xmp_with_metadata'] = has_doc_id if 'has_doc_id' in locals() else 0
    
    log_step("XMP Extraction", True, f"{len(xmp_df):,} files processed")
    
    return xmp_df

# Execute XMP extraction
if pipeline_status.get('validation_passed', False):
    xmp_df = extract_xmp_metadata()
    
    if not xmp_df.empty:
        print(f"\\nüéØ XMP extraction completed successfully!")
        print(f"   Ready for integration into RDF graph")
    else:
        print(f"\\n‚ö†Ô∏è  XMP extraction completed but no metadata found")
else:
    print("‚è≠Ô∏è  Skipping XMP extraction - RDF validation failed")

## üîó 9. XMP Data Integration into RDF

Merge von XMP-Metadaten in den bestehenden RDF-Graph mit denselben MD5-basierten File-URIs, Hinzuf√ºgung von PREMIS-Identifiern und Erstellung von Derivation-Beziehungen zwischen Dateien.

In [None]:
# =====================================================
# XMP DATA INTEGRATION INTO RDF
# =====================================================

def build_path_to_md5_mapping():
    """Build mapping from file paths to MD5 hashes from DROID data"""
    path_to_md5 = {}
    
    if 'droid_df' not in pipeline_status:
        return path_to_md5
    
    droid_df = pipeline_status['droid_df']
    
    # Find MD5 column
    md5_col = None
    for col in ['MD5_HASH', 'HASH', 'md5', 'MD5']:
        if col in droid_df.columns:
            md5_col = col
            break
    
    if md5_col:
        for _, record in droid_df.iterrows():
            file_path = record.get('FILE_PATH', record.get('PATH', ''))
            md5_hash = record.get(md5_col)
            
            if file_path and md5_hash and not pd.isna(md5_hash):
                # Create full path
                full_path = str(files_base_dir / file_path)
                path_to_md5[full_path] = md5_hash
    
    return path_to_md5

def get_file_uri_from_path(file_path: str, path_to_md5: Dict[str, str]) -> URIRef:
    """Get consistent file URI from path using MD5 or fallback"""
    # Try MD5-based URI first
    md5_hash = path_to_md5.get(file_path)
    if md5_hash:
        return dca_file_uri_from_md5(md5_hash)
    
    # Fallback to path-based URI
    return dca_file_uri_from_path_fallback(file_path)

def integrate_xmp_into_rdf():
    """Integrate XMP metadata into existing RDF graph"""
    
    # Prerequisites check
    if ('graph' not in pipeline_status or 
        'xmp_df' not in pipeline_status):
        print("‚è≠Ô∏è  Skipping XMP integration - prerequisites not met")
        return False
    
    g = pipeline_status['graph']
    xmp_df = pipeline_status['xmp_df']
    
    if xmp_df.empty:
        print("‚ö†Ô∏è  No XMP data to integrate")
        return True  # Not an error, just no data
    
    print(f"üîÑ Integrating XMP metadata into RDF graph...")
    
    # Build path-to-MD5 mapping for consistent URIs
    path_to_md5 = build_path_to_md5_mapping()
    print(f"üìã Built MD5 mapping for {len(path_to_md5):,} files")
    
    # Build indices for derivation matching
    doc_id_to_path = {}
    inst_id_to_path = {}
    
    for _, row in xmp_df.iterrows():
        source_file = row['SourceFile']
        if not source_file:
            continue
            
        if row['DocumentID']:
            doc_id_to_path[row['DocumentID']] = source_file
        if row['InstanceID']:
            inst_id_to_path[row['InstanceID']] = source_file
    
    print(f"üìá Built indexing: {len(doc_id_to_path)} DocumentIDs, {len(inst_id_to_path)} InstanceIDs")
    
    # Integration counters
    added_identifiers = 0
    added_derivations = 0
    processed_files = 0
    skipped_duplicates = 0
    
    # Track processed identifiers per file to avoid duplicates
    processed_per_file = {}
    
    initial_triples = len(g)
    
    for _, row in xmp_df.iterrows():
        source_file = row['SourceFile']
        if not source_file:
            continue
            
        # Get consistent file URI
        file_uri = get_file_uri_from_path(source_file, path_to_md5)
        file_uri_str = str(file_uri)
        
        # Initialize tracking for this file
        if file_uri_str not in processed_per_file:
            processed_per_file[file_uri_str] = set()
        
        # Ensure file is properly typed in RDF
        g.add((file_uri, RDF.type, DCA.ArchiveFile))
        g.add((file_uri, RDF.type, PREMIS.Object))
        g.add((file_uri, RDF.type, RICO.Record))
        
        processed_files += 1
        
        # Add XMP identifiers (with duplicate prevention)
        xmp_identifiers = [
            ("XMP DocumentID", row['DocumentID']),
            ("XMP InstanceID", row['InstanceID']),
            ("XMP OriginalDocumentID", row['OriginalDocumentID'])
        ]
        
        for id_type, id_value in xmp_identifiers:
            if id_value and not pd.isna(id_value):
                id_key = f"{id_type}:{id_value}"
                if id_key not in processed_per_file[file_uri_str]:
                    add_premis_identifier(g, file_uri, id_type, str(id_value))
                    processed_per_file[file_uri_str].add(id_key)
                    added_identifiers += 1
                else:
                    skipped_duplicates += 1
        
        # Add CreatorTool if available
        if row['CreatorTool'] and not pd.isna(row['CreatorTool']):
            g.add((file_uri, PREMIS.hasCreatingApplication, Literal(str(row['CreatorTool']))))
        
        # Add derivation relationships
        parent_path = None
        
        # Try DocumentID first, then InstanceID for parent matching
        if row['DerivedFromDocumentID'] and row['DerivedFromDocumentID'] in doc_id_to_path:
            parent_path = doc_id_to_path[row['DerivedFromDocumentID']]
        elif row['DerivedFromInstanceID'] and row['DerivedFromInstanceID'] in inst_id_to_path:
            parent_path = inst_id_to_path[row['DerivedFromInstanceID']]
        
        if parent_path:
            parent_uri = get_file_uri_from_path(parent_path, path_to_md5)
            
            # Add bidirectional derivation relationships
            g.add((file_uri, PREMIS.hasSource, parent_uri))
            g.add((parent_uri, PREMIS.isSourceOf, file_uri))
            added_derivations += 1
    
    # Final statistics
    final_triples = len(g)
    added_triples = final_triples - initial_triples
    
    print(f"‚úÖ XMP integration completed:")
    print(f"   üìÅ Files processed: {processed_files:,}")
    print(f"   üîó XMP identifiers added: {added_identifiers:,}")
    print(f"   üìé Derivation relationships: {added_derivations:,}")
    print(f"   üîÑ Duplicate identifiers skipped: {skipped_duplicates:,}")
    print(f"   üìà Triples added: {added_triples:,}")
    print(f"   üìä Total graph size: {final_triples:,} triples")
    
    # Update pipeline status
    pipeline_status['file_counts']['xmp_integrated'] = processed_files
    pipeline_status['file_counts']['xmp_identifiers'] = added_identifiers
    pipeline_status['file_counts']['derivations'] = added_derivations
    pipeline_status['graph'] = g
    
    success = added_identifiers > 0 or processed_files > 0
    log_step("XMP Integration", success, f"{processed_files:,} files, {added_identifiers:,} identifiers")
    
    return success

def validate_xmp_integration():
    """Validate XMP integration with SPARQL queries"""
    
    if 'graph' not in pipeline_status:
        return False
    
    g = pipeline_status['graph']
    
    print("üîç Validating XMP integration...")
    
    # XMP-specific validation queries
    xmp_queries = {
        "Files with XMP DocumentID": """
            PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file premis:hasIdentifier ?id .
                ?id premis:identifierType "XMP DocumentID" .
            }
        """,
        
        "Files with XMP InstanceID": """
            PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
            SELECT (COUNT(?file) AS ?count) WHERE {
                ?file premis:hasIdentifier ?id .
                ?id premis:identifierType "XMP InstanceID" .
            }
        """,
        
        "Derivation Relationships": """
            PREFIX premis: <http://www.loc.gov/premis/rdf/v3/>
            SELECT (COUNT(*) AS ?count) WHERE {
                ?child premis:hasSource ?parent .
            }
        """
    }
    
    validation_results = {}
    
    for query_name, query_text in xmp_queries.items():
        try:
            results = list(g.query(query_text))
            if results:
                count = int(results[0][0])
                validation_results[query_name] = count
                print(f"   ‚úÖ {query_name}: {count:,}")
            else:
                validation_results[query_name] = 0
                print(f"   ‚ùå {query_name}: 0")
        except Exception as e:
            print(f"   ‚ùå {query_name}: Query failed - {e}")
    
    # Store validation results
    pipeline_status['xmp_validation'] = validation_results
    
    # Check if integration was successful
    has_xmp_data = (validation_results.get("Files with XMP DocumentID", 0) > 0 or
                   validation_results.get("Files with XMP InstanceID", 0) > 0)
    
    if has_xmp_data:
        print("‚úÖ XMP integration validation PASSED")
        return True
    else:
        print("‚ö†Ô∏è  XMP integration validation: No XMP metadata found")
        return True  # Not an error if no XMP data exists

# Execute XMP integration
if pipeline_status.get('xmp_df') is not None:
    integration_success = integrate_xmp_into_rdf()
    
    if integration_success:
        validation_success = validate_xmp_integration()
        pipeline_status['xmp_integration_success'] = integration_success
        
        print(f"\\nüéØ XMP integration completed successfully!")
    else:
        print(f"\\n‚ùå XMP integration failed")
else:
    print("‚è≠Ô∏è  Skipping XMP integration - no XMP data available")

## üíæ 10. Final RDF Export & Statistics

Export der angereicherten RDF im Turtle-Format, Generierung umfassender Verarbeitungsstatistiken, Validierung der finalen Ausgabe und Erstellung von Backup-Dateien.

In [None]:
# =====================================================
# FINAL RDF EXPORT & COMPREHENSIVE STATISTICS
# =====================================================

def export_final_rdf():
    """Export final RDF with validation and backup"""
    
    if 'graph' not in pipeline_status:
        print("‚ùå No graph available for export")
        return False
    
    g = pipeline_status['graph']
    
    print("üíæ Exporting final RDF to Turtle format...")
    
    try:
        # Serialize to Turtle with nice formatting
        turtle_data = g.serialize(format='turtle')
        
        # Write to final output file
        with open(rdf_output_path, 'w', encoding='utf-8') as f:
            f.write(turtle_data)
        
        # Validate export
        file_size_mb = rdf_output_path.stat().st_size / (1024 * 1024)
        triple_count = len(g)
        
        print(f"‚úÖ RDF exported successfully:")
        print(f"   üìÅ File: {rdf_output_path}")
        print(f"   üìä Size: {file_size_mb:.2f} MB")
        print(f"   üìà Triples: {triple_count:,}")
        
        # Test load validation
        test_graph = Graph()
        test_graph.parse(rdf_output_path, format='turtle')
        
        if len(test_graph) == triple_count:
            print(f"   ‚úÖ Export validation: Load test successful")
            log_step("RDF Export", True, f"{file_size_mb:.2f} MB, {triple_count:,} triples")
            return True
        else:
            print(f"   ‚ùå Export validation: Triple count mismatch")
            log_step("RDF Export", False, "Triple count mismatch in validation")
            return False
            
    except Exception as e:
        error_msg = f"RDF export failed: {e}"
        print(f"‚ùå {error_msg}")
        log_step("RDF Export", False, error_msg)
        return False

def generate_pipeline_statistics():
    """Generate comprehensive pipeline statistics"""
    
    end_time = datetime.now()
    execution_time = end_time - pipeline_status['start_time']
    
    stats = {
        'execution_time': execution_time,
        'start_time': pipeline_status['start_time'],
        'end_time': end_time,
        'steps_completed': pipeline_status['steps_completed'],
        'steps_failed': pipeline_status['steps_failed'],
        'file_counts': pipeline_status.get('file_counts', {}),
        'errors': pipeline_status['errors']
    }
    
    print(f"\\nüìä COMPREHENSIVE PIPELINE STATISTICS")
    print(f"=" * 60)
    
    # Execution Summary
    print(f"üïê Execution Time: {execution_time}")
    print(f"üìÖ Started: {stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"üèÅ Completed: {stats['end_time'].strftime('%Y-%m-%d %H:%M:%S')}")
    print()
    
    # Step Summary
    total_steps = len(stats['steps_completed']) + len(stats['steps_failed'])
    success_rate = (len(stats['steps_completed']) / total_steps * 100) if total_steps > 0 else 0
    
    print(f"üìã Pipeline Steps:")
    print(f"   ‚úÖ Completed: {len(stats['steps_completed'])}")
    print(f"   ‚ùå Failed: {len(stats['steps_failed'])}")
    print(f"   üìà Success Rate: {success_rate:.1f}%")
    print()
    
    # File Processing Summary
    file_counts = stats['file_counts']
    print(f"üìÅ File Processing Summary:")
    
    processing_stages = [
        ("Input Files Detected", "input_files"),
        ("DROID Records Generated", "droid_records"),
        ("Files Converted to RDF", "rdf_files"),
        ("XMP Files Processed", "xmp_processed"),
        ("XMP Files Integrated", "xmp_integrated"),
        ("XMP Identifiers Added", "xmp_identifiers"),
        ("Derivation Relations", "derivations")
    ]
    
    for label, key in processing_stages:
        count = file_counts.get(key, 0)
        if count > 0:
            print(f"   {label}: {count:,}")
    
    # Error Processing
    conversion_errors = file_counts.get('conversion_errors', 0)
    total_processed = file_counts.get('rdf_files', 0) + conversion_errors
    if total_processed > 0:
        error_rate = (conversion_errors / total_processed) * 100
        print(f"   ‚ùå Conversion Errors: {conversion_errors:,} ({error_rate:.1f}%)")
    print()
    
    # RDF Statistics
    if 'graph' in pipeline_status:
        g = pipeline_status['graph']
        print(f"üìä RDF Graph Statistics:")
        print(f"   üìà Total Triples: {len(g):,}")
        
        # Calculate enrichment ratio
        initial_triples = pipeline_status.get('initial_triples', 0)
        if initial_triples > 0:
            enrichment = ((len(g) - initial_triples) / initial_triples) * 100
            print(f"   üìà Enrichment: +{enrichment:.1f}% (+{len(g) - initial_triples:,} triples)")
        print()
    
    # Quality Metrics
    print(f"üéØ Quality Metrics:")
    if 'validation_results' in pipeline_status:
        validation = pipeline_status['validation_results']
        total_files = validation.get('Total Files', 0)
        
        if total_files > 0:
            metrics = [
                ("Title Coverage", "Files with Titles"),
                ("Identifier Coverage", "Files with Identifiers"),  
                ("Format Coverage", "Files with Format Info"),
                ("MD5 Hash Coverage", "Files with MD5 Hashes")
            ]
            
            for metric_name, key in metrics:
                count = validation.get(key, 0)
                if isinstance(count, int):
                    coverage = (count / total_files) * 100
                    print(f"   {metric_name}: {coverage:.1f}% ({count:,}/{total_files:,})")
    
    # XMP Integration Quality
    if 'xmp_validation' in pipeline_status:
        xmp_val = pipeline_status['xmp_validation']
        xmp_doc_ids = xmp_val.get('Files with XMP DocumentID', 0)
        xmp_inst_ids = xmp_val.get('Files with XMP InstanceID', 0)
        derivations = xmp_val.get('Derivation Relationships', 0)
        
        if xmp_doc_ids > 0 or xmp_inst_ids > 0:
            print(f"   XMP DocumentIDs: {xmp_doc_ids:,} files")
            print(f"   XMP InstanceIDs: {xmp_inst_ids:,} files")
            print(f"   Derivation Links: {derivations:,} relationships")
    print()
    
    # Error Summary
    if stats['errors']:
        print(f"‚ö†Ô∏è  Error Summary:")
        for error in stats['errors']:
            print(f"   ‚Ä¢ {error}")
        print()
    
    # Output Files
    print(f"üì§ Output Files:")
    print(f"   üìÑ Final RDF: {rdf_output_path}")
    if rdf_output_path.exists():
        size_mb = rdf_output_path.stat().st_size / (1024 * 1024)
        print(f"   üìä File Size: {size_mb:.2f} MB")
    
    # Log file
    log_files = list(Path('.').glob('dca_pipeline_*.log'))
    if log_files:
        latest_log = max(log_files, key=lambda p: p.stat().st_mtime)
        print(f"   üìã Log File: {latest_log}")
    print()
    
    # Success Assessment
    critical_steps = ['Path Validation', 'DROID Analysis', 'RDF Graph Initialization', 'DROID to RDF Conversion']
    critical_completed = [step for step in critical_steps if step in stats['steps_completed']]
    
    if len(critical_completed) == len(critical_steps):
        print(f"‚úÖ PIPELINE STATUS: SUCCESS")
        print(f"   üéØ All critical steps completed successfully")
        print(f"   üìà Ready for ETH DCA integration")
    else:
        missing_steps = [step for step in critical_steps if step not in stats['steps_completed']]
        print(f"‚ö†Ô∏è  PIPELINE STATUS: PARTIAL SUCCESS")
        print(f"   ‚ùå Missing critical steps: {', '.join(missing_steps)}")
    
    return stats

def create_pipeline_summary():
    """Create a summary report file"""
    
    stats = generate_pipeline_statistics()
    
    summary_path = rdf_output_path.parent / f"pipeline_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
    
    try:
        with open(summary_path, 'w', encoding='utf-8') as f:
            f.write("DCA INTEGRATED PIPELINE - EXECUTION SUMMARY\\n")
            f.write("=" * 60 + "\\n\\n")
            
            f.write(f"Project: {PROJECT_NAME}\\n")
            f.write(f"Dataset: {dataset_to_analyze}\\n")
            f.write(f"Execution Time: {stats['execution_time']}\\n")
            f.write(f"Completed Steps: {len(stats['steps_completed'])}/{len(stats['steps_completed']) + len(stats['steps_failed'])}\\n\\n")
            
            f.write("FILE PROCESSING:\\n")
            for key, value in stats['file_counts'].items():
                f.write(f"  {key}: {value:,}\\n")
            
            if stats['errors']:
                f.write("\\nERRORS:\\n")
                for error in stats['errors']:
                    f.write(f"  - {error}\\n")
        
        print(f"üìã Summary report saved: {summary_path}")
        
    except Exception as e:
        print(f"‚ö†Ô∏è  Failed to create summary report: {e}")

# Execute final export and statistics
print("üèÅ Finalizing integrated pipeline...")

export_success = export_final_rdf()
pipeline_status['export_success'] = export_success

if export_success:
    print("\\nüéØ RDF Export completed successfully!")

# Generate comprehensive statistics
stats = generate_pipeline_statistics()

# Create summary report
create_pipeline_summary()

# Final status
final_success = (
    export_success and
    len(pipeline_status['steps_failed']) == 0 and
    'DROID to RDF Conversion' in pipeline_status['steps_completed']
)

if final_success:
    print(f"\\nüéä INTEGRATED PIPELINE COMPLETED SUCCESSFULLY!")
    print(f"   üìÑ Final output ready: {rdf_output_path}")
    print(f"   üîó Ready for ETH DCA integration and workflow visualization")
else:
    print(f"\\n‚ö†Ô∏è  PIPELINE COMPLETED WITH ISSUES")
    print(f"   üìÑ Check logs and summary for details")

print(f"\\nüìã Session log available in: dca_pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")