In [1]:
# import libraries
from typing import List, Dict, Any, Tuple
import re

In [2]:
# prefixes to understant that they are used to set up a prefix
statement_prefix = ("v:", "s:")
triple_statement_prefix = ("p:", "psv:")


In [3]:
test = """wd:Q43 wdt:P31 wd:Q5 .

wd:Q42 wdt:P31 wd:Q5 ;
  wdt:P569 "1952-03-11T00:00:00Z"^^xsd:dateTime ;
  p:P569 s:q42-D8404CDA-25E4-4334-AF13-A3290BCD9C0F ;
  a wikibase:Statement, wikibase:BestRank .

s:q42-D8404CDA-25E4-4334-AF13-A3290BCD9C0F a wikibase:Statement, wikibase:BestRank,  wikibase-2:BestRank-2k  ;
  wikibase:rank wikibase:NormalRank ;
  ps:P569 "1952-03-11T00:00:00Z"^^xsd:dateTime ;
  psv:P569 v:426df9023763f08b066f4478480f44cd, v:426df9023763f08b066f4478480f44cde;
  prov:wasDerivedFrom ref:355b56329b78db22be549dec34f2570ca61ca056,
                      ref:a02f3a77ddd343e6b88be25696b055f5131c3d64 .
v:426df9023763f08b066f4478480f44cd a wikibase:TimeValue ;
  wikibase:timeValue "1952-03-11T00:00:00Z"^^xsd:dateTime ;
  wikibase:timePrecision "11"^^xsd:integer, "12"^^xsd:integer ;
  wikibase:timeTimezone "0"^^xsd:integer, "2"^^xsd:integer ;
  wikibase:timeCalendarModel <http://www.wikidata.org/entity/Q1985727> .
  
v:426df9023763f08b066f4478480f44cde a wikibase:TimeValue ;
  wikibase:timeValue "1952-03-11T00:00:00Z"^^xsd:dateTime ;
  wikibase:timePrecision "11"^^xsd:integer, "12"^^xsd:integer ;
  wikibase:timeTimezone "0"^^xsd:integer, "2"^^xsd:integer ;
  wikibase:timeCalendarModel <http://www.wikidata.org/entity/Q1985727> .
  """

In [4]:
#find size in mb of the string
print("Size of the string in MB: ", len(test.encode('utf-8')) / 1024 / 1024)

Size of the string in MB:  0.0011949539184570312


In [5]:
def preprocess_ttl(ttl_text: str) -> str:
    """
    Preprocess the Turtle format text by removing line breaks and excess spaces.
    
    Args:
    ttl_text (str): The input Turtle format text.
    
    Returns:
    str: Preprocessed text as a single line.
    """
    # Remove line breaks and excess spaces, but preserve important whitespace
    preprocessed = re.sub(r'\s+', ' ', ttl_text)
    # Ensure space after semicolons and commas for readability
    preprocessed = re.sub(r'([;,])(?!\s)', r'\1 ', preprocessed)
    # Remove space before period at the end of statements
    preprocessed = re.sub(r'\s+\.', ' .', preprocessed)

    # remove the last dot " ." from the string
    if preprocessed[-1] == ".":
        preprocessed = preprocessed[:-1]
    
    return preprocessed.strip()

In [None]:
preprocessed_ttl = preprocess_ttl(test)

In [6]:
def split_by_sections(preprocessed_text: str) -> Dict: 
    """
    Split the preprocessed text into sections based on periods.
    
    Args:
    preprocessed_text (str): The preprocessed Turtle format text.
    
    Returns:
    list: A list of strings, each representing a section.
    """
    # Split by period, but not if it's part of a data type or URL
    sections = re.split(r'\.\s+(?=[^\s])', preprocessed_text)
    result = dict()
    
    # remove the last dot " ." from the string
    if sections[-1][-1] == ".":
        sections[-1] = sections[-1][:-1]
    
    for section in sections:
        stripped_section = section.strip() # remove leading and trailing whitespaces
        #split section by ; and remove leading and trailing whitespaces
        statements = [statement.strip() for statement in stripped_section.split(";")]
        
        # remove the first word in the list (till the first space)
        subject = statements[0].split(" ")[0]
        # pop subject from the string, add " " to remove the leading space
        statements[0] = statements[0].replace(subject + " ", "")
        #split each statement by space        
        statement_parts = [statement.split(" ") for statement in statements]
        
        result[subject] = statement_parts
    return result

In [7]:
splitted_by_sections = split_by_sections(preprocessed_ttl)
splitted_by_sections

NameError: name 'preprocessed_ttl' is not defined

In [None]:
import time

def timer_decorator(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Execution time: {end_time - start_time} seconds")
        return result
    return wrapper

In [None]:
def recursive_conversion(sections, predicate_chain, index_chain, object, subject):
    if object not in sections:
        return
    
    for triple in sections[object]:
        new_predicate_chain = predicate_chain.copy()
        new_index_chain = index_chain.copy()
        new_predicate_chain.append(triple[0])
        
        if triple[0].startswith(triple_statement_prefix):
            for i, obj in enumerate(triple[1:], start=1):
                if obj.endswith(','):
                    obj = obj[:-1]  # Remove trailing comma
                new_index = new_index_chain + [str(i)]
                recursive_conversion(sections, new_predicate_chain, new_index, obj, subject)
        else:
            predicate = "|".join(new_predicate_chain)
            for i, obj in enumerate(triple[1:], start=1):
                if obj.endswith(','):
                    obj = obj[:-1]  # Remove trailing comma
                new_index = new_index_chain + [str(i)]
                index = ",".join(new_index)
                answer.append(f"{subject} <{predicate}>[{index}] {obj}")


In [None]:
@timer_decorator
def convert_to_new_format(sections: Dict[str, List[List[str]]]) -> str:
    global answer
    answer = []
    
    for subject, triples in sections.items():
        if subject.startswith(statement_prefix):
            continue
        
        for triple in triples:
            predicate_chain = [triple[0]]
            index_chain = []
            
            if triple[0].startswith(triple_statement_prefix):
                for i, obj in enumerate(triple[1:], start=1):
                    if obj.endswith(','):
                        obj = obj[:-1]  # Remove trailing comma
                    recursive_conversion(sections, predicate_chain, [str(i)], obj, subject)
            else:
                predicate = triple[0]
                for i, obj in enumerate(triple[1:], start=1):
                    if obj.endswith(','):
                        obj = obj[:-1]  # Remove trailing comma
                    answer.append(f"{subject} <{predicate}>[{i}] {obj}")
    
    return "\n".join(answer)

In [None]:
print(convert_to_new_format(sections=splitted_by_sections))
new_format = convert_to_new_format(sections=splitted_by_sections)


In [None]:
def full_function(ttl_input):
    """
    Convert Turtle format to the new format.
    
    Args:
    ttl_input (str): Input text in Turtle format.
    
    Returns:
    str: Converted text in the new format.
    """
    preprocessed = preprocess_ttl(ttl_input)
    sections = split_by_sections(preprocessed)
    processed = convert_to_new_format(sections)
    return processed

In [None]:
print(full_function(test))