# Text Processing Storage

> Standardized SQLite storage for text processing results with content hashing

In [None]:
#| default_exp storage

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import json
import sqlite3
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from cjm_plugin_system.utils.hashing import hash_bytes

## TextProcessRow

A dataclass representing a single row in the standardized `text_jobs` table.

In [None]:
#| export
@dataclass
class TextProcessRow:
    """A single row from the text_jobs table."""
    job_id: str          # Unique job identifier
    input_text: str      # Original input text
    input_hash: str      # Hash of input text in "algo:hexdigest" format
    spans: Optional[List[Dict[str, Any]]] = None  # Processed text spans
    metadata: Optional[Dict[str, Any]] = None      # Processing metadata
    created_at: Optional[float] = None              # Unix timestamp

In [None]:
# Test TextProcessRow creation
row = TextProcessRow(
    job_id="job_abc123",
    input_text="Hello world. How are you?",
    input_hash="sha256:" + "a" * 64,
    spans=[
        {"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
        {"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
    ],
    metadata={"processor": "nltk"}
)

print(f"Row: job_id={row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")

Row: job_id=job_abc123
Input: Hello world. How are you?
Spans: 2 spans


## TextProcessStorage

Standardized SQLite storage that all text processing plugins should use. Defines the canonical schema for the `text_jobs` table with input hashing for traceability.

**Schema:**

```sql
CREATE TABLE IF NOT EXISTS text_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    input_text TEXT NOT NULL,
    input_hash TEXT NOT NULL,
    spans JSON,
    metadata JSON,
    created_at REAL NOT NULL
);
```

The `input_hash` column stores a hash of the input text in `"algo:hexdigest"` format, enabling downstream consumers to verify that the source text hasn't changed since processing.

In [None]:
#| export
class TextProcessStorage:
    """Standardized SQLite storage for text processing results."""

    SCHEMA = """
        CREATE TABLE IF NOT EXISTS text_jobs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_id TEXT UNIQUE NOT NULL,
            input_text TEXT NOT NULL,
            input_hash TEXT NOT NULL,
            spans JSON,
            metadata JSON,
            created_at REAL NOT NULL
        )
    """

    INDEX = "CREATE INDEX IF NOT EXISTS idx_text_jobs_job_id ON text_jobs(job_id);"

    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    ):
        """Initialize storage and create table if needed."""
        self.db_path = db_path
        with sqlite3.connect(self.db_path) as con:
            con.execute(self.SCHEMA)
            con.execute(self.INDEX)

    def save(
        self,
        job_id: str,       # Unique job identifier
        input_text: str,   # Original input text
        input_hash: str,   # Hash of input text in "algo:hexdigest" format
        spans: Optional[List[Dict[str, Any]]] = None,  # Processed text spans
        metadata: Optional[Dict[str, Any]] = None       # Processing metadata
    ) -> None:
        """Save a text processing result to the database."""
        with sqlite3.connect(self.db_path) as con:
            con.execute(
                """INSERT INTO text_jobs
                   (job_id, input_text, input_hash, spans, metadata, created_at)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                (
                    job_id,
                    input_text,
                    input_hash,
                    json.dumps(spans) if spans else None,
                    json.dumps(metadata) if metadata else None,
                    time.time()
                )
            )

    def get_by_job_id(
        self,
        job_id: str  # Job identifier to look up
    ) -> Optional[TextProcessRow]:  # Row or None if not found
        """Retrieve a text processing result by job ID."""
        with sqlite3.connect(self.db_path) as con:
            cur = con.execute(
                """SELECT job_id, input_text, input_hash, spans, metadata, created_at
                   FROM text_jobs WHERE job_id = ?""",
                (job_id,)
            )
            row = cur.fetchone()
            if not row:
                return None
            return TextProcessRow(
                job_id=row[0],
                input_text=row[1],
                input_hash=row[2],
                spans=json.loads(row[3]) if row[3] else None,
                metadata=json.loads(row[4]) if row[4] else None,
                created_at=row[5]
            )

    def list_jobs(
        self,
        limit: int = 100  # Maximum number of rows to return
    ) -> List[TextProcessRow]:  # List of text processing rows
        """List text processing jobs ordered by creation time (newest first)."""
        results = []
        with sqlite3.connect(self.db_path) as con:
            cur = con.execute(
                """SELECT job_id, input_text, input_hash, spans, metadata, created_at
                   FROM text_jobs ORDER BY created_at DESC LIMIT ?""",
                (limit,)
            )
            for row in cur:
                results.append(TextProcessRow(
                    job_id=row[0],
                    input_text=row[1],
                    input_hash=row[2],
                    spans=json.loads(row[3]) if row[3] else None,
                    metadata=json.loads(row[4]) if row[4] else None,
                    created_at=row[5]
                ))
        return results

    def verify_input(
        self,
        job_id: str  # Job identifier to verify
    ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        """Verify the stored input text still matches its hash."""
        row = self.get_by_job_id(job_id)
        if not row:
            return None
        current_hash = hash_bytes(row.input_text.encode())
        return current_hash == row.input_hash

## Testing

In [None]:
import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = TextProcessStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")

Storage initialized at: /tmp/tmp62hw1v1n.db


In [None]:
# Save a text processing result
test_text = "Hello world. How are you?"
input_hash = hash_bytes(test_text.encode())

storage.save(
    job_id="job_test_001",
    input_text=test_text,
    input_hash=input_hash,
    spans=[
        {"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
        {"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
    ],
    metadata={"processor": "nltk", "language": "english"}
)

print(f"Saved job_test_001")
print(f"Input hash: {input_hash}")

Saved job_test_001
Input hash: sha256:1d473b202b6fea30ab890b153d9d5fa3a79830a7bdb6d662581a95bda1a57866


In [None]:
# Retrieve by job ID
row = storage.get_by_job_id("job_test_001")
assert row is not None
assert row.job_id == "job_test_001"
assert row.input_text == test_text
assert row.input_hash == input_hash
assert len(row.spans) == 2
assert row.metadata["processor"] == "nltk"
assert row.created_at is not None

print(f"Retrieved: {row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")
print(f"Input hash: {row.input_hash[:30]}...")

# Missing job returns None
assert storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")

Retrieved: job_test_001
Input: Hello world. How are you?
Spans: 2 spans
Input hash: sha256:1d473b202b6fea30ab890b1...
get_by_job_id returns None for missing job: OK


In [None]:
# Save another and test list_jobs
storage.save(
    job_id="job_test_002",
    input_text="Second text.",
    input_hash=hash_bytes(b"Second text."),
    spans=[{"text": "Second text.", "start_char": 0, "end_char": 12, "label": "sentence"}]
)

jobs = storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_test_002"  # Newest first

print(f"list_jobs returned {len(jobs)} rows: {[j.job_id for j in jobs]}")

list_jobs returned 2 rows: ['job_test_002', 'job_test_001']


In [None]:
# Test input verification
assert storage.verify_input("job_test_001") == True
print("verify_input with unchanged text: True")

# Tamper with input text directly in DB
with sqlite3.connect(tmp_db.name) as con:
    con.execute("UPDATE text_jobs SET input_text = 'TAMPERED' WHERE job_id = 'job_test_001'")

assert storage.verify_input("job_test_001") == False
print("verify_input after tampering: False")

# Missing job returns None
assert storage.verify_input("nonexistent") is None
print("verify_input for missing job: None")

verify_input with unchanged text: True
verify_input after tampering: False
verify_input for missing job: None


In [None]:
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")

Cleanup complete


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()