In [None]:
from datetime import datetime
from operator import itemgetter
from pathlib import Path

import spacy
from rich.console import Console

from spacy_model_manager.lib import (
    SPACY_MODELS,
    get_installed_model_version,
    install_spacy_model,
)

import blacktape
from blacktape.db import db_init, db_session
from blacktape.lib import chunks, ENTITY, PATTERN
from blacktape.models import EntityMatch, PatternMatch
from blacktape.pipeline import Pipeline

In [None]:
console = Console()

In [None]:
blacktape.__version__

### Test Data

In [None]:
test_file = Path('../tests/data/Crime_and_Punishment.txt')

### Chunking Options

In [None]:
# Pick a spaCy language model to detect sentences
model, version = SPACY_MODELS.en_core_web_sm, "3.2.0"

In [None]:
# Our model must be installed in the current environment before we can use it
if get_installed_model_version(model) != version:
    install_spacy_model(model, version)

In [None]:
# Load model
nlp = spacy.load(model, disable=["parser"])
nlp.enable_pipe("senter")

In [None]:
chunking_options = {
    'nlp': nlp,
    'max_chunk_size': 10_000,  # Value in characters (not bytes) to stay below spaCy's max doc size of 1_000_000 characters by default
}

text_file_open_options = {
    'encoding': "UTF-8",
    'errors': "ignore",
    'newline': '',  # To preserve line endings
}

### Target Entities

In [None]:
# Entity types we're interested in
target_entities = {'PERSON', 'ORG'}

### Feed chunks into a pipeline

In [None]:
%%time

# Pipeline model doesn't necessarily have to be the model used for chunking
with Pipeline(spacy_model=model) as pipeline:

    matches = []

    with console.status("[bold green]Chunking file and submitting jobs...") as status:

        processed = 0  # Cumulative string length of previous chunks
        file_path = str(test_file.resolve())

        for chunk in chunks(test_file, **chunking_options, **text_file_open_options):

            # Submit NER extraction job
            pipeline.submit_ner_job(chunk, target_entities, base_offset=processed, file=file_path)

            processed += len(chunk)

    # Process match results as they become available
    for result in pipeline.results():

        for match in result:
            # Resolve offset in document
            match["offset"] = match["offset"] + match.pop("base_offset")

            matches.append(match)

### Writing match results to a DB

In [None]:
SQLITE3_FILENAME_TEMPLATE = "{}_{}.sqlite3"

# Directory of sqlite3 files
out_dir = Path.cwd() / "results"
out_dir.mkdir(exist_ok=True)

# DB file timestamped for this pipeline run
db_file = out_dir / SQLITE3_FILENAME_TEMPLATE.format(
    test_file.name,
    datetime.now()
    .isoformat(timespec="seconds")
    .translate(str.maketrans({"-": "", ":": ""})),
)

# DB setup
Session = db_init(db_file)

# Get messages and extract entities
with db_session(Session) as session:

    match_objects = []

    for match in matches:

        match = dict(**match)  # copy so that we can run this cell again on its own
        match_type = match.pop("type")

        if match_type == ENTITY:
            match_objects.append(EntityMatch(**match))

        elif match_type == PATTERN:
            match_objects.append(PatternMatch(**match))

    session.add_all(match_objects)

### Validate match results

In [None]:
# Load the entire text in memory
with test_file.open(mode="r", **text_file_open_options) as f:
    full_text = f.read()

for match in matches:
    # View match results from the pipeline
    for key, value in match.items():
        print(f"{key}:\t\t{value}")

    # Check what's in the original text at that offset
    start, end = match["offset"], match["offset"] + len(match["text"])
    expected = full_text[start:end]

    print(f"expected:\t{expected}")

    try:
        assert match["text"] == expected
        print("✅")
    except:
        raise

    print("=======")