In [3]:
import pymortar
import glob
URL = "http://beta-api.mortardata.org"
c = pymortar.Client(URL)

## Anands Example Code

In [8]:
# Example Queries

# query AHU, supply air temp sensor, and downstream zone, doesnt work
query1 = """SELECT ?ahu ?sat ?zone WHERE {
    ?ahu a brick:AHU .
    ?ahu brick:hasPart?/brick:hasPoint ?sat .
    ?sat a brick:Supply_Air_Temperature_Sensor .
    ?ahu brick:feeds+ ?zone .
    ?zone a brick:HVAC_Zone
}"""

# damper position commands in VAVs, doesnt work
query2 = """SELECT ?vav ?cmd WHERE {
    ?vav   a  brick:VAV .
    ?vav   brick:hasPart?/brick:hasPoint ?cmd .
    ?cmd a brick:Damper_Position_Command
}"""

# all air temperature sensors
query3 = """SELECT ?ats WHERE {
    ?ats  a  brick:Air_Temperature_Sensor .
}"""

#c.sparql(query3)

In [25]:
g = brickschema.Graph(load_brick=True)
query = """SELECT ?afs ?afsp ?vav WHERE  {
    ?afs    a       brick:Air_Flow_Sensor .
    ?afsp   a       brick:Air_Flow_Setpoint .
    ?afs    brick:isPointOf ?vav .
    ?afsp   brick:isPointOf ?vav .
    ?vav    a   brick:VAV
}"""
g.query(query)

<rdflib.plugins.sparql.processor.SPARQLResult at 0x127a92280>

In [None]:
brickschema.Graph(load_brick=True)

In [11]:
def extract_triples(query): 
    triples = []
    where_clause = query.split('{')[1].split('}')[0]
    for where_line in where_clause.split('.'):
        line = where_line.strip()
        triple = []
        count = 0
        for i in line.split(' '):
            i = i.strip()
            
            if i!= "" and i!= " " and i!= "\n":
                if count == 0:
                    triple.append(i)
                elif count == 1:
                    triple.append(i)
                elif count == 2:
                    triple.append(i)
                count+=1
        triples.append(triple)
    return triples

In [12]:
query = """SELECT ?afs ?afsp ?vav WHERE  {
    ?afs    a       brick:Air_Flow_Sensor .
    ?afsp   a       brick:Air_Flow_Setpoint .
    ?afs    brick:isPointOf ?vav .
    ?afsp   brick:isPointOf ?vav .
    ?vav    a   brick:VAV
}"""
extract_triples(query)

[['?afs', 'a', 'brick:Air_Flow_Sensor'],
 ['?afsp', 'a', 'brick:Air_Flow_Setpoint'],
 ['?afs', 'brick:isPointOf', '?vav'],
 ['?afsp', 'brick:isPointOf', '?vav'],
 ['?vav', 'a', 'brick:VAV']]

## 1.1.1.1 ApplyRule_UpperClass() 

			Input: A triple pattern t (?x, type, C), Brick.ttl
			Output: A relaxed triple t’ such that if C is a subclass of C1, 
			t’= (?x, type, C1) 

In [199]:

def ApplyRule_UpperClass(query, level, file_path):
    """
    Inputs:
        Query Input: "SELECT ?x WHERE{ ?x rdf:type brick:C}"
        Level = specifies how many upper classes to be extracted, level = 1, level = 2, etc.. or 'max'
        file_path = path to Brick ontology ttl file
    
    Output: A relaxed triple(s) such that if C is a subclass of C1, "{ ?x rdf:type brick:C1}"
    """
    # Parsing query to obtain brick:C
    parse_query = query.split(' ')[-1].split('}')[0]
    parse_front = query.split(' b')[0]
    #print(parse_query)
    
    # Reading
    f=open(file_path)
    lines=f.readlines()
    
    # Initializing query output list
    output = []
    
    # Loops until Cmax is Alarm, Command, Parameter, Sensor, Setpoint, Status
    Cmax_list = ["brick:Alarm", "brick:Command", "brick:Parameter",
                 "brick:Sensor", "brick:Setpoint", "Brick:Status"]
        
    with open(file_path) as file:
        for num, line in enumerate(file):
            if re.search(parse_query, line) and re.search('owl:Class', line):
                #print(line)
                sub_lines = lines[num:]
                        
                matching = [sub_lines[num_1+1] for num_1, s in enumerate(sub_lines) if 'subClassOf' in s]
                upper_1  = "_".join(re.findall("[a-zA-Z]+", matching[0]))
                upper_1 = upper_1.replace('_', ':', 1)
                
                upper_C = parse_front + ' '+upper_1+"}"
                output.append(upper_C)
                
                # Update Query
                parse_query = upper_1
                        
                if parse_query in Cmax_list:
                    break
    print("Input Query:")
    print(query)
    print('\n')
    print("UpperClass Output:")
    
    if level == 'max':
        for i in output:
            print(i)
    
    else:
        if level > len(output):
            print("Level input greater than highest possible UpperClass, set level = 'max' ")
        else:
            for i in output[:level]:
                print(i)
    
    return output

In [200]:
query = "{?sensor  rdf:type brick:Chilled_Water_Pump_Differential_Pressure_Deadband_Setpoint}"
level = 'max'
file_path = 'Brick.ttl'

test = ApplyRule_UpperClass(query, level, file_path)

Input Query:
{?sensor  rdf:type brick:Chilled_Water_Pump_Differential_Pressure_Deadband_Setpoint}


UpperClass Output:
{?sensor  rdf:type brick:Chilled_Water_Differential_Pressure_Deadband_Setpoint}
{?sensor  rdf:type brick:Chilled_Water_Differential_Pressure_Setpoint}
{?sensor  rdf:type brick:Differential_Pressure_Setpoint}
{?sensor  rdf:type brick:Pressure_Setpoint}


In [201]:
query = "{?sensor  rdf:type brick:CO2_Sensor}"
level = 'max'
file_path = 'Brick.ttl'

test = ApplyRule_UpperClass(query, level, file_path)

Input Query:
{?sensor  rdf:type brick:CO2_Sensor}


UpperClass Output:
{?sensor  rdf:type brick:Particulate_Matter_Sensor}
{?sensor  rdf:type brick:Sensor}
