# Data Management
> Core functionality for data ingestion, hashing, and quality checking.

This module handles:
- Data ingestion and storage
- Content hashing and ID generation
- Basic quality checks
- Content provenance tracking


In [None]:
#| default_exp data
#| export
from fastcore.basics import *
from fastcore.test import *
import hashlib
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import json
import numpy as np
from datetime import datetime,timezone


In [None]:
#| hide
from nbdev.showdoc import *

## Taking inspiration from FineWeb
FineWeb use a multi-stage filtering approach that we could look to inspire a `DataQualityChecker` class.

The key insights we're implementing from FineWeb are:
1. Multi-stage quality filtering
2. Focus on line-level quality metrics
3. Tracking content reuse and impact
4. Weighted scoring based on historical performance


In [None]:
#| export
class DataQualityChecker:
    """Quality checking pipeline inspired by FineWeb's approach"""
    def __init__(self, min_quality_score: float = 0.12):
        self.min_quality_score = min_quality_score
    
    def check_basic_quality(self, content: str) -> Dict[str, float]:
        """Basic quality checks similar to FineWeb's base filtering"""
        scores = {}
        
        # Check fraction of lines ending with punctuation
        lines = content.split('\n')
        punct_lines = sum(1 for l in lines if l.strip() and l.strip()[-1] in '.!?')
        scores['punctuation_ratio'] = punct_lines / len(lines) if lines else 0
        
        # Check for duplicate lines (similar to FineWeb's line deduplication)
        unique_lines = set(lines)
        scores['unique_lines_ratio'] = len(unique_lines) / len(lines) if lines else 0
        
        # Check for short lines (FineWeb removes docs with too many short lines)
        short_lines = sum(1 for l in lines if len(l.strip()) < 30)
        scores['short_lines_ratio'] = short_lines / len(lines) if lines else 1
        
        return scores
    
    def calculate_quality_score(self, content: str) -> float:
        """Calculate overall quality score"""
        scores = self.check_basic_quality(content)
        
        # Weights could be tuned based on empirical results
        quality_score = (
            0.4 * scores['punctuation_ratio'] +
            0.4 * scores['unique_lines_ratio'] +
            0.2 * (1 - scores['short_lines_ratio'])
        )
        
        return quality_score
    
    def is_acceptable(self, content: str) -> Tuple[bool, Dict[str, float]]:
        """Check if content meets minimum quality standards"""
        scores = self.check_basic_quality(content)
        quality_score = self.calculate_quality_score(content)
        return quality_score >= self.min_quality_score, {
            'quality_score': quality_score,
            **scores
        }


Similarly we can implement a citation-like systme similar to how FineWeb tracks data reuse

In [None]:
#| export
class ContentProvenance:
    """Tracks content reuse and impact across training runs"""
    def __init__(self):
        self.citation_graph = {}  # content_id -> list of derived_content_ids
        self.usage_history = {}   # content_id -> list of model_version_ids
        self.quality_history = {} # content_id -> list of quality scores
        
    def record_usage(self, content_id: str, model_version: str, 
                    quality_impact: float):
        """Record when content is used in training"""
        if content_id not in self.usage_history:
            self.usage_history[content_id] = []
        if content_id not in self.quality_history:
            self.quality_history[content_id] = []
            
        self.usage_history[content_id].append(model_version)
        self.quality_history[content_id].append(quality_impact)
    
    def calculate_impact_score(self, content_id: str) -> float:
        """Calculate content impact based on usage and quality history"""
        if content_id not in self.quality_history:
            return 0.0
            
        # Average quality impact weighted by recency
        weights = np.exp(np.arange(len(self.quality_history[content_id])))
        weighted_avg = np.average(
            self.quality_history[content_id], 
            weights=weights
        )
        
        # Boost score based on number of uses
        usage_multiplier = 1 + np.log1p(len(self.usage_history[content_id]))
        
        return weighted_avg * usage_multiplier


In [None]:
#| test
def test_quality_checker():
    checker = DataQualityChecker(min_quality_score=0.5)
    
    # Test high quality content
    good_content = "This is a well-formatted paragraph.\nIt has proper punctuation.\nAnd good length lines."
    is_good, scores = checker.is_acceptable(good_content)
    test_eq(is_good, True)
    test_gt(scores['quality_score'], 0.5)
    
    # Test low quality content
    bad_content = "short\nshort\nshort\nno punct\nrepeated\nrepeated"
    is_bad, scores = checker.is_acceptable(bad_content)
    test_eq(is_bad, False)
    test_lt(scores['quality_score'], 0.5)

def test_provenance():
    prov = ContentProvenance()
    
    # Test recording usage
    prov.record_usage("content_1", "model_v1", 0.8)
    prov.record_usage("content_1", "model_v2", 0.9)
    
    # Test impact calculation
    impact = prov.calculate_impact_score("content_1")
    test_gt(impact, 0.8)  # Should be > 0.8 due to weighted average
    
    # Test nonexistent content
    test_eq(prov.calculate_impact_score("nonexistent"), 0.0)


In [None]:
#| export
class ContentStore:
    """Manages storage and retrieval of training data content with quality checking and provenance tracking"""
    def __init__(self, store_path: Path, min_quality_score: float = 0.12):
        self.store_path = Path(store_path)
        self.store_path.mkdir(exist_ok=True)
        self.index_file = self.store_path/'index.json'
        self.quality_checker = DataQualityChecker(min_quality_score=min_quality_score)
        self.provenance = ContentProvenance()
        self._load_index()
    
    def _load_index(self):
        """Load or initialize the content index"""
        if self.index_file.exists():
            self.index = json.loads(self.index_file.read_text())
        else:
            self.index = {}
            self._save_index()
    
    def _save_index(self):
        """Save the current index to disk"""
        self.index_file.write_text(json.dumps(self.index, indent=2))
    
    def add_content(self, content: str, contributor_id: str) -> Tuple[Optional[str], Dict[str, float]]:
        """
        Add new content to the store if it passes quality checks
        Returns: (content_id, quality_scores) or (None, quality_scores) if rejected
        """
        # Check quality first
        is_acceptable, quality_scores = self.quality_checker.is_acceptable(content)
        if not is_acceptable:
            return None, quality_scores
        
        content_id = hashlib.sha256(content.encode()).hexdigest()
        if content_id in self.index:
            return content_id, quality_scores
        
        timestamp = datetime.now(timezone.utc).isoformat()
        self.index[content_id] = {
            'contributor_id': contributor_id,
            'timestamp': timestamp,
            'size': len(content),
            'quality_scores': quality_scores
        }
        
        (self.store_path/content_id).write_text(content)
        self._save_index()
        return content_id, quality_scores
    
    def record_usage(self, content_id: str, model_version: str, quality_impact: float):
        """Record content usage in model training"""
        if content_id not in self.index:
            raise ValueError(f"Content ID {content_id} not found")
        
        self.provenance.record_usage(content_id, model_version, quality_impact)
        
        # Update index with latest impact score
        self.index[content_id]['impact_score'] = self.provenance.calculate_impact_score(content_id)
        self._save_index()
    
    def get_content(self, content_id: str) -> Optional[str]:
        """Retrieve content by ID"""
        if content_id not in self.index:
            return None
        return (self.store_path/content_id).read_text()
    
    def get_metadata(self, content_id: str) -> Optional[Dict]:
        """Get all metadata for a content item"""
        return self.index.get(content_id)


In [None]:
#| test
@patch
def cleanup(self:ContentStore):
    "Remove all files in the store"
    if self.store_path.exists():
        for f in self.store_path.glob('*'):
            f.unlink()
        self.store_path.rmdir()

#| test
def test_enhanced_content_store():
    # Setup
    store = ContentStore(Path('test_store'), min_quality_score=0.5)
    
    # Test adding high-quality content
    good_content = "This is a well-formatted paragraph.\nIt has proper punctuation.\nAnd good length lines."
    content_id, scores = store.add_content(good_content, "test_user_1")
    
    # Verify content was accepted and stored
    test_not_none(content_id)
    test_gt(scores['quality_score'], 0.5)
    
    # Test adding low-quality content
    bad_content = "short\nshort\nshort\nno punct\nrepeated\nrepeated"
    rejected_id, bad_scores = store.add_content(bad_content, "test_user_2")
    
    # Verify content was rejected
    test_none(rejected_id)
    test_lt(bad_scores['quality_score'], 0.5)
    
    # Test usage recording and impact scoring
    store.record_usage(content_id, "model_v1", 0.8)
    store.record_usage(content_id, "model_v2", 0.9)
    
    # Check metadata includes impact score
    metadata = store.get_metadata(content_id)
    test_not_none(metadata.get('impact_score'))
    test_gt(metadata['impact_score'], 0.8)
    
    # Cleanup
    store.cleanup()


In [None]:
#| example
store = ContentStore(Path('example_store'), min_quality_score=0.5)

# Try adding high-quality content
good_content = """
This is a well-formatted document with proper sentences.
It contains multiple paragraphs with good structure.
Each line ends with proper punctuation.
"""

content_id, scores = store.add_content(good_content, "alice")
print(f"Content quality scores: {scores}")
print(f"Content ID: {content_id[:8]}... (from alice)")

# Record some usage history
store.record_usage(content_id, "model_v1", 0.8)
store.record_usage(content_id, "model_v2", 0.9)

# Check the metadata
metadata = store.get_metadata(content_id)
print(f"\nContent metadata:")
print(json.dumps(metadata, indent=2))

# Try adding low-quality content
bad_content = "short\nno punct\nrepeated\nrepeated"
rejected_id, bad_scores = store.add_content(bad_content, "bob")
print(f"\nRejected content scores: {bad_scores}")

# Cleanup
store.cleanup()


Content quality scores: {'quality_score': 0.68, 'punctuation_ratio': 0.6, 'unique_lines_ratio': 0.8, 'short_lines_ratio': 0.4}
Content ID: 59dc718b... (from alice)

Content metadata:
{
  "contributor_id": "alice",
  "timestamp": "2025-02-16T12:57:55.960288+00:00",
  "size": 151,
  "quality_scores": {
    "quality_score": 0.68,
    "punctuation_ratio": 0.6,
    "unique_lines_ratio": 0.8,
    "short_lines_ratio": 0.4
  },
  "impact_score": 1.8323106826194044
}

Rejected content scores: {'quality_score': 0.30000000000000004, 'punctuation_ratio': 0.0, 'unique_lines_ratio': 0.75, 'short_lines_ratio': 1.0}


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()