In [37]:
from gp.distantsupervision.datasets.make_autolabel import (
    make_autolabel,
    AutoLabelDataActorArgs,
)
import serde.json
from sm_datasets import Datasets
from sm.dataset import Dataset
from gp.actors.data import KGName
from gp.distantsupervision.datasets.make_autolabel import (
    AutoLabeledTable,
)
from sm.inputs.prelude import EntityIdWithScore

In [38]:
from libactor.cache import IdentObj
from experiments.config import DATA_DIR, LIBACTOR_STORAGE_DIR
from experiments.dag import create_data_actor
from libactor.storage import GlobalStorage

GlobalStorage.init(LIBACTOR_STORAGE_DIR)
data_actor = create_data_actor()
kgdb = data_actor.db_actor.kgdbs[KGName.Wikidata]
ident_kgdb = IdentObj(kgdb.args.get_key(), kgdb)

In [39]:
autolabel_datadir = DATA_DIR / "datasets/wiki-20230620"
raw_dataset_dir = autolabel_datadir / "wt-limited-easy-sp51"

workdir = autolabel_datadir / "auto-label" / raw_dataset_dir.name
workdir.mkdir(parents=True, exist_ok=True)

## Prepare the dataset

1. Load the dataset.
2. Fix the redirection

Now let's set up the path

In [None]:
from sm_datasets import Datasets
from sm.dataset import Dataset

examples = Dataset(raw_dataset_dir).load()
examples = Datasets().fix_redirection(
    examples,
    kgdb.pydb.entity_labels.cache(),
    kgdb.pydb.props.cache(),
    kgdb.pydb.entity_redirections.cache(),
    kgdb.kgns,
)

[32m2025-05-07 12:44:21.267[0m | [34m[1mDEBUG   [0m | [36msm_datasets.datasets[0m:[36m_fix_redirections[0m:[36m271[0m - [34m[1mRedirect entity: Q21400319 to Q18625791[0m
[32m2025-05-07 12:44:21.304[0m | [34m[1mDEBUG   [0m | [36msm_datasets.datasets[0m:[36m_fix_redirections[0m:[36m271[0m - [34m[1mRedirect entity: Q206343 to Q124007617[0m
[32m2025-05-07 12:44:21.410[0m | [34m[1mDEBUG   [0m | [36msm_datasets.datasets[0m:[36m_fix_redirections[0m:[36m271[0m - [34m[1mRedirect entity: Q111949857 to None[0m
[32m2025-05-07 12:44:21.627[0m | [34m[1mDEBUG   [0m | [36msm_datasets.datasets[0m:[36m_fix_redirections[0m:[36m271[0m - [34m[1mRedirect entity: Q19754830 to Q1466375[0m
[32m2025-05-07 12:44:21.628[0m | [34m[1mDEBUG   [0m | [36msm_datasets.datasets[0m:[36m_fix_redirections[0m:[36m271[0m - [34m[1mRedirect entity: Q19754830 to Q1466375[0m
[32m2025-05-07 12:44:21.726[0m | [34m[1mDEBUG   [0m | [36msm_datasets.datasets[0

Save the redirected dataset

In [None]:
Dataset(workdir / "fixed-redirection").save(
    examples, batch_compressed=True, batch_size=1000
)

## Auto label CTA

Make the auto-label dataset.

In [40]:
examples = Dataset(workdir / "fixed-redirection").load()

This is where I realize that we already have the `s01_make_dataset_v2` notebook, so instead of making a new one, I'm going to skip the next cells (so I don't know if they run correctly), and users should just run the `s01_make_dataset_v2` notebook to make the `autolabel.json` file.

In [5]:
import orjson
import serde.json
from dataclasses import asdict
from gp.distantsupervision.datasets.make_autolabel import (
    CombinedFilterArgs,
    LabelV1Args,
    LabelV2Args,
    TransformV1Args,
)
from libactor.misc._misc import _orjson_default

args = AutoLabelDataActorArgs(
    skip_non_unique_mention=True,
    skip_column_with_no_type=True,
    filter_method="filter_combined",
    filter_combined=CombinedFilterArgs(
        header_col_type=None,
    ),
    transform_method="transform_v2",
    transform_v2=TransformV1Args(),
    label_method="label_v2",
    label_v2=LabelV2Args(
        base_labeler=LabelV1Args(
            topk=1,
            threshold=0.8,
            include_similar_score=True,
        ),
        type_header_agreement_file=autolabel_datadir / "type_to_clusters.json",
        norm_name_fn="gp.distantsupervision.datasets.make_dataset_helper.NamingHelper.make_dataset_v2_norm_fn",
    ),
)

args_workdir = workdir / f"auto-labeled-v1"
args_workdir.mkdir(parents=True, exist_ok=True)
serde.json.ser(
    asdict(args), args_workdir / "args.json", indent=2, orjson_default=_orjson_default
)

In [7]:
labeled_examples = make_autolabel(
    examples, kgdb, args=args, workdir=args_workdir, n_jobs=4
)

ModuleNotFoundError: No module named 'gp.distantsupervision.datasets'

Now, assume that you already run the `s01_make_dataset_v2` notebook and have the `autolabel.json` file. Let's load it and use it to make the semantic description

In [41]:
id2cta = serde.json.deser(raw_dataset_dir / "autolabel.json")
len(id2cta)

20290

In [42]:
has_cta_exs = [ex for ex in examples if ex.id in id2cta]
auto_labeled_tables: dict[str, AutoLabeledTable] = {}
for ex in has_cta_exs:
    auto_labeled_tables[ex.id] = AutoLabeledTable(
        table=ex.table,
        entity_columns=[x[0] for x in id2cta[ex.id]["entity_columns"]],
        entity_column_types=[
            [EntityIdWithScore.from_dict(e) for e in coltypes]
            for coltypes in id2cta[ex.id]["entity_column_types"]
        ],
    )
len(auto_labeled_tables)

20290

## Auto-label CPA

In [43]:
from gp.actors.sm.can_graph import CanGraphActor, CanGraphActorArgs
from datasets.fingerprint import Hasher

In [44]:
from typing import Sequence
from gp.actors.data import GPExample
from libactor.cache import IdentObj
from sm.dataset import Example, FullTable

exs = has_cta_exs
ident_exs = [IdentObj(key=Hasher.hash(ex), value=ex) for ex in exs]

In [45]:
import orjson
from gp.actors.el.canreg import CanRegActor, CanRegActorArgs
from gp.actors.el.cangen import CanGenActor, CanGenActorArgs
from gp.entity_linking.candidate_recognition import OracleCanReg
from gp.entity_linking.candidate_generation.oracle_model import CanGenOracleMethod
from sm.misc.funcs import get_classpath

ontology = kgdb.ontology
cangen = CanGenActor(
    CanGenActorArgs(
        clspath=get_classpath(CanGenOracleMethod),
    )
)

table_can_ents = [
    cangen.forward(
        ex,
        IdentObj(
            key=orjson.dumps(auto_labeled_tables[ex.value.id].entity_columns).decode(),
            value=auto_labeled_tables[ex.value.id].entity_columns,
        ),
        ident_kgdb,
    )
    for ex in ident_exs
]

In [48]:
from tqdm import tqdm

cangraph_actor = CanGraphActor(CanGraphActorArgs(topk=4))
cangraphs = cangraph_actor.batch_invoke(
    ident_exs,
    table_can_ents,
    ident_kgdb,
)
# cangraphs = [
#     cangraph_actor.invoke(
#         ex,
#         table_can_ent,
#         ident_kgdb,
#     )
#     for ex, table_can_ent in tqdm(zip(ident_exs, table_can_ents))
# ]

20290it [00:34, 589.49it/s]


Extracting candidate graphs: ...
Extracting candidate graphs: 282.105 seconds
Caching candidate graphs: ...
Caching candidate graphs: 1.029 seconds


In [49]:
serde.pickle.ser(cangraphs, raw_dataset_dir / "cangraphs.pkl")

In [51]:
from gp_core.algorithms import CanGraphExtractedResult
import polars as pl
from sm.misc.funcs import group_by
from sm.misc.prelude import itemgetter

DF_FIELD_FREQ = "freq"
DF_FIELD_NUM_UNMATCH_LINKS = "num_unmatch_links"
DF_FIELD_MAX_NUM_POS_ENT_LINKS = "max_num_pos_ent_links"


def pred_rels(
    ex: Example[FullTable],
    cangraph: CanGraphExtractedResult,
    min_freq_over_row: float,
    max_unmatch_over_ent_row: float,
):
    exrel = cangraph.edgedf.filter(
        ((pl.col(DF_FIELD_FREQ) / ex.table.table.nrows()) >= min_freq_over_row)
        & (
            (
                pl.col(DF_FIELD_NUM_UNMATCH_LINKS)
                / pl.col(DF_FIELD_MAX_NUM_POS_ENT_LINKS)
            )
            <= max_unmatch_over_ent_row
        )
    )
    edgedf = exrel.to_dicts()
    if len(edgedf) == 0:
        return None
    pair2edges = group_by(edgedf, itemgetter("source", "target"))
    col2label: dict[tuple[int, int], str] = {}
    for (uid, vid), edges in pair2edges.items():
        u = cangraph.nodes[uid]
        v = cangraph.nodes[vid]
        assert u.is_column()
        if not v.is_column():
            continue
        nrows = ex.table.table.nrows()
        sorted_edges = sorted(
            edges,
            key=lambda e: e[DF_FIELD_FREQ] * 2 / nrows
            - (e[DF_FIELD_NUM_UNMATCH_LINKS] / e[DF_FIELD_MAX_NUM_POS_ENT_LINKS]),
            reverse=True,
        )

        top_edges = {}
        for e in sorted_edges:
            if e["outedge"] not in top_edges:
                top_edges[e["outedge"]] = (
                    cangraph.edges[e["outedge"]],
                    e[DF_FIELD_FREQ] / nrows,
                    (e[DF_FIELD_NUM_UNMATCH_LINKS] / e[DF_FIELD_MAX_NUM_POS_ENT_LINKS]),
                )
            else:
                score = e[DF_FIELD_FREQ] * 2 / nrows - (
                    e[DF_FIELD_NUM_UNMATCH_LINKS] / e[DF_FIELD_MAX_NUM_POS_ENT_LINKS]
                )
                if score > (
                    top_edges[e["outedge"]][1] * 2 - (top_edges[e["outedge"]][2])
                ):
                    top_edges[e["outedge"]] = (
                        cangraph.edges[e["outedge"]],
                        e[DF_FIELD_FREQ] / nrows,
                        (
                            e[DF_FIELD_NUM_UNMATCH_LINKS]
                            / e[DF_FIELD_MAX_NUM_POS_ENT_LINKS]
                        ),
                    )

        col2label[u.try_as_column().column, v.try_as_column().column] = list(
            top_edges.values()
        )

    return [
        {
            "source": sci,
            "target": tci,
            "edges": [
                {"prop": e[0], "freq": e[1], "unmatch_percent": e[2]} for e in edges
            ],
        }
        for (sci, tci), edges in col2label.items()
    ]


pred_rels(exs[3], cangraphs[3], 0.5, 0.5)

[{'source': 0,
  'target': 2,
  'edges': [{'prop': 'P2046',
    'freq': 0.8312181053360749,
    'unmatch_percent': 0.14}]},
 {'source': 0,
  'target': 1,
  'edges': [{'prop': 'P131', 'freq': 0.92, 'unmatch_percent': 0.02}]},
 {'source': 1,
  'target': 0,
  'edges': [{'prop': 'P150', 'freq': 0.96, 'unmatch_percent': 0.0}]}]

In [52]:
autolabel_edges = {}
for ex, cangraph in zip(exs, cangraphs):
    autolabel_edges[ex.id] = pred_rels(ex, cangraph, 0.5, 0.5)

In [53]:
serde.json.ser(autolabel_edges, raw_dataset_dir / "autolabel_edges.json", indent=2)

## Save auto CTA and CPA into a single semantic description

In [57]:
auto_labeled_cpa_tables = {
    eid: edges
    for eid, edges in serde.json.deser(raw_dataset_dir / "autolabel_edges.json").items()
    if edges is not None
}
len(auto_labeled_cpa_tables)

3614

We further filter down the dataset to only keep properties above a certain threshold

In [60]:
filtered_auto_labeled_cpa_tables = {}
for eid, edges in auto_labeled_cpa_tables.items():
    filtered_edges = []
    for edge in edges:
        lst = [
            e for e in edge["edges"] if e["freq"] >= 0.7 and e["unmatch_percent"] <= 0.3
        ]
        if len(lst) > 0:
            filtered_edges.append(
                {"source": edge["source"], "target": edge["target"], "edges": lst}
            )
    if len(filtered_edges) > 0:
        filtered_auto_labeled_cpa_tables[eid] = filtered_edges
len(filtered_auto_labeled_cpa_tables)

2476

In [66]:
entity_redirections = kgdb.pydb.entity_redirections.cache()
entity_labels = kgdb.pydb.entity_labels.cache()


def get_entity_id(id: str):
    if id in entity_redirections:
        return entity_redirections[id]
    return id


def get_entity_label(id: str):
    return f"{entity_labels[id]} ({id})"

In [70]:
for ex in examples:
    assert len(ex.sms) == 0

In [80]:
from rdflib import RDFS
import sm.outputs as O

kgns = kgdb.kgns

for ex in examples:
    if ex.id not in auto_labeled_tables:
        # no cta meaning no cpa
        continue
    cta = auto_labeled_tables[ex.id]
    cpa = filtered_auto_labeled_cpa_tables.get(ex.id, [])

    sm = O.SemanticModel()

    selected_columns = (
        set(cta.entity_columns)
        .union({edge["source"] for edge in cpa})
        .union({edge["target"] for edge in cpa})
    )

    col2id = {}
    for col in ex.table.table.columns:
        if col.index not in selected_columns:
            continue
        col2id[col.index] = sm.add_node(
            O.DataNode(col.index, col.clean_multiline_name or "")
        )
    col2parentid = {}

    for col, coltypes in zip(cta.entity_columns, cta.entity_column_types):
        assert all(ctype.score >= 0.8 for ctype in coltypes)

        for i, coltype in enumerate(coltypes):
            clsid = get_entity_id(coltype.id)
            abs_uri = kgns.id_to_uri(clsid)
            uid = sm.add_node(
                O.ClassNode(
                    abs_uri=abs_uri,
                    rel_uri=kgns.get_rel_uri(abs_uri),
                    readable_label=get_entity_label(clsid),
                )
            )
            sm.add_edge(
                O.Edge(
                    source=uid,
                    target=col2id[col],
                    abs_uri=str(RDFS.label),
                    rel_uri="rdfs:label",
                )
            )
            if i == 0:
                col2parentid[col] = uid

    for prop in cpa:
        assert prop["source"] in col2parentid
        uid = col2parentid[prop["source"]]
        vid = col2id[prop["target"]]
        for edge in prop["edges"]:
            abs_uri = kgns.id_to_uri(edge["prop"])
            sm.add_edge(
                O.Edge(
                    source=uid,
                    target=vid,
                    abs_uri=abs_uri,
                    rel_uri=kgns.get_rel_uri(abs_uri),
                    readable_label=get_entity_label(edge["prop"]),
                )
            )

    if sm.num_edges() > 0:
        ex.sms = [sm]
    else:
        assert sm.num_nodes() == 0

print(len([ex for ex in examples if len(ex.sms) > 0]))

20290


In [79]:
[ex for ex in examples if len(ex.sms) > 0][3].sms[0].print(env="notebook")

HTML(value='<pre>\n00.\t<span style="background: #b7eb8f; color: black; padding: 2px; border-radius: 3px;">[3]…

In [82]:
Dataset(workdir / f"{raw_dataset_dir.name}-v1").save(
    [ex for ex in examples if len(ex.sms) > 0], batch_compressed=True
)