# Load data

We restrict the data by entity type for now

In [2]:
import logging

import dedupe

dedupe_logger = logging.getLogger(dedupe.__name__)
dedupe_logger.setLevel(logging.DEBUG)
dedupe_logger.handlers = []
dedupe_logger.addHandler(logging.StreamHandler())

In [3]:
import neo4j

neo4j_driver = neo4j.AsyncGraphDatabase.driver("neo4j://localhost:7687")

In [4]:
DATASHARE_BASE_URL = "http://localhost:8080"
DATASHARE_PROJECT_URL = f"{DATASHARE_BASE_URL}/#/d/local-datashare"

In [5]:
from typing import AsyncGenerator, Optional


async def retrieve_ne(
        session: neo4j.AsyncSession,
        ne_category: str,
        *,
        limit: Optional[int] = None,
) -> AsyncGenerator[neo4j.Record, None]:
    query = f"""MATCH (ne:NamedEntity:{ne_category})-[rel]->(doc:Document)
OPTIONAL MATCH (doc)-[HAS_PARENT]->(rootDoc:Document)
RETURN ne, doc, rootDoc
"""
    if limit:
        query = f"""{query}
LIMIT {limit}
"""
    res = await session.run(query)
    async for rec in res:
        yield rec

In [6]:
import itertools
import string
from neo4j_app.core.utils.pydantic import to_lower_camel
from neo4j_app.constants import (
    DOC_CONTENT_TYPE,
    DOC_DIRNAME,
    DOC_ID,
    DOC_PATH,
    DOC_ROOT_ID,
    NE_MENTION_NORM,
)
from typing import Dict, Optional

NE_DOC_ID = to_lower_camel(f"doc_{DOC_ID}")
NE_DOC_DIR_NAME = to_lower_camel(f"doc_{DOC_DIRNAME}")
NE_DOC_FILENAME = "docFilename"
NE_DOC_CONTENT_TYPE = to_lower_camel(f"doc_{DOC_CONTENT_TYPE}")
NE_DOC_ROOT_ID = to_lower_camel(f"doc_{DOC_ROOT_ID}")
NE_DEBUG_DOC_URL = "debugDocUrl"
NE_DEBUG_FILENAME = "debugFilename"

NE_FIELDNAMES = [
    NE_MENTION_NORM,
    NE_DOC_ID,
    NE_DOC_DIR_NAME,
    NE_DOC_FILENAME,
    NE_DOC_CONTENT_TYPE,
    NE_DOC_ROOT_ID,
    NE_DEBUG_DOC_URL,
    NE_DEBUG_FILENAME,
]

In [7]:
_TRANSLATE_PUNCT = str.maketrans(dict(zip(string.punctuation, itertools.repeat(" "))))


# TODO: refine this violent preprocessing...


def _replace_double_white_spaces(s: str) -> str:
    while "  " in s:
        s = s.replace("  ", " ")
    return s


def preprocess_filename(filename: str) -> Optional[str]:
    filename = filename.translate(_TRANSLATE_PUNCT)
    filename = _replace_double_white_spaces(filename)
    if not filename:
        filename = None
    filename = filename.lower().strip()
    return filename


def preprocess_dirname(dirname: str) -> str:
    dirname = " ".join(item for item in dirname.split("/") if item)
    return dirname


def doc_url(project_url: str, *, doc_id: str, root_id: Optional[str]) -> str:
    return f"{project_url}/{root_id if root_id is not None else doc_id}/{doc_id}"


def neo4j_to_record(record: neo4j.Record, project_url: str) -> Dict:
    rec = dict()
    ne = record["ne"]
    rec[NE_MENTION_NORM] = ne[NE_MENTION_NORM]
    doc = record["doc"]
    rec[NE_DOC_ID] = doc[DOC_ID]
    rec[NE_DOC_DIR_NAME] = preprocess_dirname(doc[DOC_DIRNAME])
    raw_filename = doc[DOC_PATH].split("/")[-1]
    rec[NE_DOC_FILENAME] = preprocess_filename(raw_filename)
    rec[NE_DOC_CONTENT_TYPE] = doc[DOC_CONTENT_TYPE]
    root_doc = record["rootDoc"]
    root_id = None
    if root_doc is not None:
        root_id = root_doc[DOC_ID]
    rec[NE_DOC_ROOT_ID] = root_id
    # Debug
    rec[NE_DEBUG_DOC_URL] = doc_url(
        project_url, doc_id=rec[NE_DOC_ID], root_id=rec[NE_DOC_ROOT_ID]
    )
    rec[NE_DEBUG_FILENAME] = raw_filename
    return rec

In [8]:
import csv
from typing import AsyncIterable, Iterable, List, TextIO


async def write_dataset(
        records: AsyncIterable[Dict], fieldnames: List[str], dataset_f: TextIO
):
    writer = csv.DictWriter(dataset_f, fieldnames)
    writer.writeheader()
    async for rec in records:
        writer.writerow(rec)

In [9]:
from neo4j_app import ROOT_DIR

DATA_PATH = ROOT_DIR.joinpath("data")
records_path = DATA_PATH / "person_records.csv"
training_set_path = DATA_PATH / "person_training.csv"
trained_model_path = DATA_PATH / "person_model.pickle"
clusters_path = DATA_PATH / "person_clusters.csv"
excluded_set_path = DATA_PATH / "excluded.txt"

In [10]:
# TODO: remove the limit of training data
NUM_SAMPLES = None

async with neo4j_driver.session() as sess:
    if not records_path.exists():
        with records_path.open("w") as df:
            recs = (
                neo4j_to_record(rec, project_url=DATASHARE_PROJECT_URL)
                async for rec in retrieve_ne(
                sess, ne_category="PERSON", limit=NUM_SAMPLES
            )
            )
            await write_dataset(recs, fieldnames=NE_FIELDNAMES, dataset_f=df)

In [10]:
def person_fields(
        *,
        all_mentions: Iterable[str],
        all_dirnames: Iterable[str],
        all_categories: Iterable[str],
        inside_docs: bool,
) -> List[Dict]:
    # TODO: add the number of documents of the same type in the doc as feature ?
    fields = [
        # Use both string and text type for mention norm to capture char-level (String)
        # and words (Text) similarities and differences
        {"field": NE_MENTION_NORM, "type": "String"},
        {"field": NE_MENTION_NORM, "type": "Text", "corpus": all_mentions},
        #
        {"field": NE_MENTION_NORM, "type": "Name"},
    ]
    if not inside_docs:
        cross_doc_fields = [
            # Exact match on IDs
            {"field": NE_DOC_ID, "type": "Exact"},
            {"field": NE_DOC_ROOT_ID, "type": "Exact", "has missing": True},
            # Finite set of values for categories
            {
                "field": NE_DOC_CONTENT_TYPE,
                "type": "Categorical",
                "categories": all_categories,
            },
            # We hope that some file names will have word level similarities
            {"field": NE_DOC_FILENAME, "type": "Exact"},
            # We hope that some file names will have word level similarities
            {"field": NE_DOC_DIR_NAME, "type": "Text", "corpus": all_dirnames},
        ]
        fields.extend(cross_doc_fields)
    return fields

In [11]:
from typing import Set

In [12]:
def get_filenames(records: Iterable[Dict]) -> Set[str]:
    filenames = set(rec[NE_DOC_FILENAME] for rec in records)
    return filenames

In [13]:
def get_mentions(records: Iterable[Dict]) -> Set[str]:
    filenames = set(rec[NE_MENTION_NORM] for rec in records)
    return filenames

In [14]:
def get_dirnames(records: Iterable[Dict]) -> Set[str]:
    dirname = set(rec[NE_DOC_DIR_NAME] for rec in records)
    return dirname

In [15]:
def get_categories(records: Iterable[Dict]) -> Set[str]:
    dirname = set(rec[NE_DOC_CONTENT_TYPE] for rec in records)
    return dirname

# Training

In [16]:
from neo4j_app.ml.utils import filtering_console_label
from neo4j_app.ml.graph_dedupe import DocumentGraphDedupe, read_records
from contextlib import contextmanager
from pathlib import Path


@contextmanager
def yield_none():
    yield None


def run_training(
        data_path: Path,
        *,
        excluded_path: Path,
        model_path: Path,
        training_path: Path,
        sample_size: int,
        doc_id_column: str,
        id_column: str,
        recall: float,
) -> dedupe.Dedupe:
    with excluded_path.open() as f:
        invalid_ids = (line.strip() for line in f)
        invalid_ids = set(i for i in invalid_ids if i)
    with data_path.open() as f:
        records = read_records(f, id_column=id_column, invalid_ids=invalid_ids)

    field = person_fields(
        all_mentions=get_mentions(records.values()),
        all_dirnames=get_dirnames(records.values()),
        all_categories=get_categories(records.values()),
        inside_docs=True,
    )
    # TODO: we should modify dedupe to be able to pass arguments to the
    #  classifier, here max_iter is too low to allow for convergence
    deduper = DocumentGraphDedupe(doc_key=doc_id_column, variable_definition=field)

    training_file_cm = yield_none
    if training_path.exists():
        training_file_cm = training_path.open
    with training_file_cm() as training_file:
        deduper.prepare_training(records, training_file, sample_size)

    invalid = filtering_console_label(deduper, id_column=id_column)
    invalid_ids.update((rec[id_column] for rec in invalid))
    excluded_path.write_text("\n".join(invalid_ids))

    with training_path.open("w") as f:
        deduper.write_training(f)

    deduper.train(recall=recall)
    with model_path.open("wb") as f:
        deduper.write_settings(f)

    return deduper

In [None]:
# TODO: increase
training_sample_size = 50000
target_recall = 0.8
# TODO: increase
model = run_training(
    records_path,
    excluded_path=excluded_set_path,
    model_path=trained_model_path,
    training_path=training_set_path,
    sample_size=training_sample_size,
    doc_id_column=NE_DOC_ID,
    id_column=NE_MENTION_NORM,
    recall=target_recall,
)