Skip to content

Commit

Permalink
Merge pull request #21 from AstraZeneca/dg_html_report_improvements
Browse files Browse the repository at this point in the history
HTML report improvements
  • Loading branch information
dgeleta committed Jun 8, 2022
2 parents d34dcad + fbf9a7b commit 714dd82
Show file tree
Hide file tree
Showing 156 changed files with 18,064 additions and 1,320 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:

jobs:
build:
runs-on: ubuntu-latest
runs-on: macos-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
Expand All @@ -31,8 +31,8 @@ jobs:
pip install pytest
pip install tox
pip install nbconvert
pip install markupsafe==2.0.1
python setup.py install
pip install markupsafe==2.0.1
pip install -r requirements.txt
- name: Run code quality tests
run: |
tox -e flake8
Expand Down
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ If you find *OntoMerger* useful in your work or research, please consider adding
```bibtex
@article{ontomerger,
arxivId = {???},
author = {Geleta, David and Rozemberczki, Benedek and Nikolov, Andriy and O'Donoghue, Mark and Gogleva, Anna and Tamma, Valentina},
month = {may},
title = {{OntoMerger: An Ontology Alignment Library for
Creating Minimal and Connected Domain Knowledge
Sub-graphs.}},
url = {http://arxiv.org/abs/???},
author = {Geleta, David and Nikolov, Andriy and O'Donoghue, Mark and Rozemberczki, Benedek
and Gogleva, Anna and Tamma, Valentina and Payne, Terry R.},
month = {june},
title = {{OntoMerger: An Ontology Integration Library for Deduplicating and Connecting Knowledge Graph Nodes.}},
url = {https://arxiv.org/abs/2206.02238},
year = {2022}
}
```
Expand Down
14 changes: 7 additions & 7 deletions data/data_prepper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pandas as pd
from pandas import DataFrame

from onto_merger.analyser import analysis_util
from onto_merger.analyser import analysis_utils
from onto_merger.data.constants import (
COLUMN_DEFAULT_ID,
COLUMN_PROVENANCE,
Expand All @@ -33,7 +33,7 @@

def prune_nodes_by_namespace(nodes_raw: DataFrame,
node_namespaces_to_remove: List[str]) -> DataFrame:
default_id_ns_column_name = analysis_util.get_namespace_column_name_for_column(COLUMN_DEFAULT_ID)
default_id_ns_column_name = analysis_utils.get_namespace_column_name_for_column(COLUMN_DEFAULT_ID)
nodes_updated = nodes_raw.query(
expr=f"{default_id_ns_column_name} != @node_nss",
local_dict={"node_nss": node_namespaces_to_remove},
Expand All @@ -49,8 +49,8 @@ def prune_nodes_by_namespace(nodes_raw: DataFrame,

def prune_edges_by_namespace(edges_raw: DataFrame,
node_namespaces_to_remove: List[str]) -> DataFrame:
src_ns_column_name = analysis_util.get_namespace_column_name_for_column(COLUMN_SOURCE_ID)
trg_ns_column_name = analysis_util.get_namespace_column_name_for_column(COLUMN_TARGET_ID)
src_ns_column_name = analysis_utils.get_namespace_column_name_for_column(COLUMN_SOURCE_ID)
trg_ns_column_name = analysis_utils.get_namespace_column_name_for_column(COLUMN_TARGET_ID)
edges_pruned = edges_raw.query(
expr=f"{src_ns_column_name} != @node_nss & {trg_ns_column_name} != @node_nss",
local_dict={"node_nss": node_namespaces_to_remove},
Expand Down Expand Up @@ -86,7 +86,7 @@ def produce_example_data_set(raw_input_path: str = "bikg_2022-02-28-4.27.0_disea

# (1) raw input load data
data_manager_raw_input = DataManager(project_folder_path=raw_input_path, clear_output_directory=False)
raw_input_data: List[NamedTable] = analysis_util.add_namespace_column_to_loaded_tables(
raw_input_data: List[NamedTable] = analysis_utils.add_namespace_column_to_loaded_tables(
data_manager_raw_input.load_input_tables()
)
data_repo: DataRepository = DataRepository()
Expand Down Expand Up @@ -153,7 +153,7 @@ def produce_test_data_set(raw_path: str = "bikg_2022-02-28-4.27.0_disease",
example_data_set_path: str = "bikg_disease") -> None:
# load example
data_manager_example_input = DataManager(project_folder_path=example_data_set_path, clear_output_directory=False)
raw_input_data: List[NamedTable] = analysis_util.add_namespace_column_to_loaded_tables(
raw_input_data: List[NamedTable] = analysis_utils.add_namespace_column_to_loaded_tables(
data_manager_example_input.load_input_tables()
)
data_repo: DataRepository = DataRepository()
Expand Down Expand Up @@ -181,7 +181,7 @@ def produce_test_data_set(raw_path: str = "bikg_2022-02-28-4.27.0_disease",
local_dict={"node_ids": nodes_to_keep},
inplace=False
)
default_id_ns_column_name = analysis_util.get_namespace_column_name_for_column(COLUMN_DEFAULT_ID)
default_id_ns_column_name = analysis_utils.get_namespace_column_name_for_column(COLUMN_DEFAULT_ID)
namespaces = list(set(nodes_some_to_keep[default_id_ns_column_name].tolist()))
print(f"nodes_updated_to_keep = {len(nodes_updated_to_keep):,d}")
print(f"nodes_some_to_keep = {len(nodes_some_to_keep):,d} | namespaces = {namespaces}")
Expand Down
3 changes: 1 addition & 2 deletions dev_doc_gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ rm -rf build
cd docs
make html
cd ..
open -a "Google Chrome" build/html/index.html
#open -a "Google Chrome" build/html/notes/data_testing.html
open -a "Google Chrome" build/html/index.html
143 changes: 86 additions & 57 deletions onto_merger/alignment/alignment_manager.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
"""Alignment process runner and helper methods."""

import dataclasses
from typing import List, Tuple

import pandas as pd
from pandas import DataFrame

from onto_merger.alignment import mapping_utils, merge_utils
from onto_merger.analyser import analysis_util
from onto_merger.analyser import analysis_utils
from onto_merger.data.constants import (
COLUMN_MAPPING_TYPE_GROUP,
COLUMN_NAMESPACE,
COLUMN_PROVENANCE,
COLUMN_RELATION,
COLUMN_SOURCE_ID_ALIGNED_TO,
COLUMN_STEP_COUNTER,
MAPPING_TYPE_GROUP_EQV,
MAPPING_TYPE_GROUP_XREF,
ONTO_MERGER,
RELATION_MERGE,
SCHEMA_ALIGNMENT_STEPS_TABLE,
SCHEMA_HIERARCHY_EDGE_TABLE,
TABLE_ALIGNMENT_STEPS_REPORT,
SCHEMA_MERGE_TABLE_WITH_META_DATA,
TABLE_MAPPINGS,
TABLE_MAPPINGS_FOR_INPUT_NODES,
TABLE_MAPPINGS_OBSOLETE_TO_CURRENT,
TABLE_MAPPINGS_UPDATED,
TABLE_MERGES_WITH_META_DATA,
Expand All @@ -32,6 +34,7 @@
AlignmentStep,
DataRepository,
NamedTable,
convert_alignment_steps_to_named_table,
)
from onto_merger.logger.log import get_logger

Expand All @@ -42,10 +45,10 @@ class AlignmentManager:
"""Alignment process pipeline."""

def __init__(
self,
alignment_config: AlignmentConfig,
data_repo: DataRepository,
data_manager: DataManager,
self,
alignment_config: AlignmentConfig,
data_repo: DataRepository,
data_manager: DataManager,
):
"""Initialise the AlignmentManager class.
Expand Down Expand Up @@ -74,7 +77,7 @@ def align_nodes(self) -> Tuple[DataRepository, List[str]]:
"""
# prepare for alignment
self._preprocess_mappings()
source_alignment_order = produce_source_alignment_priority_order(
source_alignment_order = _produce_source_alignment_priority_order(
seed_ontology_name=self._alignment_config.base_config.seed_ontology_name,
nodes=self._data_repo_input.get(TABLE_NODES).dataframe,
)
Expand All @@ -85,15 +88,27 @@ def align_nodes(self) -> Tuple[DataRepository, List[str]]:
sources_to_align=source_alignment_order,
mapping_type_group_name=MAPPING_TYPE_GROUP_EQV,
mapping_types=self._alignment_config.mapping_type_groups.equivalence,
start_step=0,
)

# (2) use the weaker relations: database reference
self._align_sources(
sources_to_align=source_alignment_order,
mapping_type_group_name=MAPPING_TYPE_GROUP_XREF,
mapping_types=self._alignment_config.mapping_type_groups.database_reference,
start_step=len(source_alignment_order)
)

# remove self merges
all_merges = self._data_repo_output.get(TABLE_MERGES_WITH_META_DATA).dataframe
filtered_merges = pd.concat([
all_merges,
self._data_repo_output.get("SEED_MERGES").dataframe]).drop_duplicates(keep=False)
self._data_repo_output.update(
table=NamedTable(TABLE_MERGES_WITH_META_DATA, filtered_merges)
)
logger.info(f"Filtered out seed self merges ({len(all_merges):,d} -> {len(filtered_merges):,d})")

# save meta data
self._data_repo_output.update(
table=convert_alignment_steps_to_named_table(alignment_steps=self._alignment_steps)
Expand All @@ -102,10 +117,11 @@ def align_nodes(self) -> Tuple[DataRepository, List[str]]:
return self._data_repo_output, source_alignment_order

def _align_sources(
self,
sources_to_align: List[str],
mapping_type_group_name: str,
mapping_types: List[str],
self,
sources_to_align: List[str],
mapping_type_group_name: str,
mapping_types: List[str],
start_step: int,
) -> None:
"""Run the alignment for each source according to the priority order, for a given mapping type group.
Expand All @@ -118,10 +134,10 @@ def _align_sources(
:return:
"""
for source_id in sources_to_align:
step_counter = sources_to_align.index(source_id) + 1
step_counter = start_step + sources_to_align.index(source_id) + 1
logger.info(
f"* * * * * SOURCE: {source_id} | STEP: {step_counter} of "
+ f"{len(sources_to_align)} | MAPPING: {mapping_type_group_name} "
+ f"{len(sources_to_align) * 2} | MAPPING: {mapping_type_group_name} "
+ "* * * * *"
)
(merges_for_source, alignment_step) = self._align_nodes_to_source(
Expand All @@ -133,11 +149,11 @@ def _align_sources(
self._store_results_from_alignment_step(merges_for_source=merges_for_source, alignment_step=alignment_step)

def _align_nodes_to_source(
self,
source_id: str,
step_counter: int,
mapping_type_group_name: str,
mapping_types: List[str],
self,
source_id: str,
step_counter: int,
mapping_type_group_name: str,
mapping_types: List[str],
) -> Tuple[NamedTable, AlignmentStep]:
"""Perform an alignment step to a source.
Expand All @@ -147,15 +163,15 @@ def _align_nodes_to_source(
:param mapping_types: The mapping types in the given type group.
:return: The merge named table for the step, and the step meta data dataclass.
"""
unmapped_nodes = mapping_utils.produce_table_unmapped_nodes(
unmapped_nodes = merge_utils.produce_table_unmapped_nodes(
nodes=self._data_repo_input.get(TABLE_NODES).dataframe,
merges=self._data_repo_output.get(TABLE_MERGES_WITH_META_DATA).dataframe,
)

# (1) get mappings for NS
mappings_for_ns = mapping_utils.get_mappings_for_namespace(
namespace=source_id,
edges=self._data_repo_output.get(TABLE_MAPPINGS_UPDATED).dataframe,
edges=self._data_repo_output.get(TABLE_MAPPINGS_FOR_INPUT_NODES).dataframe,
)
alignment_step = AlignmentStep(
mapping_type_group=mapping_type_group_name,
Expand Down Expand Up @@ -229,14 +245,6 @@ def _preprocess_mappings(self) -> None:
permitted_mapping_relations=self._alignment_config.mapping_type_groups.equivalence,
mappings=mappings_obsolete_to_current_node_id,
)
mappings_obsolete_to_current_node_id_merge_strength[COLUMN_RELATION] = RELATION_MERGE
mappings_obsolete_to_current_node_id_merge_strength[COLUMN_PROVENANCE] = ONTO_MERGER
self._data_repo_output.update(
table=NamedTable(
name=TABLE_MAPPINGS_OBSOLETE_TO_CURRENT,
dataframe=mappings_obsolete_to_current_node_id_merge_strength[SCHEMA_HIERARCHY_EDGE_TABLE],
)
)

# get the mappings without the internal code reassignment and update
# any obsolete node IDs
Expand All @@ -246,6 +254,40 @@ def _preprocess_mappings(self) -> None:
)
self._data_repo_output.update(table=NamedTable(name=TABLE_MAPPINGS_UPDATED, dataframe=mappings_updated))

# mappings that cover input nodes
mappings_for_input_nodes = mapping_utils.filter_mappings_for_input_node_set(
input_nodes=self._data_repo_input.get(TABLE_NODES).dataframe,
mappings=mappings_updated,
)
self._data_repo_output.update(
table=NamedTable(name=TABLE_MAPPINGS_FOR_INPUT_NODES, dataframe=mappings_for_input_nodes)
)

#
mappings_obsolete_to_current_node_id_applicable = mapping_utils.get_nodes_with_updated_node_ids(
nodes=self._data_repo_input.get(TABLE_NODES).dataframe,
mappings_obsolete_to_current_node_id=mappings_obsolete_to_current_node_id_merge_strength,
)
mappings_obsolete_to_current_node_id_applicable[COLUMN_RELATION] = RELATION_MERGE
mappings_obsolete_to_current_node_id_applicable[COLUMN_PROVENANCE] = ONTO_MERGER
self._data_repo_output.update(
table=NamedTable(
name=TABLE_MAPPINGS_OBSOLETE_TO_CURRENT,
dataframe=mappings_obsolete_to_current_node_id_applicable[SCHEMA_HIERARCHY_EDGE_TABLE],
)
)

mappings_obsolete_to_current_node_id_applicable[COLUMN_STEP_COUNTER] = 0
mappings_obsolete_to_current_node_id_applicable[COLUMN_SOURCE_ID_ALIGNED_TO] = "INTERNAL"
mappings_obsolete_to_current_node_id_applicable[COLUMN_MAPPING_TYPE_GROUP] = MAPPING_TYPE_GROUP_EQV
self._data_repo_output.update(
table=DataManager.merge_tables_of_same_type(
tables=[NamedTable(TABLE_MERGES_WITH_META_DATA,
mappings_obsolete_to_current_node_id_applicable[SCHEMA_MERGE_TABLE_WITH_META_DATA]),
self._data_repo_output.get(TABLE_MERGES_WITH_META_DATA)]
)
)

logger.info("Finished pre-processing mappings.")

def _create_initial_step(self, mapping_type_group_name: str) -> None:
Expand All @@ -265,17 +307,20 @@ def _create_initial_step(self, mapping_type_group_name: str) -> None:
nodes_obsolete=self._data_repo_input.get(TABLE_NODES_OBSOLETE).dataframe,
)
self._data_repo_output.update(table=self_merges_for_seed_nodes)
self._data_repo_output.update(table=NamedTable("SEED_MERGES", self_merges_for_seed_nodes.dataframe))

# record start step meta data
step = AlignmentStep(
mapping_type_group=mapping_type_group_name,
source="INITIALISATION",
step_counter=0,
count_unmapped_nodes=(len(self._data_repo_input.get(TABLE_NODES).dataframe)),
)
step.count_mappings = len(self_merges_for_seed_nodes.dataframe)
step.count_merged_nodes = step.count_mappings
step.task_finished()
self._alignment_steps.append(
AlignmentStep(
mapping_type_group=mapping_type_group_name,
source="START",
step_counter=0,
count_unmapped_nodes=(
len(self._data_repo_input.get(TABLE_NODES).dataframe) - len(self_merges_for_seed_nodes.dataframe)
),
)
step
)

def _store_results_from_alignment_step(self, merges_for_source: NamedTable, alignment_step: AlignmentStep) -> None:
Expand All @@ -285,6 +330,7 @@ def _store_results_from_alignment_step(self, merges_for_source: NamedTable, alig
:param alignment_step: The alignment step meta data.
:return:
"""
alignment_step.task_finished()
self._alignment_steps.append(alignment_step)
self._data_repo_output.update(
table=DataManager.merge_tables_of_same_type(
Expand All @@ -293,7 +339,7 @@ def _store_results_from_alignment_step(self, merges_for_source: NamedTable, alig
)


def produce_source_alignment_priority_order(seed_ontology_name: str, nodes: DataFrame) -> List[str]:
def _produce_source_alignment_priority_order(seed_ontology_name: str, nodes: DataFrame) -> List[str]:
"""Produce the alignment process source priority order.
The alignment order is produced by putting the seed ontology as first (this
Expand All @@ -306,25 +352,8 @@ def produce_source_alignment_priority_order(seed_ontology_name: str, nodes: Data
"""
priority_order = [seed_ontology_name]
ontology_namespaces = list(
analysis_util.produce_table_node_namespace_distribution(node_table=nodes)[COLUMN_NAMESPACE]
analysis_utils.produce_table_node_namespace_distribution(node_table=nodes)[COLUMN_NAMESPACE]
)
ontology_namespaces.remove(seed_ontology_name)
priority_order.extend(ontology_namespaces)
return priority_order


def convert_alignment_steps_to_named_table(
alignment_steps: List[AlignmentStep],
) -> NamedTable:
"""Convert the list of AlignmentStep dataclasses to a named table.
:param alignment_steps: The list of AlignmentStep dataclasses.
:return: The AlignmentStep report dataframe wrapped as a named table.
"""
return NamedTable(
TABLE_ALIGNMENT_STEPS_REPORT,
pd.DataFrame(
[dataclasses.astuple(alignment_step) for alignment_step in alignment_steps],
columns=SCHEMA_ALIGNMENT_STEPS_TABLE,
),
)
Loading

0 comments on commit 714dd82

Please sign in to comment.