# Experimental Study Notebook

to-do: use RDFox instead of rdflib to account for implications (and, in particular, owl:sameAs)


In [None]:
from __future__ import annotations

import pathlib
import sys

NOTEBOOK_DIR = pathlib.Path.cwd()
if '_NB_SYS_PATH_ADJUSTED' not in globals():
    sys.path.insert(0, str(NOTEBOOK_DIR))
    _NB_SYS_PATH_ADJUSTED = True

import pandas as pd
import re
import rdflib

from modules import file_utils
from modules import potential
from modules import rdf_utils
from modules import queries
from modules.surprise_score import SelectionCriteria, SelectionMode
from modules.query_runner import QueryRunner

print(sys.version)


In [None]:

def print_results_stats(results_df: pd.DataFrame) -> None:
    """Print how many rows have empty surprising concept sets."""
    def _is_empty(value):
        if isinstance(value, (list, tuple, set)):
            return len(value) == 0
        if isinstance(value, str):
            return value.strip() == ""
        if value is None:
            return True
        return bool(pd.isna(value))

    empty_surprising = results_df['surprising_concepts'].apply(_is_empty).sum()
    print(f"Empty surprising_concepts rows: {empty_surprising} out of {len(results_df)}")


def _strip_prefix(items):
    """Return sorted concept identifiers without the core namespace prefix."""
    return [concept.replace("https://www.foom.com/core#", "") for concept in sorted(items)]


def _extract_proposition_number(label: str) -> int | None:
    """Extract the first integer found in a proposition label, if any."""
    if not isinstance(label, str):
        return None
    match = re.search(r"\d+", label)
    if match is None:
        return None
    try:
        return int(match.group(0))
    except ValueError:
        return None


def _ensure_runner(graph: rdflib.Graph | None, runner: QueryRunner | None) -> QueryRunner:
    """Return a QueryRunner, creating one from the supplied graph when missing."""
    if graph is None:
        raise ValueError("A graph instance is required to run analyse().")
    return runner or QueryRunner(graph)


def _fetch_proposition_types(runner: QueryRunner) -> dict[int, str]:
    """Build a map from proposition numbers to their type labels via SPARQL."""
    proposition_types_df = runner.fetch(queries.find_proposition_types())
    proposition_types_map: dict[int, str] = {}
    if proposition_types_df.empty:
        return proposition_types_map
    for _, row in proposition_types_df.iterrows():
        label = row.get("proposition_pref_label")
        type_label = row.get("proposition_type_pref_label")
        number = _extract_proposition_number(label)
        if number is not None and isinstance(type_label, str):
            proposition_types_map[number] = type_label
    return proposition_types_map


def _compute_proposition_row(
    graph: rdflib.Graph,
    proposition: int,
    *,
    history_weights: tuple[float, float, float, float],
    history_selection: SelectionCriteria,
    cooccurrence_selection: SelectionCriteria,
    runner: QueryRunner,
    type_selection: bool,
    proposition_type: str,
    verbose: bool = False,
):
    """Compute analysis artefacts for one proposition and return row data plus CSV payload."""
    if verbose:
        print(f"Analysing proposition {proposition}")

    background_concepts, surprising = potential.main(
        graph,
        proposition,
        history_weights=history_weights,
        history_selection=history_selection,
        cooccurrence_selection=cooccurrence_selection,
        runner=runner,
        type_selection=type_selection,
    )

    background_list = _strip_prefix(concept for concept in background_concepts if concept)
    surprising_list = _strip_prefix(concept for concept in surprising if concept)
    ratio_surprising = (
        len(surprising_list) / len(background_list)
        if len(background_list) > 0
        else 0.0
    )

    row_dict = {
        "proposition": proposition,
        "background_concepts": background_list,
        "surprising_concepts": surprising_list,
        "proposition_type": proposition_type,
        "number_of_background_concepts": len(background_list),
        "number_of_surprising_concepts": len(surprising_list),
        "ratio_surprising_over_background": ratio_surprising,
    }

    output_row = (
        str(proposition),
        background_list,
        surprising_list,
        proposition_type,
        len(background_list),
        len(surprising_list),
        ratio_surprising,
    )
    return row_dict, output_row


def _format_value(value: float) -> str:
    """Format numeric values for filenames, trimming redundant zeroes."""
    if isinstance(value, float):
        return f"{value:.4f}".rstrip("0").rstrip(".")
    return str(value)


def _build_run_description(
    history_selection: SelectionCriteria,
    cooccurrence_selection: SelectionCriteria,
    history_weights: tuple[float, float, float, float],
    type_selection: bool,
) -> str:
    """Compose the descriptor token that names analysis outputs."""
    parts = [
        f"history-{history_selection.mode.value}-{_format_value(history_selection.value)}",
        f"coocc-{cooccurrence_selection.mode.value}-{_format_value(cooccurrence_selection.value)}",
        "weights-" + "-".join(_format_value(weight) for weight in history_weights),
        "type" if type_selection else "no-type",
    ]
    return "__".join(parts)


def _write_outputs(output_rows, description: str) -> None:
    """Persist the collected rows to a timestamped CSV under the output directory."""
    output_dir = NOTEBOOK_DIR / "output"
    output_dir.mkdir(parents=True, exist_ok=True)
    output_path = output_dir / f"analyses_{description}"
    output_df(
        output_rows,
        filename=str(output_path),
        columns=[
            "proposition",
            "background_concepts",
            "surprising_concepts",
            "proposition_type",
            "number_of_background_concepts",
            "number_of_surprising_concepts",
            "ratio_surprising_over_background",
        ],
    )


def analyse(
    upper_proposition_number: int,
    history_weights: tuple[float, float, float, float],
    history_selection: SelectionCriteria,
    cooccurrence_selection: SelectionCriteria,
    verbose: bool = False,
    graph: rdflib.Graph | None = None,
    runner: QueryRunner | None = None,
    type_selection: bool = False
) -> pd.DataFrame:
    """Run the surprise analysis up to the requested proposition and return a dataframe."""
    # Ensure we have a runner bound to the supplied graph
    runner = _ensure_runner(graph, runner)
    # Pre-load proposition type labels for contextual reporting
    proposition_types = _fetch_proposition_types(runner)

    # Collect notebook-friendly dictionaries for the final dataframe
    analyses = []
    # Hold raw tuples destined for the CSV export
    output_rows = []
    # Process each proposition sequentially
    for proposition in range(1, upper_proposition_number + 1):
        # Compute activation/surprise data for this proposition
        row_dict, output_row = _compute_proposition_row(
            graph,
            proposition,
            history_weights=history_weights,
            history_selection=history_selection,
            cooccurrence_selection=cooccurrence_selection,
            runner=runner,
            type_selection=type_selection,
            proposition_type=proposition_types.get(proposition, ""),
            verbose=verbose,
        )
        # Keep structured data for dataframe assembly
        analyses.append(row_dict)
        # Keep tuple payload for CSV persistence
        output_rows.append(output_row)

    # Describe this run so outputs get unique names
    description = _build_run_description(
        history_selection,
        cooccurrence_selection,
        history_weights,
        type_selection,
    )
    # Save the CSV snapshot under the run descriptor
    _write_outputs(output_rows, description)

    # Convert accumulated rows into a dataframe
    results_df = pd.DataFrame(analyses)
    # Report quick stats to the console
    print_results_stats(results_df)
    # Return results for further notebook analysis
    return results_df


In [None]:
INPUT_TTL = file_utils.latest_file(folder=NOTEBOOK_DIR / "ontologies", filename_fragment="ontology_", extension="ttl")
graph = load_graph(INPUT_TTL)
runner = QueryRunner(graph)

In [None]:
DEFAULT_HISTORY_SELECTION = SelectionCriteria(SelectionMode.TOP_FRACTION, 1 / 1)
DEFAULT_COOCCURRENCE_SELECTION = SelectionCriteria(SelectionMode.TOP_FRACTION, 1 / 1)
DEFAULT_HISTORY_WEIGHTS = (6 / 9, 1 / 9, 1 / 9, 1 / 9)

results_df = analyse(
    upper_proposition_number=48,
    history_selection=DEFAULT_HISTORY_SELECTION,
    cooccurrence_selection=DEFAULT_COOCCURRENCE_SELECTION,
    history_weights=DEFAULT_HISTORY_WEIGHTS,
    verbose=True,
    graph=graph,
    runner=runner,
)
results_df

In [None]:
DEFAULT_HISTORY_SELECTION = SelectionCriteria(SelectionMode.TOP_FRACTION, 1 / 1)
DEFAULT_COOCCURRENCE_SELECTION = SelectionCriteria(SelectionMode.TOP_FRACTION, 1 / 1)
DEFAULT_HISTORY_WEIGHTS = (6 / 9, 1 / 9, 2 / 9)

results_df = analyse(
    upper_proposition_number=48,
    history_selection=DEFAULT_HISTORY_SELECTION,
    cooccurrence_selection=DEFAULT_COOCCURRENCE_SELECTION,
    history_weights=DEFAULT_HISTORY_WEIGHTS,
    verbose=True,
    graph=graph,
    runner=runner,
    type_selection=True
)
results_df