Skip to content

Commit

Permalink
Merge pull request #40 from SINTEF-9012/learning-factory
Browse files Browse the repository at this point in the history
Learning factory
  • Loading branch information
tiptr committed Sep 20, 2022
2 parents 1adc1df + 1a16e71 commit 365bb3e
Show file tree
Hide file tree
Showing 17 changed files with 283 additions and 178 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
# can either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once).
disable=C0103, C0114, CO115, C0116, E1101
disable=C0103, C0114, C0115, C0116, E1101
# E1101: there are a lot of false positives when enabled.
12 changes: 6 additions & 6 deletions backend/api/python_endpoints/file_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
)
from backend.knowledge_graph.dao.AssetNodesDao import AssetsDao


DB_SERVICE_CONTAINER: DatabasePersistenceServiceContainer = (
DatabasePersistenceServiceContainer.instance()
)
DB_CON_NODE_DAO: DatabaseConnectionsDao = DatabaseConnectionsDao.instance()
SUPPL_FILE_DAO: SupplementaryFileNodesDao = SupplementaryFileNodesDao.instance()

Expand All @@ -53,7 +49,9 @@ def get_supplementary_file_stream(iri: str):
return None

file_service: FilesPersistenceService = (
DB_SERVICE_CONTAINER.get_persistence_service(file_con_node.iri)
DatabasePersistenceServiceContainer.instance().get_persistence_service(
file_con_node.iri
)
)

return file_service.stream_file(
Expand Down Expand Up @@ -82,7 +80,9 @@ def get_supplementary_file_temporary_link(iri: str):
return None

file_service: FilesPersistenceService = (
DB_SERVICE_CONTAINER.get_persistence_service(file_con_node.iri)
DatabasePersistenceServiceContainer.instance().get_persistence_service(
file_con_node.iri
)
)

# Create the temporary redirect link:
Expand Down
7 changes: 3 additions & 4 deletions backend/api/python_endpoints/timeseries_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
)


DB_SERVICE_CONTAINER: DatabasePersistenceServiceContainer = (
DatabasePersistenceServiceContainer.instance()
)
DB_CON_NODE_DAO: DatabaseConnectionsDao = DatabaseConnectionsDao.instance()

TIMESERIES_NODES_DAO: TimeseriesNodesDao = TimeseriesNodesDao.instance()
Expand Down Expand Up @@ -70,7 +67,9 @@ def _get_related_timeseries_database_service(iri: str) -> TimeseriesPersistenceS
return pd.DataFrame(columns=["time", "value"])

ts_service: TimeseriesPersistenceService = (
DB_SERVICE_CONTAINER.get_persistence_service(ts_con_node.iri)
DatabasePersistenceServiceContainer.instance().get_persistence_service(
ts_con_node.iri
)
)
return ts_service
except IdNotFoundException as exc:
Expand Down
24 changes: 1 addition & 23 deletions backend/api/rest_endpoints/file_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,17 @@
import json
from datetime import datetime, timedelta
import pandas as pd
from fastapi.responses import StreamingResponse
from fastapi.responses import RedirectResponse
from fastapi.responses import PlainTextResponse

from backend.api.api import app
from backend.exceptions.IdNotFoundException import IdNotFoundException
from backend.knowledge_graph.KnowledgeGraphPersistenceService import (
KnowledgeGraphPersistenceService,
)

from backend.knowledge_graph.dao.DatabaseConnectionsDao import DatabaseConnectionsDao
from backend.knowledge_graph.dao.SupplementaryFileNodesDao import (
SupplementaryFileNodesDao,
)
from backend.specialized_databases.DatabasePersistenceServiceContainer import (
DatabasePersistenceServiceContainer,
)
from backend.specialized_databases.files.FilesPersistenceService import (
FilesPersistenceService,
)
from backend.specialized_databases.timeseries.TimeseriesPersistenceService import (
TimeseriesPersistenceService,
)
from backend.specialized_databases.timeseries.influx_db.InfluxDbPersistenceService import (
InfluxDbPersistenceService,
)
from backend.knowledge_graph.dao.AssetNodesDao import AssetsDao

import backend.api.python_endpoints.file_endpoints as python_file_endpoints


DB_SERVICE_CONTAINER: DatabasePersistenceServiceContainer = (
DatabasePersistenceServiceContainer.instance()
)
DB_CON_NODE_DAO: DatabaseConnectionsDao = DatabaseConnectionsDao.instance()
SUPPL_FILE_DAO: SupplementaryFileNodesDao = SupplementaryFileNodesDao.instance()

Expand Down
22 changes: 3 additions & 19 deletions backend/api/rest_endpoints/timeseries_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
import json
from datetime import datetime, timedelta
import pandas as pd
from datetime import datetime

from backend.api.api import app
from backend.exceptions.IdNotFoundException import IdNotFoundException
from backend.knowledge_graph.KnowledgeGraphPersistenceService import (
KnowledgeGraphPersistenceService,
)

from backend.knowledge_graph.dao.DatabaseConnectionsDao import DatabaseConnectionsDao
from backend.knowledge_graph.dao.TimeseriesNodesDao import TimeseriesNodesDao
from backend.specialized_databases.DatabasePersistenceServiceContainer import (
DatabasePersistenceServiceContainer,
)
from backend.specialized_databases.timeseries.TimeseriesPersistenceService import (
TimeseriesPersistenceService,
)
from backend.specialized_databases.timeseries.influx_db.InfluxDbPersistenceService import (
InfluxDbPersistenceService,
)

import backend.api.python_endpoints.timeseries_endpoints as python_timeseries_endpoints


DB_SERVICE_CONTAINER: DatabasePersistenceServiceContainer = (
DatabasePersistenceServiceContainer.instance()
)
DB_CON_NODE_DAO: DatabaseConnectionsDao = DatabaseConnectionsDao.instance()
TIMESERIES_NODES_DAO: TimeseriesNodesDao = TimeseriesNodesDao.instance()

Expand Down
27 changes: 18 additions & 9 deletions backend/knowledge_graph/KnowledgeGraphPersistenceService.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,28 @@ def _connect(self):
while not self.connected:
try:
print("Connecting to Neo4J...")
self._graph = py2neo.Graph(
get_environment_variable(key="NEO4J_DB_HOST", optional=False),
name=get_environment_variable(key="NEO4J_DB_NAME", optional=False),
)

self._repo = ogm.Repository(
get_environment_variable(key="NEO4J_DB_HOST", optional=False),
name=get_environment_variable(key="NEO4J_DB_NAME", optional=False),
host = get_environment_variable(key="NEO4J_DB_HOST", optional=False)
graph_name = get_environment_variable(
key="NEO4J_DB_NAME", optional=False
)
user_name = get_environment_variable(key="NEO4J_DB_USER", optional=True)
pw = get_environment_variable(key="NEO4J_DB_PW", optional=True)
if user_name is not None and pw is not None:
auth = (user_name, pw)
elif user_name is not None:
auth = (user_name, None)
else:
auth = None

self._graph = py2neo.Graph(host, name=graph_name, auth=auth)

self._repo = ogm.Repository(host, name=graph_name, auth=auth)
print("Successfully connected to Neo4J!")
self.connected = True
except py2neo.ConnectionUnavailable:
print("Neo4J graph unavailable! Trying again in 10 seconds...")
print(
"Neo4J graph unavailable or Authentication invalid! Trying again in 10 seconds..."
)
time.sleep(10)

def graph_run(self, cypher: any) -> Cursor:
Expand Down
17 changes: 13 additions & 4 deletions backend/runtime_connections/RuntimeConnection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
from typing import Dict

from graph_domain.main_digital_twin.RuntimeConnectionNode import RuntimeConnectionNode
from backend.runtime_connections.TimeseriesInput import TimeseriesInput
Expand Down Expand Up @@ -52,7 +53,7 @@ def __init__(
else None
)

self.timeseries_inputs = []
self.timeseries_inputs: Dict[str, TimeseriesInput] = dict()

@classmethod
def from_runtime_connection_node(cls, node: RuntimeConnectionNode):
Expand All @@ -64,12 +65,20 @@ def from_runtime_connection_node(cls, node: RuntimeConnectionNode):
key_environment_variable=node.key_environment_variable,
)

def add_input(self, input: TimeseriesInput):
self.timeseries_inputs.append(input)

@abc.abstractmethod
def start_connection(self):
pass

def is_active(self) -> bool:
return self.active

def remove_ts_input(self, iri: str):
del self.timeseries_inputs[iri]

@abc.abstractmethod
def disconnect(self):
"""Disconnects and prepares for deletion"""

@abc.abstractmethod
def add_ts_input(self, ts_input: TimeseriesInput):
pass
124 changes: 89 additions & 35 deletions backend/runtime_connections/RuntimeConnectionContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
)
from graph_domain.main_digital_twin.TimeseriesNode import (
TimeseriesNodeDeep,
TimeseriesNodeFlat,
)
from backend.runtime_connections.RuntimeConnection import RuntimeConnection

Expand Down Expand Up @@ -57,50 +56,99 @@ def __init__(self):

self.connections: Dict[str, RuntimeConnection] = {}

def get_runtime_connection(self, iri: str):
return self.connections.get(iri)
def refresh_connection_inputs_and_handlers(
self, updated_ts_nodes_deep: List[TimeseriesNodeDeep]
):
"""Refreshes the inputs and handlers, creating new ones if available in the graph, or deleting old ones.
def register_runtime_connection(self, iri: str, connection: RuntimeConnection):
self.connections[iri] = connection
Args:
timeseries_nodes_deep (List[TimeseriesNodeDeep]): _description_
"""
#
# Check if ts inputs or connections have been removed:
#
updated_connection_iri_list = set(
[ts_node.runtime_connection.iri for ts_node in updated_ts_nodes_deep]
)
updated_ts_input_iri_list = set(
[ts_node.iri for ts_node in updated_ts_nodes_deep]
)

removed_connections = [
con
for con in self.connections.values()
if con.iri not in updated_connection_iri_list
]
for con in removed_connections:
# Whole connection was removed!
# Unregister all inputs and the connection
con.disconnect()
self.connections.pop(con.iri)
del con

removed_inputs = []
for con in self.connections.values():
removed_inputs.extend(
[
(ts_input, con)
for ts_input in con.timeseries_inputs.values()
if ts_input.iri not in updated_ts_input_iri_list
]
)

def initialize_connections_inputs_and_handlers(
self, timeseries_nodes: List[TimeseriesNodeDeep]
):
# Prepare nodes to avoid redundand connections:
ts_input: TimeseriesInput
con: RuntimeConnection
for (ts_input, con) in removed_inputs:
con.remove_ts_input(ts_input.iri)

#
# Initialize new ts inputs and connections
#
old_ts_connection_iris = [con.iri for con in self.connections.values()]
old_ts_input_iris = set()
for con in self.connections.values():
for ts in con.timeseries_inputs.values():
old_ts_input_iris.add(ts.iri)

# Get connection nodes
# Prepare nodes to avoid redundand connections:
# Get current connection nodes
# Dict to remove duplicate connections (multiple inputs for one connection each having a different child...)
connection_nodes: Dict[str, RuntimeConnectionNode] = {}
ts_inputs_for_connection_iri: Dict[str, List[TimeseriesInput]] = {}
# db_con_iri_for_input: Dict[str, str] = {}
new_connection_nodes: Dict[str, RuntimeConnectionNode] = {}
new_ts_inputs_per_connection: Dict[str, List[TimeseriesInput]] = {}

ts_node: TimeseriesNodeDeep
for ts_node in timeseries_nodes:
# Add the connection node, if new
connection_nodes[
ts_node.runtime_connection.iri
] = ts_node.runtime_connection
for ts_node in updated_ts_nodes_deep:
if ts_node.iri in old_ts_input_iris:
# Node already exists
continue

new_connection = False
if ts_node.runtime_connection.iri not in old_ts_connection_iris:
# Add the connection node (dict so that it is not created multiple times)
new_connection_nodes[
ts_node.runtime_connection.iri
] = ts_node.runtime_connection
new_connection = True

# Create the timeseries input
input_class: TimeseriesInput = RT_INPUT_MAPPING.get(
ts_node.runtime_connection.type
)

ts_input: TimeseriesInput = input_class.from_timeseries_node(ts_node)

# Add the input to the list of its connection
ts_inputs_list = ts_inputs_for_connection_iri.get(
ts_inputs_dict = new_ts_inputs_per_connection.get(
ts_node.runtime_connection.iri
)

if ts_inputs_list is None:
ts_inputs_list = []
if ts_inputs_dict is None:
ts_inputs_dict = dict()

ts_inputs_list.append(ts_input)
ts_inputs_dict[ts_input.iri] = ts_input

ts_inputs_for_connection_iri[
new_ts_inputs_per_connection[
ts_node.runtime_connection.iri
] = ts_inputs_list
] = ts_inputs_dict

# Get the persistence handler method and activate it
ts_service: InfluxDbPersistenceService = (
Expand All @@ -111,8 +159,14 @@ def initialize_connections_inputs_and_handlers(

ts_input.register_handler(handler_method=ts_service.write_measurement)

if not new_connection:
self.connections.get(ts_node.runtime_connection.iri).timeseries_inputs[
ts_node.iri
] = ts_input

# Add new connections
rt_con_node: RuntimeConnectionNode
for rt_con_node in connection_nodes.values():
for rt_con_node in new_connection_nodes.values():

# Create actual connections:
input_class = RT_CONNECTION_MAPPING.get(rt_con_node.type)
Expand All @@ -124,18 +178,18 @@ def initialize_connections_inputs_and_handlers(
self.connections[rt_con_node.iri] = rt_connection

# Link the inputs to its connections:
rt_connection.timeseries_inputs = ts_inputs_for_connection_iri.get(
rt_connection.timeseries_inputs = new_ts_inputs_per_connection.get(
rt_con_node.iri
)
# Start the connection

def start_connections(self):
"""
Starts all connections
:return:
"""
connection: RuntimeConnection
for connection in self.connections.values():
connection.start_connection()
rt_connection.start_connection()

def get_runtime_connection(self, iri: str):
return self.connections.get(iri)

def register_runtime_connection(self, iri: str, connection: RuntimeConnection):
self.connections[iri] = connection

def get_all_inputs(self) -> List[TimeseriesInput]:
"""
Expand Down
Loading

0 comments on commit 365bb3e

Please sign in to comment.