In [None]:
import requests
from benchmark_src.dataset_creation.wikidata_hierarchies import wikidata_utils
from tqdm import tqdm
from collections import defaultdict
import time
import json


In [None]:
initial_superclass = ("Q6999", "Astronomical Object")

# BFS to get all subclasses of the initial superclass
all_subclasses = set()
to_process = [initial_superclass]

while len(to_process) > 0:
    new_classes = set()
    for x in tqdm(to_process):
        found_classes = wikidata_utils.get_subclasses(x[0])
        for y in found_classes:
            if y not in all_subclasses:
                new_classes.add(y)
    print(f"Found {len(new_classes)} new subclasses: {new_classes}")
    all_subclasses.update(new_classes)
    to_process = new_classes
print(f"Found {len(all_subclasses)} subclasses of {initial_superclass}")

In [None]:

initial_superclass = ("Q6999", "Astronomical Object")

all_subclasses = set()  # store all discovered subclasses
to_process = [initial_superclass]

# This dict will store the hierarchy: {subclass_id: [superclass_ids]}
subclass_hierarchy = {}

while to_process:
    new_classes = set()
    for superclass in tqdm(to_process):
        found_classes = wikidata_utils.get_subclasses(superclass[0])
        for subclass in found_classes:
            qid = subclass[0]

            # Track hierarchy
            if qid not in subclass_hierarchy:
                subclass_hierarchy[qid] = []
            if superclass[0] not in subclass_hierarchy[qid]:
                subclass_hierarchy[qid].append(superclass[0])

            # Queue for BFS
            if subclass not in all_subclasses:
                new_classes.add(subclass)
    
    all_subclasses.update(new_classes)
    to_process = new_classes

print(f"Found {len(all_subclasses)} subclasses of {initial_superclass[1]}")
print("Hierarchy example:", dict(list(subclass_hierarchy.items())[:5]))

In [None]:
len(subclass_hierarchy)

In [None]:
from collections import defaultdict
import requests
import time

def collect_items_of_subclass(entity_qids, url="https://query.wikidata.org/sparql"):
    """
    Collect all items whose P31 is one of the subclasses in entity_qids.
    If the query fails (JSON parsing error), retry each subclass individually.
    """
    def query_wikidata(subclass_list, limit=None):
        """Helper: send SPARQL query for a list of subclasses"""
        values_str = ' '.join(['wd:' + qid for qid in subclass_list])
        sparql_str = f"""
        SELECT DISTINCT ?s ?sLabel ?subclass
        WHERE {{
            ?s wdt:P31 ?subclass .
            SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
            VALUES ?subclass {{ {values_str} }}
        }}
        """
        if limit is not None:
            sparql_str += f" LIMIT {limit}"
        try:
            res = requests.get(
                url,
                params={'format': 'json', 'query': sparql_str},
                headers={
                    'User-Agent': 'CollectIdProperties/0.0',
                    'Accept': 'application/sparql-results+json'
                },
                timeout=300
            )
            res.raise_for_status()  # Raises for 4xx/5xx
            data = res.json()
            return data
        except (requests.RequestException, ValueError) as e:
            print(f"Query failed for subclasses {subclass_list}: {e}")
            if isinstance(e, ValueError):
                # Print a snippet of the response for debugging
                # print("Response snippet:", res.text[:1000])
                pass
            return None

    # First try querying all at once
    data = query_wikidata(entity_qids)

    # If it fails, try each subclass individually
    if data is None:
        print("Retrying each subclass individually...")
        items_by_subclass = defaultdict(list)
        items_labels = {}
        for qid in entity_qids:
            time.sleep(0.1)  # small delay to avoid hammering the server
            single_data = query_wikidata([qid], limit=20000)
            if single_data is None:
                print(f"Failed again for subclass {qid}, skipping.")
                continue  # skip this qid if it still fails
            for row in single_data['results']['bindings']:
                item_qid = row['s']['value'].split("/")[-1]
                subclass_qid = row['subclass']['value'].split("/")[-1]
                item_label = row['sLabel']['value']
                if not wikidata_utils.is_wikidata_id(item_label):
                    items_by_subclass[item_qid].append(subclass_qid)
                    items_labels[item_qid] = item_label
        print(f"Collected {len(items_labels)} items for {len(entity_qids)} subclasses.")
        return items_by_subclass, items_labels

    # If the big query worked, parse normally
    items_by_subclass = defaultdict(list)
    items_labels = {}
    for row in data['results']['bindings']:
        item_qid = row['s']['value'].split("/")[-1]
        subclass_qid = row['subclass']['value'].split("/")[-1]
        item_label = row['sLabel']['value']
        if not wikidata_utils.is_wikidata_id(item_label):
            items_by_subclass[item_qid].append(subclass_qid)
            items_labels[item_qid] = item_label

    print(f"Collected {len(items_labels)} items for {len(entity_qids)} subclasses.")
    return items_by_subclass, items_labels


In [None]:
def collect_all_instances(ent_list):
    wikidata_url = "https://query.wikidata.org/sparql"
    finished_entities = set(ent_list)
    n_batch = 25

    labels = {}
    all_output = {}
    while ent_list:
        print(f"Processing {len(ent_list)} entities, {n_batch} at a time")
        new_entries = 0
        new_entities = []
        for i in tqdm(range(0, len(ent_list), n_batch)):
            # force the query to wait a bit to avoid hitting query limits
            time.sleep(0.25)
            output, out_labels = collect_items_of_subclass(entity_qids=ent_list[i:i + n_batch], url=wikidata_url)
            all_output |= output
            labels |= out_labels
            for k, vlist in output.items():
                for v in vlist:
                    if v not in finished_entities:
                        finished_entities.add(v)
                        new_entities.append(v)
                        new_entries += 1

        print(f"{new_entries} new entries added to process.")
        ent_list = list(set(new_entities))

    # Save results
    with open('items_by_subclass.json', 'w') as f:
        json.dump(all_output, f)

    items_per_subclass = {}
    for key, values in all_output.items():
        if len(values) == 1:
            if values[0] not in items_per_subclass:
                items_per_subclass[values[0]] = []
            items_per_subclass[values[0]].append(key)

    with open("items_per_subclass.json", "w") as file:
        json.dump(items_per_subclass, file, indent=2)

    with open('items_labels.json', 'w') as f:
        json.dump(labels, f)
    return all_output, labels


In [None]:
# Suppose all_subclasses is a list of tuples (qid, label)
subclass_qids = [qid for qid, label in all_subclasses]

print(len(subclass_qids), "subclasses to process.")


items, labels = collect_all_instances(subclass_qids)
print(f"Collected {len(items)} items across {len(subclass_qids)} subclasses.")


items

In [None]:
items_to_subclasses = items
items_to_labels = labels

In [None]:
items_to_labels

In [None]:
items_to_subclasses

In [None]:
# Figure out which classes are actually used in items and count them
all_classes_used_in_items = []
for item_qid, subclass_list in items_to_subclasses.items():
    item_name = items_to_labels[item_qid]
    if not wikidata_utils.is_wikidata_id(item_name):
        for subclass_qid in subclass_list:
            all_classes_used_in_items.append(subclass_qid)

print(f"Total class usages in items: {len(all_classes_used_in_items)}")

In [None]:
### also add all superclasses of the genres that are actually used in literary work!
superclass_genres_to_add = set()
for genre_id in all_classes_used_in_items:
    if genre_id in subclass_hierarchy.keys():
        superclasses = subclass_hierarchy[genre_id]
        for superclass in superclasses:
            if superclass not in all_classes_used_in_items:
                superclass_genres_to_add.add(superclass)

print(f"Adding {len(superclass_genres_to_add)} additional genres that are used as superclasses")
all_classes_used_in_items = (list(all_classes_used_in_items) + list(superclass_genres_to_add))
print(len(all_classes_used_in_items), "total unique subclasses after adding superclasses.")
        

In [None]:
# Convert to dictionary: id -> label
subclass_id_to_label_lookup = {qid: label for qid, label in all_subclasses}
subclass_id_to_label_lookup["Q6999"] = "astronomical object"

In [None]:
from collections import Counter


used_classes_counter = Counter(all_classes_used_in_items)
used_classes_info_dict = {}
seen_class_labels = []
for genre_id, used_count in dict(used_classes_counter).items():
    try:
        subclass_ids = subclass_hierarchy[genre_id]
    except KeyError:
        print(f"{genre_id} {subclass_id_to_label_lookup[genre_id]} not found")
        continue
    superclasses_with_labels = [(subclass_id_to_label_lookup[class_id], class_id) for class_id in subclass_ids]
    if subclass_id_to_label_lookup[genre_id] in seen_class_labels:
        print(f"have duplicate label {subclass_id_to_label_lookup[genre_id], genre_id} ")
    if genre_id in superclass_genres_to_add:
        used_count = 0
    seen_class_labels.append(subclass_id_to_label_lookup[genre_id])
    used_classes_info_dict[genre_id] = {"label": subclass_id_to_label_lookup[genre_id],
                                        "QID": genre_id, 
                                        "url": f"https://www.wikidata.org/wiki/{genre_id}", 
                                        "used_count": used_count,
                                        "superclasses": superclasses_with_labels }

used_classes_info_dict = dict(sorted(used_classes_info_dict.items(), key=lambda item: item[1]["used_count"], reverse=True))


with open("astronomical_objects_instances_used_classes_info.json", "w") as file:
    json.dump(used_classes_info_dict, file)