# Custom Validation Rules - Advanced Guide

This notebook shows how to create custom validation rules for your specific needs.

## When to Create Custom Rules

- Check brain bank-specific requirements
- Validate data quality metrics
- Enforce naming conventions
- Check for specific file formats
- Validate metadata in JSON files

## Rule Architecture

Every validation rule:
1. Inherits from `ValidationRule`
2. Implements `check(context)` method
3. Returns `ValidationResult`
4. Can use `_pass()` and `_fail()` helpers

In [None]:
from pathlib import Path

from voxelops import QSIPrepInputs, run_procedure
from voxelops.validation.base import ValidationResult, ValidationRule
from voxelops.validation.context import ValidationContext
from voxelops.validation.validators import Validator

## Example 1: Simple File Count Rule

Check that a minimum number of files exist:

In [None]:
class MinimumDWIVolumesRule(ValidationRule):
    """Check that DWI scan has minimum number of volumes."""

    name = "minimum_dwi_volumes"
    description = "Verify DWI has at least 30 diffusion directions"
    severity = "error"
    phase = "pre"

    def __init__(self, min_volumes=30):
        self.min_volumes = min_volumes

    def check(self, context: ValidationContext) -> ValidationResult:
        """Check minimum volumes in bval file."""
        # Get BIDS directory
        if not context.inputs or not hasattr(context.inputs, 'bids_dir'):
            return self._fail("Cannot determine BIDS directory")

        bids_dir = Path(context.inputs.bids_dir)
        participant_dir = bids_dir / f"sub-{context.participant}"

        if context.session:
            participant_dir = participant_dir / f"ses-{context.session}"

        dwi_dir = participant_dir / "dwi"

        if not dwi_dir.exists():
            return self._fail(f"DWI directory not found: {dwi_dir}")

        # Find bval file
        bval_files = list(dwi_dir.glob("*_dwi.bval"))

        if not bval_files:
            return self._fail("No .bval file found")

        # Read bval file
        bval_file = bval_files[0]
        with open(bval_file) as f:
            bvals = f.read().split()

        volume_count = len(bvals)

        if volume_count < self.min_volumes:
            return self._fail(
                f"Found {volume_count} volumes, minimum is {self.min_volumes}",
                details={
                    "bval_file": str(bval_file),
                    "volume_count": volume_count,
                    "minimum_required": self.min_volumes,
                }
            )

        return self._pass(
            f"DWI has {volume_count} volumes (>= {self.min_volumes})",
            details={
                "bval_file": str(bval_file),
                "volume_count": volume_count,
            }
        )

# Test the rule
rule = MinimumDWIVolumesRule(min_volumes=30)
print(f"Rule: {rule.name}")
print(f"Description: {rule.description}")
print(f"Phase: {rule.phase}")

## Example 2: JSON Metadata Validation

Check that required metadata fields exist:

In [None]:
import json


class DWIMetadataRule(ValidationRule):
    """Check that DWI JSON sidecar has required fields."""

    name = "dwi_metadata_complete"
    description = "Verify DWI JSON has required metadata fields"
    severity = "warning"  # Warning, not error
    phase = "pre"

    REQUIRED_FIELDS = [
        "EchoTime",
        "RepetitionTime",
        "PhaseEncodingDirection",
        "TotalReadoutTime",
    ]

    def check(self, context: ValidationContext) -> ValidationResult:
        """Check JSON metadata."""
        bids_dir = Path(context.inputs.bids_dir)
        participant_dir = bids_dir / f"sub-{context.participant}"

        if context.session:
            participant_dir = participant_dir / f"ses-{context.session}"

        dwi_dir = participant_dir / "dwi"

        # Find JSON sidecar
        json_files = list(dwi_dir.glob("*_dwi.json"))

        if not json_files:
            return self._fail(
                "No JSON sidecar found",
                details={"directory": str(dwi_dir)}
            )

        json_file = json_files[0]

        # Read JSON
        with open(json_file) as f:
            metadata = json.load(f)

        # Check for required fields
        missing_fields = [
            field for field in self.REQUIRED_FIELDS
            if field not in metadata
        ]

        if missing_fields:
            return self._fail(
                f"Missing metadata fields: {', '.join(missing_fields)}",
                details={
                    "json_file": str(json_file),
                    "missing_fields": missing_fields,
                    "present_fields": list(metadata.keys()),
                }
            )

        return self._pass(
            "All required metadata fields present",
            details={
                "json_file": str(json_file),
                "fields_checked": self.REQUIRED_FIELDS,
            }
        )

# Test the rule
rule = DWIMetadataRule()
print(f"Rule: {rule.name}")
print(f"Severity: {rule.severity}")
print(f"Required fields: {rule.REQUIRED_FIELDS}")

## Example 3: Creating a Custom Validator

Combine multiple rules into a custom validator:

In [None]:
from voxelops.validation.rules.common import (
    DirectoryExistsRule,
    GlobFilesExistRule,
    ParticipantExistsRule,
)


class CustomQSIPrepValidator(Validator):
    """Custom QSIPrep validator with additional checks."""

    procedure_name = "qsiprep"

    pre_rules = [
        # Standard checks
        DirectoryExistsRule("bids_dir", "BIDS directory"),
        ParticipantExistsRule(),
        GlobFilesExistRule(
            base_dir_attr="bids_dir",
            pattern="dwi/*_dwi.nii.gz",
            min_count=1,
            file_type="DWI files",
        ),

        # Custom checks
        MinimumDWIVolumesRule(min_volumes=30),
        DWIMetadataRule(),
    ]

    post_rules = [
        # Add post-validation rules here
    ]

# Test the custom validator
validator = CustomQSIPrepValidator()
print(f"Procedure: {validator.procedure_name}")
print(f"Pre-rules: {len(validator.pre_rules)}")
print("\nRules:")
for i, rule in enumerate(validator.pre_rules, 1):
    print(f"  {i}. {rule.description} ({rule.severity})")

## Example 4: Using Custom Validator

Use your custom validator with `run_procedure()`:

In [None]:
# Temporarily replace the default validator
from voxelops.procedures import orchestrator

# Save original
original_validator = orchestrator.VALIDATORS["qsiprep"]

# Use custom validator
orchestrator.VALIDATORS["qsiprep"] = CustomQSIPrepValidator()

try:
    # Run with custom validation
    inputs = QSIPrepInputs(
        bids_dir=Path("/data/bids"),
        participant="001",
    )

    result = run_procedure("qsiprep", inputs)

    # Check custom rule results
    if result.pre_validation:
        for r in result.pre_validation.results:
            if r.rule_name in ["minimum_dwi_volumes", "dwi_metadata_complete"]:
                print(f"{r.rule_name}: {'PASSED' if r.passed else 'FAILED'}")
                print(f"  {r.message}")
finally:
    # Restore original
    orchestrator.VALIDATORS["qsiprep"] = original_validator

## Example 5: Conditional Rules

Rules that only run under certain conditions:

In [None]:
class FieldmapRequiredRule(ValidationRule):
    """Check for fieldmaps if distortion correction is enabled."""

    name = "fieldmap_required"
    description = "Verify fieldmap exists when needed"
    severity = "warning"
    phase = "pre"

    def skip_condition(self, context: ValidationContext) -> bool:
        """Skip if distortion correction is disabled."""
        # Check config for distortion correction setting
        if context.config:
            use_syn = getattr(context.config, 'use_syn_sdc', False)
            if use_syn:  # Using synthetic fieldmap
                return True
        return False

    def check(self, context: ValidationContext) -> ValidationResult:
        """Check for fieldmap files."""
        bids_dir = Path(context.inputs.bids_dir)
        participant_dir = bids_dir / f"sub-{context.participant}"

        if context.session:
            participant_dir = participant_dir / f"ses-{context.session}"

        fmap_dir = participant_dir / "fmap"

        if not fmap_dir.exists():
            return self._fail(
                "Fieldmap directory not found",
                details={"expected_path": str(fmap_dir)}
            )

        # Check for fieldmap files
        fmap_files = list(fmap_dir.glob("*"))

        if not fmap_files:
            return self._fail(
                "No fieldmap files found",
                details={"fmap_dir": str(fmap_dir)}
            )

        return self._pass(
            f"Found {len(fmap_files)} fieldmap file(s)",
            details={
                "fmap_dir": str(fmap_dir),
                "file_count": len(fmap_files),
            }
        )

print("This rule will be skipped if use_syn_sdc=True in config")

## Example 6: Post-validation Rule

Check outputs after procedure completes:

In [None]:
class HTMLReportExistsRule(ValidationRule):
    """Check that QSIPrep HTML report was generated."""

    name = "html_report_exists"
    description = "Verify QSIPrep HTML report was created"
    severity = "warning"
    phase = "post"  # Post-validation

    def check(self, context: ValidationContext) -> ValidationResult:
        """Check for HTML report."""
        if not context.expected_outputs:
            return self._fail("No expected outputs defined")

        if not hasattr(context.expected_outputs, 'qsiprep_dir'):
            return self._fail("QSIPrep directory not in outputs")

        qsiprep_dir = Path(context.expected_outputs.qsiprep_dir)

        # Look for HTML report
        html_pattern = f"sub-{context.participant}*.html"
        html_files = list(qsiprep_dir.glob(html_pattern))

        if not html_files:
            return self._fail(
                f"HTML report not found (pattern: {html_pattern})",
                details={
                    "qsiprep_dir": str(qsiprep_dir),
                    "pattern": html_pattern,
                }
            )

        html_file = html_files[0]

        # Check file size (should be > 1MB for real report)
        file_size_mb = html_file.stat().st_size / (1024 * 1024)

        if file_size_mb < 0.1:
            return self._fail(
                f"HTML report suspiciously small: {file_size_mb:.2f}MB",
                details={
                    "html_file": str(html_file),
                    "size_mb": file_size_mb,
                }
            )

        return self._pass(
            f"HTML report found: {html_file.name} ({file_size_mb:.2f}MB)",
            details={
                "html_file": str(html_file),
                "size_mb": file_size_mb,
            }
        )

print("Post-validation rules check outputs after procedure completes")

## Example 7: Rule with External Dependencies

Rules can use external tools:

In [None]:
import subprocess


class BIDSValidatorRule(ValidationRule):
    """Run BIDS validator on the dataset."""

    name = "bids_validator"
    description = "Verify BIDS compliance with bids-validator"
    severity = "warning"  # Warning since it requires external tool
    phase = "pre"

    def check(self, context: ValidationContext) -> ValidationResult:
        """Run bids-validator."""
        bids_dir = Path(context.inputs.bids_dir)

        try:
            # Run bids-validator
            result = subprocess.run(
                ["bids-validator", str(bids_dir), "--json"],
                capture_output=True,
                text=True,
                timeout=60,
            )

            # Parse JSON output
            import json
            validation_result = json.loads(result.stdout)

            errors = validation_result.get('issues', {}).get('errors', [])
            warnings = validation_result.get('issues', {}).get('warnings', [])

            if errors:
                return self._fail(
                    f"BIDS validation failed with {len(errors)} error(s)",
                    details={
                        "bids_dir": str(bids_dir),
                        "error_count": len(errors),
                        "warning_count": len(warnings),
                        "first_error": errors[0] if errors else None,
                    }
                )

            return self._pass(
                f"BIDS validation passed ({len(warnings)} warning(s))",
                details={
                    "bids_dir": str(bids_dir),
                    "warning_count": len(warnings),
                }
            )

        except FileNotFoundError:
            return self._fail(
                "bids-validator not installed",
                details={"suggestion": "npm install -g bids-validator"}
            )
        except Exception as e:
            return self._fail(
                f"BIDS validation error: {str(e)}",
                details={"error": str(e)}
            )

print("This rule requires bids-validator to be installed")
print("Install: npm install -g bids-validator")

## Best Practices

### Rule Design

1. **Single Responsibility**: Each rule checks one thing
2. **Clear Messages**: Error messages should be actionable
3. **Include Details**: Add context in `details` dict
4. **Appropriate Severity**:
   - `error`: Must fix, validation fails
   - `warning`: Should fix, but validation passes
   - `info`: Informational only

### Performance

1. **Skip Conditions**: Use `skip_condition()` to avoid unnecessary checks
2. **Early Returns**: Fail fast if prerequisites aren't met
3. **Cache Expensive Operations**: Store results in context if needed
4. **Avoid External Calls**: Unless absolutely necessary

### Testing

1. **Test Rules Independently**: Create mock contexts
2. **Test Both Pass and Fail**: Ensure both paths work
3. **Check Details**: Verify useful info is included
4. **Test Skip Conditions**: Ensure rules skip when appropriate

## Summary

Custom validation rules allow you to:
- Enforce brain bank-specific requirements
- Add quality checks beyond basic file existence
- Integrate with external validation tools
- Create reusable validation logic

The framework is designed to be extended - use these examples as templates for your own rules!