# core

> Fill in a module description here

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

We begin with some helper functions

In [None]:
#| export
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Iterable, Set, Union, Tuple
from jsonpath_ng import parse as jsonpath_parse
from abc import ABC, abstractmethod
from datetime import datetime
import json, os, hashlib, re

_HEX_RE = re.compile(r'^[0-9a-fA-F]*$')

def hex_to_bytes(h: str) -> bytes:
    """
    Convert a hex string to raw bytes.

    Accepts optional '0x' prefix and common separators (space, underscore,
    hyphen, colon). Raises TypeError for non-str inputs and ValueError for
    invalid hex or odd length after cleaning.
    """
    if not isinstance(h, str):
        raise TypeError("hex_to_bytes() expects a str")

    s = h.strip()
    if s.startswith(("0x", "0X")):
        s = s[2:]

    # Remove common separators
    for sep in (" ", "_", "-", ":"):
        s = s.replace(sep, "")

    if len(s) % 2 != 0:
        raise ValueError(f"hex string has odd length ({len(s)}): {h!r}")

    if not _HEX_RE.fullmatch(s):
        raise ValueError(f"invalid hex string: {h!r}")

    return bytes.fromhex(s)

def sha256_hex(data: bytes) -> str:
    """Get the hex digest of the SHA-256 hash of the given data"""
    return hashlib.sha256(data).hexdigest()

def path_for_key(key: str) -> str:
    """Get a JSONPath for a given key using bracket notation."""
    safe = key.replace('"', r'\"')
    return f'$["{safe}"]'

def extend_path(base: str, key_or_index: Union[str, int]) -> str:
    """Append a segment to a JSONPath using bracket notation."""
    if isinstance(key_or_index, str):
        seg = path_for_key(key_or_index)[1:]  # '["key"]'
        return f"{base}{seg}"
    if isinstance(key_or_index, int):
        return f"{base}[{key_or_index}]"
    raise TypeError("Path extension must be str (field) or int (index).")

def hash_json(data: Any) -> bytes:
    """Compute a SHA-256 hash of JSON-serializable data."""
    j = json.dumps(data, separators=(',', ':'), sort_keys=True)
    return hashlib.sha256(j.encode('utf-8')).digest()

In [None]:
#| export

@dataclass(frozen=True)
class OperationHeader:
    """Metadata about an operation."""
    kind: str # e.g. 'obs' or 'trans' 
    task: str # e.g. 'fetch_http', 'parse_article'
    tool: str # e.g. 'requests', 'my_scraper_1'
    output_type: str # e.g. 'http_response', 'article'
    event_uuid: Optional[bytes] # e.g. '550e8400-e29b-41d4-a716-446655440000'
    timestamp: Optional[str] # e.g. '2024-01-01T12:00:00Z'
    meta: Any # Arbitrary JSON metadata

    def sha256(self) -> bytes:
        """
        Deterministic SHA-256 over all fields as plain data.
        Uses a canonical JSON *list* to preserve field order.
        """
        return hash_json([
            self.kind, self.task, self.tool, self.output_type, 
            None if self.event_uuid is None else self.event_uuid.hex(), 
            self.timestamp, self.meta
        ])
    
    def op_id(self, inputs, outputs) -> bytes:
        """
        Get the operation ID for this header combined with given inputs and outputs.
        """
        return hash_json([self.sha256().hex(), inputs, outputs])

In [None]:
#| export

# -------- JSON leaf tags --------
OP_TAG  = "$op"          # {"$op": {"id": "<hex>", "path": "$.a.b[0]"}}
ART_TAG = "$artifact"    # {"$artifact": "<sha256-hex>"}

OBS_KIND = "obs"
TRANS_KIND = "trans"

class Artifact:
    """Lazy-loaded bytes referenced by SHA-256; caches after first load."""
    __slots__ = ("book", "sha256", "_cache")

    def __init__(self, book: "Scrapebook", sha256: bytes):
        self.book = book
        self.sha256 = sha256
        self._cache: Optional[bytes] = None

    def bytes(self) -> bytes:
        if self._cache is None:
            self._cache = self.book.fetch_artifact_bytes_sha256(self.sha256)
        return self._cache
    
    def value(self) -> bytes:
        return self.bytes()
    
    def produced_by(self) -> Set[Operation]:
        """
        Return the set of operations in the book which produce this artifact
        """
        op_ids = self.book.fetch_artifact_produced_by(self.sha256)
        return {Operation(self.book, op_id) for op_id in op_ids}
    
    def consumed_by(self) -> Set[Operation]:
        """
        Return the set of operations in the book which directly consume this artifact
        """
        op_ids = self.book.fetch_artifact_consumed_by(self.sha256)
        return {Operation(self.book, op_id) for op_id in op_ids}
    
    def __repr__(self) -> str:
        return f"<Artifact {self.sha256.hex()}>"


class Operation:
    """Lazy header; decoded results/inputs are cached along with their dependency sets."""
    __slots__ = (
        "book", "id", "_header",
        "_results_cache",
        "_artifacts_produced_cache",
        "_inputs_cache",
        "_artifacts_consumed_cache", "_deps_cache",
    )

    def __init__(self, book: "Scrapebook", op_id: bytes):
        self.book = book
        self.id = op_id
        self._header: Optional[OperationHeader] = None

        # Results caches
        self._results_cache: Optional[Any] = None
        self._artifacts_produced_cache: Optional[Set[str]] = None

        # Input caches
        self._inputs_cache: Optional[Any] = None
        self._artifacts_consumed_cache: Optional[Set[Artifact]] = None
        self._deps_cache: Optional[Set[Operation]] = None

    # --- header (lazy) ---
    @property
    def header(self) -> OperationHeader:
        if self._header is None:
            self._header = self.book.fetch_operation_header(self.id)
        return self._header

    @property
    def kind(self) -> str: return self.header.kind
    @property
    def task(self) -> str: return self.header.task
    @property
    def tool(self) -> str: return self.header.tool
    @property
    def output_type(self) -> str: return self.header.output_type
    @property
    def event_uuid(self) -> Optional[bytes]: return self.header.event_uuid
    @property
    def timestamp(self) -> Optional[str]: return self.header.timestamp
    @property
    def meta(self) -> Any: return self.header.meta

    # --- result-path handles ---
    def __getitem__(self, key: str) -> "OperationResult":
        return self.at(path_for_key(key))
    
    def at(self, path: str) -> "OperationResult":
        """Return a handle to the results at the given JSONPath."""
        return OperationResult(self, path)

    # --- results JSON (raw / decoded + deps) ---
    def results_json(self) -> Any:
        """
        Get the raw JSON of an operation's results.
        
        This is _not_ cached; see results() for the cached version
        """
        return self.book.fetch_results_json(self.id)

    def _ensure_results(self) -> Any:
        if self._results_cache is None:
            raw = self.book.fetch_results_json(self.id)
            arts = set()
            decoded = self.book.decode_json(raw, arts=arts)
            self._results_cache = decoded
            self._artifacts_produced_cache = arts
        return self._results_cache

    def results(self) -> Any:
        """
        Get the results of an operation
        """
        return self._ensure_results()

    def artifacts_produced(self) -> Set[str]:
        """
        Return the set of artifacts produced by this operation.
        """
        if self._artifacts_produced_cache is None:
            self._ensure_results()
        return self._artifacts_produced_cache or set()

    # --- inputs JSON (raw / decoded + deps) ---
    def inputs_json(self) -> Any:
        """
        Get the raw JSON of an operation's inputs.
        
        This is _not_ cached; see inputs() for the cached version
        """
        return self.book.fetch_inputs_json(self.id)

    def _ensure_inputs(self) -> Any:
        if self._inputs_cache is None:
            raw = self.book.fetch_inputs_json(self.id)
            arts = set()
            ops = set()
            decoded = self.book.decode_json(raw, ops=ops, arts=arts)
            self._inputs_cache = decoded
            self._artifacts_consumed_cache = arts
            self._deps_cache = ops
        return self._inputs_cache

    def inputs(self) -> Any:
        """
        Get an operation's inputs
        """
        return self._ensure_inputs()

    def artifacts_consumed(self) -> Set[Artifact]:
        """
        Get an operation's consumed artifacts
        """
        if self._artifacts_consumed_cache is None:
            self._ensure_inputs()
        return self._artifacts_consumed_cache or set()

    def deps(self) -> Set[Operation]:
        """
        Get the dependencies of this operation
        """
        if self._deps_cache is None:
            self._ensure_inputs()
        return self._deps_cache or set()

    def used_by(self) -> Set[Operation]:
        """
        Get the set of operations visible in the book used by this operation.
        """
        return {
            Operation(self.book, op_id) 
            for op_id in self.book.fetch_artifact_consumed_by_many(self.artifacts_produced())
        }

    # --- JSON leaf builders for constructing inputs ---
    def op_ref(self) -> Dict[str, Any]:
        return {OP_TAG: {"id": self.id.hex(), "path": "$"}}
    
    def out(self, path: str) -> Dict[str, Any]:
        return {OP_TAG: {"id": self.id.hex(), "path": path}}

    # --- values via decoded cache ---
    def value(self) -> Any:
        return self.results()

    def validate(self):
        """
        Validate that this operation is well-formed.

        Currently checks that:
        - The hash of this operation matches its ID
        - No other operation's results are referenced in this operation's results
        """
        inputs = self.inputs_json()
        results = self.results_json()
        assert self.header.op_id(inputs, results) == self.id, \
            f"Operation ID mismatch for {self.id.hex()}"
        deps = set()
        input_arts = set()
        self.book.decode_json(inputs, ops=deps, arts=input_arts)
        output_arts = set()
        self.book.decode_json(results, arts=output_arts)

    def __repr__(self) -> str:
        return f"<Operation {self.id.hex()}>"


@dataclass(frozen=True)
class OperationResult:
    """
    Handle: (Operation, JSONPath) into that operation's decoded results.
    Supports subscripting to extend the path: op["a"]["b"][0]
    """
    op: Operation
    path: str

    def __getitem__(self, key_or_index: Union[str, int]) -> "OperationResult":
        """Extend the JSONPath with a field or index."""
        return OperationResult(self.op, extend_path(self.path, key_or_index))

    def value(self) -> Any:
        """
        Evaluate JSONPath on the op's **decoded cached** results.
        - 0 matches -> None
        - 1 match   -> the value
        - >1 matches-> list of values
        """
        decoded = self.op._ensure_results()
        expr = jsonpath_parse(self.path)
        matches = [m.value for m in expr.find(decoded)]
        if not matches:
            return None
        if len(matches) == 1:
            return matches[0]
        return matches

    # JSON leaf for niceness (like op.op_ref/out)
    def op_ref(self) -> Dict[str, Any]:
        """Return the JSON leaf representation of this result reference."""
        return {OP_TAG: {"id": self.op.id.hex(), "path": self.path}}

    def __repr__(self) -> str:
        return f"<OperationResult id={self.op.id.hex()} path={self.path!r}>"


# -------- Recorder (fixed config) --------
class Recorder:
    """
    Fixed (kind, task, tool, output_type).
    """

    __slots__ = ("book", "kind", "task", "tool", "output_type")

    def __init__(self, book: "Scrapebook", *, kind: str, task: str, tool: str, output_type: str):
        self.book = book
        self.kind = kind
        self.task = task
        self.tool = tool
        self.output_type = output_type

    def record(self, inputs: Any, results: Any, meta: Any = None) -> Operation:
        deps: Set[bytes] = set()
        input_arts: Set[bytes] = set()
        output_arts: Set[bytes] = set()
        # allow op refs; collect deps
        enc_inputs  = self._encode(inputs, ops=deps, arts=input_arts) 
        # forbid op refs in results 
        enc_results = self._encode(results, ops=None, arts=output_arts)
          
        # generate a UUID and timestamp if this is an observation
        if self.kind == OBS_KIND:
            event_uuid = os.urandom(32)
            timestamp = datetime.now().isoformat()
        else:
            event_uuid = None
            timestamp = None

        header = OperationHeader(
            kind=self.kind,
            task=self.task,
            tool=self.tool,
            output_type=self.output_type,
            event_uuid=event_uuid,
            timestamp=timestamp,
            meta=meta
        )

        op_id = self.book.persist_operation(
            header=header,
            inputs_json=enc_inputs,
            results_json=enc_results,
            deps=deps,
            input_arts=input_arts,
            output_arts=output_arts,
        )
        return Operation(self.book, op_id)

    def _encode(self, obj: Any, *, 
        ops: Optional[Set[bytes]], arts: Optional[Set[bytes]]
    ) -> Any:
        # op refs
        if isinstance(obj, OperationResult):
            if ops is None:
                raise ValueError("Operation references are not allowed here (ops=None).")
            ops.add(obj.op.id)
            return obj.op_ref()
        if isinstance(obj, Operation):
            if ops is None:
                raise ValueError("Operation references are not allowed here (ops=None).")
            ops.add(obj.id)
            return obj.op_ref()
        # artifacts
        if isinstance(obj, Artifact):
            if arts is None:
                raise ValueError("Artifact references are not allowed here (arts=None).")
            arts.add(obj.sha256)
            return {ART_TAG: obj.sha256.hex()}
        if isinstance(obj, (bytes, bytearray, memoryview)):
            sha256 = self.book.put_artifact_sha256(bytes(obj))  # -> "<hex>"
            if arts is None:
                raise ValueError("Artifact references are not allowed here (arts=None).")
            arts.add(sha256)
            return {ART_TAG: sha256.hex()}

        # containers / primitives
        if isinstance(obj, dict):
            return {k: self._encode(v, ops=ops, arts=arts) for k, v in obj.items()}
        if isinstance(obj, (list, tuple)):
            return [self._encode(v, ops=ops, arts=arts) for v in obj]
        return obj

class Scrapebook(ABC):
    """
    An ABC exposing the basic API for Scrapebook
    """
    def recorder(self, *, kind: str, task: str, tool: str, output_type: str) -> Recorder:
        return Recorder(self, kind=kind, task=task, tool=tool, output_type=output_type)

    def obs_recorder(self, *, task: str, tool: str, output_type: str) -> Recorder:
        return self.recorder(kind=OBS_KIND, task=task, tool=tool, output_type=output_type)

    def trans_recorder(self, *, task: str, tool: str, output_type: str) -> Recorder:
        return self.recorder(kind=TRANS_KIND, task=task, tool=tool, output_type=output_type)

    @abstractmethod
    def persist_operation(
        self,
        *,
        header: OperationHeader,
        inputs_json: Any,
        results_json: Any,
        deps: Set[bytes],
        input_arts: Set[bytes],
        output_arts: Set[bytes],
    ) -> bytes:
        """
        Persist an operation into the database

        Returns its ID, which is a deterministic SHA-256 hash of the operation's contents.
        """

    @abstractmethod
    def fetch_operation_header(self, op_id: bytes) -> OperationHeader:
        """
        Fetch an operation header by its ID.
        """

    @abstractmethod
    def fetch_results_json(self, op_id: bytes) -> Any:
        """
        Fetch the raw JSON of an operation's results by its ID
        """
    
    @abstractmethod
    def fetch_inputs_json(self, op_id: bytes) -> Any:
        """
        Fetch the raw JSON of an operation's inputs by its ID
        """

    @abstractmethod
    def put_artifact_sha256(self, data: bytes) -> bytes:
        """
        Store bytes content-addressed by SHA-256 and return the hash as bytes.
        """
    
    @abstractmethod
    def fetch_artifact_bytes_sha256(self, sha256: bytes) -> bytes:
        """
        Given the hash of a string of bytes, return the original bytes if known.
        """

    @abstractmethod
    def fetch_artifact_produced_by(self, sha256: bytes) -> Iterable[bytes]:
        """
        Given the hash of an artifact, return the set of operation IDs that produce it.
        """

    @abstractmethod
    def fetch_artifact_consumed_by(self, sha256: bytes) -> Iterable[bytes]:
        """
        Given the hash of an artifact, return the set of operation IDs that consume it.
        """

    @abstractmethod
    def fetch_op_ids(self) -> Iterable[bytes]:
        """
        Get an iterator over all operation IDs in the book.
        """

    def decode_json(
            self, node: Any, *, ops: Set[Operation] = None, arts: Set[Artifact] = None
        ) -> Any:
        """
        Decode JSON leaves back to handles, collecting dependencies along the way.

        - Ops decode to lazy OperationResult and their IDs are collected
          If `ops` is None, operation references are forbidden and will raise ValueError.
        - Artifacts decode to lazy Artifact and their hashes are collected
          If `arts` is None, artifact references are forbidden and will raise ValueError.

        Returns: decoded_tree
        """
        if isinstance(node, dict):
            if OP_TAG in node:
                if ops is None:
                    raise ValueError("Operation references are not allowed here (ops=None).")
                try:
                    spec = node[OP_TAG]
                    op = Operation(self, hex_to_bytes(spec["id"]))
                    path = spec.get("path", "$")
                except:
                    raise ValueError(f"Invalid operation reference: {node!r}")
                ops.add(op)
                return OperationResult(op, path)
            if ART_TAG in node:
                if arts is None:
                    raise ValueError("Artifact references are not allowed here (arts=None).")
                try:
                    sha = hex_to_bytes(node[ART_TAG])
                except:
                    raise ValueError(f"Invalid artifact reference: {node!r}") 
                art = Artifact(self, sha)
                arts.add(art)
                return art
            return {k: self.decode_json(v, ops=ops, arts=arts) for k, v in node.items()}
        if isinstance(node, list):
            return [self.decode_json(v, ops=ops, arts=arts) for v in node]
        return node
    
    def validate(self) -> int:
        """Validate this ScrapebookDict for internal consistency."""
        validated = 0
        for op in self.fetch_op_ids():
            Operation(self, op).validate()
            validated += 1
        return validated

In [None]:
show_doc(OperationResult.value)

---

[source](https://github.com/imbrem/scrapebook/blob/main/scrapebook/core.py#L298){target="_blank" style="float:right; font-size:smaller"}

### OperationResult.value

>      OperationResult.value ()

*Evaluate JSONPath on the op's **decoded cached** results.
- 0 matches -> None
- 1 match   -> the value
- >1 matches-> list of values*

In [None]:
show_doc(Operation.inputs)

---

[source](https://github.com/imbrem/scrapebook/blob/main/scrapebook/core.py#L239){target="_blank" style="float:right; font-size:smaller"}

### Operation.inputs

>      Operation.inputs ()

*Get an operation's inputs*

In [None]:
show_doc(Operation.results)

---

[source](https://github.com/imbrem/scrapebook/blob/main/scrapebook/core.py#L205){target="_blank" style="float:right; font-size:smaller"}

### Operation.results

>      Operation.results ()

*Get the results of an operation*

In [None]:
class ScrapebookDict(Scrapebook):
    def __init__(self):
        self.ops = dict()
        self.arts = dict()
        self.deps = dict()
        self.op_used_by = dict()
        self.produced_by = dict()
        self.consumed_by = dict()
        pass

    # persistence
    def persist_operation(
        self,
        *,
        header: OperationHeader,
        inputs_json: Any,
        results_json: Any,
        deps: Set[bytes],
        input_arts: Set[bytes],
        output_arts: Set[bytes]
    ) -> bytes:
        op_id = header.op_id(inputs_json, results_json)
        self.ops[op_id] = { "header" : header, "input" : inputs_json, "result" : results_json}
        for dep in deps:
            self.op_used_by.setdefault(dep, set()).add(op_id)
            self.deps.setdefault(op_id, set()).add(dep)
        for art in input_arts:
            self.consumed_by.setdefault(art, set()).add(op_id)
        for art in output_arts:
            self.produced_by.setdefault(art, set()).add(op_id)
        return op_id

    def fetch_operation_header(self, op_id: bytes) -> OperationHeader:
        return self.ops[op_id]["header"]

    def fetch_results_json(self, op_id: bytes) -> Any:
        return self.ops[op_id]["result"]
    
    def fetch_inputs_json(self, op_id: bytes) -> Any:
        return self.ops[op_id]["input"]

    def fetch_op_ids(self) -> Iterable[bytes]:
        return self.ops.keys()

    def put_artifact_sha256(self, data: bytes) -> bytes:
        """Store bytes content-addressed by SHA-256 and return the hex hash."""
        hash = hashlib.sha256(data).digest()
        self.arts[hash] = data
        return hash
    
    def fetch_artifact_bytes_sha256(self, sha256: bytes) -> bytes:
        return self.arts[sha256]
    
    def fetch_artifact_produced_by(self, sha256: bytes) -> Iterable[bytes]:
        """
        Given the hash of an artifact, return the set of operation IDs that produce it.
        """
        return self.produced_by.get(sha256, set())

    def fetch_artifact_consumed_by(self, sha256: bytes) -> Iterable[bytes]:
        """
        Given the hash of an artifact, return the set of operation IDs that consume it.
        """
        return self.consumed_by.get(sha256, set())
        

In [None]:
book = ScrapebookDict()

In [None]:
scrapes = book.obs_recorder(task="scrape_site", tool="demo", output_type="http_response")

In [None]:
op = scrapes.record({"url": "http://example.com"}, {"status": 200, "content": b"hello"})
op.validate()

In [None]:
book.fetch_artifact_produced_by(op.id)

set()

In [None]:
(op, op.kind, op.task, op.tool, op.output_type, op.event_uuid.hex(), op.timestamp, op.meta)

(<Operation ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784>,
 'obs',
 'scrape_site',
 'demo',
 'http_response',
 '0321dd21d2eabb8da0ea55383fabc053e4bccf7dc2f87c1de29404f0eee5b466',
 '2025-10-05T02:58:41.209504',
 None)

In [None]:
op.results_json()

{'status': 200,
 'content': {'$artifact': '2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824'}}

In [None]:
op.results()

{'status': 200,
 'content': <Artifact 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824>}

In [None]:
op["content"].value().bytes()

b'hello'

In [None]:
op["content"]

<OperationResult id=ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784 path='$["content"]'>

In [None]:
articles = book.trans_recorder(task="parse_article", tool="demo2", output_type="article")

In [None]:
op2 = articles.record({"content" : op["content"], "mode": "cool"}, "good article")

In [None]:
(
    op2, op2.kind, op2.task, op2.tool, op2.output_type, op2.event_uuid, 
    op2.timestamp, op2.meta
)

(<Operation d37127eba9c227c45f73d26c34d4305f90ceff4a0f253577e809a32bbc243cc8>,
 'trans',
 'parse_article',
 'demo2',
 'article',
 None,
 None,
 None)

In [None]:
op2.value()

'good article'

In [None]:
op2.inputs()

{'content': <OperationResult id=ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784 path='$["content"]'>,
 'mode': 'cool'}

In [None]:
[dep for dep in op2.deps()]

[<Operation ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784>]

In [None]:
op3 = articles.record(
    {"content" : op["content"], "mode": op2, "status": op["status"]}, 
    "better article"
)

In [None]:
op3

<Operation f60c4785522a91181c5110336408465d2591b9f34a59b231ec2e5deea6b48377>

In [None]:
op3.value()

'better article'

In [None]:
[dep for dep in op3.deps()]

[<Operation ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784>,
 <Operation d37127eba9c227c45f73d26c34d4305f90ceff4a0f253577e809a32bbc243cc8>,
 <Operation ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784>]

In [None]:
op

<Operation ae67f785a7e39b117161a13fdd2d6a5139f3e42d8bea6f9e3b264e853a45d784>

In [None]:
op.validate()

In [None]:
op.artifacts_produced()

{<Artifact 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824>}

In [None]:
book.validate()

3

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()