# Data Versioning Example

This notebook demonstrates how to use the Data Version Control System to version datasets, track lineage, and detect drift in the Mental Health Risk Assessment System.

## Setup

In [None]:
import sys
sys.path.append('..')

import pandas as pd
import numpy as np
from datetime import datetime

from src.ds.data_versioning import DataVersionControl
from src.ds.storage import FileSystemStorage
from src.database.connection import get_db_connection

## Initialize Data Version Control

In [None]:
# Initialize storage and database
storage = FileSystemStorage(base_path="../data/versions")
db = get_db_connection()

# Create data version control system
dvc = DataVersionControl(storage_backend=storage, db_connection=db)

print("✓ Data Version Control initialized")

## Create Sample Dataset

In [None]:
# Generate synthetic patient assessment data (January 2024)
np.random.seed(42)
n_samples = 500

raw_data = pd.DataFrame({
    'patient_id': range(1, n_samples + 1),
    'age': np.random.randint(18, 80, n_samples),
    'gender': np.random.choice(['M', 'F', 'Other'], n_samples),
    'phq9_score': np.random.randint(0, 27, n_samples),
    'gad7_score': np.random.randint(0, 21, n_samples),
    'assessment_date': pd.date_range('2024-01-01', periods=n_samples, freq='H')
})

# Add some missing values
raw_data.loc[np.random.choice(raw_data.index, 20), 'phq9_score'] = np.nan
raw_data.loc[np.random.choice(raw_data.index, 15), 'gad7_score'] = np.nan

print(f"Raw dataset shape: {raw_data.shape}")
print(f"Missing values: {raw_data.isnull().sum().sum()}")
raw_data.head()

## Register Initial Dataset Version

In [None]:
# Register raw dataset
raw_version = dvc.register_dataset(
    dataset=raw_data,
    dataset_name="patient_assessments",
    source="emr_system",
    metadata={
        "collection_period": "2024-01",
        "num_facilities": 5,
        "data_quality": "raw"
    }
)

print(f"✓ Registered dataset version: {raw_version.version}")
print(f"  Version ID: {raw_version.version_id}")
print(f"  Rows: {raw_version.num_rows}")
print(f"  Columns: {raw_version.num_columns}")
print(f"  Created: {raw_version.created_at}")

## View Dataset Statistics

In [None]:
# View automatically computed statistics
print("Dataset Statistics:")
print(f"\nSchema:")
for col, dtype in raw_version.schema.items():
    print(f"  {col}: {dtype}")

print(f"\nSummary Statistics:")
stats = raw_version.statistics
print(f"  Missing values: {stats.get('missing_values', {})}")
print(f"  Duplicate rows: {stats.get('duplicate_rows', 0)}")

## Data Cleaning and Transformation

In [None]:
# Clean the data
def clean_data(df):
    """Clean patient assessment data"""
    cleaned = df.copy()
    
    # Remove rows with missing patient_id
    cleaned = cleaned.dropna(subset=['patient_id'])
    
    # Impute missing scores with median
    cleaned['phq9_score'].fillna(cleaned['phq9_score'].median(), inplace=True)
    cleaned['gad7_score'].fillna(cleaned['gad7_score'].median(), inplace=True)
    
    # Remove duplicates
    cleaned = cleaned.drop_duplicates()
    
    return cleaned

cleaned_data = clean_data(raw_data)

print(f"Cleaned dataset shape: {cleaned_data.shape}")
print(f"Missing values: {cleaned_data.isnull().sum().sum()}")

In [None]:
# Register cleaned version
cleaned_version = dvc.register_dataset(
    dataset=cleaned_data,
    dataset_name="patient_assessments",
    source="cleaning_pipeline",
    metadata={
        "parent_version": str(raw_version.version_id),
        "cleaning_steps": ["imputation", "deduplication"],
        "data_quality": "cleaned"
    }
)

print(f"✓ Registered cleaned version: {cleaned_version.version}")

## Track Data Lineage

In [None]:
# Track the transformation from raw to cleaned
import inspect

dvc.track_transformation(
    input_version_id=str(raw_version.version_id),
    output_version_id=str(cleaned_version.version_id),
    transformation_code=inspect.getsource(clean_data),
    transformation_type="cleaning"
)

print("✓ Transformation tracked")

In [None]:
# Query lineage
upstream = dvc.get_lineage(
    dataset_version_id=str(cleaned_version.version_id),
    direction="upstream"
)

print(f"Upstream lineage for {cleaned_version.version}:")
for ancestor in upstream:
    print(f"  ← {ancestor.dataset_name} v{ancestor.version} ({ancestor.source})")

## Feature Engineering

In [None]:
# Create features
def engineer_features(df):
    """Engineer features for modeling"""
    features = df.copy()
    
    # Age groups
    features['age_group'] = pd.cut(
        features['age'],
        bins=[0, 25, 45, 65, 100],
        labels=['young', 'adult', 'middle_aged', 'senior']
    )
    
    # Composite severity score
    features['severity_score'] = (
        features['phq9_score'] * 0.5 + 
        features['gad7_score'] * 0.5
    )
    
    # Risk category
    features['risk_category'] = pd.cut(
        features['severity_score'],
        bins=[0, 10, 20, 50],
        labels=['low', 'medium', 'high']
    )
    
    return features

feature_data = engineer_features(cleaned_data)

print(f"Feature dataset shape: {feature_data.shape}")
print(f"New columns: {set(feature_data.columns) - set(cleaned_data.columns)}")
feature_data.head()

In [None]:
# Register feature version
feature_version = dvc.register_dataset(
    dataset=feature_data,
    dataset_name="patient_assessments",
    source="feature_engineering",
    metadata={
        "parent_version": str(cleaned_version.version_id),
        "features_added": ["age_group", "severity_score", "risk_category"],
        "data_quality": "features"
    }
)

# Track transformation
dvc.track_transformation(
    input_version_id=str(cleaned_version.version_id),
    output_version_id=str(feature_version.version_id),
    transformation_code=inspect.getsource(engineer_features),
    transformation_type="feature_engineering"
)

print(f"✓ Registered feature version: {feature_version.version}")

## List All Versions

In [None]:
# List all versions of the dataset
versions = dvc.list_versions("patient_assessments")

print(f"Dataset versions ({len(versions)}):")
for v in versions:
    print(f"\n{v.version} ({v.source})")
    print(f"  Created: {v.created_at}")
    print(f"  Shape: {v.num_rows} rows × {v.num_columns} columns")
    print(f"  Quality: {v.metadata.get('data_quality', 'unknown')}")

## Retrieve Specific Version

In [None]:
# Retrieve the cleaned version
retrieved_df, retrieved_version = dvc.get_dataset(
    dataset_name="patient_assessments",
    version=cleaned_version.version
)

print(f"Retrieved version: {retrieved_version.version}")
print(f"Shape: {retrieved_df.shape}")
print(f"Missing values: {retrieved_df.isnull().sum().sum()}")
retrieved_df.head()

## Drift Detection

In [None]:
# Simulate new data with drift (February 2024)
np.random.seed(100)
n_new = 500

# Introduce drift: higher scores, different age distribution
new_data = pd.DataFrame({
    'patient_id': range(n_samples + 1, n_samples + n_new + 1),
    'age': np.random.randint(25, 70, n_new),  # Narrower age range
    'gender': np.random.choice(['M', 'F', 'Other'], n_new),
    'phq9_score': np.random.randint(5, 27, n_new),  # Higher scores
    'gad7_score': np.random.randint(3, 21, n_new),  # Higher scores
    'assessment_date': pd.date_range('2024-02-01', periods=n_new, freq='H')
})

print(f"New dataset shape: {new_data.shape}")
print(f"\nScore distributions:")
print(f"  PHQ-9 mean: {new_data['phq9_score'].mean():.2f} (was {raw_data['phq9_score'].mean():.2f})")
print(f"  GAD-7 mean: {new_data['gad7_score'].mean():.2f} (was {raw_data['gad7_score'].mean():.2f})")

In [None]:
# Register new version
new_version = dvc.register_dataset(
    dataset=new_data,
    dataset_name="patient_assessments",
    source="emr_system",
    metadata={
        "collection_period": "2024-02",
        "num_facilities": 5,
        "data_quality": "raw"
    }
)

print(f"✓ Registered new version: {new_version.version}")

In [None]:
# Detect drift between versions
drift_report = dvc.detect_drift(
    dataset_version_id1=str(raw_version.version_id),
    dataset_version_id2=str(new_version.version_id)
)

print("Drift Detection Report")
print("=" * 50)
print(f"Drift detected: {drift_report.drift_detected}")
print(f"Overall drift score: {drift_report.drift_score:.4f}")
print(f"\nFeature-level drift:")
for feature, score in sorted(drift_report.feature_drifts.items(), key=lambda x: x[1], reverse=True):
    status = "⚠️ HIGH" if score > 0.1 else "✓ low"
    print(f"  {feature}: {score:.4f} {status}")

In [None]:
# View statistical tests
print("\nStatistical Tests:")
for feature, test_result in drift_report.statistical_tests.items():
    print(f"\n{feature}:")
    print(f"  Test: {test_result['test_name']}")
    print(f"  Statistic: {test_result['statistic']:.4f}")
    print(f"  P-value: {test_result['p_value']:.4f}")
    if test_result['p_value'] < 0.05:
        print(f"  Result: Significant drift detected (p < 0.05)")
    else:
        print(f"  Result: No significant drift (p >= 0.05)")

In [None]:
# View recommendations
print("\nRecommendations:")
for i, rec in enumerate(drift_report.recommendations, 1):
    print(f"{i}. {rec}")

## Visualize Data Lineage

In [None]:
# Get full lineage for feature version
upstream_lineage = dvc.get_lineage(
    dataset_version_id=str(feature_version.version_id),
    direction="upstream"
)

print("Data Lineage (upstream):")
print(f"\n{feature_version.version} ({feature_version.source})")
for ancestor in upstream_lineage:
    print(f"  ↑")
    print(f"  {ancestor.version} ({ancestor.source})")

## Summary

This notebook demonstrated:

1. **Registering datasets** with metadata
2. **Tracking transformations** through the pipeline
3. **Querying lineage** to understand data provenance
4. **Detecting drift** between dataset versions
5. **Retrieving specific versions** for reproducibility
6. **Viewing statistics** automatically computed for each version

### Key Benefits

- **Reproducibility**: Retrieve exact dataset versions used in experiments
- **Traceability**: Track how data was transformed
- **Quality**: Monitor data drift and quality over time
- **Collaboration**: Share versioned datasets across team

### Next Steps

- Integrate with experiment tracking
- Set up automated drift monitoring
- Implement data quality gates in pipelines
- Archive old versions to manage storage