In [2]:
import requests, time

WIKIDATA_SPARQL_ENDPOINT = "https://query.wikidata.org/sparql"
HEADERS = {"User-Agent": "LanguageFamilyTreeBot/1.0 (https://example.com)", "Accept": "application/json"}
# API URLs
WIKIPEDIA_API: str = "https://en.wikipedia.org/w/api.php"
WIKIDATA_API: str = "https://www.wikidata.org/wiki/Special:EntityData/{}.json"
SPARQL_API: str = "https://query.wikidata.org/sparql"
WIKIDATA_QUERY_API: str = "https://www.wikidata.org/w/api.php"

MAX_QIDS_PER_CALL = 50  # Wikidata wbgetentities practical limit
MAX_RETRIES = 4
BACKOFF_BASE = 0.8
MAX_NODES = 1500  # safety cap to avoid runaway expansion


def safe_get_json(url: str, *, params: dict, headers: dict | None = None):
    """GET a JSON response with retry & backoff; return None on hard failure."""
    merged_headers = {**HEADERS, **(headers or {})}
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = requests.get(url, params=params, headers=merged_headers, timeout=20)
            status = resp.status_code
            if status == 429:  # rate limited
                wait = BACKOFF_BASE * attempt * 2
                print(f"Rate limited (429). Sleeping {wait:.2f}s ...")
                time.sleep(wait)
                continue
            if status >= 500:
                wait = BACKOFF_BASE * attempt
                print(f"Server error {status}. Retry {attempt}/{MAX_RETRIES} after {wait:.2f}s")
                time.sleep(wait)
                continue
            if status != 200:
                print(f"Non-200 status {status} for {url} params={params}")
                return None
            text = resp.text.strip()
            if not text:
                wait = BACKOFF_BASE * attempt
                print(f"Empty body. Retry {attempt}/{MAX_RETRIES} after {wait:.2f}s")
                time.sleep(wait)
                continue
            return resp.json()
        except ValueError as ve:  # JSON decode
            wait = BACKOFF_BASE * attempt
            print(f"JSON decode error attempt {attempt}: {ve}. Backing off {wait:.2f}s")
            time.sleep(wait)
        except requests.RequestException as re:
            wait = BACKOFF_BASE * attempt
            print(f"Request error attempt {attempt}: {re}. Backing off {wait:.2f}s")
            time.sleep(wait)
    return None


def chunked(iterable, size):
    it = list(iterable)
    for i in range(0, len(it), size):
        yield it[i:i + size]


def get_language_labels(qids):
    """Batch-fetch labels for a set of Q-ids (returns dict). Robust with retries & chunking."""
    qids = list({q for q in qids if q})
    if not qids:
        return {}
    results = {}
    for group in chunked(qids, MAX_QIDS_PER_CALL):
        params = {
            "action": "wbgetentities",
            "ids": "|".join(group),
            "props": "labels",
            "languages": "en",
            "format": "json",
            "origin": "*",
        }
        data = safe_get_json(WIKIDATA_QUERY_API, params=params)
        if not data:
            for q in group:  # leave unresolved
                results.setdefault(q, q)
            continue
        entities = data.get("entities", {})
        for qid, ent in entities.items():
            label = ent.get("labels", {}).get("en", {}).get("value")
            if label:
                results[qid] = label
            else:
                results.setdefault(qid, qid)
        time.sleep(0.1)  # politeness
    return results

# Simple caches
LABEL_CACHE: dict[str, str] = {}
VALID_QIDS: set[str] = set()
INVALID_QIDS: set[str] = set()


def is_valid_language(qid):
    """Check if a QID represents a valid language, dialect, or language family."""
    if not qid:
        return False
    query = f"""
    SELECT ?class WHERE {{
      wd:{qid} (wdt:P31/wdt:P279*) ?class.
    }}
    """
    response = safe_get_json(WIKIDATA_SPARQL_ENDPOINT, params={'query': query, 'format': 'json'}) or {}
    results = response.get('results', {}).get('bindings', [])
    classes = [r['class']['value'].split('/')[-1] for r in results]
    valid_classes = [
        'Q34770',    # language
        'Q33742',    # natural language
        'Q20162172', # human language
        'Q33384',    # dialect
        'Q25209536', # variety of language
        'Q1288568',  # modern language
        'Q25295',    # language family
        'Q1072694',  # constructed language
        'Q17376908', # language isolate
        'Q11755682', # proto-language
        "Q45762"
    ]
    return any(c in valid_classes for c in classes)


def is_valid_language_cached(qid: str) -> bool:
    if qid in VALID_QIDS:
        return True
    if qid in INVALID_QIDS:
        return False
    ok = is_valid_language(qid)
    (VALID_QIDS if ok else INVALID_QIDS).add(qid)
    return ok


def get_label(qid: str) -> str:
    """Return English label for a QID (falls back to QID)."""
    if not qid:
        return qid
    if qid in LABEL_CACHE:
        return LABEL_CACHE[qid]
    labels = get_language_labels([qid])
    label = labels.get(qid, qid)
    LABEL_CACHE[qid] = label
    return label


def get_wikidata_entity_id(language_name):
    """Return the Wikidata Q-identifier for a language name."""
    try:
        # First try direct page lookup
        params = {
            "action": "query",
            "titles": f"{language_name} language",
            "prop": "pageprops",
            "ppprop": "wikibase_item",
            "format": "json",
        }
        data = safe_get_json(WIKIPEDIA_API, params=params) or {}
        pages = data.get("query", {}).get("pages", {})
        for page in pages.values():
            if "pageprops" in page and "wikibase_item" in page["pageprops"]:
                return page["pageprops"]["wikibase_item"]
        # Search fallback
        params = {
            "action": "query",
            "list": "search",
            "srsearch": f"{language_name} language",
            "srlimit": 1,
            "format": "json",
        }
        data = safe_get_json(WIKIPEDIA_API, params=params) or {}
        search = data.get("query", {}).get("search", [])
        if search:
            page_title = search[0]["title"]
            params = {
                "action": "query",
                "titles": page_title,
                "prop": "pageprops",
                "ppprop": "wikibase_item",
                "format": "json",
            }
            data = safe_get_json(WIKIPEDIA_API, params=params) or {}
            pages = data.get("query", {}).get("pages", {})
            for page in pages.values():
                if "pageprops" in page and "wikibase_item" in page["pageprops"]:
                    return page["pageprops"]["wikibase_item"]
    except Exception as e:
        print(f"Error getting QID for {language_name}: {e}")
    return None


def get_parents(entity_id):
    query = f"""
    SELECT ?parent ?parentLabel WHERE {{
      wd:{entity_id} wdt:P279 ?parent.
      SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
    }}
    """
    response = safe_get_json(WIKIDATA_SPARQL_ENDPOINT, params={'query': query, 'format': 'json'}) or {}
    results = response.get('results', {}).get('bindings', [])
    return [(r['parent']['value'].split('/')[-1], r['parentLabel']['value']) for r in results]


def get_children_by_p527(entity_id):
    query = f"""
    SELECT ?child ?childLabel WHERE {{
      wd:{entity_id} wdt:P527 ?child.
      SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
    }}
    """
    response = safe_get_json(WIKIDATA_SPARQL_ENDPOINT, params={'query': query, 'format': 'json'}) or {}
    results = response.get('results', {}).get('bindings', [])
    return [(r['child']['value'].split('/')[-1], r['childLabel']['value']) for r in results]


def get_children(entity_id):
    query = f"""
    SELECT ?child ?childLabel WHERE {{
      ?child wdt:P279 wd:{entity_id}.
      SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
    }}
    """
    response = safe_get_json(WIKIDATA_SPARQL_ENDPOINT, params={'query': query, 'format': 'json'}) or {}
    results = response.get('results', {}).get('bindings', [])
    return [(r['child']['value'].split('/')[-1], r['childLabel']['value']) for r in results]


def build_language_family_tree(entity_id, depth, current_depth=1, visited=None):
    if visited is None:
        visited = set()
    if current_depth > depth or entity_id in visited:
        return []
    if len(visited) >= MAX_NODES:
        print("Max node cap reached; stopping expansion.")
        return []

    visited.add(entity_id)
    relations: list[tuple[str, str, str]] = []
    current_label = get_label(entity_id)

    # Parents (superclasses). Filter invalid before using.
    parents = get_parents(entity_id)
    if entity_id == "Q1680":  # debug for English
        print("Parents of English:", parents)
    for parent_id, parent_label in parents:
        if is_valid_language_cached(parent_id):
            relations.append((current_label, "Child of", parent_label))
            relations.extend(build_language_family_tree(parent_id, depth, current_depth + 1, visited))

    # Children by P527 (parts/members)
    for child_id, child_label in get_children_by_p527(entity_id):
        if child_id != entity_id and is_valid_language_cached(child_id):
            relations.append((child_label, "Child of", current_label))
            relations.extend(build_language_family_tree(child_id, depth, current_depth + 1, visited))

    # Children by reverse P279 (subclasses)
    for child_id, child_label in get_children(entity_id):
        if child_id != entity_id and child_id not in visited and is_valid_language_cached(child_id):
            relations.append((child_label, "Child of", current_label))
            relations.extend(build_language_family_tree(child_id, depth, current_depth + 1, visited))

    return relations


def get_language_family(language_name, depth):
    entity_id = get_wikidata_entity_id(language_name)
    if not entity_id:
        raise ValueError(f"Language '{language_name}' not found in Wikidata.")
    # Root language is forced valid to ensure at least a starting point
    VALID_QIDS.add(entity_id)
    relations = build_language_family_tree(entity_id, depth)
    unique_relations = list({(r[0], r[1], r[2]) for r in relations})
    formatted_relations = [{"language1": rel[0], "relationship": rel[1], "language2": rel[2]} for rel in unique_relations]
    return formatted_relations

# Example usage
family_tree = get_language_family("English", 2)
print(f"Entries: {len(family_tree)}")
print(family_tree[:10])  # preview first 10

Entries: 214
[{'language1': 'English language in Ukraine', 'relationship': 'Child of', 'language2': 'languages of Ukraine'}, {'language1': 'received pronunciation', 'relationship': 'Child of', 'language2': 'Standard English'}, {'language1': 'Gustavia English', 'relationship': 'Child of', 'language2': 'Caribbean English'}, {'language1': 'Palauan English', 'relationship': 'Child of', 'language2': 'English'}, {'language1': 'Jewish English varieties', 'relationship': 'Child of', 'language2': 'Jewish languages'}, {'language1': 'Quebec English', 'relationship': 'Child of', 'language2': 'Canadian English'}, {'language1': 'Zambian English', 'relationship': 'Child of', 'language2': 'English'}, {'language1': 'Belizean English', 'relationship': 'Child of', 'language2': 'Caribbean English'}, {'language1': 'English in the Netherlands', 'relationship': 'Child of', 'language2': 'English'}, {'language1': 'Pacific Northwest English', 'relationship': 'Child of', 'language2': 'North American English'}]


In [None]:
# (Reserved cell) – helper / scratch space.
# Visualization cells will follow below.


In [3]:
# Build a NetworkX graph from the family_tree list of dicts (language1, relationship, language2)
try:
    import networkx as nx
except ImportError:
    raise SystemExit("Please install networkx: pip install networkx")

from collections import defaultdict, deque

if 'family_tree' not in globals():
    raise RuntimeError("family_tree not defined – run the retrieval cell first.")

def build_graph(relations):
    G = nx.DiGraph()
    for rel in relations:
        l1 = rel["language1"]
        l2 = rel["language2"]
        label = rel["relationship"]
        # Edge direction: parent -> child (reverse of 'Child of')
        if label == "Child of":
            parent, child = l2, l1
        else:
            parent, child = l1, l2
        G.add_node(parent)
        G.add_node(child)
        G.add_edge(parent, child, relationship=label)
    return G

family_graph = build_graph(family_tree)
print(f"Nodes: {family_graph.number_of_nodes()}, Edges: {family_graph.number_of_edges()}")

# Derive depths (heuristic BFS from nodes with no predecessors)
roots = [n for n in family_graph.nodes() if family_graph.in_degree(n) == 0]
node_depth = {n: 0 for n in roots}
for r in roots:
    q = deque([(r, 0)])
    while q:
        node, d = q.popleft()
        for child in family_graph.successors(node):
            if child not in node_depth or d + 1 < node_depth[child]:
                node_depth[child] = d + 1
                q.append((child, d + 1))
print("Depth levels computed (sample):", list(node_depth.items())[:10])

Nodes: 193, Edges: 214
Depth levels computed (sample): [('languages of Ukraine', 0), ('Jewish languages', 0), ('languages of Sweden', 0), ('languages of the Falkland Islands', 0), ('languages of Europe', 0), ('languages of Switzerland', 0), ('languages of Puerto Rico', 0), ('languages of Lebanon', 0), ('Anglo-Frisian', 0), ('languages of Spain', 0)]


In [4]:
# Interactive visualization with PyVis (HTML)
try:
    from pyvis.network import Network
except ImportError:
    raise SystemExit("Please install pyvis: pip install pyvis")

if 'family_graph' not in globals():
    raise RuntimeError("family_graph not defined – run the graph build cell first.")

palette = [
    '#1f77b4', '#2ca02c', '#ff7f0e', '#9467bd', '#8c564b',
    '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'
]

net = Network(height='750px', width='100%', directed=True, notebook=True, bgcolor='#ffffff', font_color='#222222')
net.toggle_physics(True)
net.set_options("""
var options = {
  physics: { stabilization: { iterations: 150 }, barnesHut: { gravitationalConstant: -4000 } },
  edges: { arrows: { to: { enabled: true } }, smooth: { type: 'dynamic' } }
}
""")

for node in family_graph.nodes():
    depth = globals().get('node_depth', {}).get(node, 0)
    color = palette[depth % len(palette)]
    net.add_node(node, label=node, color=color, level=depth)

for u, v, data in family_graph.edges(data=True):
    rel = data.get('relationship', '')
    net.add_edge(u, v, title=rel)

net.show('language_family_tree.html')
print("Generated language_family_tree.html (open in sidebar or a browser).")



JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)

In [6]:
# ASCII tree fallback (choose roots heuristically: nodes with no incoming edges)
from collections import defaultdict

if 'family_graph' not in globals():
    raise RuntimeError("family_graph not defined – run the graph build cell first.")

incoming = defaultdict(int)
for u, v in family_graph.edges():
    incoming[v] += 1

roots = [n for n in family_graph.nodes() if incoming[n] == 0] or list(family_graph.nodes())[:1]

PRINT_LIMIT = 400  # avoid huge console spam
printed = 0

def print_tree(node, prefix="", visited=None):
    global printed
    if printed >= PRINT_LIMIT:
        return
    if visited is None:
        visited = set()
    depth = globals().get('node_depth', {}).get(node, 0)
    print(f"{prefix}{node} [d={depth}]")
    printed += 1
    if node in visited:
        print(prefix + "  (cycle)")
        return
    visited.add(node)
    children = list(family_graph.successors(node))
    for i, child in enumerate(children):
        if printed >= PRINT_LIMIT:
            print("... output truncated ...")
            return
        is_last = i == len(children) - 1
        connector = "└─ " if is_last else "├─ "
        print_tree(child, prefix + connector, visited)

for r in roots:
    if printed >= PRINT_LIMIT:
        break
    print_tree(r)
if printed >= PRINT_LIMIT:
    print(f"Truncated after {PRINT_LIMIT} nodes.")

languages of Ukraine [d=0]
└─ English language in Ukraine [d=1]
Jewish languages [d=0]
└─ Jewish English varieties [d=1]
languages of Sweden [d=0]
└─ English language in Sweden [d=1]
languages of the Falkland Islands [d=0]
└─ Falkland Islands English [d=1]
languages of Europe [d=0]
└─ English language in Europe [d=1]
languages of Switzerland [d=0]
└─ English language in Switzerland [d=1]
languages of Puerto Rico [d=0]
└─ English language in Puerto Rico [d=1]
languages of Lebanon [d=0]
└─ English language in Lebanon [d=1]
Anglo-Frisian [d=0]
└─ Anglic [d=1]
└─ ├─ Yola [d=2]
└─ ├─ Middle English [d=2]
└─ ├─ ├─ Kentish Dialect [d=3]
└─ ├─ ├─ Southern Dialect [d=3]
└─ ├─ ├─ Central Middle English [d=3]
└─ ├─ ├─ Southern Middle English [d=3]
└─ ├─ ├─ East Midland Dialect [d=3]
└─ ├─ ├─ Northern Middle English [d=3]
└─ ├─ ├─ West Midland Dialect [d=3]
└─ ├─ ├─ Late Middle English [d=3]
└─ ├─ ├─ Northern Dialect [d=3]
└─ ├─ ├─ Midland Middle English [d=3]
└─ ├─ ├─ Q1877420 [d=3]
└─ ├─ └─ Earl

In [16]:
from SPARQLWrapper import SPARQLWrapper, JSON

class WikidataLanguageValidator:
    def __init__(self):
        self.sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
        self.valid_types = {
            "language": "wd:Q34770",          # language
            "language_family": "wd:Q25295", # language family
            "dialect": "wd:Q33384"  ,           # dialect
            "extinct_language":"wd:Q38058796",
            "dead_language":"wd:Q45762"
        }

    def validate_qid(self, qid):
        # SPARQL query to check instance of and subclasses for valid types
        query = f"""
        SELECT ?type ?typeLabel WHERE {{
          VALUES ?item {{ wd:{qid} }} 
          {{
            ?item wdt:P31 ?type.
          }} UNION {{
            ?item wdt:P279 ?type.
          }}
          VALUES ?validType {{ { ' '.join(self.valid_types.values())} }}
          FILTER(?type IN (?validType))
          SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
        }}
        LIMIT 1
        """
        
        self.sparql.setQuery(query)
        self.sparql.setReturnFormat(JSON)
        results = self.sparql.query().convert()

        if results["results"]["bindings"]:
            type_uri = results["results"]["bindings"][0]["type"]["value"]
            type_label = results["results"]["bindings"][0]["typeLabel"]["value"]

            for key, val in self.valid_types.items():
                if type_uri.endswith(val.split(":")[1]):
                    return True, key

        return False, None

# Example usage:
validator = WikidataLanguageValidator()
valid, classification = validator.validate_qid("Q56395")  # e.g., "English language" QID = Q1860
print(valid, classification)


True extinct_language
