# Acquirium - WaterTap - MQTT Integration

## Introduction

We built a single pump simulation using WaterTap. The pump model requires six input variables. Two of these variables, pump efficiency and pump pressure delta, are fixed. The remaining four variables are simulated as if they were produced by real sensors and are continuously published to MQTT.

We implemented a script that subscribes to MQTT and uses the incoming values as inputs to the WaterTap model. Since all six required variables are provided, the degrees of freedom of the model are reduced to zero. As a result, the model operates as a simulation rather than an optimization problem.

After running the simulation, we extract the WaterTap computed pump work and publish it back to MQTT. This output behaves like a soft sensor, where the value is derived from a physics based model rather than a physical measurement.

Below is an overview of the system architecture:
<div style="text-align: center;">

![overview](./figs/watertap_process.png)

</div>

We then modeled this system using a knowledge graph based on the WaTr ontology. The knowledge graph captures the structure of the pump, its associated variables, and the relationships between physical components, sensor streams, and computed quantities. This semantic representation allows us to reason over both the system topology and the available data streams, enabling flexible discovery, querying, and integration of sensor measurements and model derived values.

<div style="text-align: center;">

![overview](./figs/kg_model.png)

</div>

The properties store all the information required for data retrieval, along with additional metadata that describes the values produced by each sensor. This metadata specifies details such as the physical meaning of the measurement, its unit, the medium or substance being measured, and whether the value originates from a physical sensor or a model derived computation.

<div style="text-align: center;">

![overview](./figs/property.png)

</div>

Given a knowledge graph model, Acquirium can connect to MQTT and subscribe to the corresponding data streams. From Acquirium’s perspective, it does not matter whether the values are simulated or produced by real sensors; it simply consumes the data published on MQTT. In addition to MQTT, Acquirium can also connect to other data sources such as files, databases, and external services, providing a unified interface for accessing heterogeneous data streams. 

Acquirium stores both the data and its associated metadata in its own database, enabling efficient access, querying, and reuse in future analyses and applications.

<div style="text-align: center;">

![overview](./figs/acquirium_pull.png)

</div>

## Acquirium Code Example:

Now we'll show how we can retrieve data using acquirium:

#### Initial Steps

In [1]:
# Package imports

from datetime import datetime, timedelta
from acquirium import Acquirium
from acquirium.Client.query import Query
from acquirium.Internals.internals_namespaces import (
    HAS_UNIT,
    HAS_MEDIUM,
    HAS_QUANTITY_KIND,
    OF_SUBSTANCE,
    S223,
    WATR,
    UNIT
)
def banner(msg: str) -> None:
    print("\n" + "=" * 80)
    print(msg)
    print("=" * 80 + "\n")


In [2]:
## Initial setup: create Acquirium instance
banner("1) Create Acquirium session")
acq = Acquirium(
        server_url="localhost",
        server_port=8000,
        use_ssl=False,
        lexicon_path="../ontologies/lexicon.json",
    )


1) Create Acquirium session



In [3]:
## Add a graph to acquirium
## This will automatically connect the data sources to database and ingest timeseries data 
## Since our data is MQTT Streams, acquirium will subscribe to those streams and start pulling data
banner("2) Add a graph to acquirium")
acq.insert_graph("../deployments/WATERTAP/models/watertap-simple-pipe-model-with-ext-refs.ttl")



2) Add a graph to acquirium



#### Querying

In [4]:
banner("3) Start a new query and add an entity node")

#create a query object
# text matching to find ontology classes
q1 : Query = acq.find_entity(_class="inlet connection pt", alias="pump-in")
q1.show_query_graph() # show current query graph
q1.metadata_head() # show metadata of the current query graph nodes


3) Start a new query and add an entity node

QUERY GRAPH

Nodes:
  0 [pump-in]  class=http://data.ashrae.org/standard223#InletConnectionPoint

Edges:

Data nodes: (none)

Current pointer: pump-in



{'columns': ['v0'], 'rows': [['urn:ex/Pump1-in']]}

In [5]:
# Find a related node to our first node
q1 : Query = q1.find_related(_class="pump", alias="pump", _from="pump-in", hops=1)
q1.show_query_graph()
q1.metadata_head()

QUERY GRAPH

Nodes:
  0 [pump-in]  class=http://data.ashrae.org/standard223#InletConnectionPoint
  1 [pump]  class=urn:nawi-water-ontology#Pump

Edges:
  pump-in --(*, hops=1)--> pump

Data nodes: (none)

Current pointer: pump



{'columns': ['v0', 'v1'], 'rows': [['urn:ex/Pump1-in', 'urn:ex/Pump1']]}

#### Data Retrieval

In [7]:
q2 :Query = acq.find_all_data()
q2.show_query_graph()
q2.metadata_head()
a = q2.data_head()

QUERY GRAPH

Nodes:
  0 [0] [DATA]  class=*

Edges:

Data nodes:
  0 [0]  filters={}

Current pointer: 0



shape: (10, 6)
┌────────────────┬────────────────┬────────────────┬───────────────┬───────────────┬───────────────┐
│ time           ┆ PumpWork       ┆ Pump1-in-tempe ┆ Pump1-in-flow ┆ Pump1-in-flow ┆ Pump1-in-pres │
│ ---            ┆ ---            ┆ rature         ┆ -mass-seawate ┆ -mass-water   ┆ sure          │
│ datetime[μs,   ┆ str            ┆ ---            ┆ r             ┆ ---           ┆ ---           │
│ UTC]           ┆                ┆ str            ┆ ---           ┆ str           ┆ str           │
│                ┆                ┆                ┆ str           ┆               ┆               │
╞════════════════╪════════════════╪════════════════╪═══════════════╪═══════════════╪═══════════════╡
│ 2023-01-01     ┆ 642.6348924007 ┆ 286.7065465305 ┆ 0.1           ┆ 1.0           ┆ 101325.0      │
│ 11:20:00 UTC   ┆ 63             ┆ 472            ┆               ┆               ┆               │
│ 2023-01-01     ┆ 642.6563831923 ┆ 286.8355400505 ┆ 0.1           ┆ 1.0    

#### Apply Filters:

In [8]:
q3 : Query = q2.filter_by_unit(unit = "kilogram per second")
q3.show_query_graph()
q3.metadata_head()
b = q3.data_head()

QUERY GRAPH

Nodes:
  0 [0] [DATA]  class=*

Edges:

Data nodes:
  0 [0]  filters={http://qudt.org/schema/qudt/hasUnit=http://qudt.org/vocab/unit/KiloGM-PER-SEC}}

Current pointer: 0



shape: (10, 3)
┌─────────────────────────┬──────────────────────────┬─────────────────────────────┐
│ time                    ┆ Pump1-in-flow-mass-water ┆ Pump1-in-flow-mass-seawater │
│ ---                     ┆ ---                      ┆ ---                         │
│ datetime[μs, UTC]       ┆ str                      ┆ str                         │
╞═════════════════════════╪══════════════════════════╪═════════════════════════════╡
│ 2023-01-01 11:20:00 UTC ┆ 1.0                      ┆ 0.1                         │
│ 2023-01-01 11:30:00 UTC ┆ 1.0                      ┆ 0.1                         │
│ 2023-01-01 11:40:00 UTC ┆ 1.0                      ┆ 0.1                         │
│ 2023-01-01 11:50:00 UTC ┆ 1.0                      ┆ 0.1                         │
│ 2023-01-01 12:00:00 UTC ┆ 1.0                      ┆ 0.1                         │
│ 2023-01-01 12:10:00 UTC ┆ 1.0                      ┆ 0.1                         │
│ 2023-01-01 12:20:00 UTC ┆ 1.0                   

In [10]:
from IPython.display import clear_output

import time
while True:
    a = q2.latest_data()
    print(a)
    time.sleep(5)
    clear_output(wait=True)

shape: (1, 6)
┌────────────────┬────────────────┬────────────────┬───────────────┬───────────────┬───────────────┐
│ time           ┆ PumpWork       ┆ Pump1-in-tempe ┆ Pump1-in-flow ┆ Pump1-in-flow ┆ Pump1-in-pres │
│ ---            ┆ ---            ┆ rature         ┆ -mass-seawate ┆ -mass-water   ┆ sure          │
│ datetime[μs,   ┆ str            ┆ ---            ┆ r             ┆ ---           ┆ ---           │
│ UTC]           ┆                ┆ str            ┆ ---           ┆ str           ┆ str           │
│                ┆                ┆                ┆ str           ┆               ┆               │
╞════════════════╪════════════════╪════════════════╪═══════════════╪═══════════════╪═══════════════╡
│ 2023-01-02     ┆ 642.7841414763 ┆ 287.5928477822 ┆ 0.1           ┆ 1.0           ┆ 101325.0      │
│ 12:30:00 UTC   ┆ 573            ┆ 3645           ┆               ┆               ┆               │
└────────────────┴────────────────┴────────────────┴───────────────┴─────────

KeyboardInterrupt: 