## Extract sub-KG
1. add all superclasses
2. relations among them
3. add transitivity relations

In [None]:
import os
import json
import re
import time
import random
import socket
from SPARQLWrapper import SPARQLWrapper, JSON
from urllib.error import HTTPError
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
from tqdm import tqdm

# Global socket timeout for HTTP operations
socket.setdefaulttimeout(30)

# Constants
OUTPUT_DIR          = 'kg'
INPUT_FILE          = 'oven_entity.txt'
QID_PATTERN         = re.compile(r"^Q\d+$")
ANCESTOR_CHUNK_SIZE = 20    # QIDs per batch for ancestor fetch
RELATION_CHUNK_SIZE = 50    # Entities per batch for relation fetch
TRANS_CHUNK_SIZE    = 50    # Entities per batch for transitive property fetch
MAX_WORKERS         = 8     # Parallel threads
MAX_RETRIES         = 10     # Retry attempts
BASE_BACKOFF        = 2     # Base seconds for exponential backoff

# List of truly transitive properties to include
TRANSITIVE_PROPS = [
    'P31', 'P279',  # instance/subclass
    'P131',        # located in the administrative entity
    'P361',        # part of
    'P171',        # parent taxon
    'P155', 'P156',# follows / followed by
    'P1365', 'P1366', # see also / has edition
    'P3729', 'P3730', # first/last appearance
    'P5135', 'P5136'  # category topics
]

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Read and validate input QIDs
with open(INPUT_FILE) as f:
    raw_qids = [line.strip() for line in f if line.strip()]
input_qids = [qid for qid in dict.fromkeys(raw_qids) if QID_PATTERN.match(qid)]
if not input_qids:
    raise ValueError("No valid QIDs found in input file.")

# Helper: split list into chunks
def chunk_list(lst, size):
    for i in range(0, len(lst), size):
        yield lst[i:i+size]

# Helper: perform SPARQL query with retry/backoff and timeout
def sparql_query(client, query):
    client.setQuery(query)
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            return client.query().convert()
        except (HTTPError, socket.timeout) as e:
            code = e.code if isinstance(e, HTTPError) else 'timeout'
            if attempt == MAX_RETRIES:
                tqdm.write(f"[{code}] Failed after {attempt} attempts: {e}")
                raise
            backoff = BASE_BACKOFF * (2 ** (attempt - 1)) + random.random()
            tqdm.write(f"[{code}] Retrying in {backoff:.1f}s...")
            time.sleep(backoff)

# 1️⃣ Batch + threaded ancestor fetching
#    Only fetch direct subclass(*) or one-step instanceOf then subclass(*)
all_entities = set(input_qids)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {}
    for chunk in chunk_list(input_qids, ANCESTOR_CHUNK_SIZE):
        vals = ' '.join(f'wd:{qid}' for qid in chunk)
        # Use union of subclassOf* or instanceOf followed by subclassOf*
        query = f"""
SELECT DISTINCT ?ancestor WHERE {{
  VALUES ?item {{ {vals} }}
  ?item (wdt:P279*|wdt:P31/wdt:P279*) ?ancestor .
}}"""
        client = SPARQLWrapper('https://query.wikidata.org/sparql')
        client.setReturnFormat(JSON)
        futures[executor.submit(sparql_query, client, query)] = chunk
    for fut in tqdm(as_completed(futures), total=len(futures), desc='Ancestor batches'):
        try:
            res = fut.result(timeout=60)
            for b in res['results']['bindings']:
                qid = b['ancestor']['value'].split('/')[-1]
                if QID_PATTERN.match(qid):
                    all_entities.add(qid)
        except Exception as e:
            tqdm.write(f"Ancestor chunk error {futures[fut]}: {e}")

# Write entities
with open(os.path.join(OUTPUT_DIR, 'entity.txt'), 'w') as f:
    for qid in tqdm(sorted(all_entities), desc='Writing entity.txt'):
        f.write(qid + '\n')

# 2️⃣ Prepare for deduplicated triplet collection
triplets_out = {}
triplets_in  = {}
triplets_out_seen = {}
triplets_in_seen  = {}
relation_ids = set()

# Helper: add a triplet if not already seen
def add_triplet(out_dict, seen_dict, subj, prop, obj):
    seen = seen_dict.setdefault(subj, set())
    if (subj, prop, obj) not in seen:
        seen.add((subj, prop, obj))
        out_dict.setdefault(subj, []).append([subj, prop, obj])

# 2️⃣ Batch + threaded direct relation fetching
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {}
    for chunk in chunk_list(sorted(all_entities), RELATION_CHUNK_SIZE):
        vals = ' '.join(f'wd:{e}' for e in chunk)
        query = f"""
SELECT ?s ?p ?o WHERE {{
  VALUES ?s {{ {vals} }}
  ?s ?prop ?o .
  ?p wikibase:directClaim ?prop .
}}"""
        client = SPARQLWrapper('https://query.wikidata.org/sparql')
        client.setReturnFormat(JSON)
        futures[executor.submit(sparql_query, client, query)] = chunk
    for fut in tqdm(as_completed(futures), total=len(futures), desc='Relation batches'):
        try:
            res = fut.result(timeout=60)
            for b in res['results']['bindings']:
                s = b['s']['value'].split('/')[-1]
                p = b['p']['value'].split('/')[-1]
                o = b['o']['value'].split('/')[-1]
                if s in all_entities and o in all_entities:
                    add_triplet(triplets_out, triplets_out_seen, s, p, o)
                    add_triplet(triplets_in,  triplets_in_seen,  o, p, s)
                    relation_ids.add(p)
        except Exception as e:
            tqdm.write(f"Relation chunk error {futures[fut]}: {e}")

# 3️⃣ Batch fetch all transitive property edges (deduped)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {}
    for prop in TRANSITIVE_PROPS:
        for chunk in chunk_list(sorted(all_entities), TRANS_CHUNK_SIZE):
            vals = ' '.join(f'wd:{e}' for e in chunk)
            # Build path: P31 => one-step instanceOf then subclassOf*, P279 => subclassOf+, others => prop+
            if prop == 'P31':
                path_clause = '?s wdt:P31/wdt:P279* ?o .'
            elif prop == 'P279':
                path_clause = '?s wdt:P279+ ?o .'
            else:
                path_clause = f'?s wdt:{prop}+ ?o .'
            query = f"""
SELECT ?s ?o WHERE {{
  VALUES ?s {{ {vals} }}
  {path_clause}
}}"""
            client = SPARQLWrapper('https://query.wikidata.org/sparql')
            client.setReturnFormat(JSON)
            futures[executor.submit(sparql_query, client, query)] = (prop, chunk)
    for fut in tqdm(as_completed(futures), total=len(futures), desc='Transitive batches'):
        prop, chunk = futures[fut]
        try:
            res = fut.result(timeout=60)
            for b in res['results']['bindings']:
                s = b['s']['value'].split('/')[-1]
                o = b['o']['value'].split('/')[-1]
                if s in all_entities and o in all_entities:
                    add_triplet(triplets_out, triplets_out_seen, s, prop, o)
                    add_triplet(triplets_in,  triplets_in_seen,  o, prop, s)
                    relation_ids.add(prop)
        except Exception as e:
            tqdm.write(f"Transitive '{prop}' chunk error {chunk}: {e}")

# 4️⃣ Write head-centric triplets
with open(os.path.join(OUTPUT_DIR, 'triplet_h.json'), 'w') as fh:
    for k, trips in tqdm(triplets_out.items(), desc='Writing triplet_h.json'):
        fh.write(json.dumps({'key': k, 'triplets': trips}, ensure_ascii=False) + '\n')

# 5️⃣ Write tail-centric triplets
with open(os.path.join(OUTPUT_DIR, 'triplet_t.json'), 'w') as ft:
    for k, trips in tqdm(triplets_in.items(), desc='Writing triplet_t.json'):
        ft.write(json.dumps({'key': k, 'triplets': trips}, ensure_ascii=False) + '\n')

# 6️⃣ Write relation IDs
with open(os.path.join(OUTPUT_DIR, 'relation.txt'), 'w') as fr:
    for pid in tqdm(sorted(relation_ids), desc='Writing relation.txt'):
        fr.write(pid + '\n')

print('Extraction complete with adjusted ancestor handling.')

Fetching ancestors:   0%|          | 0/20549 [00:00<?, ?it/s]


URLError: <urlopen error [Errno 61] Connection refused>

## Query the relation description

In [7]:
import os
import json
import time
import random
from SPARQLWrapper import SPARQLWrapper, JSON
from urllib.error import HTTPError
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configuration
INPUT_FILE = 'kg/relation.txt'
OUTPUT_FILE = 'kg/relation_descriptions.jsonl'
SPARQL_ENDPOINT = 'https://query.wikidata.org/sparql'
SLEEP_BETWEEN = 0.1                # seconds between queries to respect endpoint
MAX_RETRIES = 5                    # retry attempts for 403/405 errors
BASE_BACKOFF = 2                   # base seconds for exponential backoff
MAX_WORKERS = 5                    # parallel threads for fetching

# Read all property IDs
with open(INPUT_FILE, 'r') as f:
    pids = [line.strip() for line in f if line.strip()]

# Function to fetch description with retry on 403/405
def get_description(pid):
    sparql = SPARQLWrapper(SPARQL_ENDPOINT)
    sparql.setReturnFormat(JSON)
    query = f"""
SELECT ?desc WHERE {{
  wd:{pid} schema:description ?desc .
  FILTER(LANG(?desc) = 'en')
}} LIMIT 1
"""
    sparql.setQuery(query)

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            result = sparql.query().convert()
            bindings = result.get('results', {}).get('bindings', [])
            return bindings[0]['desc']['value'] if bindings else ''
        except HTTPError as e:
            code = getattr(e, 'code', None)
            if code in (403, 405) and attempt < MAX_RETRIES:
                backoff = BASE_BACKOFF * (2 ** (attempt - 1)) + random.random()
                tqdm.write(f"[{code}] Forbidden for {pid}, retry {attempt}/{MAX_RETRIES} in {backoff:.1f}s...")
                time.sleep(backoff)
                continue
            raise

# Ensure output directory exists
os.makedirs(os.path.dirname(OUTPUT_FILE) or '.', exist_ok=True)

# Fetch descriptions in parallel and write to file
with open(OUTPUT_FILE, 'w', encoding='utf-8') as out:
    futures = {}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for pid in pids:
            futures[executor.submit(get_description, pid)] = pid
        for fut in tqdm(as_completed(futures), total=len(pids), desc='Fetching descriptions'):
            pid = futures[fut]
            try:
                desc = fut.result()
            except Exception as e:
                tqdm.write(f"Error fetching {pid}: {e}")
                desc = ''
            record = {'p_id': pid, 'description': desc}
            out.write(json.dumps(record, ensure_ascii=False) + '\n')
            time.sleep(SLEEP_BETWEEN)

print(f"Written descriptions for {len(pids)} properties to {OUTPUT_FILE}")

Fetching descriptions:  20%|█▉        | 100/501 [00:32<02:33,  2.61it/s]

[403] Forbidden for P1408, retry 1/5 in 2.9s...


Fetching descriptions:  21%|██        | 105/501 [00:34<02:02,  3.23it/s]

[403] Forbidden for P1427, retry 1/5 in 2.5s...


Fetching descriptions:  21%|██        | 105/501 [00:34<02:02,  3.23it/s]

[403] Forbidden for P1431, retry 1/5 in 2.8s...


Fetching descriptions:  22%|██▏       | 112/501 [00:37<02:04,  3.13it/s]

Error fetching P1454: HTTP Error 429: Too Many Requests


Fetching descriptions:  23%|██▎       | 113/501 [00:37<02:45,  2.34it/s]

Error fetching P1427: HTTP Error 429: Too Many Requests


Fetching descriptions:  23%|██▎       | 116/501 [00:38<02:03,  3.11it/s]

[403] Forbidden for P1431, retry 2/5 in 4.2s...


Fetching descriptions:  23%|██▎       | 117/501 [00:39<01:45,  3.62it/s]

[403] Forbidden for P1531, retry 1/5 in 2.2s...


Fetching descriptions:  24%|██▍       | 121/501 [00:41<03:13,  1.96it/s]

[403] Forbidden for P155, retry 1/5 in 2.9s...


Fetching descriptions:  25%|██▍       | 123/501 [00:42<03:19,  1.90it/s]

Error fetching P1557: HTTP Error 429: Too Many Requests


Fetching descriptions:  26%|██▌       | 128/501 [00:43<01:39,  3.75it/s]

Error fetching P1571: HTTP Error 429: Too Many Requests


Fetching descriptions:  26%|██▋       | 132/501 [00:44<01:36,  3.84it/s]

Error fetching P1589: HTTP Error 429: Too Many Requests


Fetching descriptions:  27%|██▋       | 133/501 [00:45<02:34,  2.38it/s]

Error fetching P155: HTTP Error 429: Too Many Requests


Fetching descriptions:  27%|██▋       | 135/501 [00:45<02:05,  2.92it/s]

Error fetching P16: HTTP Error 429: Too Many Requests


Fetching descriptions:  30%|██▉       | 150/501 [00:50<01:58,  2.96it/s]

Error fetching P1716: HTTP Error 429: Too Many Requests


Fetching descriptions:  31%|███       | 156/501 [00:52<01:42,  3.37it/s]

Error fetching P179: HTTP Error 429: Too Many Requests
[403] Forbidden for P180, retry 1/5 in 2.4s...


Fetching descriptions:  32%|███▏      | 161/501 [00:53<01:33,  3.65it/s]

Error fetching P1880: HTTP Error 429: Too Many Requests


Fetching descriptions:  33%|███▎      | 167/501 [00:55<01:43,  3.22it/s]

[403] Forbidden for P199, retry 1/5 in 2.1s...


Fetching descriptions:  34%|███▎      | 168/501 [00:56<02:36,  2.13it/s]

Error fetching P180: HTTP Error 429: Too Many Requests
Error fetching P1990: HTTP Error 429: Too Many Requests


Fetching descriptions:  35%|███▌      | 177/501 [00:59<02:01,  2.67it/s]

Error fetching P2079: HTTP Error 429: Too Many Requests


Fetching descriptions:  36%|███▋      | 182/501 [01:00<01:54,  2.78it/s]

Error fetching P2175: HTTP Error 429: Too Many Requests


Fetching descriptions:  37%|███▋      | 183/501 [01:01<01:41,  3.14it/s]

Error fetching P2176: HTTP Error 429: Too Many Requests


Fetching descriptions:  37%|███▋      | 184/501 [01:01<01:20,  3.95it/s]

[403] Forbidden for P22, retry 1/5 in 2.3s...


Fetching descriptions:  38%|███▊      | 191/501 [01:03<01:57,  2.63it/s]

[403] Forbidden for P2388, retry 1/5 in 2.4s...


Fetching descriptions:  42%|████▏     | 210/501 [01:09<00:57,  5.09it/s]

[403] Forbidden for P2632, retry 1/5 in 2.4s...


Fetching descriptions:  42%|████▏     | 210/501 [01:10<00:57,  5.09it/s]

[403] Forbidden for P2647, retry 1/5 in 2.8s...


Fetching descriptions:  43%|████▎     | 215/501 [01:11<01:57,  2.42it/s]

[403] Forbidden for P2674, retry 1/5 in 2.9s...


Fetching descriptions:  45%|████▍     | 223/501 [01:15<02:24,  1.93it/s]

[403] Forbidden for P277, retry 1/5 in 3.0s...


Fetching descriptions:  45%|████▌     | 227/501 [01:16<01:21,  3.37it/s]

[403] Forbidden for P2821, retry 1/5 in 2.9s...
[403] Forbidden for P2822, retry 1/5 in 2.5s...


Fetching descriptions:  47%|████▋     | 233/501 [01:18<01:36,  2.77it/s]

[403] Forbidden for P277, retry 2/5 in 4.4s...


Fetching descriptions:  47%|████▋     | 237/501 [01:21<02:17,  1.92it/s]

[403] Forbidden for P2821, retry 2/5 in 4.6s...


Fetching descriptions:  51%|█████     | 256/501 [01:29<01:47,  2.27it/s]

[403] Forbidden for P3137, retry 1/5 in 2.5s...


Fetching descriptions:  53%|█████▎    | 266/501 [01:32<00:55,  4.25it/s]

[403] Forbidden for P3137, retry 2/5 in 4.1s...


Fetching descriptions:  55%|█████▍    | 274/501 [01:34<01:00,  3.74it/s]

[403] Forbidden for P360, retry 1/5 in 2.1s...


Fetching descriptions:  56%|█████▌    | 279/501 [01:37<01:32,  2.40it/s]

[403] Forbidden for P3712, retry 1/5 in 2.3s...


Fetching descriptions:  56%|█████▋    | 282/501 [01:38<01:31,  2.39it/s]

[403] Forbidden for P3730, retry 1/5 in 2.0s...


Fetching descriptions:  58%|█████▊    | 290/501 [01:40<01:07,  3.11it/s]

[403] Forbidden for P3938, retry 1/5 in 2.9s...


Fetching descriptions:  62%|██████▏   | 312/501 [01:48<01:31,  2.06it/s]

[403] Forbidden for P427, retry 1/5 in 2.6s...


Fetching descriptions:  63%|██████▎   | 314/501 [01:48<00:59,  3.14it/s]

[403] Forbidden for P4424, retry 1/5 in 2.3s...


Fetching descriptions:  64%|██████▍   | 320/501 [01:50<00:59,  3.06it/s]

[403] Forbidden for P457, retry 1/5 in 2.2s...


Fetching descriptions:  76%|███████▌  | 381/501 [02:10<00:24,  4.94it/s]

[403] Forbidden for P607, retry 1/5 in 2.5s...


Fetching descriptions:  83%|████████▎ | 415/501 [02:20<00:30,  2.79it/s]

[403] Forbidden for P710, retry 1/5 in 3.0s...


Fetching descriptions:  84%|████████▍ | 420/501 [02:22<00:27,  2.99it/s]

[403] Forbidden for P737, retry 1/5 in 2.1s...


Fetching descriptions:  84%|████████▍ | 422/501 [02:22<00:24,  3.26it/s]

[403] Forbidden for P747, retry 1/5 in 2.8s...


Fetching descriptions:  86%|████████▌ | 429/501 [02:26<00:31,  2.25it/s]

[403] Forbidden for P737, retry 2/5 in 4.2s...


Fetching descriptions:  86%|████████▋ | 433/501 [02:27<00:25,  2.68it/s]

[403] Forbidden for P787, retry 1/5 in 3.0s...


Fetching descriptions:  98%|█████████▊| 489/501 [02:44<00:04,  2.97it/s]

[403] Forbidden for P924, retry 1/5 in 2.1s...


Fetching descriptions: 100%|██████████| 501/501 [02:47<00:00,  2.99it/s]

Written descriptions for 501 properties to kg/relation_descriptions.jsonl





## merge two relation files

In [8]:
import json

INPUT_FILE_1 = 'kg/relation_descriptions.jsonl'
INPUT_FILE_2 = 'kg/relation_descriptions2.jsonl'
OUTPUT_FILE = 'kg/merged_relation_descriptions.jsonl'

def load_descriptions(path):
    """从 JSONL 文件中加载 {p_id: description} 映射。"""
    descs = {}
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                rec = json.loads(line)
                pid = rec.get('p_id')
                desc = rec.get('description', '')
                if pid:
                    # 如果同一个 p_id 在两个文件中都出现，以第二个文件中的为准：
                    descs[pid] = desc
            except json.JSONDecodeError:
                continue
    return descs

def merge_and_write(path1, path2, out_path):
    # 1. 读取两个文件
    descs1 = load_descriptions(path1)
    descs2 = load_descriptions(path2)

    # 2. 合并：先放入 descs1，再用 descs2 覆盖（如果有重复）
    merged = {**descs1, **descs2}

    # 3. 写回 JSONL
    with open(out_path, 'w', encoding='utf-8') as out:
        for pid, desc in merged.items():
            record = {'p_id': pid, 'description': desc}
            out.write(json.dumps(record, ensure_ascii=False) + '\n')

    print(f"Merged {len(descs1)} + {len(descs2)} → {len(merged)} unique records into {out_path}")

if __name__ == '__main__':
    merge_and_write(INPUT_FILE_1, INPUT_FILE_2, OUTPUT_FILE)

Merged 501 + 825 → 955 unique records into kg/merged_relation_descriptions.jsonl


In [1]:
import json

# 输入文件路径
input_file = '/home/zho1rng/oven_train/dataset/wikidata_subgraph_v1/triplet_t.jsonl'
output_file = '/home/zho1rng/oven_train/dataset/wikidata_subgraph_v1/triplet_t1.jsonl'

# 翻转 triplets 的 head 和 tail
def reverse_triplets(data):
    reversed_triplets = []
    for head, relation, tail in data['triplets']:
        reversed_triplets.append([tail, relation, head])
    return {'key': data['key'], 'triplets': reversed_triplets}

# 读取输入文件并翻转triplets
with open(input_file, 'r', encoding='utf-8') as infile, \
     open(output_file, 'w', encoding='utf-8') as outfile:
    
    for line in infile:
        data = json.loads(line.strip())
        reversed_data = reverse_triplets(data)
        outfile.write(json.dumps(reversed_data, ensure_ascii=False) + '\n')

print(f"Reversed triplets have been saved to {output_file}")


Reversed triplets have been saved to /home/zho1rng/oven_train/dataset/wikidata_subgraph_v1/triplet_t1.jsonl
