In [39]:
import json
from tqdm.notebook import tqdm
import csv
from rdflib import *
import re
from datetime import datetime
from typing import List, Tuple, Union, Optional
import requests

In [2]:
# Open files
with open('ml_output.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

In [45]:
# Create the graph
g = ConjunctiveGraph()

kb = Namespace("http://example.org/data/")
g.bind("kb", kb)

sebi = Namespace("http://example.org/ontology/")
g.bind("sebi", sebi)

hico = Namespace("https://w3id.org/hico#")
g.bind("hico", hico)

dct = Namespace("http://purl.org/dc/terms/")
g.bind("dct", dct)

time = Namespace("https://www.w3.org/TR/owl-time/")
g.bind = ("time", time)

prov = Namespace("http://www.w3.org/ns/prov#")
g.bind = ("prov", prov)

wd = Namespace("http://www.wikidata.org/entity/")
g.bind = ("wd", wd)

factual_data = URIRef("http://example.org/factual_data")

In [6]:
# wikidata alignment via nl label
def wikidata_alignment_via_label(label):
    url = "https://www.wikidata.org/w/api.php"

    params = {
      "action": "wbsearchentities",
      "format": "json",
      "language": "en",
      "search": label
    }

    response = requests.get(url, params=params)
    data = response.json()

    if 'search' in data and data['search']:
        wikidata_id = data['search'][0]['id']
        return wikidata_id
    else:
          return None

In [7]:
# date semiautomatic checker
def parse_date(date_str: str) -> Optional[Tuple[int, int]]:
    try:
        year = int(date_str)
        return year, year
    except:
        # if months
        months = [
            "January",
            "February",
            "March",
            "April",
            "May",
            "June",
            "July",
            "August",
            "September",
            "October",
            "November",
            "December",
        ]
        if any(month.lower() in date_str.lower() for month in months):
            year_match = re.findall(r"(\d+)", date_str)
            if year_match:
                return int(year_match[-1]), int(year_match[-1])

        if "cent" in date_str:
            century_match = re.match(r"(\d+)(st|nd|rd|th) cent", date_str)
            if century_match:
                century = int(century_match.group(1))
                return (century - 1) * 100 + 1, (century) * 100

        if "early" in date_str:
            century_match = re.match(r"early (\d+)(st|nd|rd|th) century", date_str)
            if century_match:
                century = int(century_match.group(1))
                return (century - 1) * 100 + 1, (century - 1) * 100 + 33
            decade_match = re.match(r"early (\d{4})s", date_str)
            if decade_match:
                decade = int(decade_match.group(1))
                return decade, decade + 9

        elif "late" in date_str:
            century_match = re.match(r"late (\d+)(st|nd|rd|th) century", date_str)
            if century_match:
                century = int(century_match.group(1))
                return (century - 1) * 100 + 67, century * 100
            decade_match = re.match(r"late (\d{4})s", date_str)
            if decade_match:
                decade = int(decade_match.group(1))
                return decade + 7, decade + 9

        elif "around" in date_str or "about" in date_str:
            year_match = re.match(r"(around|about) (\d+)", date_str)
            if year_match:
                year = int(year_match.group(2))
                return year - 5, year + 5

        elif "between" in date_str:
            between_match = re.match(r"between (\d+) and (\d+)", date_str)
            if between_match:
                return int(between_match.group(1)), int(between_match.group(2))

        elif "before" in date_str:
            before_match = re.match(r"before (\d+)", date_str)
            if before_match:
                year = int(before_match.group(2))
                return year - 5, year

        elif "-" in date_str:
            year_range_match = re.match(r"(\d+)[-/](\d+)", date_str)
            if year_range_match:
                return int(year_range_match.group(1)), int(year_range_match.group(2))

        else:
            beginning_input = input("beginning of this timespan")
            end_input = input("end of this timespan")
            return int(beginning_input), int(end_input)

In [8]:
def string_to_id(string):
    # Convert to lowercase
    id_str = string.lower()

    # Remove special characters, replace spaces with dashes
    id_str = re.sub(r'[^a-z0-9\-]+', '-', id_str)

    # Remove leading and trailing dashes
    id_str = id_str.strip('-')

    return id_str

In [47]:
for doc in tqdm(data):
    
    # Document basic metadata
    doc_id = doc["Page ID"]
    source = doc["Source"]  
    page_url = doc["Page URL"]
    doc_description = doc["Document description"]
    g.add((URIRef(f"{kb}{doc_id}"),dct.description,Literal(doc_description.replace("\n", ""), datatype=XSD.string),factual_data,))

    # Claims metadata
    for author, author_data in doc["Entities"].items():
        opinion_info = author_data["opinion_info"]
        if opinion_info.get("is_expressing_opinion") == True:
            # Opinions
            claim_id = f"{doc_id}-{string_to_id(author)}"
            claim_uri = URIRef(f"{kb}{claim_id}")
            g.add((claim_uri,prov.wasQuotedFrom,Literal(page_url, datatype=XSD.anyURI),factual_data))
            opinion = opinion_info.get("opinion_evaluation", {})
            g.add(
                (
                    URIRef(f"{kb}{doc_id}"),
                    RDF.type,
                    URIRef(f"{sebi}{opinion.capitalize()}"),
                    claim_uri,
                )
            )
            g.add((claim_uri, RDF.type, hico.InterpretationAct, factual_data))

            opinion_document_author = opinion_info.get("opinion_document_author", {})
            opinion_document_date = opinion_info.get("opinion_document_date", {})
            opinion_document_location = opinion_info.get(
                "opinion_document_location", {}
            )

            # Opinion dates
            if opinion_document_date != "null":
                for single_date in opinion_document_date:
                    print(single_date)
                    date = parse_date(single_date)
                    if date is not None:
                        if date[0] == date[1]:
                            date_uri = URIRef(f"{kb}{date[0]}")
                            g.add(
                                (
                                    date_uri,
                                    RDFS.label,
                                    Literal(date[0], datatype=XSD.string),
                                    factual_data,
                                )
                            )
                            g.add(
                                (URIRef(f"{kb}{doc_id}"), dct.date, date_uri, claim_uri)
                            )
                        else:
                            date_uri = URIRef(f"{kb}{date[0]}-{date[1]}")
                            g.add(
                                (
                                    date_uri,
                                    RDFS.label,
                                    Literal(
                                        f"{date[0]}-{date[1]}", datatype=XSD.string
                                    ),
                                    factual_data,
                                )
                            )
                            g.add(
                                (URIRef(f"{kb}{doc_id}"), dct.date, date_uri, claim_uri)
                            )

                        g.add(
                            (
                                date_uri,
                                time.beginning,
                                Literal(date[0], datatype=XSD.gYear),
                                factual_data,
                            )
                        )
                        g.add(
                            (
                                date_uri,
                                time.end,
                                Literal(date[1], datatype=XSD.gYear),
                                factual_data,
                            )
                        )

            # Opinion authors
            for doc_author in opinion_document_author:
                g.add(
                    (
                        URIRef(f"{kb}{doc_id}"),
                        dct.creator,
                        URIRef(f"{kb}{string_to_id(doc_author)}"),
                        claim_uri,
                    )
                )
                g.add(
                    (
                        URIRef(f"{kb}{string_to_id(doc_author)}"),
                        RDFS.label,
                        Literal(doc_author, datatype=XSD.string),
                        factual_data,
                    )
                )

            # Opinion location
            for location in opinion_document_location:
                wd_location = wikidata_alignment_via_label(location)
                if wd_location != None:
                    g.add(( URIRef(f"{kb}{string_to_id(location)}"), OWL.sameAs, URIRef(f'{wd}{wd_location}'), factual_data))
                g.add((URIRef(f"{kb}{doc_id}"), dct.coverage, URIRef(f"{kb}{string_to_id(location)}"), claim_uri))
                g.add(( URIRef(f"{kb}{string_to_id(location)}"), RDFS.label, Literal(location, datatype=XSD.string), factual_data))

            # Evidences supporting the opinions
            evidence_set = opinion_info.get("opinion_evidence_provided", [])
            if evidence_set != None:
                num = 1
                for evidence in evidence_set:
                    evidence_uri = URIRef(f"{kb}{claim_id}-{num}")
                    g.add((evidence_uri, sebi.support, claim_uri, factual_data))
                    for evaluation, evaluation_score in evidence.items():
                        if evaluation != "feature":
                            g.add((evidence_uri, sebi.evaluate, URIRef(f"{sebi}{evaluation}"), factual_data))
                            g.add((evidence_uri,sebi.hasEvaluationScore,URIRef(f"{sebi}{evaluation_score}"),factual_data))
                        else:
                            feature_URI = URIRef(f"{sebi}{evidence['feature'].replace(' ', '_')}")
                            g.add((evidence_uri, sebi.assess, feature_URI, factual_data))
                        num += 1

            # Criterion
            criterion_set = opinion_info.get("opinion_specific_perspective", [])
            if criterion_set != None:
                for criterion in criterion_set:
                    g.add(
                        (
                            claim_uri,
                            hico.hasInterpretationCriterion,
                            URIRef(f"{sebi}{string_to_id(criterion)}"),
                            factual_data,
                        )
                    )

            # Claim author
            author_id = string_to_id(author)
            author_uri = URIRef(f"{kb}{author_id}")
            g.add(
                (
                    author_uri,
                    RDFS.label,
                    Literal(author, datatype=XSD.string),
                    factual_data,
                )
            )
            g.add((claim_uri, prov.wasAttributedTo, author_uri, factual_data))
            
            # Bibliography
            
            

  0%|          | 0/61 [00:00<?, ?it/s]

972
837
14th century
March 4, 840
14th century
14th century
13th century
14th century
8th century
after 1704
beginning of this timespan1704
end of this timespan1730
1985
1729
late 19th century
shortly before 1944
after 1935
beginning of this timespan1935
end of this timespan1950
after September 1902
1797
since about 1950
20th century
1782
around the year 1400
20th century
1955
1913
20th-century
1913
20th century
1913
1913
1879
between 1833 and 1842
1834-1835
late 14th century
18th century
18th century
early 1820s
late 4th century
330
reign of Julian the Apostate
beginning of this timespan361
end of this timespan363
closely in the region of AD 395
beginning of this timespan375
end of this timespan415
after the defeat of Magnentius
beginning of this timespan353
end of this timespan375
361-380s
Edo period
beginning of this timespan1603
end of this timespan1868
early eighth century CE
1170
1156
during the lifetime of Diarmaid MacMurchada
beginning of this timespan1110
end of this timespan1

In [48]:
i=1
with open('references.tsv', mode='r', newline='', encoding='utf-8') as file:
    tsv_reader = csv.DictReader(file, delimiter='\t')

    for reference in tqdm(tsv_reader):
        for doc in data:
            doc_id = doc["Page ID"]
            if doc["Page ID"] == reference['Page ID']:
                for author, author_data in doc["Entities"].items():
                    claim_id = f"{doc_id}-{string_to_id(author)}"
                    claim_uri = URIRef(f"{kb}{claim_id}")
                    opinion_info = author_data["opinion_info"]
                    if opinion_info.get("is_expressing_opinion") == True:
                        info_claims = author_data.get("claims", [])
                        for info_claim in info_claims:
                            refs = info_claim.get('bibliography', [])
                            if refs != []:
                                for ref in refs:
                                    if ref in reference['Reference number']:
                                        ref_uri = URIRef(f'{kb}ref-{i}')
                                        g.add((claim_uri, prov.wasDerivedFrom, ref_uri, factual_data))
                                        g.add((ref_uri, dct.description, Literal(reference['Reference'], datatype=XSD.string), factual_data))  
        i+=1

0it [00:00, ?it/s]

In [49]:
output_graph = g.serialize(destination='output_graph.trig', format='trig')