In [1]:
import pandas as pd
import re
import json
from rdflib import Graph, URIRef, RDFS, RDF, OWL, Literal, BNode

def get_local_name(uri):
    if '#' in uri:
        return uri.split('#')[-1]
    else:
        return uri.rsplit('/', 1)[-1]

def is_valid_label(label):
    if len(label) >= 32 and re.match(r'^[A-Za-z0-9]+$', label):
        return False
    return True

def literal_to_string(literal):
    if isinstance(literal, Literal):
        return str(literal.value)
    else:
        return str(literal)

def process_anonymous_class(bnode, graph):
    # This function processes an anonymous class and returns a description of it
    anon_class_info = {}
    for p, o in graph.predicate_objects(bnode):
        if p == OWL.onProperty:
            anon_class_info['onProperty'] = str(o)
        elif p == OWL.someValuesFrom:
            anon_class_info['someValuesFrom'] = str(o)
        elif p == OWL.allValuesFrom:
            anon_class_info['allValuesFrom'] = str(o)
        elif p == OWL.onClass:
            anon_class_info['onClass'] = str(o)
        elif p == OWL.qualifiedCardinality:
            anon_class_info['qualifiedCardinality'] = str(o)
        elif p == OWL.intersectionOf:
            # Handle intersectionOf
            anon_class_info['intersectionOf'] = []
            for item in graph.items(o):
                if isinstance(item, URIRef):
                    anon_class_info['intersectionOf'].append(str(item))
                elif isinstance(item, BNode):
                    anon_class_info['intersectionOf'].append(process_anonymous_class(item, graph))
        elif p == RDF.type:
            # skip type
            pass
        else:
            # other properties
            anon_class_info[str(p)] = str(o)
    return anon_class_info

def process_class(s, graph):
    class_info = {'URI': str(s)}
    
    # Get label
    labels = list(graph.objects(s, RDFS.label))
    if labels:
        label_str = literal_to_string(labels[0])
        if not is_valid_label(label_str):
            label_str = get_local_name(str(s))
    else:
        label_str = get_local_name(str(s))
    class_info['Label'] = label_str

    # Get comment
    comments = list(graph.objects(s, RDFS.comment))
    if comments:
        comment_str = ' '.join([literal_to_string(c) for c in comments])
        class_info['Comment'] = comment_str
    else:
        class_info['Comment'] = ''

    # Get superclasses
    superclasses = []
    for o in graph.objects(s, RDFS.subClassOf):
        if isinstance(o, URIRef):
            superclasses.append(str(o))
        elif isinstance(o, BNode):
            superclasses.append(process_anonymous_class(o, graph))
        else:
            superclasses.append("[Unknown superclass type]")
    class_info['Superclasses'] = superclasses

    # Get equivalent classes
    equivalent_classes = []
    for o in graph.objects(s, OWL.equivalentClass):
        if isinstance(o, URIRef):
            equivalent_classes.append(str(o))
        elif isinstance(o, BNode):
            equivalent_classes.append(process_anonymous_class(o, graph))
        else:
            equivalent_classes.append("[Unknown equivalent class type]")
    if equivalent_classes:
        class_info['EquivalentClasses'] = equivalent_classes

    # Optionally, process other attributes here

    return class_info

rdf_file_path = 'BEO (Building Energy Ontology).rdf'
g = Graph()
g.parse(rdf_file_path, format="xml")

excluded_classes = [
    "http://www.w3.org/2002/07/owl#Thing",
    "https://www.auto.tuwien.ac.at/downloads/thinkhome/ontology/EnergyResourceOntology.owl#EnergyConsumerFacility",
    "http://energy.linkeddata.es/em-kpi/ontology#EnergyConsumer",
    "http://energy.linkeddata.es/em-kpi/ontology#GeneratingUnit",
    "http://energy.linkeddata.es/em-kpi/ontology#PowerDeliveryUnit",
    "http://energy.linkeddata.es/em-kpi/ontology#PowerSystemResource",
    "https://www.auto.tuwien.ac.at/downloads/thinkhome/ontology/EnergyResourceOntology.owl#EnergyProducerFacility"
]

included_classes = [
    "https://saref.etsi.org/core/Measurement",
]

class_list = []

for s in g.subjects(RDF.type, OWL.Class):
    s_str = str(s)
    if s_str in excluded_classes:
        continue

    class_info = process_class(s, g)
    class_list.append(class_info)

df_classes = pd.DataFrame(class_list)

# Sort by label
df_classes_sorted = df_classes.sort_values(by=["Label"])

pattern = r'^N[0-9a-f]{32}$'

# Filter out the rows where 'Label' matches the pattern
df_classes_sorted = df_classes_sorted[
    ~df_classes_sorted['Label'].str.match(pattern)
].reset_index(drop=True)

# For CSV, flatten the complex fields
def flatten_field(field):
    if isinstance(field, list):
        return json.dumps(field, ensure_ascii=False)
    else:
        return field

df_classes_sorted['Superclasses'] = df_classes_sorted['Superclasses'].apply(flatten_field)
df_classes_sorted['EquivalentClasses'] = df_classes_sorted.get('EquivalentClasses', '').apply(flatten_field)

df_classes_sorted.to_csv('results/data.csv', index=False, encoding='utf-8')

# For JSON, save as is
df_classes_sorted.to_json('results/classes.json', orient='records', force_ascii=False)



OSError: Cannot save file into a non-existent directory: 'results'