## SQL

In [57]:
#!/usr/bin/env python3
"""
infer_types_sqlite.py

Memory-efficient inference of Wikidata types using SQLite.

Usage:
  # Prepare SQLite DB once:
  ./infer_types_sqlite.py --prepare-db types.db instance_of.tsv subclass_of.tsv

  # Then for each batch or single item:
  ./infer_types_sqlite.py --db types.db --infer-all output.tsv

This avoids loading the entire graph in RAM by leveraging SQLite recursive CTEs.
"""
import sqlite3
from collections import defaultdict


def prepare_db(db_path, inst_path, sub_path):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    # Drop existing tables
    cur.execute("DROP TABLE IF EXISTS instance;")
    cur.execute("DROP TABLE IF EXISTS subclass;")
    # Create tables with UNIQUE constraints to avoid duplicate inserts
    cur.execute(
        """
        CREATE TABLE instance(
            item TEXT,
            class TEXT,
            UNIQUE(item, class)
        );
    """
    )
    cur.execute("CREATE INDEX IF NOT EXISTS idx_inst_item ON instance(item);")
    cur.execute(
        """
        CREATE TABLE subclass(
            subclass TEXT,
            superclass TEXT,
            UNIQUE(subclass, superclass)
    );
    """
    )
    cur.execute("CREATE INDEX IF NOT EXISTS idx_sub_sub ON subclass(subclass);")
    conn.commit()

    # Bulk insert P31 edges (instances), ignoring duplicates
    with open(inst_path, "r", encoding="utf-8") as f:
        cur.executemany(
            "INSERT OR IGNORE INTO instance(item, class) VALUES (?, ?);",
            (line.strip().split("\t") for line in f if line.strip()),
        )

    # Bulk insert P279 edges (subclasses), ignoring duplicates
    with open(sub_path, "r", encoding="utf-8") as f:
        cur.executemany(
            "INSERT OR IGNORE INTO subclass(subclass, superclass) VALUES (?, ?);",
            (line.strip().split("\t") for line in f if line.strip()),
        )

    conn.commit()
    conn.close()
    print(f"Database prepared at {db_path}")


def materialize(db_path):
    # Connect and tune for bulk operations
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    # Speed up writes and temp storage
    cur.execute("PRAGMA journal_mode = MEMORY;")
    cur.execute("PRAGMA synchronous = OFF;")
    cur.execute("PRAGMA temp_store = MEMORY;")

    # Create the closure table with UNIQUE constraint
    cur.execute(
        """
    CREATE TABLE IF NOT EXISTS subclass_closure (
        subclass   TEXT,
        superclass TEXT,
        UNIQUE(subclass, superclass)
    );
    """
    )
    conn.commit()

    # Ensure raw subclass table is indexed for the CTE
    cur.execute("CREATE INDEX IF NOT EXISTS idx_sub_sub   ON subclass(subclass);")
    cur.execute("CREATE INDEX IF NOT EXISTS idx_sub_super ON subclass(superclass);")
    conn.commit()

    # Populate the closure table using a recursive CTE
    cur.execute(
        """
    INSERT OR IGNORE INTO subclass_closure(subclass, superclass)
    WITH RECURSIVE
      closure(sub, sup) AS (
        SELECT subclass, superclass FROM subclass
        UNION
        SELECT c.sub, s.superclass
          FROM closure AS c
          JOIN subclass AS s
            ON c.sup = s.subclass
      )
    SELECT sub, sup FROM closure;
    """
    )
    conn.commit()

    # Index the closure table for fast lookups
    cur.execute(
        "CREATE INDEX IF NOT EXISTS idx_closure_sub ON subclass_closure(subclass);"
    )
    conn.commit()

    conn.close()
    print(f"Transitive closure materialized in 'subclass_closure' in {db_path}")


def infer_all(db_path, items: list[str] | None = None):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    # Query distinct items
    if items is None:
        cur.execute("SELECT DISTINCT item FROM instance;")
        items = [row[0] for row in cur.fetchall()]

    vals = ",".join(f"('{item}')" for item in items)

    # Recursive CTE: join direct classes and their ancestors
    # query = f"""
    # WITH RECURSIVE
    # items(item) AS (
    #     VALUES {vals}
    # ),
    # initial(item, sup) AS (
    #     SELECT i.item, i.class
    #     FROM instance AS i
    #     JOIN items    AS its ON i.item     = its.item
    #     UNION
    #     SELECT s.subclass, s.superclass
    #     FROM subclass AS s
    #     JOIN items    AS its ON s.subclass = its.item
    # ),
    # closure(item, sup) AS (
    #     SELECT item, sup FROM initial
    #     UNION
    #     SELECT c.item, s.superclass
    #     FROM closure AS c
    #     JOIN subclass AS s ON c.sup = s.subclass
    # )
    # SELECT DISTINCT
    # item,
    # sup AS superclass
    # FROM closure
    # ORDER BY item, superclass;
    # """
    query = f"""
        WITH RECURSIVE
        items(item) AS (
            VALUES {vals}
        ),
        initial(item, sup) AS (
            -- only direct subclass_of edges for each seed
            SELECT s.subclass, s.superclass
            FROM subclass AS s
            JOIN items     AS its ON s.subclass = its.item
        ),
        closure(item, sup) AS (
            -- walk up the P279 chain
            SELECT item, sup FROM initial
            UNION
            SELECT c.item, s.superclass
            FROM closure AS c
            JOIN subclass AS s
                ON c.sup = s.subclass
        )
        SELECT DISTINCT
        item,
        sup     AS superclass
        FROM closure
    """
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute(query)
    results = cur.fetchall()
    conn.close()
    out = {}
    for item, superclass in results:
        if item not in out:
            out[item] = set()
        out[item].add(superclass)
    return out


def linear_infer_all(db_path, items):
    """
    items: List of QIDs, e.g. ["Q1","Q2","Q3"]
    Returns: dict mapping each item -> set(superclasses)
    """
    if not items:
        return {}
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()

    # build a parameter placeholder for each item
    placeholders = ",".join("?" for _ in items)
    sql = f"""
        SELECT subclass, superclass
          FROM subclass_closure
         WHERE subclass IN ({placeholders})
    """
    cur.execute(sql, items)

    out = defaultdict(set)
    for sub, sup in cur.fetchall():
        out[sub].add(sup)

    # ensure every requested item appears (even if it has no superclasses)
    return {item: out.get(item, set()) for item in items}


def transitive_closure_from_db_update(db_path, items):
    """Get transitive closure from SQLite database,
    using a pre-materialized closure table if available."""
    if not items:
        return {}
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    vals = ",".join(f"('{item}')" for item in items)

    cur.execute(
        """
            SELECT name 
            FROM sqlite_master 
            WHERE type='table' 
            AND name='subclass_closure'
        """
    )
    if cur.fetchone():
        # 1) Check if the fast lookup table exists
        query = f"""
                SELECT subclass, superclass
                FROM subclass_closure
                WHERE subclass IN ({vals})
            """
    else:
        # 2) Fallback: recursive‐CTE
        query = f"""
                WITH RECURSIVE
                items(item) AS (
                    VALUES {vals}
                ),
                initial(item, sup) AS (
                    SELECT s.subclass, s.superclass
                    FROM subclass AS s
                    JOIN items     AS its ON s.subclass = its.item
                ),
                closure(item, sup) AS (
                    SELECT item, sup FROM initial
                    UNION
                    SELECT c.item, s.superclass
                    FROM closure AS c
                    JOIN subclass AS s
                        ON c.sup = s.subclass
                )
                SELECT DISTINCT
                    item,
                    sup     AS superclass
                FROM closure
            """
    cur.execute(query)
    out = defaultdict(set)
    for item, superclass in cur.fetchall():
        out[item].add(superclass)
    return {item: out.get(item, set()) for item in items}

In [None]:
prepare_db("types.db", "instance_of.tsv", "subclass_of.tsv")

In [None]:
materialize("../types-db/types.db")

In [60]:
# Q123 (Septmber)
# P31 (instance of)
#    - Q47018901
# P279 (subclass of)
#    - Q18602249

# Audi:
# P31 (instance of)
#    - wd:Q786820 wd:Q4830453 wd:Q891723

In [61]:
infer_all("../types-db/types.db", ["Q786820"])

{'Q786820': {'Q103940464',
  'Q106559804',
  'Q106668099',
  'Q12569864',
  'Q131085629',
  'Q13235160',
  'Q13420330',
  'Q1415187',
  'Q155076',
  'Q16334295',
  'Q16334298',
  'Q1639378',
  'Q167037',
  'Q21980538',
  'Q24229398',
  'Q35120',
  'Q3778211',
  'Q43229',
  'Q4830453',
  'Q488383',
  'Q53617489',
  'Q58778',
  'Q61961344',
  'Q6881511',
  'Q7048977',
  'Q783794',
  'Q830077',
  'Q854457',
  'Q98119401',
  'Q99527517'}}

In [62]:
transitive_closure_from_db_update("../types-db/types.db", ["Q786820"])

{'Q786820': {'Q103940464',
  'Q106559804',
  'Q106668099',
  'Q12569864',
  'Q131085629',
  'Q13235160',
  'Q13420330',
  'Q1415187',
  'Q155076',
  'Q16334295',
  'Q16334298',
  'Q1639378',
  'Q167037',
  'Q21980538',
  'Q24229398',
  'Q35120',
  'Q3778211',
  'Q43229',
  'Q4830453',
  'Q488383',
  'Q53617489',
  'Q58778',
  'Q61961344',
  'Q6881511',
  'Q7048977',
  'Q783794',
  'Q830077',
  'Q854457',
  'Q98119401',
  'Q99527517'}}

In [63]:
infer_all("../types-db/types.db", ["Q786820", "Q4830453", "Q891723"])[
    "Q786820"
] == infer_all("../types-db/types.db", ["Q786820"])["Q786820"]

True

In [64]:
linear_infer_all("../types-db/types.db", ["Q786820", "Q4830453", "Q891723"])[
    "Q786820"
] == infer_all("../types-db/types.db", ["Q786820", "Q4830453", "Q891723"])[
    "Q786820"
] == transitive_closure_from_db_update(
    "../types-db/types.db", ["Q786820"]
)[
    "Q786820"
]

True

## Pandas

In [None]:
import pandas as pd

In [None]:
instance_df = pd.read_csv(
    "../types-db/instance_of.tsv", sep="\t", header=None, names=["item", "class"]
)
subclass_df = pd.read_csv(
    "../types-db/subclass_of.tsv", sep="\t", header=None, names=["subclass", "superclass"]
)

In [None]:
instance_df

In [None]:
instance_df = instance_df.drop_duplicates(subset=["item", "class"])

In [None]:
subclass_df

In [None]:
subclass_df = subclass_df.drop_duplicates(subset=["subclass", "superclass"])

In [None]:
seen = set()
entity = "Q123"
seen = {entity}  # don’t revisit
result = set()  # accumulate all superclasses
#  1) classes from P31
# Seed the queue with:
#  2) superclasses from direct P279
queue = []
queue.extend(instance_df.loc[instance_df["item"] == entity, "class"])
queue.extend(subclass_df.loc[subclass_df["subclass"] == entity, "superclass"])

while queue:
    current = queue.pop()
    if current in seen:
        continue
    seen.add(current)
    result.add(current)
    # enqueue its parents (one hop of P279)
    parents = subclass_df.loc[subclass_df["subclass"] == current, "superclass"].tolist()
    queue.extend(parents)

In [None]:
result

## SPARQL

In [None]:
from SPARQLWrapper import SPARQLWrapper, JSON

# Define the SPARQL endpoint and query
entity_ids = set(["Q20738676"])  # Example entity IDs to query
endpoint_url = "https://query.wikidata.org/sparql"
entity_list = " ".join(f"wd:{eid}" for eid in entity_ids)

query = f"""
SELECT DISTINCT ?item ?superclass WHERE {{
  VALUES ?item {{ {entity_list} }}
  {{ ?item (wdt:P279*) ?superclass. }}
}}
"""


def query_wikidata(sparql_client, query):
    """Perform the SPARQL query with retries using exponential backoff."""
    sparql_client.setQuery(query)
    sparql_client.setReturnFormat(JSON)
    return sparql_client.query().convert()


# Set up the SPARQL client
sparql = SPARQLWrapper(endpoint_url)
sparql.addCustomHttpHeader("User-Agent", "WikidataParser/1.0 (belo.fede@outlook.com)")
results = query_wikidata(sparql, query)
out = {}
for result in results["results"]["bindings"]:
    item = result["item"]["value"].split("/")[-1]
    superclass = result["superclass"]["value"].split("/")[-1]
    if superclass in entity_ids:
        continue
    if item not in out:
        out[item] = set()
    out[item].add(superclass)

In [None]:
out["Q123"] == infer_all("../types-db/types.db", ["Q123", "Q124"])["Q123"], out["Q124"] == infer_all("../types-db/types.db", ["Q123", "Q124"])["Q124"]