# IWTC Raw Source Indexing

This notebook executes the raw source indexing workflow defined in:

- `docs/raw_source_indexing_design.md`

It is intended for hands-on execution and experimentation. Conceptual scope, responsibilities, and workflow design are defined in the linked design document.

This notebook operates on a single world repository.

A minimal example of `world_repository.yml` is provided in this repository
under:

- `data/config_examples/world_repository.yml`

You may copy and adapt that example for your own world repository.

## Phase 0: Parameters

This notebook operates on a **campaign world repository** and produces draft, machine-generated indexes for human review.

In this phase, you tell the notebook **which world it is operating on** and **how broad this run should be**.

At a high level:
- You point the notebook at a world descriptor file that explains how your world’s files are organized.
- You can optionally restrict this run to specific files or folders if you are working on a subset of material.
- You choose whether to review discovered files interactively or process everything automatically.

You do **not** need to understand internal data structures or file parsing to set these parameters.  
The goal is simply to answer: *“What world am I indexing, and how much of it do I want to work on right now?”*

The code cell below contains inline comments explaining each parameter in concrete terms.

**IMPORTANT:** Resulting indexes are, by design, not auditable.

In [30]:
# Phase 0: Parameters
LAST_PHASE_RUN = "0"

# Absolute path to the world_repository.yml descriptor.
WORLD_REPOSITORY_DESCRIPTOR = (
    "/Users/charissophia/obsidian/Iron Wolf Trading Company/_meta/descriptors/world_repository.yml"
)

# Optional override: use these paths instead of descriptor sources for this run.
# Examples:
#   OVERRIDE_PATHS = "/Users/you/path/to/file_or_dir"
#   OVERRIDE_PATHS = ["/Users/you/path/a", "/Users/you/path/b"]
OVERRIDE_PATHS = None

# Selection behavior:
#   "PROMPT" -> list candidates and prompt for selection
#   "ALL"    -> select all candidates
SOURCE_MODE = "ALL"

# Internal run metadata (do not edit)
from datetime import datetime
print(f"Notebook run initialized at: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
del datetime

Notebook run initialized at: 2026-02-10 17:13


## Phase 1: Load and validate world descriptor

Before this notebook can safely read or write anything, it must be confident that it understands the **structure of the world repository**.

In this phase, the notebook:

- Loads the world repository descriptor file you provided
- Confirms that it is readable and structurally valid
- Extracts only the information this notebook needs
- Verifies that referenced paths actually exist and are usable

This phase answers a single question:

**“Can I trust this descriptor enough to proceed?”**

If the answer is *no*, the notebook will stop with clear, actionable error messages explaining what needs to be fixed in the descriptor file.  
Nothing is modified, created, or scanned until this check succeeds.

This phase does **not** interpret world lore, indexing rules, or heuristics.  
It only establishes that the filesystem layout described by the world is coherent and usable.

In [31]:
# Phase 1a: Load and parse world repository descriptor
LAST_PHASE_RUN = "1a"

from pathlib import Path
import yaml

# Locate descriptor file
descriptor_path = Path(WORLD_REPOSITORY_DESCRIPTOR)

if not descriptor_path.exists():
    raise FileNotFoundError(
        "World repository descriptor file was not found.\n"
        f"Path provided:\n  {descriptor_path}\n\n"
        "What to do:\n"
        "- Confirm the file exists at this location or fix the Parameters cell\n"
        "- If you just edited the Parameters cell, rerun Phase 0 and then rerun this cell\n"
    )

# Read and parse YAML
try:
    with descriptor_path.open("r", encoding="utf-8") as f:
        world_repo = yaml.safe_load(f)
except Exception:
    raise ValueError(
        "The world repository descriptor could not be read.\n"
        "This usually indicates a YAML formatting problem.\n\n"
        f"File:\n  {descriptor_path}\n\n"
        "What to do:\n"
        "- Compare the file against the example world_repository.yml\n"
        "- Paste the contents into https://www.yamllint.com/\n"
        "- Fix any reported issues, save the file, and rerun this cell"
    )

# Validate basic structure
if not isinstance(world_repo, dict):
    raise ValueError(
        "The world repository descriptor was read, but its structure is not usable.\n"
        "The file must be a YAML mapping (top-level `name: value` entries).\n\n"
        "What to do:\n"
        "- Compare the file against the example world_repository.yml\n"
        "- Ensure it uses clear `name: value` lines\n"
        "- Fix the file and rerun this cell"
    )

print(f"World repository descriptor loaded successfully: {descriptor_path.name}")

# cleanup: remove local variables
del f, yaml, descriptor_path

World repository descriptor loaded successfully: world_repository.yml


In [32]:
# Phase 1b: Extract only the information this notebook needs (presence + run intent)
LAST_PHASE_RUN = "1b"

errors = []

# ---- OVERRIDE_PATHS normalization (shape only) ----
override_list = None

if OVERRIDE_PATHS:
    if isinstance(OVERRIDE_PATHS, str):
        override_list = [OVERRIDE_PATHS]
    elif isinstance(OVERRIDE_PATHS, list):
        bad = [x for x in OVERRIDE_PATHS if not isinstance(x, str) or not x]
        if bad:
            errors.append("OVERRIDE_PATHS must be a path string or a list of non-empty path strings.")
        else:
            override_list = OVERRIDE_PATHS
        del bad
    else:
        errors.append("OVERRIDE_PATHS must be None, a path string, or a list of path strings.")

SOURCE_ORIGIN = "override_paths" if override_list else "descriptor"

# ---- extract descriptor blocks (presence only) ----
WORLD_ROOT_RAW = world_repo.get("world_root")

read_paths = None
sources = world_repo.get("sources")
if isinstance(sources, dict):
    read_paths = sources.get("read_paths")

drafts = world_repo.get("working_drafts")
DRAFTS_RAW = drafts.get("path") if isinstance(drafts, dict) else None

indexes = world_repo.get("indexes")  # optional
INDEXES_RAW = indexes.get("path") if isinstance(indexes, dict) else None

vocab = world_repo.get("vocabulary")
ENTITIES_RAW = vocab.get("entities") if isinstance(vocab, dict) else None
ALIASES_RAW = vocab.get("aliases") if isinstance(vocab, dict) else None
AUTHORS_RAW = vocab.get("author_aliases") if isinstance(vocab, dict) else None  # optional
PC_MAP_RAW = vocab.get("player_character_map") if isinstance(vocab, dict) else None  # optional

# ---- required entries ----
if not WORLD_ROOT_RAW:
    errors.append("Missing required entry: world_root")

if not DRAFTS_RAW:
    errors.append("Missing required entry: working_drafts.path")

# indexes optional (may not exist yet)
# but if declared it must have a path
if INDEXES_RAW is not None:
    errors.append("indexes is declared but missing indexes.path")

# vocab files optional (may not exist on first run)
# but paths must be present if declared
if vocab is not None:
    if ENTITIES_RAW is None:
        errors.append("vocabulary.entities is declared but missing a path")
    if ALIASES_RAW is None:
        errors.append("vocabulary.aliases is declared but missing a path")

# sources.read_paths required only when not overriding
if SOURCE_ORIGIN == "descriptor":
    if read_paths is None:
        errors.append("Missing required entry: sources.read_paths")
    elif not isinstance(read_paths, list):
        errors.append("sources.read_paths must be a YAML list")
else:
    # override mode: read_paths optional, but if present must be a list
    if read_paths is not None and not isinstance(read_paths, list):
        errors.append("sources.read_paths must be a YAML list")

if errors:
    raise ValueError(
        "World repository + parameters are missing required entries or have invalid values:\n- "
        + "\n- ".join(errors)
        + "\n\nWhat to do:\n"
          "- Fix OVERRIDE_PATHS in Phase 0 (if set)\n"
          "- Edit your world_repository.yml\n"
          "- Save and rerun Phase 1a, then rerun this cell\n"
          "\nNote: This check only confirms entries and shapes. Filesystem usability is validated in Phase 1c."
    )

# ---- build PATHS_RAW (flat list for Phase 1c validation) ----
PATHS_RAW = [
    {"tag": "world_root", "raw": WORLD_ROOT_RAW},
    {"tag": "drafts",     "raw": DRAFTS_RAW},
]

if INDEXES_RAW:
    PATHS_RAW.append({"tag": "indexes", "raw": INDEXES_RAW})

if ENTITIES_RAW:
    PATHS_RAW.append({"tag": "entities", "raw": ENTITIES_RAW})

if ALIASES_RAW:
    PATHS_RAW.append({"tag": "aliases", "raw": ALIASES_RAW})

if AUTHORS_RAW:
    PATHS_RAW.append({"tag": "authors", "raw": AUTHORS_RAW})

if PC_MAP_RAW:
    PATHS_RAW.append({"tag": "pc_map", "raw": PC_MAP_RAW})

# run source paths
if SOURCE_ORIGIN == "override_paths":
    for p in override_list:
        PATHS_RAW.append({"tag": "src", "raw": p})
else:
    for entry in read_paths:
        if isinstance(entry, str):
            PATHS_RAW.append({"tag": "src", "raw": entry})
        elif isinstance(entry, dict):
            p = entry.get("path")
            if not p:
                raise ValueError("sources.read_paths contains a mapping entry missing 'path'.")
            PATHS_RAW.append({"tag": "src", "raw": p})
            del p
        else:
            raise ValueError("sources.read_paths entries must be a path string or a {path,type} mapping.")

# source types (descriptor-declared typing only)
if isinstance(read_paths, list):
    for entry in read_paths:
        if isinstance(entry, dict):
            p = entry.get("path")
            if not p:
                raise ValueError("sources.read_paths contains a mapping entry missing 'path'.")
            PATHS_RAW.append({
                "tag": "src_type",
                "raw": p,
                "source_type": entry.get("type") or "unknown",
            })
            del p

print("Phase 1b OK: required entries present and parameters are coherent.")
print(f"SOURCE_ORIGIN: {SOURCE_ORIGIN}")
print(f"PATHS_RAW entries: {len(PATHS_RAW)}")

# ---- clean up locals ----
del errors, override_list, sources, drafts, indexes, vocab, read_paths, WORLD_ROOT_RAW, entry, world_repo
del DRAFTS_RAW, INDEXES_RAW, ENTITIES_RAW, ALIASES_RAW, AUTHORS_RAW, PC_MAP_RAW, OVERRIDE_PATHS

Phase 1b OK: required entries present and parameters are coherent.
SOURCE_ORIGIN: descriptor
PATHS_RAW entries: 15


In [33]:
# Phase 1c: Validate paths and publish contract variables (simple, no RUN_SOURCE_DIRS)
LAST_PHASE_RUN = "1c"

errors = []

# Published outputs (initialized up front)
WORLD_ROOT = None
WORKING_DRAFTS_PATH = None
WORKING_DRAFTS_RELPATH = None

INDEXES_PATH = None
INDEXES_RELPATH = None

VOCAB_ENTITIES_PATH = None
VOCAB_ENTITIES_RELPATH = None
VOCAB_ALIASES_PATH = None
VOCAB_ALIASES_RELPATH = None
VOCAB_AUTHORS_PATH = None
VOCAB_AUTHORS_RELPATH = None
VOCAB_PC_MAP_PATH = None
VOCAB_PC_MAP_RELPATH = None

SOURCE_TYPES_MAP = {}   # resolved Path (file or dir) -> source_type
RUN_SOURCE_PATHS = []   # list[Path] (files or dirs)

# world_root (1b guarantees presence/uniqueness)
WORLD_ROOT = Path(next(x["raw"] for x in PATHS_RAW if x.get("tag") == "world_root"))

if str(WORLD_ROOT).startswith("~"):
    errors.append("world_root: '~' is not allowed. Use a full absolute path.")
elif not WORLD_ROOT.is_absolute():
    errors.append("world_root must be an absolute path (starts with / on macOS/Linux, or C:\\ on Windows).")
elif not WORLD_ROOT.is_dir():
    errors.append(f"world_root must be an existing directory: {WORLD_ROOT}")
else:
    WORLD_ROOT = WORLD_ROOT.resolve()

if errors:
    raise ValueError("Descriptor path validation failed:\n- " + "\n- ".join(errors))

for item in PATHS_RAW:
    tag = item.get("tag")

    if tag != "world_root" and tag in ("drafts", "src", "src_type", "indexes", "entities", "aliases", "authors", "pc_map"):
        raw = item.get("raw")
        
        if not raw:
            errors.append(f"{tag}: missing path value.")
        
        else:
            p = Path(raw)
            rel = None
        
            if str(p).startswith("~"):
                errors.append(f"{tag}: '~' is not allowed: {raw}")
            else:
                if not p.is_absolute():
                    p = WORLD_ROOT / p
                p = p.resolve()

                try:
                    rel = str(p.relative_to(WORLD_ROOT))
                except Exception:
                    rel = str(p)

            # Required existence
            if tag in ("drafts", "src") and not p.exists():
                errors.append(f"{tag}: path does not exist: {p}")

            # If it's a directory but tag requires file
            if p.exists() and p.is_dir() and tag in ("entities", "aliases", "authors", "pc_map"):
                errors.append(f"{tag}: {p} must be a file")

            # If it's a file but tag requires directory
            if p.exists() and p.is_file() and tag in ("drafts", "indexes", "src_type"):
                errors.append(f"{tag}: {p} must be a directory")

            # Publish / collect
            if tag == "drafts" and p.exists() and p.is_dir():
                WORKING_DRAFTS_PATH = p
                WORKING_DRAFTS_RELPATH = rel

            elif tag == "indexes" and p.exists() and p.is_dir():
                INDEXES_PATH = p
                INDEXES_RELPATH = rel

            elif tag == "src" and p.exists():
                RUN_SOURCE_PATHS.append(p)

            elif tag == "src_type" and p.exists():
                SOURCE_TYPES_MAP[p] = item.get("source_type") or "unknown"

            elif tag == "entities" and (not p.exists() or p.is_file()):
                VOCAB_ENTITIES_PATH = p
                VOCAB_ENTITIES_RELPATH = rel
            
            elif tag == "aliases" and (not p.exists() or p.is_file()):
                VOCAB_ALIASES_PATH = p
                VOCAB_ALIASES_RELPATH = rel
            
            elif tag == "authors" and (not p.exists() or p.is_file()):
                VOCAB_AUTHORS_PATH = p
                VOCAB_AUTHORS_RELPATH = rel
            
            elif tag == "pc_map" and (not p.exists() or p.is_file()):
                VOCAB_PC_MAP_PATH = p
                VOCAB_PC_MAP_RELPATH = rel

if WORKING_DRAFTS_PATH is None:
    errors.append("drafts: required working drafts directory was not validated (missing or invalid).")

if len(RUN_SOURCE_PATHS) == 0:
    errors.append("src: no valid source paths were provided for this run.")

if errors:
    raise ValueError(
        "Descriptor path validation failed:\n- "
        + "\n- ".join(errors)
        + f"\n\nFix entries in: {Path(WORLD_REPOSITORY_DESCRIPTOR).name}\n"
          "Then rerun Phase 1a, Phase 1b, and this cell."
    )

# drafts write probe
probe = WORKING_DRAFTS_PATH / ".iwtc_tools_write_probe.tmp"
try:
    probe.write_text("test", encoding="utf-8")
finally:
    if probe.exists():
        probe.unlink()

print("Descriptor paths are usable for this notebook.")
print(f"world_root: {WORLD_ROOT}")
print(f"working_drafts: {WORKING_DRAFTS_RELPATH}")
print(f"indexes: {INDEXES_RELPATH if INDEXES_RELPATH else 'none'}")

print(f"vocab.entities: {VOCAB_ENTITIES_RELPATH} (exists={VOCAB_ENTITIES_PATH.exists() if VOCAB_ENTITIES_PATH else False})")
print(f"vocab.aliases: {VOCAB_ALIASES_RELPATH} (exists={VOCAB_ALIASES_PATH.exists() if VOCAB_ALIASES_PATH else False})")
print(f"vocab.authors: {VOCAB_AUTHORS_RELPATH} (exists={VOCAB_AUTHORS_PATH.exists() if VOCAB_AUTHORS_PATH else False})")
print(f"vocab.pc_map: {VOCAB_PC_MAP_RELPATH} (exists={VOCAB_PC_MAP_PATH.exists() if VOCAB_PC_MAP_PATH else False})")
print(f"typed source paths: {len(SOURCE_TYPES_MAP)}")
print(f"run source paths: {len(RUN_SOURCE_PATHS)}")

# clean up locals
del errors, item, tag, raw, p, probe, PATHS_RAW
del Path, rel

Descriptor paths are usable for this notebook.
world_root: /Users/charissophia/obsidian/Iron Wolf Trading Company
working_drafts: _local/machine_wip
indexes: none
vocab.entities: _meta/indexes/vocab_entities.csv (exists=True)
vocab.aliases: _meta/indexes/vocab_aliases.csv (exists=True)
vocab.authors: _meta/indexes/vocab_author_aliases.csv (exists=True)
vocab.pc_map: _meta/indexes/vocab_map_player_character.csv (exists=True)
typed source paths: 4
run source paths: 5


## Phase 2: Discover source files

Before this notebook can index or analyze anything, it must determine **which files are available to work with**.

In this phase, the notebook:
- Determines which source locations to use (either override paths you provided, or the repository’s declared sources)
- Recursively scans those locations for supported file types
- Groups discovered files by directory for human-readable review
- Establishes a stable ordering
- Associates each file with its declared source type (if available)

This phase answers a single question:

**“What source files are available for processing right now?”**

If no supported files are found, the notebook will stop and explain why.  
Nothing is read, modified, or written during discovery.

Depending on your configuration:
- If SOURCE_MODE is "PROMPT", you will be prompted to choose which files to process
- Otherwise, all discovered files will be selected without prompting

This phase does **not** read file contents, interpret text, or apply chunking rules.  
It only establishes the complete, concrete list of candidate files that later phases may operate on.

In [5]:
# Phase 2: Expand RUN_SOURCE_PATHS into concrete files + resolve source_type
LAST_PHASE_RUN = "2"

from pathlib import Path

# Output
SOURCE_FILES = []

# Eligible extensions (must match Phase 3 readers)
ALLOWED_EXTS = {".md", ".txt", ".docx", ".pdf"}

# 2a) Expand dirs/files -> candidate file paths
candidates = []

for p in RUN_SOURCE_PATHS:
    if p.is_file():
        if (p.suffix.lower() in ALLOWED_EXTS) and (not p.name.startswith(".")):
            candidates.append(p)

    if p.is_dir():
        for f in p.rglob("*"):
            if f.is_file():
                if (f.suffix.lower() in ALLOWED_EXTS) and (not f.name.startswith(".")):
                    candidates.append(f)

# De-dupe + stable order
candidates = sorted(set(candidates), key=lambda x: x.as_posix().lower())

if len(candidates) == 0:
    raise ValueError("Phase 2: No eligible source files found under RUN_SOURCE_PATHS.")

# 2b) Resolve source_type (exact path match, else nearest parent match, else unknown)
typed = []
for f in candidates:
    st = next(
        (SOURCE_TYPES_MAP[p] for p in ([f] + list(f.parents)) if p in SOURCE_TYPES_MAP),
        "unknown",
    )
    typed.append((f, st))

# quick counts by type
counts = {}
for _, st in typed:
    counts[st] = counts.get(st, 0) + 1

print("Candidate files by source_type:")
for k in sorted(counts, key=lambda x: (-counts[x], x)):
    print(f"{counts[k]:>6}  {k}")
del counts, k

# 2c) Selection by SOURCE_MODE
selected = []

if SOURCE_MODE == "ALL":
    selected = typed

else:
    print("\nCandidate files:")
    for i, (f, st) in enumerate(typed):
        try:
            show = f.relative_to(WORLD_ROOT)
        except Exception:
            show = f
        print(f"{i:>4}  {st:<18}  {show}")
        del show

    raw = input("\nSelect by index (e.g., 0,2,5-8) or 'all': ").strip().lower()

    if raw == "all":
        selected = typed
    else:
        idxs = set()
        parts = [x.strip() for x in raw.split(",") if x.strip()]

        for part in parts:
            if "-" in part:
                a, b = [x.strip() for x in part.split("-", 1)]
                if not (a.isdigit() and b.isdigit()):
                    raise ValueError(f"Bad range: '{part}'")
                for j in range(int(a), int(b) + 1):
                    idxs.add(j)
                del a, b
            else:
                if not part.isdigit():
                    raise ValueError(f"Bad index: '{part}'")
                idxs.add(int(part))

        bad = [j for j in sorted(idxs) if j < 0 or j >= len(typed)]
        if bad:
            raise ValueError(f"Invalid selection indexes: {bad}")

        selected = [typed[j] for j in sorted(idxs)]

        del idxs, parts, part, bad, j

    del raw

# 2d) Build SOURCE_FILES records (include relpath)
for i, (f, st) in enumerate(selected, start=1):
    try:
        rel = str(f.resolve().relative_to(WORLD_ROOT))
    except Exception:
        rel = str(f)

    SOURCE_FILES.append({
        "source_id": f"src_{i:06d}",
        "path": f,
        "relpath": rel,
        "source_type": st,
        "ext": f.suffix.lower(),
    })

print(f"\nPhase 2 OK: selected {len(SOURCE_FILES)} source files (of {len(candidates)} candidates).")

# clean up locals
del ALLOWED_EXTS, candidates, typed, selected, p, f, st, i, rel
del Path
# clean up 
del RUN_SOURCE_PATHS, SOURCE_TYPES_MAP

Candidate files by source_type:
   105  auto_transcripts
    11  planning_notes
     9  pbp_transcripts
     5  session_notes

Phase 2 OK: selected 130 source files (of 130 candidates).


## Phase 3: Normalize selected inputs

In this phase, the notebook converts the **selected source files** into a consistent, machine-usable form.

Different file formats (Markdown, plain text, Word documents) store text differently.  
Before any indexing, chunking, or analysis can occur, those differences must be removed.

In this phase, the notebook:

- Opens each selected file using a format-appropriate reader
- Extracts raw textual content without interpretation
- Preserves line order exactly as it appears in the source file
- Represents each file as an ordered sequence of text lines
- Records minimal metadata needed to trace each line back to its source file

This phase performs the task:

**“Create a uniform, trustworthy representation of the selected sources.”**

This phase performs **no chunking, interpretation, or transformation** of content.
Text is preserved exactly as read (including blank lines and formatting), and files are never modified on disk.

The output of this phase is a normalized in-memory representation of each selected file, suitable for later chunking and indexing steps.

In [6]:
# Phase 3: Load selected sources into memory as raw lines
# - Preserves original line order
# - Preserves blank lines as empty strings
# - Performs no interpretation or chunking
LAST_PHASE_RUN = "3"

from pathlib import Path
import docx  # python-docx

LOADED_SOURCES = []

for source_id, item in enumerate(SOURCE_FILES):
    path = Path(item["path"])
    relpath = item.get("relpath", str(path))
    source_type = item.get("source_type", "unknown")

    suffix = path.suffix.lower()

    if suffix in (".md", ".txt"):
        file_type = suffix.lstrip(".")
        text = path.read_text(encoding="utf-8", errors="replace")
        lines = text.splitlines()
        del text  # text is no longer needed
    elif suffix == ".docx":
        file_type = "docx"
        doc = docx.Document(str(path))
        lines = [p.text for p in doc.paragraphs]  # blank paragraphs preserved as ""
        del doc
    else:
        raise ValueError(
            f"Unsupported file type for source_id={source_id}: {path}"
        )

    LOADED_SOURCES.append(
        {
            "source_id": source_id,
            "path": path,
            "relpath": relpath,
            "source_type": source_type,
            "file_type": file_type,
            "lines": lines,
        }
    )

print(f"Loaded sources: {len(LOADED_SOURCES)}")
for s in LOADED_SOURCES:
    print(
        f" - [{s['source_id']}] {s['file_type']}: {s['relpath']}  "
        f"[{s['source_type']}]  ({len(s['lines'])} lines)"
    )

del item, source_id, path, relpath, source_type, suffix, file_type, lines, s
# clean up variables that have served their purpose
del SOURCE_FILES, SOURCE_MODE

Loaded sources: 130
 - [0] txt: _local/auto_transcripts/caravan tales session 1.txt  [auto_transcripts]  (3473 lines)
 - [1] txt: _local/auto_transcripts/caravan tales session 2.txt  [auto_transcripts]  (2827 lines)
 - [2] txt: _local/auto_transcripts/iwtc session 000.txt  [auto_transcripts]  (2599 lines)
 - [3] txt: _local/auto_transcripts/iwtc session 001.5.txt  [auto_transcripts]  (1561 lines)
 - [4] txt: _local/auto_transcripts/iwtc session 001.txt  [auto_transcripts]  (4192 lines)
 - [5] txt: _local/auto_transcripts/iwtc session 002.txt  [auto_transcripts]  (2543 lines)
 - [6] txt: _local/auto_transcripts/iwtc session 003.txt  [auto_transcripts]  (4853 lines)
 - [7] txt: _local/auto_transcripts/iwtc session 007.txt  [auto_transcripts]  (2033 lines)
 - [8] txt: _local/auto_transcripts/iwtc session 008.txt  [auto_transcripts]  (2341 lines)
 - [9] txt: _local/auto_transcripts/iwtc session 010.txt  [auto_transcripts]  (3363 lines)
 - [10] txt: _local/auto_transcripts/iwtc session 011.

## Phase 4: Data profiling

In this phase, the notebook examines the **normalized source data** produced in Phase 3 in order to understand its real structural characteristics.

At this point, all selected files have already been converted into a consistent in-memory representation:  
each source is represented as an ordered sequence of text lines, preserved exactly as read.

Before any chunking or indexing rules can be proposed, it is necessary to **observe how the data actually appears** after normalization.

In this phase, the notebook:

- Inspects line-level structure across normalized sources
- Examines differences between file formats (e.g., `.txt`, `.md`, `.docx`) as they appear post-normalization
- Identifies patterns such as blank lines, paragraph boundaries, headings, or artifacts
- Produces human-readable summaries and previews for inspection
- Supports interactive exploration of specific sources by ID

This phase performs the task:

**“Understand the shape and structure of the normalized data.”**

This phase performs **no chunking, segmentation, or interpretation** of content.
It does not define rules, thresholds, or heuristics.
Its sole purpose is to inform later design decisions by grounding them in observed data rather than assumptions.

The output of this phase is **human insight**, not transformed data.

In [None]:
# Phase 4a: File-level line length profiling and charts
# Observational only — no persistence, no mutation
LAST_PHASE_RUN = "4a"

from collections import defaultdict
from statistics import mean, median
import matplotlib.pyplot as plt
import numpy as np

if not LOADED_SOURCES:
    raise ValueError("No loaded sources found. Run Phase 3 first.")

# --- collect per-file 5-number summaries ---
file_profiles = []

for src in LOADED_SOURCES:
    lengths = sorted(len(l) for l in src["lines"] if l)

    if not lengths:
        continue

    q1 = np.percentile(lengths, 25)
    q3 = np.percentile(lengths, 75)

    file_profiles.append({
        "source_type": src["source_type"],
        "min": lengths[0],
        "q1": q1,
        "median": median(lengths),
        "q3": q3,
        "max": lengths[-1],
    })

# --- group by source_type ---
by_type = defaultdict(list)
for p in file_profiles:
    by_type[p["source_type"]].append(p)

# --- Box plots: distribution of medians by source type ---
plt.figure(figsize=(10, 6))

labels = []
data = []

for source_type in sorted(by_type.keys()):
    labels.append(source_type)
    data.append([p["median"] for p in by_type[source_type]])

plt.boxplot(data, tick_labels=labels, showfliers=True)
plt.title("Distribution of per-file median line lengths by source type")
plt.ylabel("Median line length")
plt.xticks(rotation=30, ha="right")
plt.tight_layout()
plt.show()

# --- Box plots: distribution of max line lengths by source type ---
plt.figure(figsize=(10, 6))

labels = []
data = []

for source_type in sorted(by_type.keys()):
    labels.append(source_type)
    data.append([p["max"] for p in by_type[source_type]])

plt.boxplot(data, tick_labels=labels, showfliers=True)
plt.title("Distribution of per-file max line lengths by source type")
plt.ylabel("Max line length")
plt.xticks(rotation=30, ha="right")
plt.tight_layout()
plt.show()

# --- Scatter: median vs max (outlier detector) ---
plt.figure(figsize=(8, 6))

for source_type in sorted(by_type.keys()):
    medians = [p["median"] for p in by_type[source_type]]
    maxes = [p["max"] for p in by_type[source_type]]
    plt.scatter(medians, maxes, label=source_type, alpha=0.7)

plt.xlabel("Median line length (per file)")
plt.ylabel("Max line length (per file)")
plt.title("Median vs Max line length per file")
plt.legend()
plt.tight_layout()
plt.show()

# cleanup
del src, lengths, q1, q3, p
del labels, data, medians, maxes
del file_profiles, by_type, source_type

In [None]:
# Phase 4b: Sanity check: preview a loaded source by ID
LAST_PHASE_RUN = "4b"

PREVIEW_SOURCE_ID = 120
PREVIEW_MAX_LINES = 50

s = next((x for x in LOADED_SOURCES if x["source_id"] == PREVIEW_SOURCE_ID), None)
if s is None:
    available = sorted(x["source_id"] for x in LOADED_SOURCES)
    raise ValueError(
        f"No loaded source found with source_id={PREVIEW_SOURCE_ID}. "
        f"Available source_id values: {available}"
    )

print(f"[{s['source_id']}] {s['file_type']}: {s['relpath']}  [{s['source_type']}]")
print("")

for i, line in enumerate(s["lines"][:PREVIEW_MAX_LINES], start=1):
    print(f"{i:>4}: {line}")

# clean up local variables
del i, line, s
del PREVIEW_SOURCE_ID, PREVIEW_MAX_LINES

In [None]:
# Phase 4c: deciling line lengths for a specific source type
LAST_PHASE_RUN = "4c"

import numpy as np
import pandas as pd

rows = []

for src in LOADED_SOURCES:
    if src["source_type"] != "session_notes":
        continue

    path = src["path"]
    lines = src["lines"]

    if not lines:
        continue

    # Extract last 3 characters before ".txt"
    stem = path.stem
    file_id = stem[-3:]

    line_lengths = [len(line) for line in lines]
    deciles = np.percentile(line_lengths, range(0, 101, 10))

    row = {"file": file_id}
    for p, value in zip(range(0, 101, 10), deciles):
        row[f"p{p}"] = value

    rows.append(row)

df_auto_deciles = pd.DataFrame(rows).sort_values("file").reset_index(drop=True)
df_auto_deciles

# clean up local variables
del rows, src, path, lines, stem, file_id, line_lengths
del deciles, row, p, value, df_auto_deciles

In [None]:
# Phase 4d: Scan session_notes for "session" header signal
LAST_PHASE_RUN = "4d"

import re

SESSION_WORD_REGEX = re.compile(r"\bsession\b", re.IGNORECASE)

total_hits = 0
files_with_hits = 0

print('Scanning session_notes for word "session"...\n')

for src in LOADED_SOURCES:
    if src.get("source_type") != "session_notes":
        continue

    path = src["path"]
    lines = src["lines"]

    file_hits = 0

    for idx, line in enumerate(lines, start=1):
        if SESSION_WORD_REGEX.search(line):
            if file_hits == 0:
                files_with_hits += 1
                print(f"\nFILE: {path.name}")
            file_hits += 1
            total_hits += 1
            print(f"  L{idx}: {line}")

    if file_hits:
        print(f"  -> hits in file: {file_hits}")

print("\nSummary:")
print(f"Files scanned: {sum(1 for s in LOADED_SOURCES if s.get('source_type') == 'session_notes')}")
print(f"Files with hits: {files_with_hits}")
print(f"Total 'session' hits: {total_hits}")

# cleanup
del SESSION_WORD_REGEX, total_hits, files_with_hits, src, path, lines, idx, line, file_hits

### Phase 4: Data Profiling & Structural Findings

This phase focused on **observing and understanding the real structure of raw source data** before proposing any chunking or indexing rules.

No transformation, normalization, or chunking was performed in this phase.  
All conclusions are based on direct inspection of normalized line-level data produced in Phase 3.

The purpose of Phase 4 is to ensure that any future chunking rules are grounded in **how the data is actually written**, not assumptions.

---

### What Was Analyzed

All available source files were loaded and profiled:

- OVERRIDE_PATHS = None
- SOURCE_MODE = "ALL"
- Total files analyzed: **125**
- Formats included:
  - `.txt`
  - `.md`
  - `.docx`

For each file, line-length distributions were analyzed using:
- minimum
- deciles / quantiles
- median
- maximum
- standard deviation

Analysis was grouped by **source_type**, not file format.

---

### Key Findings by Source Type

#### 1. auto_transcripts

**Observed structure**
- Files follow a rigid, repeating pattern:
  - One blank line
  - Timestamp line
  - One line of spoken text
- Approximately 50% timestamps, 50% dialogue
- Dialogue lines consistently range ~35–175 characters
- Extremely low variance across files

**Implications**
- This source is highly regular and predictable
- Structural units are already implicit in the data
- Line-level alternation is meaningful and should be preserved

---

#### 2. pbp_transcripts

**Observed structure**
- High structural variability
- Lines include:
  - speaker names
  - dialogue
  - emotes
  - blank lines
  - single-character markers (e.g., "-")
- Markdown formatting appears in some files but not others
- Datetimestamps are inconsistent and not always reliable
- Line-length distributions show a long tail:
  - Many very short lines
  - Occasional very long narrative blocks

**Implications**
- Speaker attribution matters
- Sequence matters more than timestamps
- Chunking must tolerate mixed formatting
- Line length alone is not a reliable delimiter

---

#### 3. planning_notes (docx-heavy)

**Observed structure**
- Formatting information from `.docx` is not preserved
- Blank lines consistently separate conceptual blocks
- Many blocks consist of a **single line**, often serving as:
  - section headers
  - category labels
- Lines are generally short; content is list-like
- Long lines are rare but present

**Implications**
- Blank lines are the primary structural signal
- Single-line blocks likely represent major sections
- The first line of a block often labels the block
- Formatting inference must be conservative

---

#### 4. session_notes

**Observed structure**
- Files often contain multiple sessions
- Session headers:
  - usually include a session number and a date
  - punctuation and exact format vary
- Blank lines separate complete thoughts
- Line-length distribution:
  - heavy use of blank lines
  - short median lines
  - very long tail (summaries, pasted narrative)
- One file is expected to produce many logical documents

**Implications**
- Document-level segmentation is required
- Blank lines represent paragraph boundaries
- Chunking must occur *within* session boundaries
- Over-splitting is safer than under-splitting

---

### Cross-Cutting Observations

- Blank lines are a **strong structural signal** across all human-authored sources
- Line length alone is insufficient for chunking decisions
- File format does not reliably indicate structure
- Source type is the most important determinant of chunking behavior
- Preserving original order and spacing is critical

---

### Resulting v0 Structural Contracts (Conceptual)

These are **conceptual contracts**, not implementations.

| Source Type        | Document Boundary        | Atomic Unit        |
|--------------------|--------------------------|--------------------|
| auto_transcripts   | File                     | Timestamp + text pair |
| pbp_transcripts    | File                     | Speaker / dialogue block |
| planning_notes     | File                     | Blank-line-delimited block |
| session_notes      | Session header            | Blank-line-delimited paragraph |

These contracts will guide future chunking design but are not yet codified.

## Phase 5: Chunking (v0)

In this phase, the notebook defines and applies **v0 chunking rules** to the normalized sources produced in Phase 3.

Chunking is the first structural transformation step: it groups contiguous lines into **chunks** that will later be indexed.

v0 is intentionally simple:

- A **new chunk begins** whenever a line matches any known *header pattern*
- All following lines belong to that chunk until the next header line
- Chunk boundaries preserve original line order and line numbers
- Chunking is designed for iterative refinement; anomalies are expected and will usually be handled by fixing the source file and rerunning

This phase produces:

- `CHUNKS_V0`: a list of chunk dictionaries suitable for later indexing steps

This phase performs **no semantic interpretation** and does not modify any source files on disk.

In [7]:
# Phase 5: Chunking v0 - consolidated header-driven chunking
# Input:  LOADED_SOURCES (list[dict])
# Output: CHUNKS_V0 (list[dict])
LAST_PHASE_RUN = "5"

import re
from pathlib import Path

TIME_LIKE_REGEX = r"\d{1,2}:\d{2}(?::\d{2})?(?:\s*[AP]M)?"

# Header regexes are evaluated in order; first match wins.
# Ordering reflects specificity and expected frequency.
HEADER_REGEXES = [
    # auto_transcripts (most frequent, most rigid)
    ("auto_ts", re.compile(r"^\s*\d{1,2}:\d{2}(?::\d{2})?\s*$")),

    # pbp_transcripts (Discord-style headers, sometimes numbered like "2. ### ...")
    ("pbp_hash", re.compile(rf"^\s*(?:\d+\.\s*)?(?:[*-]\s*)?###\s+.*{TIME_LIKE_REGEX}.*$")),

    # pbp_transcripts (forum-style quoted bold header)
    ("pbp_forum", re.compile(rf"^\s*>?\s*\*\*.*{TIME_LIKE_REGEX}.*\*\*\s*$")),

    # session_notes (lines beginning with optional format codes and "Session", omitting "Session notes")
    ("session", re.compile(r"^\s*(?:\d+\.\s*)?(?:[>#*_\-\s]+)?session\s+(?!notes\b)\S.*$", re.IGNORECASE)),

    # planning_notes (lines beginning with markdown headers)
    ("md_heading", re.compile(r"^\s*(?:[*\-]\s*)?#{1,6}\s+\S.*$")),
]

CHUNKS_V0 = []
chunk_global_id = 1  # global (not per-file) to allow stable cross-file references

world_root_resolved = Path(WORLD_ROOT).resolve()

for src in LOADED_SOURCES:
    source_id = src["source_id"]
    path = src["path"]
    relpath = src.get("relpath", str(path))
    source_type = src.get("source_type", "unknown")
    file_type = src.get("file_type", "unknown")
    lines = src["lines"]

    current_kind = "preamble"
    current_lines = []
    chunk_start_line = 1

    for idx, line in enumerate(lines, start=1):
        matched_kind = next(
            (kind for kind, header_regex in HEADER_REGEXES if header_regex.match(line)),
            None,
        )

        if matched_kind:
            # Flush what we have so far (literal fidelity: keep preamble content too)
            if current_lines:
                CHUNKS_V0.append(
                    {
                        "chunk_id": chunk_global_id,
                        "source_id": source_id,
                        "source_type": source_type,
                        "file_type": file_type,
                        "path": path,
                        "relpath": relpath,
                        "start_line": chunk_start_line,
                        "end_line": idx - 1,
                        "header_kind": current_kind,
                        "lines": list(current_lines),
                    }
                )
                chunk_global_id += 1

            # Start a new chunk at this header line (header is included)
            current_kind = matched_kind
            current_lines = [line]
            chunk_start_line = idx

        else:
            current_lines.append(line)

    # Flush final chunk (including files with no headers)
    if current_lines:
        CHUNKS_V0.append(
            {
                "chunk_id": chunk_global_id,
                "source_id": source_id,
                "source_type": source_type,
                "file_type": file_type,
                "path": path,
                "relpath": relpath,
                "start_line": chunk_start_line,
                "end_line": len(lines),
                "header_kind": current_kind,
                "lines": list(current_lines),
            }
        )
        chunk_global_id += 1

print(f"Chunked v0: {len(CHUNKS_V0)} chunks from {len(LOADED_SOURCES)} files.")

# Cleanup locals (keep CHUNKS_V0)
del TIME_LIKE_REGEX, HEADER_REGEXES, chunk_global_id
del src, source_id, path, relpath, source_type, file_type, lines
del current_kind, current_lines, chunk_start_line, idx, line, matched_kind
del world_root_resolved
del Path

Chunked v0: 170131 chunks from 130 files.


## Phase 6: Vocabulary bootstrap (candidate proper nouns)

In this phase, the notebook generates a first-pass list of candidate proper nouns and named entities from the v0 chunks.

Goal:
- Produce a ranked candidate list of names/vocabs worth adding to the world vocabulary.
- Provide evidence snippets so a human can confirm canon spellings and create aliases.

Approach (v0):
- Exclude `auto_transcripts` to avoid overwhelming the candidate list with conversational noise.
- Extract:
  - Multiword Title Case vocabs (e.g., "Temple of the Bronze Flame")
  - Single-word Proper Nouns (e.g., "Dhassa", "Killeth")
  - PbP authors (e.g. "CroweTheDualityKing")
- Aggregate counts by:
  - total mentions
  - chunks mentioned
  - files mentioned
  - source_type distribution
- Capture a small number of evidence snippets per candidate.

This phase does not resolve ambiguity (e.g., "temple" vs "monastery", or "Lia" vs "Liavarah").
It produces the evidence needed to curate aliases and canonical forms in a later step.


In [11]:
# Phase 6a: Vocabulary bootstrap setup (v0)
# Input:  CHUNKS_V0
# Output: shared config + aggregate structures for Phase 6b/6c
LAST_PHASE_RUN = "6a"

import re
from collections import defaultdict, Counter
from pathlib import Path
import pandas as pd

from datetime import datetime
RUN_STAMP_6 = datetime.now().strftime("%Y%m%d_%H%M%S")
del datetime

# Content candidate scan excludes noisy sources (keeps current behavior).
EXCLUDE_SOURCE_TYPES_CONTENT = {"auto_transcripts"}

# Header author scan should NOT filter on source_type.
# It keys exclusively off header_kind == "pbp_hash".

CONNECTORS = {
    "of", "the", "and", "to", "in", "at", "on", "for", "from", "with", "by", "a", "an",
}

STOP_SINGLE = {
    "I", "A", "An", "The", "And", "Or", "But", "We", "You", "He", "She", "They",
    "This", "That", "These", "Those", "It", "Its", "Our", "My", "Your", "His", "Her",
    "Session", "Sessions",
    "No","Yes","What","Why","How","When","Where","Who","Whom","Which",
    "As","If","At","In","On","Not","With","Without","Within","For","From","To","Of",
    "And","Or","But","So","Then","Than","Now","Just","Only","Also","Still","Even",
    "There","Here","This","That","These","Those","It","Its","We","You","He","She","They",
    "Do","Does","Did","Can","Could","Will","Would","Shall","Should","May","Might","Must",
    "All","One","Well","Go","Ah","After","Oh","Let","AM","PM"
}

STOP_TOKENS = {"##", "#", "###", ">", "*", "-", "_", "`"}

MAX_EVIDENCE_PER_CANDIDATE = 5
MAX_SNIPPET_CHARS = 160

# Regexes (content scan)
WORD_REGEX = re.compile(r"[A-Za-z][A-Za-z0-9'\-]*")
TITLE_WORD_REGEX = re.compile(r"^[A-Z][a-z][A-Za-z'\-]*$")
ACRONYM_REGEX = re.compile(r"^[A-Z]{2,8}$")

# Regex (pbp_hash header author scan)
# Example: * ### **Shworn** **7/3/25, 6:12 PM**
PBP_HASH_HEADER_REGEX = re.compile(
    r"^\s*(?:\d+\.\s*)?(?:[*-]\s*)?###\s+\*\*(?P<author>[^*]+)\*\*\s+\*\*(?P<ts>[^*]+)\*\*\s*$"
)

# Aggregates for content candidate vocab
mentions = Counter()
chunks_mentioned = defaultdict(set)
files_mentioned = defaultdict(set)
by_source_type = defaultdict(Counter)
evidence = defaultdict(list)

# Aggregates for pbp header authors
header_mentions = Counter()          # author -> total header occurrences
header_chunks = defaultdict(set)     # author -> set(chunk_id)
header_files = defaultdict(set)      # author -> set(path)
header_evidence = defaultdict(list)  # author -> evidence dicts

print("Phase 6a ready.")

Phase 6a ready.


In [12]:
# Phase 6b: Vocabulary bootstrap scans (v0)
# Input:  CHUNKS_V0 and Phase 6a globals
# Output: populated aggregates for Phase 6c
#
# Design:
# - Header scan: does NOT filter by source_type. It keys only off header_kind == "pbp_hash".
# - Content scan: filters by source_type (exclude auto_transcripts) via included_chunks_content.
LAST_PHASE_RUN = "6b"

# -----------------------
# 6b.0: Select chunks for content scan
# -----------------------
included_chunks_content = [
    c for c in CHUNKS_V0
    if c.get("source_type") not in EXCLUDE_SOURCE_TYPES_CONTENT
]

# -----------------------
# 6b.1: PbP header author scan (no source_type filtering)
# -----------------------
for chunk in CHUNKS_V0:
    lines = chunk.get("lines", [])
    header_kind = chunk.get("header_kind")

    if lines and header_kind == "pbp_hash":
        header_line = (lines[0] or "").strip()

        if header_line:
            m = PBP_HASH_HEADER_REGEX.match(header_line)
            if m:
                author = " ".join((m.group("author") or "").split())
                ts = " ".join((m.group("ts") or "").split())

                if author:
                    chunk_id = chunk.get("chunk_id")
                    source_type = chunk.get("source_type", "unknown")
                    path = str(chunk.get("path", ""))

                    header_mentions[author] += 1
                    header_chunks[author].add(chunk_id)
                    header_files[author].add(path)

                    if len(header_evidence[author]) < MAX_EVIDENCE_PER_CANDIDATE:
                        header_evidence[author].append(
                            {
                                "chunk_id": chunk_id,
                                "source_type": source_type,
                                "path": path,
                                "start_line": chunk.get("start_line"),
                                "end_line": chunk.get("end_line"),
                                "header": header_line,
                                "author": author,
                                "timestamp": ts,
                            }
                        )

                    del chunk_id, source_type, path
                del author, ts
            del m

# -----------------------
# 6b.2: Content candidate scan (exclude auto_transcripts by source_type)
# -----------------------
for chunk in included_chunks_content:
    lines = chunk.get("lines", [])
    if lines:
        header_kind = chunk.get("header_kind")

        # Split header from content (instead of dropping header entirely)
        is_header_chunk = bool(header_kind in {"pbp_hash", "pbp_forum", "session"} and lines)
        content_lines = lines[1:] if is_header_chunk else lines

        # Collapse all chunk content text to single-space whitespace
        concat_text = " ".join(" ".join(content_lines).split())

        if concat_text:
            chunk_id = chunk.get("chunk_id")
            source_type = chunk.get("source_type", "unknown")
            path = str(chunk.get("path", ""))

            # Evidence snippet (reuse for all candidates found in this chunk)
            if len(concat_text) > MAX_SNIPPET_CHARS:
                snippet = concat_text[: MAX_SNIPPET_CHARS - 3] + "..."
            else:
                snippet = concat_text

            words = WORD_REGEX.findall(concat_text)
            candidates = []

            # ---- extract Title Case vocabs ----
            i = 0
            n = len(words)

            while i < n:
                w = words[i]
                if TITLE_WORD_REGEX.match(w) or ACRONYM_REGEX.match(w):
                    parts = [w]
                    cap_count = 1
                    j = i + 1

                    while j < n:
                        wj = words[j]
                        wj_lower = wj.lower()

                        if TITLE_WORD_REGEX.match(wj) or ACRONYM_REGEX.match(wj):
                            parts.append(wj)
                            cap_count += 1
                            j += 1
                        elif wj_lower in CONNECTORS:
                            parts.append(wj_lower)
                            j += 1
                        else:
                            break

                    if cap_count >= 2:
                        candidates.append(" ".join(parts))

                    i = j
                    
                    del parts, cap_count, j
                else:
                    i += 1
                del w
            del i,n

            # ---- extract single-word propers ----
            if words:
                first_word = words[0]
                tail_set = set(words[1:])

                for w in words:
                    if (
                        (TITLE_WORD_REGEX.match(w) or ACRONYM_REGEX.match(w))
                        and w not in STOP_TOKENS
                        and w not in STOP_SINGLE
                        and (w != first_word or w in tail_set)
                    ):
                        candidates.append(w)
                del w

            # Record aggregates
            if candidates:
                seen_this_chunk = set()

                # Count mentions (raw frequency)
                for cand in candidates:
                    mentions[cand] += 1
                    seen_this_chunk.add(cand)

                # Per-chunk / per-file coverage + by_source_type + evidence
                for cand in seen_this_chunk:
                    chunks_mentioned[cand].add(chunk_id)
                    files_mentioned[cand].add(path)
                    by_source_type[cand][source_type] += 1

                    if len(evidence[cand]) < MAX_EVIDENCE_PER_CANDIDATE:
                        evidence[cand].append(
                            {
                                "chunk_id": chunk_id,
                                "source_type": source_type,
                                "path": path,
                                "start_line": chunk.get("start_line"),
                                "end_line": chunk.get("end_line"),
                                "snippet": snippet,
                            }
                        )
                del cand

print(f"Content chunks scanned: {len(included_chunks_content)} (excluded: {sorted(EXCLUDE_SOURCE_TYPES_CONTENT)})")
print(f"Content candidates found: {len(mentions)}")
print(f"PbP header authors found: {len(header_mentions)}")

Content chunks scanned: 1486 (excluded: ['auto_transcripts'])
Content candidates found: 16655
PbP header authors found: 6


In [13]:
# Phase 6c: Materialize outputs + cleanup (v0)
# Input:  aggregates from 6a/6b, WORKING_DRAFTS_PATH, WORLD_ROOT (for relpaths in outputs)
# Output: CANDIDATE_VOCAB + written files
LAST_PHASE_RUN = "6c"

# Resolve WORLD_ROOT once for relpath output
world_root_resolved = Path(WORLD_ROOT).resolve()

# ---- build CANDIDATE_VOCAB ----
rows = []
for cand, total in mentions.most_common():
    rows.append(
        {
            "candidate": cand,
            "mentions_total": total,
            "chunks_mentioned": len(chunks_mentioned[cand]),
            "files_mentioned": len(files_mentioned[cand]),
            "source_types": dict(by_source_type[cand]),
        }
    )

CANDIDATE_VOCAB = pd.DataFrame(rows)
if not CANDIDATE_VOCAB.empty:
    CANDIDATE_VOCAB = (
        CANDIDATE_VOCAB
        .sort_values(
            by=["files_mentioned", "chunks_mentioned", "mentions_total"],
            ascending=[False, False, False],
        )
        .reset_index(drop=True)
    )

print(f"Excluded source types (content scan): {sorted(EXCLUDE_SOURCE_TYPES_CONTENT)}")
print(f"Content chunks scanned: {len(included_chunks_content)}")
print(f"Candidates found: {len(CANDIDATE_VOCAB)}")
display(CANDIDATE_VOCAB.head(25))

# ---- build HEADER_AUTHORS_V0 ----
header_rows = []
for author, total in header_mentions.most_common():
    header_rows.append(
        {
            "author": author,
            "mentions_total": total,
            "chunks_mentioned": len(header_chunks[author]),
            "files_mentioned": len(header_files[author]),
        }
    )

HEADER_AUTHORS_V0 = pd.DataFrame(header_rows)
if not HEADER_AUTHORS_V0.empty:
    HEADER_AUTHORS_V0 = (
        HEADER_AUTHORS_V0
        .sort_values(
            by=["files_mentioned", "chunks_mentioned", "mentions_total"],
            ascending=[False, False, False],
        )
        .reset_index(drop=True)
    )

print(f"PbP header authors found: {len(HEADER_AUTHORS_V0)}")
display(HEADER_AUTHORS_V0.head(25))

# ---- write outputs ----
out_dir = Path(WORKING_DRAFTS_PATH)
out_dir.mkdir(parents=True, exist_ok=True)

# vocab outputs (same naming convention as before)
vocab_csv_path = out_dir / f"candidate_vocab_v0_excluding_auto_transcripts_{RUN_STAMP_6}.csv"
CANDIDATE_VOCAB.to_csv(vocab_csv_path, index=False, encoding="utf-8")
try:
    vocab_csv_rel = str(vocab_csv_path.resolve().relative_to(world_root_resolved))
except Exception:
    vocab_csv_rel = str(vocab_csv_path)
print(f"Wrote: {vocab_csv_rel}")

TOP_N = 150
vocab_md_path = out_dir / f"candidate_vocab_v0_evidence_top{TOP_N}_{RUN_STAMP_6}.md"
with vocab_md_path.open("w", encoding="utf-8") as f:
    f.write("# Candidate vocab evidence (v0, excluding auto_transcripts)\n\n")
    f.write("For human review. Ranked by file/chunk coverage.\n\n")

    for i, row in CANDIDATE_VOCAB.head(TOP_N).iterrows():
        cand = row["candidate"]
        f.write(f"## {i+1}. {cand}\n\n")
        f.write(f"- mentions_total: {row['mentions_total']}\n")
        f.write(f"- chunks_mentioned: {row['chunks_mentioned']}\n")
        f.write(f"- files_mentioned: {row['files_mentioned']}\n")
        f.write(f"- source_types: {row['source_types']}\n\n")

        for ev in evidence.get(cand, []):
            p = ev["path"]
            try:
                rel = str(Path(p).resolve().relative_to(world_root_resolved))
            except Exception:
                rel = p

            f.write(
                f"- chunk {ev['chunk_id']} [{ev['source_type']}] "
                f"{rel} L{ev['start_line']}-L{ev['end_line']}\n"
            )
            f.write(f"  - {ev['snippet']}\n")
            del p, rel

        f.write("\n")
    del i,row

try:
    vocab_md_rel = str(vocab_md_path.resolve().relative_to(world_root_resolved))
except Exception:
    vocab_md_rel = str(vocab_md_path)
print(f"Wrote: {vocab_md_rel}")

# pbp header author outputs (new)
authors_csv_path = out_dir / f"candidate_pbp_authors_v0_{RUN_STAMP_6}.csv"
HEADER_AUTHORS_V0.to_csv(authors_csv_path, index=False, encoding="utf-8")
try:
    authors_csv_rel = str(authors_csv_path.resolve().relative_to(world_root_resolved))
except Exception:
    authors_csv_rel = str(authors_csv_path)
print(f"Wrote: {authors_csv_rel}")

authors_md_path = out_dir / f"candidate_pbp_authors_v0_evidence_{RUN_STAMP_6}.md"
with authors_md_path.open("w", encoding="utf-8") as f:
    f.write("# Candidate PbP authors from headers (v0)\n\n")
    f.write("For human review. Extracted from pbp_hash header lines.\n\n")

    for i, row in HEADER_AUTHORS_V0.iterrows():
        author = row["author"]
        f.write(f"## {i+1}. {author}\n\n")
        f.write(f"- mentions_total: {row['mentions_total']}\n")
        f.write(f"- chunks_mentioned: {row['chunks_mentioned']}\n")
        f.write(f"- files_mentioned: {row['files_mentioned']}\n\n")

        for ev in header_evidence.get(author, []):
            p = ev["path"]
            try:
                rel = str(Path(p).resolve().relative_to(world_root_resolved))
            except Exception:
                rel = p

            f.write(
                f"- chunk {ev['chunk_id']} [{ev['source_type']}] "
                f"{rel} L{ev['start_line']}-L{ev['end_line']}\n"
            )
            f.write(f"  - {ev['header']}\n")
            del ev, p, rel

        f.write("\n")
        
    del i, row

try:
    authors_md_rel = str(authors_md_path.resolve().relative_to(world_root_resolved))
except Exception:
    authors_md_rel = str(authors_md_path)
print(f"Wrote: {authors_md_rel}")

# ---- cleanup locals (keep CANDIDATE_VOCAB, HEADER_AUTHORS_V0) ----
# clean up imports
del re, defaultdict, Counter, Path, pd

# clean up config variables
del RUN_STAMP_6, EXCLUDE_SOURCE_TYPES_CONTENT
del CONNECTORS, STOP_SINGLE, STOP_TOKENS
del MAX_EVIDENCE_PER_CANDIDATE, MAX_SNIPPET_CHARS
del WORD_REGEX, TITLE_WORD_REGEX, ACRONYM_REGEX, PBP_HASH_HEADER_REGEX
del mentions, chunks_mentioned, files_mentioned, by_source_type, evidence
del header_mentions, header_chunks, header_files, header_evidence

# clean up working variables
del included_chunks_content, chunk, lines, header_kind, is_header_chunk
del content_lines, concat_text, chunk_id, source_type, path, snippet
del words, candidates, header_line, first_word, tail_set, seen_this_chunk
del world_root_resolved, cand, rows, total, header_rows, author, out_dir
del vocab_csv_path, vocab_csv_rel, TOP_N, vocab_md_path, vocab_md_rel
del authors_csv_path, authors_csv_rel, authors_md_path, authors_md_rel
del wj, wj_lower, f
del HEADER_AUTHORS_V0, CANDIDATE_VOCAB

Excluded source types (content scan): ['auto_transcripts']
Content chunks scanned: 1486
Candidates found: 16655


Unnamed: 0,candidate,mentions_total,chunks_mentioned,files_mentioned,source_types
0,Henry,832,293,20,"{'pbp_transcripts': 157, 'planning_notes': 21,..."
1,Alivyre,535,233,20,"{'pbp_transcripts': 111, 'planning_notes': 11,..."
2,Faeryne,847,239,19,"{'pbp_transcripts': 93, 'planning_notes': 9, '..."
3,Lia,405,169,19,"{'pbp_transcripts': 86, 'planning_notes': 7, '..."
4,Victor,724,338,18,"{'pbp_transcripts': 283, 'planning_notes': 13,..."
5,Crafthold,337,142,17,"{'pbp_transcripts': 32, 'planning_notes': 9, '..."
6,Luminia,285,133,14,"{'pbp_transcripts': 49, 'planning_notes': 2, '..."
7,Elysia,74,58,14,"{'pbp_transcripts': 45, 'planning_notes': 9, '..."
8,Urgulk,167,54,13,"{'pbp_transcripts': 25, 'planning_notes': 3, '..."
9,Dhassa,80,32,13,"{'pbp_transcripts': 5, 'planning_notes': 8, 's..."


PbP header authors found: 6


Unnamed: 0,author,mentions_total,chunks_mentioned,files_mentioned
0,Charis,229,229,6
1,CroweTheDualityKing,193,193,6
2,Bysickle,57,57,6
3,Kalina Hitana,42,42,6
4,Shworn,28,28,3
5,Lia,8,8,2


Wrote: _local/machine_wip/candidate_vocab_v0_excluding_auto_transcripts_20260210_160247.csv
Wrote: _local/machine_wip/candidate_vocab_v0_evidence_top150_20260210_160247.md
Wrote: _local/machine_wip/candidate_pbp_authors_v0_20260210_160247.csv
Wrote: _local/machine_wip/candidate_pbp_authors_v0_evidence_20260210_160247.md


## Phase 7: Vocabulary-based linking

In this phase, the notebook applies **curated vocabulary files** to the chunked source data in order to link known identifiers to specific regions of text.

At this point, the source material has already been normalized and segmented into `CHUNKS_V0`. Each chunk represents a bounded region of text with stable source metadata and preserved line structure.

Phase 7 introduces the first **interpretive pass** over the corpus, but it is intentionally conservative. Only explicitly defined vocabulary entries are considered, and no new identifiers are inferred or generated.

This phase answers two closely related questions:

- **Where do known entities appear in the corpus?**
- **Who authored PbP content, where that information is explicitly available?**

### What this phase does

In this phase, the notebook:

- Loads authoritative vocabulary files (entities, aliases, author mappings)
- Normalizes vocabulary into a unified matching table
- Applies boundary-aware matching to chunk text
- Links canonical entities and approved aliases to their occurrences
- Extracts PbP authorship from header metadata
- Records each match with stable source and positional context

No semantic interpretation occurs here. All matches are literal and vocabulary-driven.

### Outputs

Phase 7 produces structured reference tables:

- **Entity mentions**: where known entities appear in chunked text
- **Author mentions**: which player authored which PbP chunks, when available

These outputs form a **reference index**, suitable for later aggregation, querying, attribution analysis, or graph construction.

In [43]:
# Phase 7a: Linking preparation (v0)
# Output:
#   - entities_df
#   - aliases_df (may be empty)
#   - author_aliases_df (may be empty)
#   - pc_map_df (may be empty)
#   - VOCAB_DF (canonical + aliases, regex-ready)
#
# Notes:
# - CSV schemas are human-facing and may contain extra columns.
# - We normalize to semantic column names via *_COLS maps below.
# - Any columns not mapped are intentionally ignored.
LAST_PHASE_RUN = "7a"

from datetime import datetime
RUN_STAMP_7 = datetime.now().strftime("%Y%m%d_%H%M%S")
del datetime

import re
from pathlib import Path
import pandas as pd

CASE_INSENSITIVE = True

# ------------------------------------------------------------------
# Semantic column mappings for vocab CSVs
# Each semantic field -> acceptable column names in the CSV
# Any other CSV columns are ignored on purpose.
# ------------------------------------------------------------------
ENTITY_COLS = {
    "entity_id": ["entity_id", "id"],
    "canonical": ["canonical", "canonical_name", "name"],
}
ALIAS_COLS = {
    "entity_id": ["entity_id", "id"],
    "alias": ["alias", "alt", "alternate"],
}
AUTHOR_ALIAS_COLS = {
    "author": ["author", "discord_name", "handle"],
    "player_entity_id": ["player_entity_id", "player", "player_id"],
    "ambig_char_id": ["ambig_char_id", "ambiguous_character", "ambig_character"],
}
PC_MAP_COLS = {
    "player_entity_id": ["player_entity_id", "player", "player_id"],
    "char_entity_id": ["char_entity_id", "character_entity_id", "character"],
}

# ------------------------------------------------------------------
# Helper function to handle CSV column names
# ------------------------------------------------------------------
def _normalize_vocab_csv(df, col_map, label):

    if df is None or df.empty:
        return pd.DataFrame(columns=list(col_map.keys()))

    # Build rename map: first matching option wins
    rename = {}
    for semantic, options in col_map.items():
        found = next((c for c in options if c in df.columns), None)
        if found:
            rename[found] = semantic

    # If there are rows but we couldn't resolve any semantic columns, warn loudly
    if len(df) > 0 and not rename:
        header_line = ", ".join(list(df.columns))
        expected = {k: v for k, v in col_map.items()}
        print(
            f"WARNING [{label}]: CSV has rows but none of the expected columns were found.\n"
            f"  CSV columns: {header_line}\n"
            f"  Expected (semantic -> acceptable names): {expected}"
        )
        return pd.DataFrame(columns=list(col_map.keys()))

    out = df.rename(columns=rename)

    # Warn on missing semantic fields (but keep what we have)
    missing_semantic = [k for k in col_map.keys() if k not in out.columns]
    if len(df) > 0 and missing_semantic:
        print(
            f"WARNING [{label}]: missing semantic columns after normalization: {missing_semantic}\n"
            f"  CSV columns: {list(df.columns)}\n"
            f"  Using: {list(out.columns)}"
        )

    # Keep only the semantic columns that exist (stable order)
    keep = [k for k in col_map.keys() if k in out.columns]
    return out[keep].copy()

# ------------------------------------------------------------------
# Validate required globals
# ------------------------------------------------------------------
if "CHUNKS_V0" not in globals() or not CHUNKS_V0:
    raise ValueError("CHUNKS_V0 is missing or empty. Rerun Phase 5.")

required_paths = {"VOCAB_ENTITIES_PATH", "WORKING_DRAFTS_PATH"}
missing = [k for k in required_paths if k not in globals() or not globals()[k]]
if missing:
    raise ValueError(f"Missing required descriptor paths: {missing}")

# Optional paths (do not overwrite globals)
vocab_aliases_path = globals().get("VOCAB_ALIASES_PATH")
vocab_authors_path = globals().get("VOCAB_AUTHORS_PATH")
vocab_pc_map_path = globals().get("VOCAB_PC_MAP_PATH")

# Relpaths (expected to be published by Phase 1c; fall back to "none")
entities_rel = globals().get("VOCAB_ENTITIES_RELPATH", "none")
aliases_rel = globals().get("VOCAB_ALIASES_RELPATH", "none")
authors_rel = globals().get("VOCAB_AUTHORS_RELPATH", "none")
pc_map_rel = globals().get("VOCAB_PC_MAP_RELPATH", "none")

# ------------------------------------------------------------------
# Load entities (required) + normalize schema
# ------------------------------------------------------------------
entities_path = Path(VOCAB_ENTITIES_PATH)
if not entities_path.exists():
    raise ValueError(f"Entities vocab file not found: {entities_path}")

entities_raw = pd.read_csv(entities_path, dtype=str).fillna("")
entities_df = _normalize_vocab_csv(entities_raw, ENTITY_COLS, "entities")

if entities_df.empty:
    raise ValueError("Entities CSV did not produce any usable rows after normalization.")

# ------------------------------------------------------------------
# Load aliases (optional) + normalize schema
# ------------------------------------------------------------------
aliases_df = pd.DataFrame(columns=list(ALIAS_COLS.keys()))
if vocab_aliases_path:
    p = Path(vocab_aliases_path)
    if p.exists():
        aliases_raw = pd.read_csv(p, dtype=str).fillna("")
        aliases_df = _normalize_vocab_csv(aliases_raw, ALIAS_COLS, "aliases")
    del p

# ------------------------------------------------------------------
# Load author aliases (optional) + normalize schema
# ------------------------------------------------------------------
author_aliases_df = pd.DataFrame(columns=list(AUTHOR_ALIAS_COLS.keys()))
if vocab_authors_path:
    p = Path(vocab_authors_path)
    if p.exists():
        authors_raw = pd.read_csv(p, dtype=str).fillna("")
        author_aliases_df = _normalize_vocab_csv(authors_raw, AUTHOR_ALIAS_COLS, "author_aliases")
        # Whitespace normalization on any columns that exist
    del p

# ------------------------------------------------------------------
# Load player-character map (optional) + normalize schema
# ------------------------------------------------------------------
pc_map_df = pd.DataFrame(columns=list(PC_MAP_COLS.keys()))
if vocab_pc_map_path:
    p = Path(vocab_pc_map_path)
    if p.exists():
        pc_raw = pd.read_csv(p, dtype=str).fillna("")
        pc_map_df = _normalize_vocab_csv(pc_raw, PC_MAP_COLS, "pc_map")
    del p

# ------------------------------------------------------------------
# Build VOCAB_DF (entities + aliases only)
# ------------------------------------------------------------------
rows = []

for _, r in entities_df.iterrows():
    if r.get("entity_id", "") and r.get("canonical", ""):
        rows.append(
            {
                "vocab": r["canonical"],
                "entity_id": r["entity_id"],
                "canonical": r["canonical"],
                "match_kind": "canonical",
            }
        )

if not aliases_df.empty:
    canon_by_id = dict(zip(entities_df["entity_id"], entities_df["canonical"]))
    for _, r in aliases_df.iterrows():
        if r.get("entity_id", "") and r.get("alias", ""):
            rows.append(
                {
                    "vocab": r["alias"],
                    "entity_id": r["entity_id"],
                    "canonical": canon_by_id.get(r["entity_id"], ""),
                    "match_kind": "alias",
                }
            )
    del canon_by_id

VOCAB_DF = pd.DataFrame(rows).drop_duplicates(subset=["vocab", "entity_id"]).reset_index(drop=True)
if VOCAB_DF.empty:
    raise ValueError("No vocabs available for linking.")

VOCAB_DF["vocab_len"] = VOCAB_DF["vocab"].str.len()
VOCAB_DF = VOCAB_DF.sort_values(["vocab_len", "vocab"], ascending=[False, True]).reset_index(drop=True)

# Compile regex patterns (boundary-aware). Avoid backslashes inside f-string expressions.
flags = re.IGNORECASE if CASE_INSENSITIVE else 0
patterns = []
for vocab in VOCAB_DF["vocab"]:
    esc = re.escape(vocab)
    esc_ws = esc.replace(r"\ ", r"\s+")
    patterns.append(re.compile(rf"(?<!\w){esc_ws}(?!\w)", flags))
VOCAB_DF["pattern"] = patterns

print(f"Loaded entities: {len(entities_df)} [{entities_rel}]")
print(f"Loaded aliases: {len(aliases_df)} [{aliases_rel}]")
print(f"Loaded author aliases: {len(author_aliases_df)} [{authors_rel}]")
print(f"Loaded PC map rows: {len(pc_map_df)} [{pc_map_rel}]")
print(f"Search vocabs: {len(VOCAB_DF)} (longest-first)")

# Cleanup locals (keep *_df, VOCAB_DF, RUN_STAMP_7, CASE_INSENSITIVE)
del required_paths, missing
del vocab_aliases_path, vocab_authors_path, vocab_pc_map_path
del entities_path, entities_raw
del rows, r, vocab, patterns, flags, esc, esc_ws
del entities_rel, aliases_rel, authors_rel, pc_map_rel
del ENTITY_COLS, ALIAS_COLS, AUTHOR_ALIAS_COLS, PC_MAP_COLS
del aliases_raw, authors_raw, pc_raw

Loaded entities: 176 [_meta/indexes/vocab_entities.csv]
Loaded aliases: 87 [_meta/indexes/vocab_aliases.csv]
Loaded author aliases: 6 [_meta/indexes/vocab_author_aliases.csv]
Loaded PC map rows: 42 [_meta/indexes/vocab_map_player_character.csv]
Search vocabs: 251 (longest-first)


In [44]:
# Phase 7b: Entity linking (v0)
# Input:  CHUNKS_V0, VOCAB_DF, RUN_STAMP_7, WORKING_DRAFTS_PATH
# Output: ENTITY_MENTIONS_V0 (DataFrame) + CSV written to WORKING_DRAFTS_PATH
LAST_PHASE_RUN = "7b"

EXCLUDE_SOURCE_TYPES = {"auto_transcripts"}
MAX_SNIPPET_CHARS = 220

vocabs = VOCAB_DF["vocab"].tolist()
entity_ids = VOCAB_DF["entity_id"].tolist()
canonicals = VOCAB_DF["canonical"].tolist()
match_kinds = VOCAB_DF["match_kind"].tolist()
patterns = VOCAB_DF["pattern"].tolist()

rows = []
mention_id = 1

for chunk in CHUNKS_V0:
    source_type = chunk.get("source_type", "unknown")
    if source_type in EXCLUDE_SOURCE_TYPES:
        continue

    chunk_id = chunk.get("chunk_id")
    source_id = chunk.get("source_id")
    path = chunk.get("path")
    relpath = chunk.get("relpath", "")  # <- use what Phase 2/3/5 already computed

    lines = chunk.get("lines", [])
    header_kind = chunk.get("header_kind")

    # Drop pbp/session header line (metadata, not narrative)
    if header_kind in {"pbp_hash", "pbp_forum", "session"} and lines:
        content_lines = lines[1:]
        content_start_line = (chunk.get("start_line") or 1) + 1
    else:
        content_lines = lines
        content_start_line = chunk.get("start_line") or 1

    concat_text = " ".join(" ".join(content_lines).split())
    if not concat_text:
        continue

    if len(concat_text) > MAX_SNIPPET_CHARS:
        snippet = concat_text[: MAX_SNIPPET_CHARS - 3] + "..."
    else:
        snippet = concat_text

    # Longest-first matching with span masking
    consumed = [False] * len(concat_text)

    for vocab, entity_id, canonical, match_kind, pat in zip(
        vocabs, entity_ids, canonicals, match_kinds, patterns
    ):
        hits = [(m.start(), m.end()) for m in pat.finditer(concat_text)]
        if not hits:
            continue

        kept = []
        for a, b in hits:
            if a < 0 or b <= a:
                continue
            if any(consumed[a:b]):
                continue
            kept.append((a, b))

        if not kept:
            continue

        for a, b in kept:
            for k in range(a, b):
                consumed[k] = True

        rows.append(
            {
                "mention_id": mention_id,
                "entity_id": entity_id,
                "canonical": canonical,
                "matched_vocab": vocab,
                "match_kind": match_kind,
                "match_count_in_chunk": len(kept),
        
                "chunk_id": chunk_id,
                "source_id": source_id,
                "source_type": source_type,
                "path": str(path),
                "relpath": relpath,
                "chunk_start_line": chunk.get("start_line") or 1,
                "chunk_end_line": chunk.get("end_line"),
                "header_kind": header_kind,
        
                "content_start_line": content_start_line,
                "snippet": snippet,
            }
        )
        mention_id += 1

ENTITY_MENTIONS_V0 = pd.DataFrame(rows)

print(f"Chunks scanned: {sum(1 for c in CHUNKS_V0 if c.get('source_type') not in EXCLUDE_SOURCE_TYPES)}")
print(f"Entity mentions (rows): {len(ENTITY_MENTIONS_V0)}")
display(ENTITY_MENTIONS_V0.head(5))

out_dir = Path(WORKING_DRAFTS_PATH)
out_dir.mkdir(parents=True, exist_ok=True)

csv_name = f"entity_mentions_v0_{RUN_STAMP_7}.csv"
csv_path = out_dir / csv_name
ENTITY_MENTIONS_V0.to_csv(csv_path, index=False, encoding="utf-8")

print(f"Wrote: {WORKING_DRAFTS_RELPATH}/{csv_name}")

# Cleanup locals (keep ENTITY_MENTIONS_V0)
del EXCLUDE_SOURCE_TYPES, MAX_SNIPPET_CHARS
del vocabs, entity_ids, canonicals, match_kinds, patterns
del rows, mention_id
del chunk, source_type, chunk_id, source_id, path, relpath, lines, header_kind
del content_lines, content_start_line, concat_text, snippet, consumed
del vocab, entity_id, canonical, match_kind, pat, hits, kept, a, b, k
del out_dir, csv_name, csv_path

Chunks scanned: 1486
Entity mentions (rows): 5362


Unnamed: 0,mention_id,entity_id,canonical,matched_vocab,match_kind,match_count_in_chunk,chunk_id,source_id,source_type,path,relpath,chunk_start_line,chunk_end_line,header_kind,content_start_line,snippet
0,1,faction_spencer,Spencer,Spencer,canonical,1,168646,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,1,5,pbp_hash,2,"* ""although a Spencer is a greater threat i st..."
1,2,person_shworn,Shworn Sleepsong,Shworn,alias,1,168646,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,1,5,pbp_hash,2,"* ""although a Spencer is a greater threat i st..."
2,3,person_victor,Victor D Evernight,Victor,alias,1,168646,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,1,5,pbp_hash,2,"* ""although a Spencer is a greater threat i st..."
3,4,person_piers,Piers Ashton,Piers,alias,1,168646,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,1,5,pbp_hash,2,"* ""although a Spencer is a greater threat i st..."
4,5,person_shworn,Shworn Sleepsong,Shworn,alias,2,168647,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,6,10,pbp_hash,7,"* *Henry turns to his brother, letting out a s..."


Wrote: _local/machine_wip/entity_mentions_v0_20260210_185146.csv


In [46]:
# Phase 7c: PbP authorship linking (v0)
# Input:
#   - CHUNKS_V0 (list[dict]) with header_kind + lines + relpath/path
#   - author_aliases_df (author -> player_entity_id, optional ambig_char_id)
#   - entities_df (entity_id -> canonical)
#   - RUN_STAMP_7, WORKING_DRAFTS_PATH (+ WORKING_DRAFTS_RELPATH if present)
# Output:
#   - AUTHOR_MENTIONS_V0 (DataFrame)
#   - CSV written to WORKING_DRAFTS_PATH
LAST_PHASE_RUN = "7c"

EXCLUDE_SOURCE_TYPES = {"auto_transcripts"}

# Discord-style PbP header (as chunked in Phase 5): "* ### **Author** **timestamp**"
PBP_HASH_HEADER_REGEX = re.compile(
    r"^\s*(?:\d+\.\s*)?(?:[*-]\s*)?###\s+\*\*(?P<author>[^*]+?)\*\*\s+\*\*(?P<ts>[^*]+?)\*\*\s*$"
)

# --- lookups ---
player_canon_by_id = dict(zip(entities_df["entity_id"], entities_df["canonical"]))

author_to_player = {}

if "author_aliases_df" in globals() and isinstance(author_aliases_df, pd.DataFrame) and not author_aliases_df.empty:
    # tolerate missing cols gracefully (but 7a should have normalized these already)
    if {"author", "player_entity_id"} <= set(author_aliases_df.columns):
        for _, r in author_aliases_df.iterrows():
            a = " ".join(str(r.get("author", "")).split())
            p = " ".join(str(r.get("player_entity_id", "")).split())
            if a and p:
                author_to_player[a] = p
            del a, p
        del r

# --- link headers -> players ---
rows = []
mention_id = 1

for chunk in CHUNKS_V0:
    source_type = chunk.get("source_type", "unknown")
    if source_type in EXCLUDE_SOURCE_TYPES:
        continue

    if chunk.get("header_kind") != "pbp_hash":
        continue

    lines = chunk.get("lines") or []
    if not lines:
        continue

    header_line = str(lines[0]).strip()
    if not header_line:
        continue

    m = PBP_HASH_HEADER_REGEX.match(header_line)
    if not m:
        # header_kind says pbp_hash but regex didn't match; skip (or could emit an error row later)
        continue

    author = " ".join(m.group("author").split())

    player_entity_id = author_to_player.get(author, "")
    canonical = player_canon_by_id.get(player_entity_id, "") if player_entity_id else ""

    match_kind = "author_header" if player_entity_id else "author_header_unmapped"

    rows.append(
        {
            "mention_id": mention_id,
            "player_entity_id": player_entity_id,
            "canonical": canonical,
            "matched_vocab": author,
            "match_kind": match_kind,
            "match_count_in_chunk": 1,
            "chunk_id": chunk.get("chunk_id"),
            "source_id": chunk.get("source_id"),
            "source_type": source_type,
            "path": str(chunk.get("path", "")),
            "relpath": str(chunk.get("relpath", "")),
            "chunk_start_line": chunk.get("start_line") or 1,
            "chunk_end_line": chunk.get("end_line"),
            "header_kind": chunk.get("header_kind"),
            "snippet": header_line,
        }
    )
    mention_id += 1

AUTHOR_MENTIONS_V0 = pd.DataFrame(rows)

print(f"PbP chunks scanned: {sum(1 for c in CHUNKS_V0 if c.get('source_type') not in EXCLUDE_SOURCE_TYPES and c.get('header_kind') == 'pbp_hash')}")
print(f"Author mentions (rows): {len(AUTHOR_MENTIONS_V0)}")
display(AUTHOR_MENTIONS_V0.head(5))

out_dir = Path(WORKING_DRAFTS_PATH)
out_dir.mkdir(parents=True, exist_ok=True)

csv_name = f"author_mentions_v0_{RUN_STAMP_7}.csv"
csv_path = out_dir / csv_name
AUTHOR_MENTIONS_V0.to_csv(csv_path, index=False, encoding="utf-8")

drafts_rel = globals().get("WORKING_DRAFTS_RELPATH", str(out_dir))
print(f"Wrote: {drafts_rel.rstrip('/')}/{csv_name}")

# Cleanup locals (keep AUTHOR_MENTIONS_V0)
del RUN_STAMP_7, EXCLUDE_SOURCE_TYPES, PBP_HASH_HEADER_REGEX, CASE_INSENSITIVE
del player_canon_by_id, author_to_player
del rows, mention_id, chunk, source_type, lines, header_line
del m, author, player_entity_id, canonical, match_kind
del out_dir, csv_name, csv_path, drafts_rel
del aliases_df, author_aliases_df, entities_df, pc_map_df
del re, pd, Path

PbP chunks scanned: 860
Author mentions (rows): 557


Unnamed: 0,mention_id,player_entity_id,canonical,matched_vocab,match_kind,match_count_in_chunk,chunk_id,source_id,source_type,path,relpath,chunk_start_line,chunk_end_line,header_kind,snippet
0,1,player_venric,Venric,Shworn,author_header,1,168646,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,1,5,pbp_hash,"* ### **Shworn** **7/3/25, 6:12 PM**"
1,2,player_bysickle,Bysickle,Bysickle,author_header,1,168647,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,6,10,pbp_hash,"* ### **Bysickle** **7/3/25, 6:46 PM**"
2,3,player_charis,Charis,Charis,author_header,1,168648,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,11,15,pbp_hash,"* ### **Charis** **7/3/25, 6:53 PM**"
3,4,player_crowe,Crowe,CroweTheDualityKing,author_header,1,168649,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,16,20,pbp_hash,"* ### **CroweTheDualityKing** **7/3/25, 6:54 PM**"
4,5,player_charis,Charis,Charis,author_header,1,168650,105,pbp_transcripts,/Users/charissophia/obsidian/Iron Wolf Trading...,_local/pbp_transcripts/PbP10 - The Second Camp.md,21,25,pbp_hash,"* ### **Charis** **7/3/25, 6:58 PM**"


Wrote: _local/machine_wip/author_mentions_v0_20260210_185146.csv


## Phase 8: Index materialization and persistence

In this phase, the notebook transitions from **mention detection** to **index construction**.

At the end of Phase 7, the corpus has been fully scanned against curated vocabularies, producing structured mention tables for both entities and authors. These tables precisely record *where* known concepts and speakers appear, but they are still raw observations rather than navigable indexes.

Phase 8 is responsible for:
- Transforming mention tables into stable, queryable index structures
- Aggregating mentions across chunks, files, and source types
- Normalizing outputs for long-term storage
- Writing durable index artifacts suitable for reuse outside the notebook

This phase answers the question:

**“How do we turn detected mentions into usable indexes?”**

Phase 8 does **not** introduce new interpretation, inference, or vocabulary. It operates entirely on the outputs of earlier phases, focusing on organization, aggregation, and persistence.

The outputs of this phase are intended to be:
- Stable across notebook runs
- Decoupled from in-memory state
- Suitable for downstream querying, visualization, and graph construction

In [51]:
# Phase 8: Build index artifacts (v0) - simplified
# Output (written to WORKING_DRAFTS_PATH only):
#   - INDEX_ENTITY_TO_CHUNKS_V0 (entity_id -> chunks/files)
#   - INDEX_CHUNK_TO_ENTITIES_V0 (chunk_id -> entities)
#   - INDEX_PLAYER_TO_CHUNKS_V0 (player_entity_id -> chunks/files) [if AUTHOR_MENTIONS_V0 present]
#   - SOURCE_FILES_DF (source_id/relpath/source_type)
LAST_PHASE_RUN = "8"

from pathlib import Path
import pandas as pd

# ------------------------------------------------------------------
# Validate inputs (presence only)
# ------------------------------------------------------------------
if "ENTITY_MENTIONS_V0" not in globals() or ENTITY_MENTIONS_V0 is None or ENTITY_MENTIONS_V0.empty:
    raise ValueError("ENTITY_MENTIONS_V0 is missing or empty. Run Phase 7 first.")

if "CHUNKS_V0" not in globals() or not CHUNKS_V0:
    raise ValueError("CHUNKS_V0 is missing or empty. Run Phase 5 first.")

has_author_mentions = ("AUTHOR_MENTIONS_V0" in globals()) and (AUTHOR_MENTIONS_V0 is not None) and (not AUTHOR_MENTIONS_V0.empty)

if "WORKING_DRAFTS_PATH" not in globals() or not WORKING_DRAFTS_PATH:
    raise ValueError("WORKING_DRAFTS_PATH is missing. Rerun Phase 1.")

# ------------------------------------------------------------------
# CHUNKS_DF -> SOURCE_FILES_DF (file = source file: source_id + relpath)
# ------------------------------------------------------------------
CHUNKS_DF = pd.DataFrame(CHUNKS_V0)

SOURCE_FILES_DF = (
    CHUNKS_DF[["source_id", "relpath", "source_type"]]
    .drop_duplicates()
    .reset_index(drop=True)
)

# ------------------------------------------------------------------
# Small inline helper
# ------------------------------------------------------------------
def _uniq_sorted(series):
    vals = [x for x in series.dropna().astype(str).tolist() if x != ""]
    return sorted(set(vals))

# ------------------------------------------------------------------
# 8a) Entity -> chunks/files
# ------------------------------------------------------------------
INDEX_ENTITY_TO_CHUNKS_V0 = (
    ENTITY_MENTIONS_V0.groupby(["entity_id", "canonical"], dropna=False)
      .agg(
          chunk_ids=("chunk_id", lambda s: "|".join([str(x) for x in _uniq_sorted(s)])),
          chunk_count=("chunk_id", lambda s: len(set(s.tolist()))),
          file_relpaths=("relpath", lambda s: "|".join(_uniq_sorted(s))),
          file_count=("relpath", lambda s: len(set([x for x in s.tolist() if pd.notna(x) and str(x) != ""]))),
      )
      .reset_index()
      .sort_values(["chunk_count", "file_count", "entity_id"], ascending=[False, False, True])
      .reset_index(drop=True)
)

# ------------------------------------------------------------------
# 8b) Chunk -> entities
# ------------------------------------------------------------------
INDEX_CHUNK_TO_ENTITIES_V0 = (
    ENTITY_MENTIONS_V0.groupby(
        ["chunk_id", "source_id", "source_type", "relpath", "chunk_start_line", "chunk_end_line"],
        dropna=False
    )
    .agg(
        entity_ids=("entity_id", lambda s: "|".join(_uniq_sorted(s))),
        canonicals=("canonical", lambda s: "|".join(_uniq_sorted(s))),
        entity_count=("entity_id", lambda s: len(set([x for x in s.tolist() if pd.notna(x) and str(x) != ""]))),
        matched_vocabs=("matched_vocab", lambda s: "|".join(_uniq_sorted(s))),
        match_kinds=("match_kind", lambda s: "|".join(_uniq_sorted(s))),
    )
    .reset_index()
    .sort_values(["chunk_id"], ascending=[True])
    .reset_index(drop=True)
)

# ------------------------------------------------------------------
# 8c) Player -> chunks/files (PbP authorship)
# ------------------------------------------------------------------
INDEX_PLAYER_TO_CHUNKS_V0 = pd.DataFrame()
if has_author_mentions:
    INDEX_PLAYER_TO_CHUNKS_V0 = (
        AUTHOR_MENTIONS_V0.groupby(["player_entity_id", "canonical"], dropna=False)
          .agg(
              chunk_ids=("chunk_id", lambda s: "|".join([str(x) for x in _uniq_sorted(s)])),
              chunk_count=("chunk_id", lambda s: len(set(s.tolist()))),
              file_relpaths=("relpath", lambda s: "|".join(_uniq_sorted(s))),
              file_count=("relpath", lambda s: len(set([x for x in s.tolist() if pd.notna(x) and str(x) != ""]))),
          )
          .reset_index()
          .sort_values(["chunk_count", "file_count", "player_entity_id"], ascending=[False, False, True])
          .reset_index(drop=True)
    )

# ------------------------------------------------------------------
# 8d) Write artifacts to WORKING_DRAFTS_PATH
# ------------------------------------------------------------------
out_dir = Path(WORKING_DRAFTS_PATH)
out_dir.mkdir(parents=True, exist_ok=True)
out_rel = globals().get("WORKING_DRAFTS_RELPATH", str(out_dir)).rstrip("/")

INDEX_ENTITY_TO_CHUNKS_V0.to_csv(out_dir / "index_entity_to_chunks_v0.csv", index=False, encoding="utf-8")
INDEX_CHUNK_TO_ENTITIES_V0.to_csv(out_dir / "index_chunk_to_entities_v0.csv", index=False, encoding="utf-8")
SOURCE_FILES_DF.to_csv(out_dir / "index_source_files_v0.csv", index=False, encoding="utf-8")

if has_author_mentions:
    INDEX_PLAYER_TO_CHUNKS_V0.to_csv(out_dir / "index_player_to_chunks_v0.csv", index=False, encoding="utf-8")

print("Phase 8 wrote index artifacts:")
print(f" - {out_rel}/index_entity_to_chunks_v0.csv")
print(f" - {out_rel}/index_chunk_to_entities_v0.csv")
print(f" - {out_rel}/index_source_files_v0.csv")
if has_author_mentions:
    print(f" - {out_rel}/index_player_to_chunks_v0.csv")

display(INDEX_ENTITY_TO_CHUNKS_V0.head(5))
display(INDEX_CHUNK_TO_ENTITIES_V0.head(5))
if has_author_mentions:
    display(INDEX_PLAYER_TO_CHUNKS_V0.head(5))

print("\nPhase 8 complete: index artifacts generated in working_drafts.\n")

print("Next steps:")
print("1) Review the generated CSV files in:")
print(f"   {WORKING_DRAFTS_RELPATH}")

print("\n   Files created:")
print("   - index_entity_to_chunks_v0.csv")
print("   - index_chunk_to_entities_v0.csv")
if has_author_mentions:
    print("   - index_player_to_chunks_v0.csv")

print("\n2) If satisfied, move (or copy) these files into your canonical indexes directory, e.g.:")
print("   _meta/indexes/")

print("\n3) Update your world_repository.yml to declare the indexes block, for example:")
print("""
indexes:
  # Canonical machine-generated index artifacts.
  # These are reproducible but intended to be version-controlled.
  # Relative paths are resolved under world_root unless absolute.
  path: _meta/indexes
""")

print("\n4) Commit the index files and updated descriptor to version control.")

print("\nThis notebook does NOT modify canonical data. All artifacts were written to working_drafts.")
print("After committing indexes, you may start a new notebook that consumes them.")

# Cleanup locals (keep index DataFrames)
del out_dir, out_rel, has_author_mentions
del CHUNKS_DF, SOURCE_FILES_DF
del Path, pd

Phase 8 wrote index artifacts:
 - _local/machine_wip/index_entity_to_chunks_v0.csv
 - _local/machine_wip/index_chunk_to_entities_v0.csv
 - _local/machine_wip/index_source_files_v0.csv
 - _local/machine_wip/index_player_to_chunks_v0.csv


Unnamed: 0,entity_id,canonical,chunk_ids,chunk_count,file_relpaths,file_count
0,person_victor,Victor D Evernight,168646|168650|168653|168654|168657|168658|1686...,391,_local/pbp_transcripts/PbP10 - The Second Camp...,18
1,person_henry,Henry Sleepsong,168647|168673|168675|168682|168684|168687|1686...,327,_local/pbp_transcripts/PbP10 - The Second Camp...,20
2,person_alivyre,Alivyre Dawntracker,168648|168654|168665|168687|168688|168689|1686...,264,_local/pbp_transcripts/PbP10 - The Second Camp...,20
3,person_faeryne,Faeryne,168660|168661|168662|168665|168677|168683|1686...,255,_local/pbp_transcripts/PbP10 - The Second Camp...,19
4,org_party,Party,168706|168710|168742|168748|168768|168771|1687...,167,_local/pbp_transcripts/PbP10 - The Second Camp...,21


Unnamed: 0,chunk_id,source_id,source_type,relpath,chunk_start_line,chunk_end_line,entity_ids,canonicals,entity_count,matched_vocabs,match_kinds
0,168646,105,pbp_transcripts,_local/pbp_transcripts/PbP10 - The Second Camp.md,1,5,faction_spencer|person_piers|person_shworn|per...,Piers Ashton|Shworn Sleepsong|Spencer|Victor D...,4,Piers|Shworn|Spencer|Victor,alias|canonical
1,168647,105,pbp_transcripts,_local/pbp_transcripts/PbP10 - The Second Camp.md,6,10,person_henry|person_shworn,Henry Sleepsong|Shworn Sleepsong,2,Henry|Shworn,alias
2,168648,105,pbp_transcripts,_local/pbp_transcripts/PbP10 - The Second Camp.md,11,15,person_alivyre,Alivyre Dawntracker,1,Alivyre,alias
3,168649,105,pbp_transcripts,_local/pbp_transcripts/PbP10 - The Second Camp.md,16,20,person_piers,Piers Ashton,1,Piers,alias
4,168650,105,pbp_transcripts,_local/pbp_transcripts/PbP10 - The Second Camp.md,21,25,person_piers|person_victor,Piers Ashton|Victor D Evernight,2,Piers|Victor,alias


Unnamed: 0,player_entity_id,canonical,chunk_ids,chunk_count,file_relpaths,file_count
0,player_charis,Charis,168648|168650|168654|168655|168656|168658|1686...,229,_local/pbp_transcripts/PbP10 - The Second Camp...,6
1,player_crowe,Crowe,168649|168652|168657|168663|168680|168697|1686...,193,_local/pbp_transcripts/PbP10 - The Second Camp...,6
2,player_bysickle,Bysickle,168647|168675|168681|168683|168689|168695|1687...,57,_local/pbp_transcripts/PbP10 - The Second Camp...,6
3,player_kalina,Kalina,168653|168659|168661|168722|168724|168726|1687...,42,_local/pbp_transcripts/PbP10 - The Second Camp...,6
4,player_venric,Venric,168646|168651|168666|168668|168670|168672|1686...,28,_local/pbp_transcripts/PbP10 - The Second Camp...,3



Phase 8 complete: index artifacts generated in working_drafts.

Next steps:
1) Review the generated CSV files in:
   _local/machine_wip

   Files created:
   - index_entity_to_chunks_v0.csv
   - index_chunk_to_entities_v0.csv
   - index_player_to_chunks_v0.csv

2) If satisfied, move (or copy) these files into your canonical indexes directory, e.g.:
   _meta/indexes/

3) Update your world_repository.yml to declare the indexes block, for example:

indexes:
  # Canonical machine-generated index artifacts.
  # These are reproducible but intended to be version-controlled.
  # Relative paths are resolved under world_root unless absolute.
  path: _meta/indexes


4) Commit the index files and updated descriptor to version control.

This notebook does NOT modify canonical data. All artifacts were written to working_drafts.
After committing indexes, you may start a new notebook that consumes them.
