In [1]:
from rdflib import Graph, plugin
from rdflib.serializer import Serializer
import json

In [2]:
g = Graph().parse('ebu3b_brick.ttl', format='ttl')

In [3]:
print(len(g))

14571


In [4]:
import rdflib
import json
from rdflib import Dataset, Graph, URIRef, Literal, Namespace, BNode

In [7]:
def wot(device):
    ## input(device): dict of devices
    href_set =     {
        "href": "brick_setup",
        "rel": "belongsTo",
        "mediaType": "application/td"
    }
    
    for dev in device:
        wot = {}
        wot["@context"] = "https://www.w3.org/2022/wot/td/v1.1"
        wot["id"] = "urn:dev:wot:com:" + dev
        wot["title"] = dev
        wot["type"] = device[dev][0]
        ## add security content for wot format
        wot["securityDefinitions"] = {}
        wot["securityDefinitions"]["basic_sc"] = {}
        wot["securityDefinitions"]["nosec_sc"] = {}
        basic_sc = {}
        nosec_sc = {}
        basic_sc["scheme"] = "basic"
        basic_sc["in"] = "header"
        nosec_sc["scheme"] = "nosec"
        wot["securityDefinitions"]["basic_sc"] = basic_sc
        wot["securityDefinitions"]["nosec_sc"] = nosec_sc
        wot["security"] = "nosec_sc"
        ## add properties
        if "ZN_T" in dev or "ZNT_SP" in dev:
            wot["properties"] = {}
            prop1 = {}
            prop1["default"] = 30
            prop1["minimum"] = 10
            prop1["maximum"] = 120
            prop1["type"] = "integer"
            prop1["title"] = "temperature"
            wot["properties"]["temperature"] = prop1
        elif "MTR" in dev:
            wot["properties"] = {}
            prop2 = {}
            prop2["default"] = 1
            prop2["minimum"] = 0.00001
            prop2["maximum"] = 200
            prop2["type"] = "integer"
            prop2["title"] = "power"
            wot["properties"]["power"] = prop2
        elif "VAV" in dev:
            wot["properties"] = {}
            prop3 = {}
            prop3["default"] = 0
            prop3["minimum"] = 0
            prop3["maximum"] = 100
            prop3["type"] = "integer"
            prop3["title"] = "fan_speed"
            wot["properties"]["fan_speed"] = prop3
            prop4 = {}
            prop4["default"] = 0
            prop4["minimum"] = 0
            prop4["maximum"] = 100
            prop4["type"] = "integer"
            prop4["title"] = "energy_consumed"
            wot["properties"]["energy_consumed"] = prop4
        elif "SF" in dev:
            wot["properties"] = {}
            prop3 = {}
            prop3["default"] = 0
            prop3["minimum"] = 0
            prop3["maximum"] = 100
            prop3["type"] = "integer"
            prop3["title"] = "fan_speed"
            wot["properties"]["fan_speed"] = prop3
        elif "jason_loc" in dev:
            wot["properties"] = {}
            prop5 = {}
            prop5["default"] = ""
            prop5["type"] = "string"
            prop5["title"] = "person_location"
            wot["properties"]["person_location"] = prop5
        elif "AHU" in dev:
            wot["properties"] = {}
            prop3 = {}
            prop3["default"] = 0
            prop3["minimum"] = 0
            prop3["maximum"] = 100
            prop3["type"] = "integer"
            prop3["title"] = "fan_speed"
            wot["properties"]["fan_speed"] = prop3
            prop4 = {}
            prop4["default"] = 0
            prop4["minimum"] = 0
            prop4["maximum"] = 100
            prop4["type"] = "integer"
            prop4["title"] = "energy_consumed"
            wot["properties"]["energy_consumed"] = prop4
        
        ## actions for VAV
        if "VAV" in dev:
            wot["actions"] = {}
            act1 = {}
            act1["description"] = "switch on VAV"
            act1["safe"] = "false"
            act1["idempotent"] = "false"
            wot["actions"]["switch_on"] = act1
            act2= {}
            act2["description"] = "switch off VAV"
            act2["safe"] = "false"
            act2["idempotent"] = "false"
            wot["actions"]["switch_off"] = act2
            act3= {}
            act3["description"] = "turn up fan speed"
            act3["safe"] = "false"
            act3["idempotent"] = "false"
            wot["actions"]["turn_up"] = act3
            act4= {}
            act4["description"] = "turn down fan speed"
            act4["safe"] = "false"
            act4["idempotent"] = "false"
            wot["actions"]["turn_down"] = act4
            
        ## actions for SF
        elif "SF" in dev:
            wot["actions"] = {}
            act1 = {}
            act1["description"] = "switch on SF"
            act1["safe"] = "false"
            act1["idempotent"] = "false"
            wot["actions"]["switch_on"] = act1
            act2= {}
            act2["description"] = "switch off SF"
            act2["safe"] = "false"
            act2["idempotent"] = "false"
            wot["actions"]["switch_off"] = act2
            act3= {}
            act3["description"] = "turn up fan speed"
            act3["safe"] = "false"
            act3["idempotent"] = "false"
            wot["actions"]["turn_up"] = act3
            act4= {}
            act4["description"] = "turn down fan speed"
            act4["safe"] = "false"
            act4["idempotent"] = "false"
            wot["actions"]["turn_down"] = act4
            
        ## actions for AHU
        elif "AHU" in dev:
            wot["actions"] = {}
            act1 = {}
            act1["description"] = "switch on AHU"
            act1["safe"] = "false"
            act1["idempotent"] = "false"
            wot["actions"]["switch_on"] = act1
            act2= {}
            act2["description"] = "switch off AHU"
            act2["safe"] = "false"
            act2["idempotent"] = "false"
            wot["actions"]["switch_off"] = act2
            act3= {}
            act3["description"] = "turn up fan speed"
            act3["safe"] = "false"
            act3["idempotent"] = "false"
            wot["actions"]["turn_up"] = act3
            act4= {}
            act4["description"] = "turn down fan speed"
            act4["safe"] = "false"
            act4["idempotent"] = "false"
            wot["actions"]["turn_down"] = act4
        wot["links"] = []
        wot["links"].append(href_set)
        if len(device[dev]) > 1:
            for idx in range(1, len(device[dev])):
                href_sub = {}
                href_sub["href"] = "urn:dev:wot:com:" + device[dev][idx][1]
                href_sub["rel"] = device[dev][idx][0]
                href_sub["mediaType"] = "application/td"
                wot["links"].append(href_sub)
        dev = dev.replace("ebu3b:", "")
        with open(dev + '.json', 'w') as json_file:
            json.dump(wot, json_file, indent = 2)


In [8]:
dev_list = {}
temp = []
cnt = 0
## convert URIRef type to list type
for s, p, o in g:
    for x in [s, p, o]:
        #print(s, p, o)
        if isinstance(x, URIRef):
            dev = g.qname(x) 
        else:
            dev = " "
        temp.append(dev)
        cnt += 1
        if cnt % 3 == 0:
            inx = cnt // 3
            dev_list[inx] = temp
            temp = []
        #print(cnt)
        #print(g.qname(x) if isinstance(x, URIRef) else x, end=" ")
dev_total = {}
dev_re = []
for dev in dev_list:
    if dev_list[dev][0] not in dev_re:
        dev_re.append(dev_list[dev][0])
        dev_total[dev_list[dev][0]] = []
    if dev_list[dev][1] == 'rdf:type':
        dev_total[dev_list[dev][0]].append(dev_list[dev][2])
for dev in dev_list:
    if dev_list[dev][1] != 'rdf:type':
        dev_total[dev_list[dev][0]].append((dev_list[dev][1], dev_list[dev][2]))

wot(dev_total)

