In [79]:
import xml.etree.ElementTree as ET
import xmltodict
import requests
import uuid
import sys
import re
import os

from rdflib import Graph
from uuid import UUID

from urllib.parse import urlparse
import boto3
import botocore.session
from botocore.config import Config
from botocore.session import get_session
from graph_notebook.neptune.client import ClientBuilder
from graph_notebook.configuration.generate_config import AuthModeEnum

TYPE = "rdf:type"
PDS4_TYPE = "pds4:"

In [80]:
# Get class id
def getClassID(class_label, parent_label, endpoint_url=None, access_key=None, secret_key=None):
    # Use access key and secret key to access Neptune DB
    try:
        session = get_session()
        builder = session.create_client(
            's3', region_name="us-west-2", endpoint_url=endpoint_url,
            aws_access_key_id=access_key, aws_secret_access_key=secret_key)
        client = builder.build()
    except: 
        builder = ClientBuilder() \
            .with_host("pds-en-registry-graph.cluster-cibj64l3k87g.us-west-2.neptune.amazonaws.com") \
            .with_port("8182") \
            .with_region("us-west-2") \
            .with_tls("true") \
            .with_sparql_path("sparql")

        builder = builder.with_iam(get_session())
        client = builder.build()
    
    query = f"SELECT DISTINCT ?s where {{?s rdfs:label ?label. FILTER contains(?label, \"{class_label}\"). ?s rdfs:domain ?domain. FILTER contains(?domain, \"{parent_label}\").}}"    
    result = client.sparql(query).json()
    
    # Check if query returns nothing
    if len(result["results"]["bindings"]) == 0:
        query = f"SELECT DISTINCT ?s where {{?s rdfs:label ?label. FILTER contains(?label, \"{class_label}\"). ?s rdf:type rdfs:Class.}}"
        result = client.sparql(query).json()
        value = result["results"]["bindings"][1]["s"]["value"]
        value = value.split('/')[-1] # Parse for Class id
        return(value)

In [81]:
def createUniqueID():
    return uuid.uuid4()

In [82]:
# Get Class and Parent ID
def get_im_object_id(child):
    child_list = []
    for kid in child.getchildren():
        if getText(str(kid)).istitle():
            child_list.append(getText(str(kid)) + ':' + getText(str(child)))
    return child_list

In [83]:
# Get Product Bundel to work
def getText(tag):
    text = re.search(r'}(\w+)', tag)
    return text.group(1)

In [84]:
def print_parent(class_name, class_uuid, ids, rdf_file):
    # Get class and parent id
    result = getString(getText(class_name), ids)
    class_pds_id = ""
    
    try:
        class_id, parent_id = result[0].split(':')
        class_pds_id = getClassID(getText(class_name), parent_id)
    except:
        class_pds_id = getClassID(getText(class_name), "None")
        
    rdf_file.write(str(class_uuid) + ' ' + TYPE + ' ' + class_pds_id + "\n")

In [85]:
def escapeReservedChars(data):
    operators = ['~', '.', '-', '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=', '/', '?', '#', '@', '%', '_']
    # Add escape characters to data
    data2 = ""
    for i in data:
        if i in operators:
            data2 += '\\' + i
        else:
            data2 += i
    return data2

In [86]:
# Return string that contains the given substring
def getString(substring, ids):
    # Check if ids is empty
    if ids:
        result = [x for x in ids if re.search(substring, x)]
        return result

In [87]:
def print_children(class_name, class_uuid, children, rdf_file, ids=[]):
    print_parent(class_name, class_uuid, ids, rdf_file)
    for key, value in children.items():
        for data in value:
            # Format output
            text = getText(key)
            text = text.replace('\t', '').replace('\n', '')

            # Escape all data
            data = escapeReservedChars(data)
            
            # TODO: Only add strings around name/label and comment/description
            rdf_file.write("\t" + PDS4_TYPE + text + ' ' + "\"" + data + "\"" + "\n")

In [88]:
# Initialize dictionary
def initializeDict(children_dict, childTag):
    if childTag not in children_dict:
        children_dict[childTag] = []

In [89]:
# Get Classes and Attributes (Children)
def get_children(parent, rdf_file):
    children_dict = {}
    for child in parent:
        # This is a class
        if child.getchildren():
            uniqueId = createUniqueID() # Get UUID for class
            ids = get_im_object_id(child) # Get parent class id 
            
            # Remove duplicates from list
            ids = list(set(ids))
            
            my_children = get_children(child.getchildren(), rdf_file)
                        
            # Check for child.tag not in children_dict
            initializeDict(children_dict, child.tag)
            
            # Append class
            children_dict[child.tag].append(str(uniqueId))
            print_children(child.tag, uniqueId, my_children, rdf_file, ids)
        else:
            # Check for child.tag not in children_dict
            initializeDict(children_dict, child.tag)
                
            # Append attributes 
            children_dict[child.tag].append(child.text)
                
    return children_dict

In [90]:
xml_url = input("Enter XML URL: ")
xml_url_name = xml_url.split('/')[-1]
xml_url_name = xml_url_name.rsplit('.', 1)[0]
tree = ET.fromstring(requests.get(xml_url).text)

Enter XML URL: https://pds-imaging.jpl.nasa.gov/data/nsyt/insight_cameras/bundle_9.0.xml


In [91]:
# Output in rdf form
rdf_file = open((xml_url_name + ".ttl"), "a")
my_children = get_children(tree, rdf_file)
print_children(tree.tag, createUniqueID(), my_children, rdf_file)
rdf_file.close()

In [35]:
import unittest

In [64]:
class TestNotebook(unittest.TestCase):
    
    def test_initialize_dict(self):
        temp_dict = {}
        temp_child_tag = "Child"
        initializeDict(temp_dict, temp_child_tag)
        self.assertEqual(temp_dict.get('Child'), [])
        
    def test_get_string(self):
        temp_ids = ["Child:Parent", "Team:Person"]
        temp_substring = "Child"
        result = getString(temp_substring, temp_ids)
        self.assertEqual(result, ["Child:Parent"])
        
    def test_escape_reserved_chars(self):
        temp_string = "~team,me$"
        correct_string = "\~team\,me\$"
        result = escapeReservedChars(temp_string)
        self.assertEqual(result, correct_string)
        
    def test_get_text(self):
        temp_string = "<Element '{http://pds.nasa.gov/pds4/pds/v1}author_list' at 0x7f20b7ecc2c8>"
        correct_string = "author_list"
        result = getText(temp_string)
        self.assertEqual(result, correct_string)
        
    def test_get_Class_ID(self):
        temp_class_label = "Science_Facets"
        temp_parent_label = "Primary_Result_Summary"
        correct_string = "urn:nasa:pds:0001_nasa_pds_1:pds:product_components:pds:science_facets::1.0"
        result = getClassID(temp_class_label, temp_parent_label)
        self.assertEqual(result, correct_string)

In [65]:
unittest.main(argv=[''], verbosity=2, exit=False)

test_escape_reserved_chars (__main__.TestNotebook) ... ok
ok
test_get_string (__main__.TestNotebook) ... ok
test_get_text (__main__.TestNotebook) ... ok
test_initialize_dict (__main__.TestNotebook) ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.297s

OK


<unittest.main.TestProgram at 0x7f20b68eb828>