In [None]:
!pip install rdflib pyparse

In [None]:
# imports
from rdflib.plugins.sparql.parser import parseQuery
import mwapi
from mwapi.errors import APIError
import mwparserfromhell as parser
import re
import pandas as pd
import logging

# constants
templates = [
    "Wikidata list",
    "SPARQL",
    "SPARQL2",
    "SPARQL5",
    "SPARQL Inline",
    "Wdquery",
    "Complex constraint"
]

template_regex_string = "|".join([f"{{{{\s*[{t[0].lower()}|{t[0].upper()}]{t[1:]}\s*\|" for t in templates])

wikis = set()

with open('wikis.txt', 'r') as f:
    for line in f:
        wikis.add(f'https://{line[:-1]}')

big_wikis = [
    'https://en.wikipedia.org',
    'https://fr.wikipedia.org',
    'https://de.wikipedia.org',
    'https://ja.wikipedia.org',
    'https://ru.wikipedia.org',
    'https://pt.wikipedia.org',
    'https://it.wikipedia.org',
    'https://zh.wikipedia.org',
    'https://fa.wikipedia.org',
    'https://ar.wikipedia.org',
    'https://commons.wikimedia.org',
    'https://wikidata.org',
    'https://mediawiki.org'
]

wikis.update(big_wikis)

logger = logging.getLogger(__name__)
logging.basicConfig(filename='example.log', encoding='utf-8', level=logging.DEBUG)

# helper functions
def is_sparql_query_valid(query):
    try:
        # Attempt to prepare a SPARQL query. This will parse the query.
        parseQuery(query)
        return True  # If parsing succeeds, the query is valid.
    except:
        return False  # If parsing fails, the query is invalid.
    
def get_transcluded_pages(session, template):
    continued = session.get(
        formatversion=2,
        action='query',
        prop='transcludedin',
        titles=f"Template:{template}",
        continuation=True
    )

    pages = []
    try:
        for portion in continued:
            if 'query' in portion:
                for page in portion['query']['pages']:
                    try:
                        for transcluded in page['transcludedin']:
                            pages.append(transcluded["title"])
                    except:
                        pass
            else:
                logger.error("MediaWiki returned empty result batch.")
    except APIError as error:
        raise ValueError(
            "MediaWiki returned an error:", str(error)
        )
    
    return pages

def extract_sparql(session, p, t):
    resp = session.get(
        formatversion=2,
        action='query',
        prop='revisions',
        rvslots='*',
        rvprop='content',
        titles=p
    )

    content = resp['query']['pages'][0]['revisions'][0]['slots']['main']['content']
    wikitext = parser.parse(content)
    templates = wikitext.filter_templates()
    templates = list(filter(lambda template: t in template, templates))
    if t == "Wikidata list":
        templates = list(filter(lambda template: template != "{{Wikidata list end}}", templates))
    
    out = []
    for template in templates:
        out.append(template.split("|")[1].split("=")[1])
        
    return out

def check_templates(template):
    for t in templates:
        if t in template:
            return True
    return False

def split_string_and_extract_preceding(s, delimiter):
    parts = s.split(delimiter)  # Split the string by the delimiter.
    preceding_texts = []  # Initialize a list to hold the preceding text segments.
    
    search_pos = 0  # Start position for each search iteration.
    for part in parts[:-1]:  # Ignore the last part since no split occurs after it.
        # Calculate the start position of the current part in the original string.
        current_part_start = s.find(part, search_pos)
        # Calculate the end position of the current part, which is the split point.
        split_point = current_part_start + len(part)
        
        # Determine the start position for extracting preceding characters.
        # It's the greater of 0 and split_point - 300 to avoid negative indices.
        extract_start = max(0, split_point - 300)
        
        # Extract up to 250 characters preceding the split point.
        preceding_text = s[extract_start:split_point]
        preceding_texts.append(preceding_text)
        
        # Update the search position for the next iteration.
        search_pos = split_point + len(delimiter)
    
    return preceding_texts[0]

def get_sparql_and_surrounding(title):
    out = []
    resp = session.get(
        formatversion=2,
        action='query',
        prop='revisions',
        rvslots='*',
        rvprop='content',
        titles=title
    )
    content = resp['query']['pages'][0]['revisions'][0]['slots']['main']['content']
    wikitext = parser.parse(content)
    wikitext_templates = list(filter(check_templates, wikitext.filter_templates()))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list end}}", wikitext_templates))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list header}}", wikitext_templates))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list menu}}", wikitext_templates))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list documentation}}", wikitext_templates))
    if '{{query page' in wikitext:
        lede = wikitext[:250]
        query = re.split("query\s*=\s*", str(wikitext))[1].split("|")[0]
        text = None
        results = None
        if not is_sparql_query_valid(query):
            logger.info(f'invalid query: {query}')

        out.append({"title": title, "lede": lede, 'preceding_text': text, 'query': query, 'results': results})
    
    elif len(wikitext_templates) > 0:
        for wt in wikitext_templates:
            lede = wikitext[:250]
            text = split_string_and_extract_preceding(wikitext, str(wt))
            results = None
            if "wdquery" in wt.lower():
                query = re.split("query\s*=\s*", str(wt))[1].split("|")[0]
            elif "complex constraint" in wt.lower():
                lede = re.split("label\s*=\s*", str(wt))[1].split("|")[0]
                text = re.split("description\s*=\s*", str(wt))[1].split("|")[0]
                query = re.split("sparql\s*=\s*", str(wt))[1].split("|")[0]
            elif "wikidata list" in wt.lower():
                ts = wikitext.find(str(wt))
                te = wikitext.lower().find("{{wikidata list end}}")
                truncated = wikitext[ts:te]
                results = truncated[truncated.find("{|"):truncated.find("|}")]
                query = re.split("sparql\s*=\s*", str(wt), maxsplit=1)[1].split("|")[0]
                if not is_sparql_query_valid(query):
                    query = re.split("sparql\s*=\s*", str(wt), maxsplit=1)[1].split("\n|")[0]
            elif "doc example" in wt.lower():
                query = re.split("content=\s*<pre>\s*{{SPARQL\s*|\s*query=", str(wt))[1]
            elif "sparql label" in wt.lower():
                continue
            else:
                query = wt.split("|")[1].split("=", 1)[1]
                if not is_sparql_query_valid(query):
                    query = re.split("query\s*=\s*", str(wt), maxsplit=1)[1]
                
            
            if query.endswith("\n}}"):
                query = query[:-3]
            if query.endswith("}}"):
                query = query[:-2]
            query = query.replace("{{!}}", "|")
            if not is_sparql_query_valid(query):
                logger.info(f'invalid query: {query}')

            out.append({"title": title, "lede": lede, 'preceding_text': text, 'query': query, 'results': results})
        return out
    return None

# main function
def main():
    df = pd.DataFrame(columns=['project', 'title', 'lede', 'preceding_text', 'query', 'results'])
    for w in wikis:
        fail_ctr = 0
        logger.info(w)
        session = mwapi.Session(w, user_agent="htriedman sparql corpus bot")
        all_pages = set()
        for t in templates:
            pages = get_transcluded_pages(session, t)
            logger.info(f'template {t} occurs {len(pages)} times on {w}')
            all_pages.update(pages)
        logger.info(f'there are a total of {len(all_pages)} sparql-related pages on {w}')  
        for i, p in enumerate(all_pages):
            if i % 500 == 0:
                logger.info(f'templates seen: {i}')
            try:
                out = get_sparql_and_surrounding(p)
                if out is None:
                    continue
                for i in out:
                    out[i]['project'] = w
                df = pd.concat([df, pd.DataFrame.from_dict(out)])
            except:
                fail_ctr += 1
                if fail_ctr % 50 == 0 and fail_ctr != 0:
                    logger.info(f'failures: {fail_ctr}')
                continue

    df['validity'] = df['query'].map(is_sparql_query_valid)
    df.to_pickle('wikidata-sparql-templates-bug-fixes.pkl')
    
if __name__ == "__main__":
    main()

In [None]:
from rdflib.plugins.sparql.parser import parseQuery
import mwapi
from mwapi.errors import APIError
import mwparserfromhell as parser
import re
import pandas as pd

## Check on functionality of queries

In [None]:
def is_sparql_query_valid(query):
    try:
        # Attempt to prepare a SPARQL query. This will parse the query.
        parseQuery(query)
        return True  # If parsing succeeds, the query is valid.
    except:
        return False  # If parsing fails, the query is invalid.

In [None]:
df = pd.read_pickle('wikidata-sparql-templates.pkl')

In [None]:
df = df[:2000].reset_index(drop=True)

In [None]:
df['validity'] = df['query'].map(is_sparql_query_valid)

In [None]:
def get_transcluded_pages(session, template):
    continued = session.get(
        formatversion=2,
        action='query',
        prop='transcludedin',
        titles=f"Template:{template}",
        continuation=True
    )

    pages = []
    try:
        for portion in continued:
            if 'query' in portion:
                for page in portion['query']['pages']:
                    try:
                        for transcluded in page['transcludedin']:
                            pages.append(transcluded["title"])
                    except:
                        pass
            else:
                print("MediaWiki returned empty result batch.")
    except APIError as error:
        raise ValueError(
            "MediaWiki returned an error:", str(error)
        )
    
    return pages

In [None]:
def extract_sparql(session, p, t):
    resp = session.get(
        formatversion=2,
        action='query',
        prop='revisions',
        rvslots='*',
        rvprop='content',
        titles=p
    )

    content = resp['query']['pages'][0]['revisions'][0]['slots']['main']['content']
    wikitext = parser.parse(content)
    templates = wikitext.filter_templates()
    templates = list(filter(lambda template: t in template, templates))
    if t == "Wikidata list":
        templates = list(filter(lambda template: template != "{{Wikidata list end}}", templates))
    
    out = []
    for template in templates:
        out.append(template.split("|")[1].split("=")[1])
        
    return out

In [None]:
def check_templates(template):
    for t in templates:
        if t in template:
            return True
    return False

def split_string_and_extract_preceding(s, delimiter):
    parts = s.split(delimiter)  # Split the string by the delimiter.
    preceding_texts = []  # Initialize a list to hold the preceding text segments.
    
    search_pos = 0  # Start position for each search iteration.
    for part in parts[:-1]:  # Ignore the last part since no split occurs after it.
        # Calculate the start position of the current part in the original string.
        current_part_start = s.find(part, search_pos)
        # Calculate the end position of the current part, which is the split point.
        split_point = current_part_start + len(part)
        
        # Determine the start position for extracting preceding characters.
        # It's the greater of 0 and split_point - 300 to avoid negative indices.
        extract_start = max(0, split_point - 300)
        
        # Extract up to 250 characters preceding the split point.
        preceding_text = s[extract_start:split_point]
        preceding_texts.append(preceding_text)
        
        # Update the search position for the next iteration.
        search_pos = split_point + len(delimiter)
    
    return preceding_texts[0]

In [None]:
def get_sparql_and_surrounding(title):
    out = []
    resp = session.get(
        formatversion=2,
        action='query',
        prop='revisions',
        rvslots='*',
        rvprop='content',
        titles=title
    )
    content = resp['query']['pages'][0]['revisions'][0]['slots']['main']['content']
    wikitext = parser.parse(content)
    wikitext_templates = list(filter(check_templates, wikitext.filter_templates()))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list end}}", wikitext_templates))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list header}}", wikitext_templates))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list menu}}", wikitext_templates))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list documentation}}", wikitext_templates))
    if '{{query page' in wikitext:
        print("query page")
        lede = wikitext[:250]
        query = re.split("query\s*=\s*", str(wikitext))[1].split("|")[0]
        text = None
        results = None

        out.append({"title": title, "lede": lede, 'preceding_text': text, 'query': query, 'results': results})
    
    elif len(wikitext_templates) > 0:
        for wt in wikitext_templates:
            lede = wikitext[:250]
            text = split_string_and_extract_preceding(wikitext, str(wt))
            results = None
            if "wdquery" in wt.lower():
#                 print("wdquery")
                query = re.split(r"query\s*=\s*", str(wt))[1].split("|")[0]
            elif "complex constraint" in wt.lower():
#                 print("complex constraint")
                lede = re.split(r"label\s*=\s*", str(wt))[1].split("|")[0]
                text = re.split(r"description\s*=\s*", str(wt))[1].split("|")[0]
                query = re.split(r"sparql\s*=\s*", str(wt))[1].split("|")[0]
            elif "wikidata list" in wt.lower():
#                 print("wikidata list")
                ts = wikitext.find(str(wt))
                te = wikitext.lower().find("{{wikidata list end}}")
                truncated = wikitext[ts:te]
                results = truncated[truncated.find("{|"):truncated.find("|}")]
#                 print(re.split("sparql\s*=\s*", str(wt)))
                valid = False
                i = 0
                possible_splits = [r"\|section", r"\|", r"\s+\|"]
                while not valid:
                    print(i)
                    print(possible_splits[i])
                    print(re.split(possible_splits[i], re.split(r"sparql\s*=\s*", str(wt), maxsplit=1)[1]))
                    query = re.split(possible_splits[i], re.split(r"sparql\s*=\s*", str(wt), maxsplit=1)[1])[0]#.split("|")[0]
                    valid = is_sparql_query_valid(query)
                    i += 1
                    if i >= len(possible_splits):
                        break
#                 query = re.split(r"sparql\s*=\s*", str(wt), maxsplit=1)[1].split("|")[0]
#                 if not is_sparql_query_valid(query):
#                     query = re.split("sparql\s*=\s*", str(wt), maxsplit=1)[1].split("|section")[0]
#                     if not is_sparql_query_valid(query):
#                         query = re.split("sparql\s*=\s*", str(wt), maxsplit=1)[1].split("\s+|")[0]
            elif "doc example" in wt.lower():
                query = re.split(r"content=\s*<pre>\s*{{SPARQL\s*|\s*query=", str(wt))[1]
            elif "sparql label" in wt.lower():
                continue
            else:
#                 print("other (SPARQL, SPARQL2, SPARQL5, SPARQL Inline)")
#                 print(f'{wt.split("|")}')
                query = wt.split("|")[1].split("=", 1)[1]
                if not is_sparql_query_valid(query):
                    query = re.split(r"query\s*=\s*", str(wt), maxsplit=1)[1]
                
            
            if query.endswith("\n}}"):
                query = query[:-3]
            if query.endswith("}}"):
                query = query[:-2]
            query = query.replace("{{!}}", "|")
            if not is_sparql_query_valid(query):
                print(f'invalid query: {query}')

            out.append({"title": title, "lede": lede, 'preceding_text': text, 'query': query, 'results': results})
        return out
    return None

In [None]:
templates = [
    "Wikidata list",
    "SPARQL",
    "SPARQL2",
    "SPARQL5",
    "SPARQL Inline",
    "Wdquery",
    "Complex constraint"
]

template_regex_string = "|".join([f"{{{{\s*[{t[0].lower()}|{t[0].upper()}]{t[1:]}\s*\|" for t in templates])

wikis = set()

with open('wikis.txt', 'r') as f:
    for line in f:
        wikis.add(f'https://{line[:-1]}')

big_wikis = [
    'https://en.wikipedia.org',
    'https://fr.wikipedia.org',
    'https://de.wikipedia.org',
    'https://ja.wikipedia.org',
    'https://ru.wikipedia.org',
    'https://pt.wikipedia.org',
    'https://it.wikipedia.org',
    'https://zh.wikipedia.org',
    'https://fa.wikipedia.org',
    'https://ar.wikipedia.org',
    'https://commons.wikimedia.org',
    'https://wikidata.org',
    'https://mediawiki.org'
]

wikis.update(big_wikis)

In [None]:
df = pd.DataFrame(columns=['project', 'title', 'lede', 'preceding_text', 'query', 'results'])

In [None]:
wikis = ['https://ts.wikipedia.org']

In [None]:
for w in wikis:
    fail_ctr = 0
    print(w)
    session = mwapi.Session(w, user_agent="htriedman sparql corpus bot")
    all_pages = set()
    for t in templates:
        pages = get_transcluded_pages(session, t)
        print(f'template {t} occurs {len(pages)} times on {w}')
        all_pages.update(pages)
    print(f'there are a total of {len(all_pages)} sparql-related pages on {w}')  
    for i, p in enumerate(all_pages):
        if i % 500 == 0:
            print(f'templates seen: {i}')
#         out = get_sparql_and_surrounding(p)
#         if out is None:
#             continue
#         for o in out:
#             o['project'] = w
#         df = pd.concat([df, pd.DataFrame.from_dict(out)])
        try:
            out = get_sparql_and_surrounding(p)
            if out is None:
                continue
            for i in out:
                out[i]['project'] = w
            df = pd.concat([df, pd.DataFrame.from_dict(out)])
        except:
#             print(f'failure: {out}')
            fail_ctr += 1
            if fail_ctr % 50 == 0 and fail_ctr != 0:
                print(f'failures: {fail_ctr}')
            continue

In [None]:
print('SELECT\n  ?number_of_authors\n  ?number_of_works\n  ?item\n  (REPLACE(STR(?item), ".*Q", "Q") AS ?qid)\n  (CONCAT( "[[toolforge:scholia/organization/", ?qid , "|", ENCODE_FOR_URI(?number_of_works), "]] / ", "[[toolforge:scholia/organization/", ?qid , "/missing|📖]]") AS ?scholia)\n\nWITH {  \n  SELECT\n    ?item\n    (COUNT(DISTINCT ?author) AS ?number_of_authors)\n    (COUNT(DISTINCT ?work) AS ?number_of_works)\n  WHERE {\n    ?item wdt:P17 wd:Q1006 .\n    ?author wdt:P108 {{!}} wdt:P463 {{!}} wdt:P1416/wdt:P361* ?item .\n    ?work wdt:P50 ?author .\n  }\n  GROUP BY ?item \n} AS %results\nWHERE {\n  INCLUDE %results\n  SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en,da,es,fr,jp,nl,no,ru,sv,zh". }\n}\nORDER BY DESC(?number_of_authors) DESC(?number_of_works) ?item ?qid ?scholia\n\n|columns=label:Organization,?number_of_authors:Number of authors,?scholia:Publications (known/missing)\n}}')

In [1]:
!pip3 install pickle5

Collecting pickle5
  Downloading pickle5-0.0.12-cp37-cp37m-macosx_10_9_x86_64.whl.metadata (2.0 kB)
Downloading pickle5-0.0.12-cp37-cp37m-macosx_10_9_x86_64.whl (124 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.0/125.0 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hInstalling collected packages: pickle5
Successfully installed pickle5-0.0.12


In [None]:
d

In [None]:
is_sparql_query_valid(s)

In [None]:
s.split('\n\n|')[0]

In [None]:
out[0]

In [None]:
df['validity'] = df['query'].map(is_sparql_query_valid)

In [None]:
df

In [None]:
# failure-prone wikis: commons, cswiki, cawiki, nowiki

In [None]:
df

In [None]:
df.to_pickle('wikidata-sparql-templates-bug-fixes.pkl')