diff --git a/.gitignore b/.gitignore index d81b6fb..f30f90c 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ mmcv_models/faster_rcnn/faster_rcnn_x101_64x4d_fpn_1x_coco_20200204-833ee192.pth mmcv_models/faster_rcnn/faster_rcnn_r50_fpn_1x_coco_20200130-047c8118.pth database_export/ database_import/ -safety_backups/ \ No newline at end of file +safety_backups/ +scatter-output.png \ No newline at end of file diff --git a/assets/custom_style.css b/assets/custom_style.css index ae329b6..4d51c1a 100644 --- a/assets/custom_style.css +++ b/assets/custom_style.css @@ -717,4 +717,14 @@ body { .node-information-attribute-header { font-weight: bold; +} + +.node-information-attribute-content { + max-height: 100px; + overflow: scroll; +} + + +.annotation-click-context-feature-container { + padding-bottom: 200px; } \ No newline at end of file diff --git a/assets/cytoscape-graph-style.json b/assets/cytoscape-graph-style.json index 538d82c..f1b23ee 100644 --- a/assets/cytoscape-graph-style.json +++ b/assets/cytoscape-graph-style.json @@ -5,7 +5,7 @@ "content": "data(label)", "text-halign": "center", "text-wrap": "wrap", - "text-max-width": 30, + "text-max-width": 80, "font-size": 14, "font-weight": "bold", "color": "#777", @@ -71,7 +71,7 @@ { "selector": ".TIMESERIES_CLUSTER", "style": { - "background-color": "#1A6152", + "background-color": "#6b449b", "background-image": "https://fonts.gstatic.com/s/i/short-term/release/materialsymbolsrounded/scatter_plot/wght500/48px.svg" } }, @@ -80,7 +80,7 @@ "style": { "curve-style": "bezier", "width": 6, - "line-color": "#1A6152", + "line-color": "#6b449b", "target-arrow-shape": "none", "source-distance-from-node": 10, "target-distance-from-node": 10, @@ -91,13 +91,13 @@ "text-max-width": 300, "font-size": 14, "font-weight": "bold", - "color": "#1A6152" + "color": "#6b449b" } }, { "selector": ".EXTRACTED_KEYWORD", "style": { - "background-color": "#1A6152", + "background-color": "#6b449b", "background-image": "https://fonts.gstatic.com/s/i/short-term/release/materialsymbolsrounded/label/wght500/48px.svg" } }, @@ -105,14 +105,14 @@ "selector": ".KEYWORD_EXTRACTION", "style": { "width": 3, - "line-color": "#1A6152" + "line-color": "#6b449b" } }, { "selector": ".PART_OF_TS_CLUSTER", "style": { "width": 3, - "line-color": "#1A6152" + "line-color": "#6b449b" } }, { diff --git a/backend/annotation_detection/AnnotationDetector.py b/backend/annotation_detection/AnnotationDetector.py new file mode 100644 index 0000000..ad0480c --- /dev/null +++ b/backend/annotation_detection/AnnotationDetector.py @@ -0,0 +1,322 @@ +from __future__ import annotations +from multiprocessing import Queue, Process +import abc +from queue import Empty +from re import A +from threading import Thread +import time +from typing import Dict, List +from backend.exceptions.EnvironmentalVariableNotFoundError import ( + EnvironmentalVariableNotFoundError, +) +from backend.knowledge_graph.dao.BaseNodesDao import BaseNodeDao +from backend.knowledge_graph.dao.TimeseriesNodesDao import TimeseriesNodesDao +from backend.runtime_connections.RuntimeConnectionContainer import ( + RuntimeConnectionContainer, +) +from backend.api.python_endpoints import timeseries_endpoints +import pandas as pd +from backend.specialized_databases.DatabasePersistenceServiceContainer import ( + DatabasePersistenceServiceContainer, +) +from backend.specialized_databases.SpecializedDatabasePersistenceService import ( + SpecializedDatabasePersistenceService, +) +from backend.specialized_databases.timeseries.TimeseriesPersistenceService import ( + TimeseriesPersistenceService, +) +from graph_domain.expert_annotations.AnnotationInstanceNode import ( + AnnotationInstanceNodeFlat, +) +from graph_domain.expert_annotations.AnnotationTimeseriesMatcherNode import ( + AnnotationTimeseriesMatcherNodeDeep, + AnnotationTimeseriesMatcherNodeFlat, +) +from graph_domain.main_digital_twin.AssetNode import AssetNodeDeep, AssetNodeFlat +from util.environment_and_configuration import ( + ConfigGroups, + get_configuration, +) +from dateutil import tz +from backend.knowledge_graph.dao.AnnotationNodesDao import AnnotationNodesDao +from datetime import datetime, timedelta +from graph_domain.main_digital_twin.RuntimeConnectionNode import RuntimeConnectionNode +from backend.runtime_connections.TimeseriesInput import TimeseriesInput +from util.environment_and_configuration import ( + get_environment_variable, + get_environment_variable_int, +) +from util.log import logger + + +DATETIME_STRF_FORMAT_CAPTION = "%d.%m.%Y, %H:%M:%S" +DATETIME_STRF_FORMAT_ID = "%Y_%m_%d_%H_%M_%S_%f" + + +class AnnotationDetector(abc.ABC): + """ + Annotation detector scanning the related time-series inputs of one asset for occurances of a annotation instance + """ + + def __init__( + self, + scanned_timeseries_iris: Dict[str, str], + scanned_asset: AssetNodeFlat, + scanned_annotation_instance: AnnotationInstanceNodeFlat, + persistence_services: Dict[str, SpecializedDatabasePersistenceService], + ) -> None: + + self.active = False + self.stop_signal = False + self.last_detection_timestamp = datetime.fromtimestamp(0).astimezone( + tz.gettz(get_configuration(group=ConfigGroups.FRONTEND, key="timezone")) + ) + + # Dict: annotated ts iri -> scanned ts iri + self.scanned_timeseries_iris: Dict[str, str] = scanned_timeseries_iris + self.scanned_asset: AssetNodeFlat = scanned_asset + self.scanned_annotation_instance: AnnotationInstanceNodeFlat = ( + scanned_annotation_instance + ) + self.input_handler_id = ( + f"detector_{self.scanned_asset.iri}_{self.scanned_annotation_instance.iri}" + ) + # Connections + self.annotations_dao: AnnotationNodesDao = AnnotationNodesDao.instance() + self.persistence_services = persistence_services + + # Get original timeseries excerpt: + self.original_ts_dataframes: Dict[str, pd.DataFrame] = dict() + for ts_iri in self.scanned_timeseries_iris.keys(): + service: TimeseriesPersistenceService = self.persistence_services.get( + ts_iri + ) + dataframe = service.read_period_to_dataframe( + iri=ts_iri, + begin_time=self.scanned_annotation_instance.occurance_start_date_time, + end_time=self.scanned_annotation_instance.occurance_end_date_time, + ) + self.original_ts_dataframes[ts_iri] = dataframe + + # Detection precision + self.scanned_timeseries_detection_precisions_relative: Dict[str, float] = dict() + annotations_dao = AnnotationNodesDao.instance() + matcher: AnnotationTimeseriesMatcherNodeFlat + for matcher in annotations_dao.get_matchers_for_annotation_instance( + self.scanned_annotation_instance.iri + ): + ts_iri = annotations_dao.get_matcher_original_annotated_ts(matcher.iri).iri + self.scanned_timeseries_detection_precisions_relative[ + ts_iri + ] = matcher.detection_precision + + # Min max + self.timeseries_min_values_for_original: Dict[str, float] = dict() + self.timeseries_min_values_for_scanned: Dict[str, float] = dict() + self.timeseries_max_values_for_original: Dict[str, float] = dict() + self.timeseries_max_values_for_scanned: Dict[str, float] = dict() + for ts_iri in self.scanned_timeseries_iris.keys(): + self.timeseries_max_values_for_original[ + ts_iri + ] = self.persistence_services.get(ts_iri).max_value_for_period(ts_iri) + self.timeseries_min_values_for_original[ + ts_iri + ] = self.persistence_services.get(ts_iri).min_value_for_period(ts_iri) + self.timeseries_min_values_for_scanned[ + self.scanned_timeseries_iris.get(ts_iri) + ] = self.timeseries_min_values_for_original.get(ts_iri) + self.timeseries_max_values_for_scanned[ + self.scanned_timeseries_iris.get(ts_iri) + ] = self.timeseries_max_values_for_original.get(ts_iri) + + self._prepare_original_dataset() + + self.runtime_con_container = RuntimeConnectionContainer.instance() + + @abc.abstractmethod + def _handle_new_reading(self, reading): + pass + + @abc.abstractmethod + def _prepare_original_dataset(self): + pass + + def _detector_loop(self): + active = True + while True: + if ( + self.detector_stop_queue.qsize() > 0 + and self.detector_stop_queue.get() == True + ): + logger.info( + f"Stopping process scanning {self.scanned_asset.caption} for occurances of {self.scanned_annotation_instance.caption}" + ) + return + try: + reading = self.detector_input_queue.get(block=True, timeout=3) + except Empty: + if active and self.detector_active_status_queue.qsize() == 0: + try: + self.detector_active_status_queue.get_nowait() + except Empty: + pass + self.detector_active_status_queue.put(False) + active = False + continue + + self._handle_new_reading(reading) + + if not active and self.detector_active_status_queue.qsize() == 0: + try: + self.detector_active_status_queue.get_nowait() + except Empty: + pass + self.detector_active_status_queue.put(True) + active = False + + @classmethod + def from_annotation_instance_and_asset(cls, instance_iri: str, asset_iri: str): + + basenode_dao = BaseNodeDao.instance() + timeseries_dao = TimeseriesNodesDao.instance() + annotations_dao = AnnotationNodesDao.instance() + + instance: AnnotationInstanceNodeFlat = basenode_dao.get_generic_node( + instance_iri + ) + + asset: AssetNodeFlat = basenode_dao.get_generic_node(asset_iri) + + asset_ts_iris = [ + ts.iri for ts in timeseries_dao.get_timeseries_of_asset(asset_iri) + ] + matched_ts_dict: Dict[str, str] = dict() + for matcher in annotations_dao.get_matchers_for_annotation_instance( + instance_iri + ): + matched_ts_dict[ + annotations_dao.get_matcher_original_annotated_ts(matcher.iri).iri + ] = [ + ts.iri + for ts in annotations_dao.get_matched_ts_for_matcher(matcher.iri) + if ts.iri in asset_ts_iris + ].pop() + + all_relevant_ts_iris = set(matched_ts_dict.keys()).union( + set(matched_ts_dict.values()) + ) + + persistence_services_per_ts: Dict[ + str, SpecializedDatabasePersistenceService + ] = dict() + + for ts_iri in all_relevant_ts_iris: + persistence_services_per_ts[ + ts_iri + ] = timeseries_endpoints.get_related_timeseries_database_service(ts_iri) + + return cls( + scanned_timeseries_iris=matched_ts_dict, + scanned_asset=asset, + scanned_annotation_instance=instance, + persistence_services=persistence_services_per_ts, + ) + + def start_detection(self): + logger.info( + f"Starting detection of {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption}" + ) + # Scanner process and inter-process communication: + self.detector_input_queue: Queue = Queue() + self.detector_stop_queue: Queue = Queue() + self.detector_active_status_queue: Queue = Queue() + self.detector_process: Process = Process( + target=self._detector_loop, + ) + self.detector_process.start() + + # Register handlers in order to receive new time-series data + rt_con_container = RuntimeConnectionContainer.instance() + for ts_iri in self.scanned_timeseries_iris.values(): + ts_input = rt_con_container.get_timeseries_input_by_iri(ts_iri) + ts_input.register_handler( + self._reading_handler, handler_id=self.input_handler_id + ) + + self.active = True + + def _reading_handler(self, ts_iri, reading_value, reading_time): + self.detector_input_queue.put((ts_iri, reading_value, reading_time)) + + def stop_detection(self): + logger.info( + f"Stopping detection of {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption}" + ) + # stop handlers + rt_con_container = RuntimeConnectionContainer.instance() + for ts_iri in self.scanned_timeseries_iris.values(): + ts_input = rt_con_container.get_timeseries_input_by_iri(ts_iri) + ts_input.remove_handler(handler_id=self.input_handler_id) + + # send stop signal + self.detector_stop_queue.put(True) + # wait for stop + self.detector_process.join() + self.active = False + # quit + self.detector_process.terminate() + # quit queues + self.detector_input_queue.close() + del self.detector_input_queue + self.detector_stop_queue.close() + del self.detector_stop_queue + self.detector_active_status_queue.close() + del self.detector_active_status_queue + logger.info( + f"Detection of {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption} stopped succesfully." + ) + + def is_active(self) -> bool: + + # check, if new active message: + if self.detector_active_status_queue.qsize() > 0: + self.active = self.detector_active_status_queue.get() + + return self.active + + def _create_new_detection(self, start_date_time: datetime, end_date_time: datetime): + + id_short = ( + f"detection_of_{self.scanned_annotation_instance.id_short}_on_" + f"{self.scanned_asset.caption}_at_{end_date_time.strftime(DATETIME_STRF_FORMAT_ID)}" + ) + caption = ( + f"Detection of {self.scanned_annotation_instance.caption} on " + f"{self.scanned_asset.caption} at {end_date_time.strftime(DATETIME_STRF_FORMAT_CAPTION)}" + ) + + detection_iri = self.annotations_dao.create_annotation_detection( + id_short=id_short, + start_datetime=start_date_time, + end_datetime=end_date_time, + caption=caption, + ) + + # Relationships to the scanned timeseries: + for matched_ts_iri in self.scanned_timeseries_iris.values(): + self.annotations_dao.create_annotation_detection_timeseries_relationship( + detection_iri=detection_iri, + timeseries_iri=matched_ts_iri, + ) + + # Relationship to the scanned asset: + self.annotations_dao.create_annotation_detection_asset_relationship( + detection_iri=detection_iri, + asset_iri=self.scanned_asset.iri, + ) + + # Relationship to the matched instance + self.annotations_dao.create_annotation_detection_instance_relationship( + detection_iri=detection_iri, + instance_iri=self.scanned_annotation_instance.iri, + ) diff --git a/backend/annotation_detection/AnnotationDetectorContainer.py b/backend/annotation_detection/AnnotationDetectorContainer.py new file mode 100644 index 0000000..f0f6437 --- /dev/null +++ b/backend/annotation_detection/AnnotationDetectorContainer.py @@ -0,0 +1,156 @@ +import time +from typing import Dict, List, Tuple +from backend.annotation_detection.AnnotationDetector import AnnotationDetector +from backend.annotation_detection.EuclidianDistanceAnnotationDetector import ( + EuclidianDistanceAnnotationDetector, +) +from backend.knowledge_graph.dao.AnnotationNodesDao import AnnotationNodesDao +from graph_domain.expert_annotations.AnnotationInstanceNode import ( + AnnotationInstanceNodeDeep, + AnnotationInstanceNodeFlat, +) +from graph_domain.main_digital_twin.RuntimeConnectionNode import ( + RuntimeConnectionNode, + RuntimeConnectionTypes, +) +from backend.runtime_connections.TimeseriesInput import TimeseriesInput +from backend.runtime_connections.mqtt.MqttRuntimeConnection import ( + MqttRuntimeConnection, +) +from backend.runtime_connections.mqtt.MqttTimeseriesInput import MqttTimeseriesInput +from backend.runtime_connections.opcua.OpcuaRuntimeConnection import ( + OpcuaRuntimeConnection, +) +from backend.runtime_connections.opcua.OpcuaTimeseriesInput import OpcuaTimeseriesInput +from backend.specialized_databases.DatabasePersistenceServiceContainer import ( + DatabasePersistenceServiceContainer, +) +from backend.specialized_databases.timeseries.influx_db.InfluxDbPersistenceService import ( + InfluxDbPersistenceService, +) +from graph_domain.main_digital_twin.TimeseriesNode import ( + TimeseriesNodeDeep, +) +from backend.runtime_connections.RuntimeConnection import RuntimeConnection +from backend.exceptions.EnvironmentalVariableNotFoundError import ( + EnvironmentalVariableNotFoundError, +) + +from util.log import logger +from threading import Thread +from util.inter_process_cache import memcache + +# Maps node-types to the connection / input classes +RT_CONNECTION_MAPPING = { + RuntimeConnectionTypes.MQTT.value: MqttRuntimeConnection, + RuntimeConnectionTypes.OPC_UA.value: OpcuaRuntimeConnection, +} +RT_INPUT_MAPPING = { + RuntimeConnectionTypes.MQTT.value: MqttTimeseriesInput, + RuntimeConnectionTypes.OPC_UA.value: OpcuaTimeseriesInput, +} + + +class AnnotationDetectorContainer: + """ + Holds and manages all current annotation detector services + """ + + __instance = None + + @classmethod + def instance(cls): + if cls.__instance is None: + cls() + return cls.__instance + + def __init__(self): + if self.__instance is not None: + raise Exception("Singleton instantiated multiple times!") + + AnnotationDetectorContainer.__instance = self + + # Dict: (instance_iri, asset_iri) -> AnnotationDetector + self.detectors: Dict[Tuple[str, str, float], AnnotationDetector] = {} + self._active_detectors_status_thread = None + + def start_active_detectors_status_thread(self): + self._active_detectors_status_thread = Thread( + target=self._active_detectors_write_to_cache_loop + ) + self._active_detectors_status_thread.start() + + def refresh_annotation_detectors(self): + """Refreshes the annotation detectors, creating new ones if available in the graph, or deleting old ones.""" + annotations_dao: AnnotationNodesDao = AnnotationNodesDao.instance() + updated_instance_nodes_flat: List[ + AnnotationInstanceNodeFlat + ] = annotations_dao.get_annotation_instances(only_active_scanned_instances=True) + + updated_detector_tuples: List[Tuple[str, str, float]] = [] + for instance in updated_instance_nodes_flat: + updated_detector_tuples.extend( + [ + # Precision sum used so that the detectors will be reinstantiated if one of the values was changed (sum as to only use one value in the tuple) + ( + instance.iri, + asset.iri, + annotations_dao.get_detection_precision_sum_for_instance( + instance.iri + ), + ) + for asset in annotations_dao.get_scanned_assets_for_annotation_instance( + instance.iri + ) + ] + ) + + # + # Check if detectors have been removed: + # + + removed_detector_tuples = [ + tuple + for tuple in self.detectors.keys() + if tuple not in updated_detector_tuples + ] + + for old_tuple in removed_detector_tuples: + old_detector = self.detectors.get(old_tuple) + old_detector.stop_detection() + self.detectors.pop(old_tuple) + del old_detector + + # + # Initialize new detectors + # + new_detector_tuples = [ + tuple + for tuple in updated_detector_tuples + if tuple not in self.detectors.keys() + ] + + for new_tuple in new_detector_tuples: + new_detector = ( + EuclidianDistanceAnnotationDetector.from_annotation_instance_and_asset( + instance_iri=new_tuple[0], asset_iri=new_tuple[1] + ) + ) + + self.detectors[new_tuple] = new_detector + + new_detector.start_detection() + + def get_active_detectors_count(self) -> int: + + return len( + [True for detector in self.detectors.values() if detector.is_active()] + ) + + def _active_detectors_write_to_cache_loop(self): + while True: + memcache.set( + "active_annotation_detectors_count", self.get_active_detectors_count() + ) + + time.sleep(3) diff --git a/backend/annotation_detection/EuclidianDistanceAnnotationDetector.py b/backend/annotation_detection/EuclidianDistanceAnnotationDetector.py new file mode 100644 index 0000000..e395340 --- /dev/null +++ b/backend/annotation_detection/EuclidianDistanceAnnotationDetector.py @@ -0,0 +1,196 @@ +from backend.annotation_detection.AnnotationDetector import AnnotationDetector +import pandas as pd +import numpy as np +from typing import Dict +from util.log import logger +from backend.specialized_databases.timeseries.TimeseriesPersistenceService import ( + TimeseriesPersistenceService, +) +from util.environment_and_configuration import ( + ConfigGroups, + get_configuration, +) +from datetime import datetime, timedelta +from dateutil import tz + +DATETIME_STRF_FORMAT_CAPTION = "%d.%m.%Y, %H:%M:%S" +DATETIME_STRF_FORMAT_ID = "%Y_%m_%d_%H_%M_%S_%f" + + +class EuclidianDistanceAnnotationDetector(AnnotationDetector): + """ + Annotation detector scanning the related time-series inputs of one asset for occurances of a annotation instance + + Based on the Euclidian Distance Similarity Measure + """ + + # override + def _handle_new_reading(self, reading): + reading_iri = reading[0] + value = reading[1] + time = reading[2] + + ts_array: np.array = self.current_ts_arrays.get(reading_iri) + if ts_array is None: + ts_array = np.array([]) + self.current_ts_arrays[reading_iri] = ts_array + + ts_array = np.append(ts_array, [value]) + + if len(ts_array) > self.original_ts_lens_mapped_to_scans.get(reading_iri): + ts_array = np.delete(ts_array, 0) + + self.current_ts_arrays[reading_iri] = ts_array + # print(len(ts_array)) + + if all( + [ + len(current_reading[1]) + == self.original_ts_lens_mapped_to_scans.get(current_reading[0]) + for current_reading in self.current_ts_arrays.items() + ] + ) and len(self.current_ts_arrays.keys()) == len(self.original_ts_arrays.keys()): + # current_combined_array = np.concatenate( + # [ + # self._normalize_array( + # self.current_ts_arrays.get( + # self.scanned_timeseries_iris.get(iri) + # ), + # min_value=self.timeseries_min_values_for_scanned.get(iri), + # max_value=self.timeseries_max_values_for_scanned.get(iri), + # ) + # for iri in self.original_ts_iris_ordered + # ], + # axis=0, + # ) + # Overall euclidian distance: + # euclidian_distance = np.linalg.norm( + # self.original_ts_combined_array_normalized - current_combined_array + # ) + # print( + # f"Matching {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption}. Euclidian distance: {euclidian_distance}" + # ) + + # Individual euclidian distances: + euclidian_distances: Dict[str, float] = dict() + euclidian_distances_by_len: Dict[str, float] = dict() + for iri in self.original_ts_iris_ordered: + normalized_current = self._normalize_array( + self.current_ts_arrays.get(self.scanned_timeseries_iris.get(iri)), + min_value=self.timeseries_min_values_for_scanned.get(iri), + max_value=self.timeseries_max_values_for_scanned.get(iri), + ) + euclidian_distances[iri] = np.linalg.norm( + self.original_ts_arrays_normalized.get(iri) - normalized_current + ) + euclidian_distances_by_len[iri] = euclidian_distances.get( + iri + ) / self.original_ts_lens.get(iri) + # print( + # f"Euclidian distances for {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption}: {', '.join([str(dist) for dist in euclidian_distances.values()])}" + # ) + print( + f"Euclidian distances divided by array-len for {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption}: {', '.join([f'{dist[0][-20:-1]}: {str(dist[1])}' for dist in euclidian_distances_by_len.items()])}" + ) + + if all( + [ + dist_pair[1] + < ( + 1 + - self.scanned_timeseries_detection_precisions_relative.get( + dist_pair[0] + ) + ) + for dist_pair in euclidian_distances_by_len.items() + ] + ): + # ignore, if just recently already detected: + now = datetime.now().astimezone( + tz.gettz( + get_configuration(group=ConfigGroups.FRONTEND, key="timezone") + ) + ) + if now > self.last_detection_timestamp + timedelta(seconds=30): + self.last_detection_timestamp = datetime.now().astimezone( + tz.gettz( + get_configuration( + group=ConfigGroups.FRONTEND, key="timezone" + ) + ) + ) + logger.info( + f"Match for: {self.scanned_annotation_instance.caption} on {self.scanned_asset.caption}!" + ) + self._create_new_detection( + start_date_time=now + - ( + self.scanned_annotation_instance.occurance_end_date_time + - self.scanned_annotation_instance.occurance_start_date_time + ), + end_date_time=now, + ) + + # override + def _prepare_original_dataset(self): + + self.original_ts_dataframes_only_values: Dict[str, pd.DataFrame] = dict() + + for ts_df in self.original_ts_dataframes.items(): + self.original_ts_dataframes_only_values[ts_df[0]] = ts_df[1].copy(deep=True) + self.original_ts_dataframes_only_values[ts_df[0]].drop( + columns=["time"], axis=1, inplace=True + ) + + self.original_ts_arrays: Dict[str, np.array] = dict() + self.original_ts_arrays_normalized: Dict[str, np.array] = dict() + self.original_ts_lens: Dict[str, int] = dict() + self.original_ts_lens_mapped_to_scans: Dict[str, int] = dict() + + for ts_df in self.original_ts_dataframes_only_values.items(): + self.original_ts_arrays[ts_df[0]] = ts_df[1]["value"].to_numpy( + copy=True, + ) + self.original_ts_lens[ts_df[0]] = len(self.original_ts_arrays.get(ts_df[0])) + self.original_ts_lens_mapped_to_scans[ + self.scanned_timeseries_iris.get(ts_df[0]) + ] = self.original_ts_lens.get(ts_df[0]) + self.original_ts_arrays_normalized[ts_df[0]] = self._normalize_array( + self.original_ts_arrays.get(ts_df[0]), + min_value=self.timeseries_min_values_for_original.get(ts_df[0]), + max_value=self.timeseries_max_values_for_original.get(ts_df[0]), + ) + + self.original_ts_iris_ordered = [iri for iri in self.original_ts_arrays.keys()] + self.original_ts_iris_ordered.sort() + self.original_ts_combined_array_normalized = np.concatenate( + [ + self.original_ts_arrays_normalized.get(iri) + for iri in self.original_ts_iris_ordered + ], + axis=0, + ) + + # Map the arrays and lengths directly to the iris of the scanned ts: + + self.current_ts_arrays: Dict[str, np.array] = dict() + + # Calculate the threshold to be used for detections: + for ts_iri in self.scanned_timeseries_iris.keys(): + service: TimeseriesPersistenceService = self.persistence_services.get( + ts_iri + ) + dataframe = service.read_period_to_dataframe( + iri=ts_iri, + begin_time=self.scanned_annotation_instance.occurance_start_date_time, + end_time=self.scanned_annotation_instance.occurance_end_date_time, + ) + self.original_ts_dataframes[ts_iri] = dataframe + + def _normalize_array(self, array: np.array, min_value, max_value) -> np.array: + # Subtract min value (get to starting at 0) + non_negative_array = np.subtract(array, min_value) + # Divide through (max-value - min-value) + normalized_array = np.divide(non_negative_array, (max_value - min_value)) + + return normalized_array diff --git a/backend/annotation_detection/__init__.py b/backend/annotation_detection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/api/python_endpoints/status_endpoints.py b/backend/api/python_endpoints/status_endpoints.py index cb71025..7750f24 100644 --- a/backend/api/python_endpoints/status_endpoints.py +++ b/backend/api/python_endpoints/status_endpoints.py @@ -8,6 +8,7 @@ from backend.runtime_connections.RuntimeConnectionContainer import ( RuntimeConnectionContainer, ) +from util.inter_process_cache import memcache BASE_NODE_DAO: BaseNodeDao = BaseNodeDao.instance() ANNOTATIONS_DAO: AnnotationNodesDao = AnnotationNodesDao.instance() @@ -30,7 +31,7 @@ def get_rt_connections_count(): def get_rt_active_connections_count(): - return RT_CON_CONTAINER.get_active_connections_count() + return int(memcache.get("active_runtime_connections_count")) def get_status(): diff --git a/backend/api/python_endpoints/timeseries_endpoints.py b/backend/api/python_endpoints/timeseries_endpoints.py index cbd1b9a..769a66f 100644 --- a/backend/api/python_endpoints/timeseries_endpoints.py +++ b/backend/api/python_endpoints/timeseries_endpoints.py @@ -56,7 +56,7 @@ def get_timeseries_current_range( ) -def _get_related_timeseries_database_service(iri: str) -> TimeseriesPersistenceService: +def get_related_timeseries_database_service(iri: str) -> TimeseriesPersistenceService: try: # Get related timeseries-database service: ts_con_node: DatabaseConnectionsDao = ( @@ -97,12 +97,12 @@ def get_timeseries_range( try: # Get related timeseries-database service: ts_service: TimeseriesPersistenceService = ( - _get_related_timeseries_database_service(iri) + get_related_timeseries_database_service(iri) ) # Read the actual measurements: readings_df = ts_service.read_period_to_dataframe( - id_uri=iri, + iri=iri, begin_time=date_time - timedelta(seconds=duration) if duration is not None else None, @@ -130,11 +130,11 @@ def get_timeseries_entries_count( try: # Get related timeseries-database service: ts_service: TimeseriesPersistenceService = ( - _get_related_timeseries_database_service(iri) + get_related_timeseries_database_service(iri) ) return ts_service.count_entries_for_period( - id_uri=iri, + iri=iri, begin_time=date_time - timedelta(seconds=duration) if duration is not None else None, diff --git a/backend/api/rest_endpoints/annotation_endpoints.py b/backend/api/rest_endpoints/annotation_endpoints.py index f572872..68e1b55 100644 --- a/backend/api/rest_endpoints/annotation_endpoints.py +++ b/backend/api/rest_endpoints/annotation_endpoints.py @@ -3,6 +3,10 @@ from dateutil import tz from fastapi import HTTPException from typing import List +from backend.annotation_detection.AnnotationDetectorContainer import ( + AnnotationDetectorContainer, +) +from util.inter_process_cache import memcache from backend.api.api import app from backend.knowledge_graph.dao.BaseNodesDao import BaseNodeDao from graph_domain.expert_annotations.AnnotationDetectionNode import ( @@ -43,6 +47,26 @@ async def post_annotation_definition(definition: AnnotationDefinitionArguments): ) +@app.patch("/annotation/instance/toggle_occurance_scan") +async def patch_annotation_instance_toggle_occurance_scan( + instance_iri: str, active: bool +): + logger.info( + f"Toggling occurance scan to active: {active} for annotation instance: {instance_iri}..." + ) + ANNOTATIONS_DAO.toggle_annotation_instance_occurance_scan(instance_iri, active) + + +@app.patch("/annotation/ts_matcher/detection_precision") +async def patch_annotation_matcher_detection_precision( + matcher_iri: str, detection_precision: float +): + logger.info( + f"Changing scan precision to {detection_precision} for annotation matcher: {matcher_iri}..." + ) + ANNOTATIONS_DAO.change_matcher_precision(matcher_iri, detection_precision) + + class AnnotationInstanceArguments(BaseModel): id_short: str asset_iri: str @@ -185,10 +209,7 @@ def get_annotation_detection_details(detection_iri: None | str = None): ANNOTATIONS_DAO.get_oldest_unconfirmed_detection() ) if detection_node is None: - logger.info( - "Annotation detection details for current detection " - "ordered when there was no new detection!" - ) + # not yet open -> skip return None else: detection_node: AnnotationDetectionNodeFlat = BASE_NODE_DAO.get_generic_node( @@ -305,7 +326,7 @@ async def get_annotation_status(): """ status_dict = { "total_annotations_count": ANNOTATIONS_DAO.get_annotation_instance_count(), - "sum_of_scans": "?", # TODO + "sum_of_scans": int(memcache.get("active_annotation_detectors_count")), "unconfirmed_detections": ANNOTATIONS_DAO.get_annotation_detections_count( confirmed=False ), diff --git a/backend/api/rest_endpoints/graph_endpoints.py b/backend/api/rest_endpoints/graph_endpoints.py index ce8249a..dbe8fa4 100644 --- a/backend/api/rest_endpoints/graph_endpoints.py +++ b/backend/api/rest_endpoints/graph_endpoints.py @@ -7,12 +7,12 @@ @app.patch("/node_position") -def update_node_position(iri: str, pos_x: float, pos_y: float): +async def update_node_position(iri: str, pos_x: float, pos_y: float): python_graph_endpoints.update_node_position(iri, pos_x, pos_y) @app.get("/node_details") -def get_node_details(iri: str): +async def get_node_details(iri: str): node = BASE_NODE_DAO.get_generic_node(iri) diff --git a/backend/knowledge_graph/KnowledgeGraphPersistenceService.py b/backend/knowledge_graph/KnowledgeGraphPersistenceService.py index 8d8711e..ddc2811 100644 --- a/backend/knowledge_graph/KnowledgeGraphPersistenceService.py +++ b/backend/knowledge_graph/KnowledgeGraphPersistenceService.py @@ -113,7 +113,7 @@ def graph_push(self, subgraph: Any) -> None: self._connect() def graph_create(self, subgraph: Any) -> None: - """Executes the graph.push command + """Executes the graph.create command Handles connection errors. """ while True: diff --git a/backend/knowledge_graph/dao/AnnotationNodesDao.py b/backend/knowledge_graph/dao/AnnotationNodesDao.py index bb2cae7..645605a 100644 --- a/backend/knowledge_graph/dao/AnnotationNodesDao.py +++ b/backend/knowledge_graph/dao/AnnotationNodesDao.py @@ -1,6 +1,7 @@ from datetime import datetime from typing import List -from py2neo import NodeMatcher, Relationship +from py2neo import NodeMatcher, Relationship, Node +from backend.knowledge_graph.dao.TimeseriesNodesDao import TimeseriesNodesDao from graph_domain.expert_annotations.AnnotationDefinitionNode import ( AnnotationDefinitionNodeFlat, ) @@ -39,6 +40,8 @@ IRI_PREFIX_ANNOTATION_TS_MATCHER = IRI_PREFIX_GLOBAL + "annotations/ts_matchers/" IRI_PREFIX_ANNOTATION_PRE_INDICATOR = IRI_PREFIX_GLOBAL + "annotations/pre_indicators/" +TS_NODES_DAO: TimeseriesNodesDao = TimeseriesNodesDao.instance() + class AnnotationNodesDao(object): """ @@ -92,6 +95,7 @@ def create_annotation_instance( end_datetime: datetime, caption: str | None = None, description: str | None = None, + activate_occurance_scan: bool = True, ) -> str: """Creates a new annotation instance""" iri = IRI_PREFIX_ANNOTATION_INSTANCE + id_short @@ -105,11 +109,26 @@ def create_annotation_instance( creation_date_time=datetime.now(), occurance_start_date_time=start_datetime, occurance_end_date_time=end_datetime, + activate_occurance_scan=activate_occurance_scan, ) self.ps.graph_push(instance) return iri + def toggle_annotation_instance_occurance_scan( + self, instance_iri: str, active: bool + ): + matcher = NodeMatcher(self.ps.graph) + node: Node = matcher.match(iri=instance_iri).first() + node.update(activate_occurance_scan=active) + self.ps.graph_push(node) + + def change_matcher_precision(self, matcher_iri: str, precision: float): + matcher = NodeMatcher(self.ps.graph) + node: Node = matcher.match(iri=matcher_iri).first() + node.update(detection_precision=precision) + self.ps.graph_push(node) + def create_annotation_detection( self, id_short: str, @@ -401,7 +420,7 @@ def get_matcher_annotation_instance(self, matcher_iri): return matches.first() @validate_result_nodes - def get_matcher_original_annotated_ts(self, matcher_iri): + def get_matcher_original_annotated_ts(self, matcher_iri) -> TimeseriesNodeFlat: """Returns the timeseries node that the matcher is related to""" matches = self.ps.repo_match(model=TimeseriesNodeFlat).where( "(_)<-[:" @@ -415,6 +434,20 @@ def get_matcher_original_annotated_ts(self, matcher_iri): return matches.first() + @validate_result_nodes + def get_matched_ts_for_matcher(self, matcher_iri) -> List[TimeseriesNodeFlat]: + matches = self.ps.repo_match(model=TimeseriesNodeFlat).where( + "(_)<-[:" + + RelationshipTypes.TS_MATCH.value + + "]-(:" + + NodeTypes.ANNOTATION_TS_MATCHER.value + + ' {iri: "' + + matcher_iri + + '"}) ' + ) + + return matches.all() + @validate_result_nodes def get_annotation_instance_for_definition(self, definition_iri): """Returns the instances related to the given annotation definition""" @@ -430,15 +463,64 @@ def get_annotation_instance_for_definition(self, definition_iri): return matches.all() + @validate_result_nodes + def get_scanned_assets_for_annotation_instance( + self, instance_iri + ) -> List[AssetNodeFlat]: + query = ( + "(_)-[:" + + RelationshipTypes.OCCURANCE_SCAN.value + + "]->(:" + + NodeTypes.ANNOTATION_DEFINITION.value + + ")<-[:" + + RelationshipTypes.INSTANCE_OF.value + + "]-(:" + + NodeTypes.ANNOTATION_INSTANCE.value + + ' {iri: "' + + instance_iri + + '"}) ' + ) + + scanned_assets_matches = self.ps.repo_match(model=AssetNodeFlat).where(query) + + scanned_assets = scanned_assets_matches.all() + + matchers = self.get_matchers_for_annotation_instance(instance_iri) + + # Filter: Only the assets that have a active matching for all matchers of the instance + instance_scanned_assets = [] + for asset in scanned_assets: + skip_asset = False + asset_ts_iris = [ + ts.iri for ts in TS_NODES_DAO.get_timeseries_of_asset(asset.iri) + ] + + for matcher in matchers: + matched_ts_iris = [ + ts.iri for ts in self.get_matched_ts_for_matcher(matcher.iri) + ] + if any([ts not in asset_ts_iris for ts in matched_ts_iris]): + skip_asset = True + break + if not skip_asset: + instance_scanned_assets.append(asset) + + return instance_scanned_assets + def get_annotation_instance_count_for_definition(self, definition_iri): """Returns the instances related to the given annotation definition""" return len(self.get_annotation_instance_for_definition(definition_iri)) @validate_result_nodes - def get_annotation_instances(self): + def get_annotation_instances(self, only_active_scanned_instances: bool = False): matches = self.ps.repo_match(model=AnnotationInstanceNodeFlat) + if only_active_scanned_instances: + matches = matches.where( + "not exists(_.activate_occurance_scan) or _.activate_occurance_scan = true" + ) + return matches.all() def get_annotation_instance_count(self): @@ -560,3 +642,23 @@ def get_annotation_detections_count(self, confirmed: bool = False): return len(self.get_confirmed_annotation_detections()) else: return len(self.get_unconfirmed_annotation_detections()) + + @validate_result_nodes + def get_matchers_for_annotation_instance( + self, instance_iri + ) -> AnnotationTimeseriesMatcherNodeFlat: + matches = self.ps.repo_match(model=AnnotationTimeseriesMatcherNodeFlat).where( + "(_)<-[:" + + RelationshipTypes.DETECTABLE_WITH.value + + "]-(:" + + NodeTypes.ANNOTATION_INSTANCE.value + + ' {iri: "' + + instance_iri + + '"})' + ) + + return matches.all() + + def get_detection_precision_sum_for_instance(self, instance_iri): + matchers = self.get_matchers_for_annotation_instance(instance_iri) + return sum([m.detection_precision for m in matchers]) diff --git a/backend/knowledge_graph/dao/TimeseriesNodesDao.py b/backend/knowledge_graph/dao/TimeseriesNodesDao.py index c51352f..8cd221f 100644 --- a/backend/knowledge_graph/dao/TimeseriesNodesDao.py +++ b/backend/knowledge_graph/dao/TimeseriesNodesDao.py @@ -73,6 +73,20 @@ def get_all_timeseries_nodes_flat(self): return timeseries_flat_matches.all() + @validate_result_nodes + def get_timeseries_of_asset(self, asset_iri) -> List[TimeseriesNodeFlat]: + matches = self.ps.repo_match(model=TimeseriesNodeFlat).where( + "(_)<-[:" + + RelationshipTypes.HAS_TIMESERIES.value + + "]-(:" + + NodeTypes.ASSET.value + + ' {iri: "' + + asset_iri + + '"})' + ) + + return matches.all() + @validate_result_nodes def get_all_timeseries_nodes_deep(self): """ @@ -106,7 +120,7 @@ def update_reduced_feature_list(self, iri: str, reduced_feature_list: List): node: Node = matcher.match(iri=iri).first() reduced_feature_list_str = json.dumps(reduced_feature_list) node.update(reduced_feature_list=reduced_feature_list_str) - self.ps.graph_create(node) + self.ps.graph_push(node) def create_ts_cluster( self, iri: str, id_short: str, description: str | None = None diff --git a/backend/runtime_connections/RuntimeConnectionContainer.py b/backend/runtime_connections/RuntimeConnectionContainer.py index e9ced83..2a1f214 100644 --- a/backend/runtime_connections/RuntimeConnectionContainer.py +++ b/backend/runtime_connections/RuntimeConnectionContainer.py @@ -1,8 +1,12 @@ +from threading import Thread +import time from typing import Dict, List +from backend.knowledge_graph.dao.TimeseriesNodesDao import TimeseriesNodesDao from graph_domain.main_digital_twin.RuntimeConnectionNode import ( RuntimeConnectionNode, RuntimeConnectionTypes, ) +from util.inter_process_cache import memcache from graph_domain.main_digital_twin.TimeseriesNode import ( TimeseriesNodeDeep, ) @@ -27,6 +31,7 @@ ) from util.log import logger + # Maps node-types to the connection / input classes RT_CONNECTION_MAPPING = { RuntimeConnectionTypes.MQTT.value: MqttRuntimeConnection, @@ -58,15 +63,24 @@ def __init__(self): RuntimeConnectionContainer.__instance = self self.connections: Dict[str, RuntimeConnection] = {} + self._active_connections_status_thread = None + + def start_active_connections_status_thread(self): + self._active_connections_status_thread = Thread( + target=self._active_connections_write_to_cache_loop + ) + self._active_connections_status_thread.start() - def refresh_connection_inputs_and_handlers( - self, updated_ts_nodes_deep: List[TimeseriesNodeDeep] - ): + def refresh_connection_inputs_and_handlers(self): """Refreshes the inputs and handlers, creating new ones if available in the graph, or deleting old ones. Args: timeseries_nodes_deep (List[TimeseriesNodeDeep]): _description_ """ + + timeseries_nodes_dao: TimeseriesNodesDao = TimeseriesNodesDao.instance() + updated_ts_nodes_deep = timeseries_nodes_dao.get_all_timeseries_nodes_deep() + # # Check if ts inputs or connections have been removed: # @@ -160,7 +174,9 @@ def refresh_connection_inputs_and_handlers( ) ) - ts_input.register_handler(handler_method=ts_service.write_measurement) + ts_input.register_handler( + handler_method=ts_service.write_measurement, handler_id=ts_service.iri + ) if not new_connection: self.connections.get(ts_node.runtime_connection.iri).timeseries_inputs[ @@ -206,10 +222,25 @@ def get_all_inputs(self) -> List[TimeseriesInput]: inputs = [] con: RuntimeConnection for con in self.connections.values(): - inputs.extend(con.timeseries_inputs) + inputs.extend(con.timeseries_inputs.values()) return inputs + def get_timeseries_input_by_iri(self, iri: str): + ts_input_list = [ + ts_input for ts_input in self.get_all_inputs() if ts_input.iri == iri + ] + if len(ts_input_list) > 0: + return ts_input_list[0] + def get_active_connections_count(self) -> int: return len([True for con in self.connections.values() if con.is_active()]) + + def _active_connections_write_to_cache_loop(self): + while True: + memcache.set( + "active_runtime_connections_count", self.get_active_connections_count() + ) + + time.sleep(3) diff --git a/backend/runtime_connections/TimeseriesInput.py b/backend/runtime_connections/TimeseriesInput.py index 18286e0..917ac12 100644 --- a/backend/runtime_connections/TimeseriesInput.py +++ b/backend/runtime_connections/TimeseriesInput.py @@ -2,7 +2,7 @@ from ctypes.wintypes import BOOL from datetime import datetime from math import floor -from typing import Tuple +from typing import Tuple, Dict from graph_domain.main_digital_twin.TimeseriesNode import ( TimeseriesNodeFlat, @@ -15,7 +15,7 @@ def __init__( self, iri: str, connection_topic: str, connection_keyword: str, value_type: str ): self._last_reading: Tuple[datetime, float | int | bool | str] = None - self._handlers = [] + self._handlers: Dict[str, callable] = dict() self.iri = iri self.connection_topic = connection_topic self.connection_keyword = connection_keyword @@ -36,14 +36,17 @@ def get_most_current(self) -> Tuple[datetime, int | float | bool | str]: """ return self._last_reading - def register_handler(self, handler_method) -> None: + def register_handler(self, handler_method, handler_id: str) -> None: """ Registers a given handler method to be called whenever a reading is being received :param handler_method: Callable taking three arguments: id_uri: str, value: Any, reading_time: datetime. :return: None """ - self._handlers.append(handler_method) + self._handlers[handler_id] = handler_method + + def remove_handler(self, handler_id: str): + self._handlers.pop(handler_id) def handle_reading(self, reading_time, reading_value): """ @@ -73,6 +76,6 @@ def handle_reading(self, reading_time, reading_value): if isinstance(reading_value, str): pass - for handler in self._handlers: + for handler in self._handlers.values(): handler(self.iri, reading_value, reading_time) self._last_reading = reading_time, reading_value diff --git a/backend/specialized_databases/timeseries/TimeseriesPersistenceService.py b/backend/specialized_databases/timeseries/TimeseriesPersistenceService.py index 1ff17bf..3d05aae 100644 --- a/backend/specialized_databases/timeseries/TimeseriesPersistenceService.py +++ b/backend/specialized_databases/timeseries/TimeseriesPersistenceService.py @@ -14,7 +14,7 @@ class TimeseriesPersistenceService(SpecializedDatabasePersistenceService): @abc.abstractmethod def write_measurement( - self, id_uri: str, value: float | bool | str, reading_time: datetime = None + self, iri: str, value: float | bool | str, reading_time: datetime = None ): """ Writes the given value to the standard bucket into the measurement according to the id_uri into a field @@ -30,10 +30,10 @@ def write_measurement( @abc.abstractmethod def read_period_to_dataframe( self, - id_uri: str, + iri: str, begin_time: datetime, end_time: datetime, - aggregation_window_ms: int | None, + aggregation_window_ms: int | None = None, ) -> pd.DataFrame: """ Reads all measurements from the sensor with the given ID in the time period @@ -47,7 +47,7 @@ def read_period_to_dataframe( @abc.abstractmethod def count_entries_for_period( - self, id_uri: str, begin_time: datetime, end_time: datetime + self, iri: str, begin_time: datetime, end_time: datetime ) -> int: """ Counts the measurement entries from the sensor with the given ID in the time period @@ -58,3 +58,9 @@ def count_entries_for_period( :raise IdNotFoundException: if the id_uri is not found """ pass + + @abc.abstractmethod + def max_value_for_period( + self, iri: str, begin_time: datetime = None, end_time: datetime = None + ) -> int: + pass diff --git a/backend/specialized_databases/timeseries/influx_db/InfluxDbPersistenceService.py b/backend/specialized_databases/timeseries/influx_db/InfluxDbPersistenceService.py index bd9ff7b..5cebaba 100644 --- a/backend/specialized_databases/timeseries/influx_db/InfluxDbPersistenceService.py +++ b/backend/specialized_databases/timeseries/influx_db/InfluxDbPersistenceService.py @@ -50,7 +50,7 @@ def __init__(self, **kwargs): # override def write_measurement( - self, id_uri: str, value: float | bool | str, reading_time: datetime = None + self, iri: str, value: float | bool | str, reading_time: datetime = None ): """ Writes the given value to the standard bucket into the measurement according to the id_uri into a field @@ -62,7 +62,7 @@ def write_measurement( :return: """ - record = Point(measurement_name=id_uri).field( + record = Point(measurement_name=iri).field( field=READING_FIELD_NAME, value=value ) if reading_time is not None: @@ -105,10 +105,10 @@ def _timerange_query(self, begin_time: datetime | None, end_time: datetime | Non # override def read_period_to_dataframe( self, - id_uri: str, + iri: str, begin_time: datetime | None, end_time: datetime | None, - aggregation_window_ms: int | None, + aggregation_window_ms: int | None = None, ) -> pd.DataFrame: """ Reads all measurements from the sensor with the given ID in the time period @@ -124,7 +124,7 @@ def read_period_to_dataframe( query = ( f'from(bucket: "{self.bucket}") \n' f"{range_query} \n" - f'|> filter(fn: (r) => r["_measurement"] == "{id_uri}") \n' + f'|> filter(fn: (r) => r["_measurement"] == "{iri}") \n' f"|> aggregateWindow(every: {aggregation_window_ms}ms, fn: first, createEmpty: false)\n" f'|> keep(columns: ["_time", "_value"]) \n' '|> rename(columns: {_time: "time", _value: "value"})' @@ -133,7 +133,7 @@ def read_period_to_dataframe( query = ( f'from(bucket: "{self.bucket}") \n' f"{range_query} \n" - f'|> filter(fn: (r) => r["_measurement"] == "{id_uri}") \n' + f'|> filter(fn: (r) => r["_measurement"] == "{iri}") \n' f'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") \n' f'|> keep(columns: ["_time", "{READING_FIELD_NAME}"]) \n' '|> rename(columns: {_time: "time", reading: "value"})' @@ -141,7 +141,8 @@ def read_period_to_dataframe( try: df = self._query_api.query_data_frame(query=query) - + if df.empty: + return pd.DataFrame({"time": [], "value": []}) # Dataframe cleanup df.drop(columns=["result", "table"], axis=1, inplace=True) # df.rename( @@ -160,7 +161,7 @@ def read_period_to_dataframe( # override def count_entries_for_period( - self, id_uri: str, begin_time: datetime, end_time: datetime + self, iri: str, begin_time: datetime, end_time: datetime ) -> int: """ Counts the measurement entries from the sensor with the given ID in the time period @@ -176,7 +177,7 @@ def count_entries_for_period( query = ( f'from(bucket: "{self.bucket}") \n' f"{range_query} \n" - f'|> filter(fn: (r) => r["_measurement"] == "{id_uri}") \n' + f'|> filter(fn: (r) => r["_measurement"] == "{iri}") \n' f'|> count(column: "_value") \n' f'|> keep(columns: ["_value"])' ) @@ -195,6 +196,64 @@ def count_entries_for_period( # Waiting for reconnect... return None + # override + def max_value_for_period( + self, iri: str, begin_time: datetime = None, end_time: datetime = None + ) -> int: + warnings.simplefilter("ignore", MissingPivotFunction) + range_query = self._timerange_query(begin_time, end_time) + + query = ( + f'from(bucket: "{self.bucket}") \n' + f"{range_query} \n" + f'|> filter(fn: (r) => r["_measurement"] == "{iri}") \n' + f'|> max(column: "_value") \n' + f'|> keep(columns: ["_value"])' + ) + + # pylint: disable=W0703 + try: + df: pd.DataFrame = self._query_api.query_data_frame(query=query) + + return int(df["_value"][0]) if not df.empty else 0 + + except KeyError: + # id_uri not found + raise IdNotFoundException + except Exception: + # Using generic exception on purpose, since there are many different ones occuring, that + # Waiting for reconnect... + return None + + # override + def min_value_for_period( + self, iri: str, begin_time: datetime = None, end_time: datetime = None + ) -> int: + warnings.simplefilter("ignore", MissingPivotFunction) + range_query = self._timerange_query(begin_time, end_time) + + query = ( + f'from(bucket: "{self.bucket}") \n' + f"{range_query} \n" + f'|> filter(fn: (r) => r["_measurement"] == "{iri}") \n' + f'|> min(column: "_value") \n' + f'|> keep(columns: ["_value"])' + ) + + # pylint: disable=W0703 + try: + df: pd.DataFrame = self._query_api.query_data_frame(query=query) + + return int(df["_value"][0]) if not df.empty else 0 + + except KeyError: + # id_uri not found + raise IdNotFoundException + except Exception: + # Using generic exception on purpose, since there are many different ones occuring, that + # Waiting for reconnect... + return None + def backup(self, backup_path: str): logger.info("Backing up InfluxDB...") @@ -242,54 +301,3 @@ def restore(self, backup_path: str): ["influx", "restore", backup_path, "--host", self.uri, "-t", self.key] ) logger.info("Finished restoring InfluxDB.") - - # # override - # def stream( - # self, - # id_uri: str, - # begin_time: datetime | None, - # end_time: datetime | None, - # aggregation_window_ms: int | None, - # ) -> pd.DataFrame: - - # self._query_api.query_stream() - - # range_query = self._timerange_query(begin_time, end_time) - - # if isinstance(aggregation_window_ms, int) and aggregation_window_ms != 0: - # query = ( - # f'from(bucket: "{self.bucket}") \n' - # f"{range_query} \n" - # f'|> filter(fn: (r) => r["_measurement"] == "{id_uri}") \n' - # f"|> aggregateWindow(every: {aggregation_window_ms}ms, fn: first, createEmpty: false)\n" - # f'|> keep(columns: ["_time", "_value"]) \n' - # '|> rename(columns: {_time: "time", _value: "value"})' - # ) - # else: - # query = ( - # f'from(bucket: "{self.bucket}") \n' - # f"{range_query} \n" - # f'|> filter(fn: (r) => r["_measurement"] == "{id_uri}") \n' - # f'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") \n' - # f'|> keep(columns: ["_time", "{READING_FIELD_NAME}"]) \n' - # '|> rename(columns: {_time: "time", reading: "value"})' - # ) - - # try: - # df = self._query_api.query_data_frame(query=query) - - # # Dataframe cleanup - # df.drop(columns=["result", "table"], axis=1, inplace=True) - # # df.rename( - # # columns={"_time": "time", READING_FIELD_NAME: "value"}, inplace=True - # # ) - # # df.rename(columns={"_time": "time", "_value": "value"}, inplace=True) - - # return df - - # except KeyError: - # # id_uri not found - # raise IdNotFoundException - # except NewConnectionError: - # # Skip this ts - # return None diff --git a/container-requirements-initializer.sh b/container-requirements-initializer.sh index 1eb964a..f4fb142 100755 --- a/container-requirements-initializer.sh +++ b/container-requirements-initializer.sh @@ -5,6 +5,9 @@ apt-get update apt-get install -y curl wget +# Inter-process communication +apt-get install memcached + # git required for the pke keyphrase extraction library (git pip link) apt-get install -y git diff --git a/documentation/FAQ.md b/documentation/FAQ.md index a89198e..ee29667 100644 --- a/documentation/FAQ.md +++ b/documentation/FAQ.md @@ -8,6 +8,8 @@ > > To temporally fix the problem, press 'Acknowledge Errors' at the Node-RED Dashboard. Note that this Dashboard is not publicly available and can only be reached from within the local network of the factory. +Node-RED Dashboard is available in the local network of the factory at: `1880/ui/` + ### Development with the Devcontainer **The port is already in use when (re-)launching either the backend or frontend.** diff --git a/documentation/sindit-development-guide.md b/documentation/sindit-development-guide.md index c0bdc21..6c2b930 100644 --- a/documentation/sindit-development-guide.md +++ b/documentation/sindit-development-guide.md @@ -39,6 +39,8 @@ This project is run via docker-compose. Run `docker-compose up -d` inside the re At the first start, this will pull the image used for the frontend and backend containers and for the databases. Use a good network, as the images are not small. +To update the images, run `docker-compose pull`. + To stop the containers, run `docker-compose stop`. To destroy the containers but keep the images and data, run `docker-compose down`. diff --git a/dt_backend.py b/dt_backend.py index f11801e..3cef3f0 100644 --- a/dt_backend.py +++ b/dt_backend.py @@ -12,6 +12,9 @@ from dateutil import tz from threading import Thread +from backend.annotation_detection.AnnotationDetectorContainer import ( + AnnotationDetectorContainer, +) from util.environment_and_configuration import ( ConfigGroups, get_configuration, @@ -67,6 +70,8 @@ get_environment_variable_int, ) +import util.inter_process_cache as cache + # ############################################################################# # Setup sensor connections and timeseries persistence @@ -89,29 +94,33 @@ def init_database_data_if_not_available(): logger.info("Finished initilization.") -def refresh_ts_inputs(): - timeseries_nodes_dao: TimeseriesNodesDao = TimeseriesNodesDao.instance() +def refresh_workers(): runtime_con_container: RuntimeConnectionContainer = ( RuntimeConnectionContainer.instance() ) - timeseries_deep_nodes = timeseries_nodes_dao.get_all_timeseries_nodes_deep() - - runtime_con_container.refresh_connection_inputs_and_handlers(timeseries_deep_nodes) + detectors_container: AnnotationDetectorContainer = ( + AnnotationDetectorContainer.instance() + ) + runtime_con_container.refresh_connection_inputs_and_handlers() + detectors_container.refresh_annotation_detectors() -def refresh_time_series_thread_loop(): +def refresh_workers_thread_loop(): while True: time.sleep(120) - logger.info("Refreshing time-series inputs and connections...") - refresh_ts_inputs() + logger.info("Refreshing worker services...") + + refresh_workers() - logger.info("Done refreshing time-series inputs and connections.") + logger.info("Done refreshing worker services.") # ############################################################################# # Launch backend # ############################################################################# + + if __name__ == "__main__": logger.info("Initializing Knowledge Graph...") @@ -130,59 +139,24 @@ def refresh_time_series_thread_loop(): ) logger.info("Done initializing specialized databases.") - logger.info("Loading time-series inputs and connections...") - refresh_ts_inputs() - logger.info("Done loading time-series inputs and connections.") + logger.info( + "Loading worker services: time-series inputs and connections as well as annotation detectors..." + ) + refresh_workers() + logger.info("Done loading worker services.") - # Thread checking regulary, if timeseries inputs and runtime-connections have been added / removed - ts_refresh_thread = Thread(target=refresh_time_series_thread_loop) - ts_refresh_thread.start() + # Thread checking regulary, if timeseries inputs, runtime-connections and annotation detectors have been added / removed + workers_refresh_thread = Thread(target=refresh_workers_thread_loop) + workers_refresh_thread.start() # Start cleanup thread deleting obsolete backups: start_storage_cleanup_thread() - # - # TODO: remove this. Just for - # - # from backend.knowledge_graph.dao.AnnotationNodesDao import AnnotationNodesDao - # from datetime import datetime - - # annotations_dao: AnnotationNodesDao = AnnotationNodesDao.instance() - - # detection_iri = annotations_dao.create_annotation_detection( - # id_short="test-detection", - # start_datetime=datetime.now().astimezone( - # tz.gettz(get_configuration(group=ConfigGroups.FRONTEND, key="timezone")) - # ) - # - timedelta(minutes=10), - # end_datetime=datetime.now().astimezone( - # tz.gettz(get_configuration(group=ConfigGroups.FRONTEND, key="timezone")) - # ), - # caption="Test Detection", - # ) - - # annotations_dao.create_annotation_detection_timeseries_relationship( - # detection_iri=detection_iri, - # timeseries_iri="www.sintef.no/aas_identifiers/learning_factory/sensors/hbw_actual_pos_vertical", - # ) - # annotations_dao.create_annotation_detection_timeseries_relationship( - # detection_iri=detection_iri, - # timeseries_iri="www.sintef.no/aas_identifiers/learning_factory/sensors/factory_humidity_raw", - # ) - - # annotations_dao.create_annotation_detection_asset_relationship( - # detection_iri=detection_iri, - # asset_iri="www.sintef.no/aas_identifiers/learning_factory/machines/hbw", - # ) - - # annotations_dao.create_annotation_detection_instance_relationship( - # detection_iri=detection_iri, - # instance_iri="www.sintef.no/aas_identifiers/learning_factory/annotations/instances/test_annotation_definition_hbw_2022-09-18T20:30:29.263466", - # ) - - # - # - # + # Start getting the connectivity status for runtime connections + RuntimeConnectionContainer.instance().start_active_connections_status_thread() + # Start getting the connectivity status for runtime connections + + AnnotationDetectorContainer.instance().start_active_detectors_status_thread() # Run fast API # noinspection PyTypeChecker @@ -190,8 +164,6 @@ def refresh_time_series_thread_loop(): "dt_backend:app", host=get_environment_variable("FAST_API_HOST"), port=get_environment_variable_int("FAST_API_PORT"), - # workers=4, - # # TODO: decide whether to introduce inter-process communication - # (e.g. for runtime-connection status) and to activate workers! + workers=4, access_log=False, ) diff --git a/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_callbacks.py b/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_callbacks.py index c0e2289..a2c08d4 100644 --- a/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_callbacks.py +++ b/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_callbacks.py @@ -1,5 +1,14 @@ from datetime import datetime from dash import ctx +from graph_domain.BaseNode import BaseNode +from graph_domain.expert_annotations.AnnotationInstanceNode import ( + AnnotationInstanceNodeDeep, + AnnotationInstanceNodeFlat, +) +from graph_domain.expert_annotations.AnnotationTimeseriesMatcherNode import ( + AnnotationTimeseriesMatcherNodeFlat, +) +from graph_domain.factory_graph_ogm_matches import OGM_CLASS_FOR_NODE_TYPE from util.log import logger from frontend.app import app @@ -95,6 +104,87 @@ def annotation_info(info_open, sidebar_open_classname): return None, None, None, None +@app.callback( + Output("annotation-instance-scan-toggle-container", "is_open"), + Output("annotation-instance-scanning-toggle", "value"), + Input("selected-graph-element-store", "data"), + prevent_initial_call=False, +) +def show_instance_scan_toggle_switch(selected_el_json): + if selected_el_json is None: + return False, [] + + selected_el: GraphSelectedElement = GraphSelectedElement.from_json(selected_el_json) + if selected_el.type == NodeTypes.ANNOTATION_INSTANCE.value: + node_details_json = api_client.get_json("/node_details", iri=selected_el.iri) + node_class = OGM_CLASS_FOR_NODE_TYPE.get(selected_el.type) + node: AnnotationInstanceNodeFlat = node_class.from_dict(node_details_json) + + return True, [True] if node.activate_occurance_scan else [] + else: + return False, [] + + +@app.callback( + Output("annotation-instance-scan-toggled", "data"), + Input("annotation-instance-scanning-toggle", "value"), + State("selected-graph-element-store", "data"), + prevent_initial_call=True, +) +def toggle_instance_scan(toggle_value, selected_el_json): + activated = True in toggle_value + selected_el: GraphSelectedElement = GraphSelectedElement.from_json(selected_el_json) + + api_client.patch( + "/annotation/instance/toggle_occurance_scan", + instance_iri=selected_el.iri, + active=activated, + ) + + return datetime.now() + + +@app.callback( + Output("annotation-matcher-settings-container", "is_open"), + Output("annotation-matcher-precission-slider", "value"), + Input("selected-graph-element-store", "data"), + prevent_initial_call=False, +) +def show_annotation_matcher_settings(selected_el_json): + if selected_el_json is None: + return False, 0 + + selected_el: GraphSelectedElement = GraphSelectedElement.from_json(selected_el_json) + if selected_el.type == NodeTypes.ANNOTATION_TS_MATCHER.value: + node_details_json = api_client.get_json("/node_details", iri=selected_el.iri) + node_class = OGM_CLASS_FOR_NODE_TYPE.get(selected_el.type) + node: AnnotationTimeseriesMatcherNodeFlat = node_class.from_dict( + node_details_json + ) + + return True, node.detection_precision + else: + return False, 0 + + +@app.callback( + Output("annotation-matcher-settings-changed", "data"), + Input("annotation-matcher-precission-slider", "value"), + State("selected-graph-element-store", "data"), + prevent_initial_call=True, +) +def change_matcher_settings(value, selected_el_json): + selected_el: GraphSelectedElement = GraphSelectedElement.from_json(selected_el_json) + + api_client.patch( + "/annotation/ts_matcher/detection_precision", + matcher_iri=selected_el.iri, + detection_precision=value, + ) + + return datetime.now() + + ########################################## # Deleting: ########################################## diff --git a/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_layout.py b/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_layout.py index 851d8ff..f14a2da 100644 --- a/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_layout.py +++ b/frontend/left_sidebar/extensions/annotation_detection_extension/annotation_extension_layout.py @@ -44,7 +44,7 @@ def get_layout(): ), html.Div( className="node-information-attribute-header", - children="Sum of Active Scans for New Occurances:", + children="Total Amount of Active Detectors:", style={"margin-top": "10px"}, ), html.Div( @@ -74,6 +74,56 @@ def get_layout(): ), ], ), + dbc.Collapse( + id="annotation-instance-scan-toggle-container", + className="annotation-click-context-feature-container", + is_open=False, + children=[ + dcc.Store(id="annotation-instance-scan-toggled"), + html.Div( + "Settings for the selected Annotation Instance:", + style={"font-weight": "bold"}, + ), + dbc.Checklist( + id="annotation-instance-scanning-toggle", + options=[ + { + "label": "Activate Scannning for new Occurances", + "value": True, + } + ], + value=[True], + switch=True, + style={"margin-top": "10px"}, + ), + ], + ), + dbc.Collapse( + id="annotation-matcher-settings-container", + className="annotation-click-context-feature-container", + is_open=False, + children=[ + dcc.Store(id="annotation-matcher-settings-changed"), + html.Div( + "Settings for the selected Annotation Timeseries Matcher:", + style={ + "font-weight": "bold", + "padding-bottom": "5px", + }, + ), + html.Div( + "Here, you can adjust how precise the timeseries will be matched.", + style={"padding-bottom": "5px"}, + ), + dcc.Slider( + 0, + 1, + marks=None, + value=0.5, + id="annotation-matcher-precission-slider", + ), + ], + ), html.Div( id="annotations-buttons-container", className="annotations-bottom-buttons", diff --git a/frontend/left_sidebar/extensions/similarity_pipeline_extension/pipeline_extension_layout.py b/frontend/left_sidebar/extensions/similarity_pipeline_extension/pipeline_extension_layout.py index f711459..4367e85 100644 --- a/frontend/left_sidebar/extensions/similarity_pipeline_extension/pipeline_extension_layout.py +++ b/frontend/left_sidebar/extensions/similarity_pipeline_extension/pipeline_extension_layout.py @@ -15,7 +15,7 @@ def get_layout(): ), dbc.CardBody( html.Div( - "Pipeline status info (and control?) coming soon...", + "This is the place where you will soon be able to view some information about the similarity pipeline (and maybe start it). Currently, the pipeline has to be manually executet via a script.", ) ), ], diff --git a/frontend/left_sidebar/global_information/global_information_callbacks.py b/frontend/left_sidebar/global_information/global_information_callbacks.py index 25a421b..458cfa6 100644 --- a/frontend/left_sidebar/global_information/global_information_callbacks.py +++ b/frontend/left_sidebar/global_information/global_information_callbacks.py @@ -4,7 +4,6 @@ from dash.dependencies import Input, Output from frontend import api_client from dateutil import tz - from dash import html from frontend.left_sidebar.global_information import global_information_layout diff --git a/frontend/left_sidebar/global_information/global_information_layout.py b/frontend/left_sidebar/global_information/global_information_layout.py index 34aff1e..cc677ab 100644 --- a/frontend/left_sidebar/global_information/global_information_layout.py +++ b/frontend/left_sidebar/global_information/global_information_layout.py @@ -8,7 +8,7 @@ def get_layout(): dbc.CardHeader( id="global-information-container-card", children=[ - html.Div("Connectivity status"), + html.Div("Connectivity Status"), ], ), dbc.CardBody( diff --git a/frontend/left_sidebar/left_sidebar_layout.py b/frontend/left_sidebar/left_sidebar_layout.py index 0af0d62..945216c 100644 --- a/frontend/left_sidebar/left_sidebar_layout.py +++ b/frontend/left_sidebar/left_sidebar_layout.py @@ -96,7 +96,7 @@ def get_layout(): [ visibility_settings_layout.get_layout(), ], - title="Graph visibility settings", + title="Graph Visibility Settings", ), dbc.AccordionItem( [ diff --git a/frontend/left_sidebar/visibility_settings/visibility_settings_callbacks.py b/frontend/left_sidebar/visibility_settings/visibility_settings_callbacks.py index 9944480..653e463 100644 --- a/frontend/left_sidebar/visibility_settings/visibility_settings_callbacks.py +++ b/frontend/left_sidebar/visibility_settings/visibility_settings_callbacks.py @@ -16,7 +16,9 @@ ) from graph_domain.factory_graph_types import ( NODE_TYPE_STRINGS, + PSEUDO_NODE_TYPE_STRINGS, NodeTypes, + PseudoNodeTypes, ) from util.log import logger @@ -135,6 +137,7 @@ def change_graph_visibility_options( for inactive_switch in deactivated_switches: # Hide nodes from that type: + invisibility_styles.append( {"selector": f".{inactive_switch}", "style": {"display": "none"}} ) diff --git a/frontend/main_column/factory_graph/factory_graph_layout.py b/frontend/main_column/factory_graph/factory_graph_layout.py index 3f7a518..f3262c1 100644 --- a/frontend/main_column/factory_graph/factory_graph_layout.py +++ b/frontend/main_column/factory_graph/factory_graph_layout.py @@ -29,7 +29,7 @@ def get_layout(): id="graph-header-row", children=[ dbc.Col( - html.Div("Factory graph"), + html.Div("Factory Graph"), align="center", ), dbc.Col( diff --git a/frontend/right_sidebar/node_information_tab/node_information_layout.py b/frontend/right_sidebar/node_information_tab/node_information_layout.py index 06d71bc..64903ee 100644 --- a/frontend/right_sidebar/node_information_tab/node_information_layout.py +++ b/frontend/right_sidebar/node_information_tab/node_information_layout.py @@ -60,6 +60,26 @@ def get_visualized_attributes_for_node_type(node: BaseNode) -> List[Tuple[str, s attributes_list.append(("Connection Topic", node.connection_topic)) attributes_list.append(("Connection Keyword", node.connection_keyword)) attributes_list.append(("Value Type", node.value_type)) + if node.feature_dict is not None: + attributes_list.append( + ( + "Extracted Features", + ", ".join( + [ + f"{feature_tuple[0]}: {feature_tuple[1]}" + for feature_tuple in node.feature_dict.items() + ] + ), + ) + ) + if node.reduced_feature_list is not None: + attributes_list.append( + ( + "PCA-reduced Features", + ", ".join([str(feature) for feature in node.reduced_feature_list]), + ) + ) + elif isinstance(node, SupplementaryFileNodeFlat): attributes_list.append(("File Name", node.file_name)) attributes_list.append(("File Type", node.file_type)) @@ -96,6 +116,9 @@ def get_visualized_attributes_for_node_type(node: BaseNode) -> List[Tuple[str, s node.occurance_end_date_time.strftime(STRF_DATETIME_FORMAT), ) ) + attributes_list.append( + ("Active Scanning for New Occurances", str(node.activate_occurance_scan)) + ) elif isinstance(node, AnnotationPreIndicatorNodeFlat): attributes_list.append( ( @@ -144,7 +167,9 @@ def get_visualized_attributes_for_node_type(node: BaseNode) -> List[Tuple[str, s ) ) elif isinstance(node, AnnotationTimeseriesMatcherNodeFlat): - pass + attributes_list.append( + ("Precision used for Finding new Occurances", str(node.detection_precision)) + ) return attributes_list diff --git a/frontend/right_sidebar/right_sidebar_callbacks.py b/frontend/right_sidebar/right_sidebar_callbacks.py index b657251..764a705 100644 --- a/frontend/right_sidebar/right_sidebar_callbacks.py +++ b/frontend/right_sidebar/right_sidebar_callbacks.py @@ -63,10 +63,17 @@ def change_navigation_tab_content(tab, selected_el): Input("tabs-infos", "active_tab"), Input("annotation-creation-store-step", "data"), State("last-manually-selected-tab", "data"), + Input("annotation-instance-scan-toggled", "modified_timestamp"), + Input("annotation-matcher-settings-changed", "modified_timestamp"), prevent_initial_update=False, ) def add_remove_navigation_tabs( - selected_el_json, active_tab, annotation_creation_step, last_manually_opened_tab + selected_el_json, + active_tab, + annotation_creation_step, + last_manually_opened_tab, + scanning_toggled, + matcher_settings_changed, ): if ctx.triggered_id == "tabs-infos": last_manually_opened_tab = active_tab diff --git a/frontend/right_sidebar/right_sidebar_layout.py b/frontend/right_sidebar/right_sidebar_layout.py index 44a3e58..31352aa 100644 --- a/frontend/right_sidebar/right_sidebar_layout.py +++ b/frontend/right_sidebar/right_sidebar_layout.py @@ -19,7 +19,7 @@ def get_layout(): children=[ dbc.CardHeader( id="right-sidebar-card-header", - children=[html.Div("Selected element")], + children=[html.Div("Selected Element")], ), dbc.CardBody( id="right-sidebar-card-body", diff --git a/graph_domain/expert_annotations/AnnotationInstanceNode.py b/graph_domain/expert_annotations/AnnotationInstanceNode.py index 4e25949..a0b731a 100644 --- a/graph_domain/expert_annotations/AnnotationInstanceNode.py +++ b/graph_domain/expert_annotations/AnnotationInstanceNode.py @@ -44,6 +44,8 @@ class AnnotationInstanceNodeFlat(BaseNode): __primarylabel__ = LABEL # Additional properties: + activate_occurance_scan: bool = Property(default=True) + _creation_date_time: str | datetime = Property(key="creation_date_time") @property diff --git a/graph_domain/expert_annotations/AnnotationTimeseriesMatcherNode.py b/graph_domain/expert_annotations/AnnotationTimeseriesMatcherNode.py index ab94474..26e7161 100644 --- a/graph_domain/expert_annotations/AnnotationTimeseriesMatcherNode.py +++ b/graph_domain/expert_annotations/AnnotationTimeseriesMatcherNode.py @@ -32,6 +32,8 @@ class AnnotationTimeseriesMatcherNodeFlat(BaseNode): # Identifier for the node-type: __primarylabel__ = LABEL + detection_precision: float = Property(default=0.5) + def validate_metamodel_conformance(self): """ Used to validate if the current node (self) and its child elements is conformant to the defined metamodel. diff --git a/graph_domain/factory_graph_types.py b/graph_domain/factory_graph_types.py index 14167c2..a870dd9 100644 --- a/graph_domain/factory_graph_types.py +++ b/graph_domain/factory_graph_types.py @@ -49,67 +49,19 @@ class RelationshipTypes(Enum): class PseudoNodeTypes(Enum): - ASSET_SIMILARITY_PSEUDO_NODE = "ASSET_SIMILARITY_PSEUDO_NODE" + """Pseudo node types are relationship-types mapped to a virtual node type in order to be able to select them in graph visibility settings + Args: + Enum (_type_): _description_ + """ + + ASSET_SIMILARITY_PSEUDO_NODE = RelationshipTypes.ASSET_SIMILARITY.value + + +PSEUDO_NODE_TYPE_STRINGS = [nd_type.value for nd_type in PseudoNodeTypes] +NODE_TYPE_STRINGS = [nd_type.value for nd_type in NodeTypes] + PSEUDO_NODE_TYPE_STRINGS -NODE_TYPE_STRINGS = [nd_type.value for nd_type in NodeTypes] + [ - nd_type.value for nd_type in PseudoNodeTypes -] RELATIONSHIP_TYPE_STRINGS = [rl_type.value for rl_type in RelationshipTypes] ELEMENT_TYPE_STRINGS = list(chain(NODE_TYPE_STRINGS, RELATIONSHIP_TYPE_STRINGS)) UNSPECIFIED_LABEL = "UNSPECIFIED" - -RELATIONSHIP_TYPES_FOR_NODE_TYPE = { - NodeTypes.ASSET.value: [ - RelationshipTypes.HAS_TIMESERIES.value, - RelationshipTypes.HAS_SUPPLEMENTARY_FILE.value, - RelationshipTypes.OCCURANCE_SCAN.value, - RelationshipTypes.ANNOTATION.value, - RelationshipTypes.ASSET_SIMILARITY.value, - ], - NodeTypes.TIMESERIES_INPUT.value: [ - RelationshipTypes.HAS_TIMESERIES.value, - RelationshipTypes.HAS_UNIT.value, - RelationshipTypes.RUNTIME_ACCESS.value, - RelationshipTypes.TIMESERIES_DB_ACCESS.value, - RelationshipTypes.PART_OF_TS_CLUSTER.value, - RelationshipTypes.ORIGINAL_ANNOTATED.value, - RelationshipTypes.TS_MATCH.value, - ], - NodeTypes.SUPPLEMENTARY_FILE.value: [ - RelationshipTypes.HAS_SUPPLEMENTARY_FILE.value, - RelationshipTypes.FILE_DB_ACCESS.value, - RelationshipTypes.SECONDARY_FORMAT.value, - RelationshipTypes.KEYWORD_EXTRACTION.value, - ], - NodeTypes.DATABASE_CONNECTION.value: [ - RelationshipTypes.TIMESERIES_DB_ACCESS.value, - RelationshipTypes.FILE_DB_ACCESS.value, - ], - NodeTypes.UNIT.value: [RelationshipTypes.HAS_UNIT.value], - NodeTypes.RUNTIME_CONNECTION.value: [RelationshipTypes.RUNTIME_ACCESS.value], - NodeTypes.TIMESERIES_CLUSTER.value: [RelationshipTypes.PART_OF_TS_CLUSTER.value], - NodeTypes.EXTRACTED_KEYWORD.value: [RelationshipTypes.KEYWORD_EXTRACTION.value], - PseudoNodeTypes.ASSET_SIMILARITY_PSEUDO_NODE.value: [ - RelationshipTypes.ASSET_SIMILARITY.value - ], - NodeTypes.ANNOTATION_DEFINITION.value: [ - RelationshipTypes.INSTANCE_OF.value, - RelationshipTypes.OCCURANCE_SCAN.value, - ], - NodeTypes.ANNOTATION_INSTANCE.value: [ - RelationshipTypes.INSTANCE_OF.value, - RelationshipTypes.ANNOTATION.value, - RelationshipTypes.PRE_INDICATABLE_WITH.value, - RelationshipTypes.DETECTABLE_WITH.value, - ], - NodeTypes.ANNOTATION_PRE_INDICATOR.value: [ - RelationshipTypes.PRE_INDICATABLE_WITH.value, - RelationshipTypes.DETECTABLE_WITH.value, - ], - NodeTypes.ANNOTATION_TS_MATCHER.value: [ - RelationshipTypes.ORIGINAL_ANNOTATED.value, - RelationshipTypes.TS_MATCH.value, - ], -} diff --git a/learning_factory_continuous_ordering.py b/learning_factory_continuous_ordering.py index 56a8cbd..f107015 100644 --- a/learning_factory_continuous_ordering.py +++ b/learning_factory_continuous_ordering.py @@ -103,39 +103,46 @@ def datachange_notification(self, node: asyncua.Node, val, data): sleep(10) while True: - # random_type = random.choice(PIECE_TYPES) - - # Acknowledge errors, if some occured. This sometimes happens (reason and further error information still to be discovered) - dv = asyncua.ua.DataValue(asyncua.ua.Variant(True, asyncua.ua.VariantType.Boolean)) - # dv.ServerTimestamp = None - # dv.SourceTimestamp = None - acknowledge_error_node.set_value(dv) - - # Get available pieces: - first_available_type = None try: - if last_stock_status is not None: - stock_dict = json.loads(last_stock_status) - for storage in stock_dict["stockItems"]: - if storage["workpiece"]["type"] != "": - first_available_type = storage["workpiece"]["type"] - break - except KeyError: - logger.info("KeyError in MQTT stock message!") - - formatted_time = datetime.now().isoformat()[:-3] + "Z" - if first_available_type is not None: - logger.info(f"Ordering piece ({first_available_type}, {formatted_time})") - - success_info = mqtt_client.publish( - topic="f/o/order", - payload='{"type":"' - + first_available_type - + '","ts":"' - + formatted_time - + '"}', + # random_type = random.choice(PIECE_TYPES) + + # Acknowledge errors, if some occured. This sometimes happens (reason and further error information still to be discovered) + dv = asyncua.ua.DataValue( + asyncua.ua.Variant(True, asyncua.ua.VariantType.Boolean) ) - else: - logger.info(f"No piece available! ({formatted_time})") + # dv.ServerTimestamp = None + # dv.SourceTimestamp = None + acknowledge_error_node.set_value(dv) + + # Get available pieces: + first_available_type = None + try: + if last_stock_status is not None: + stock_dict = json.loads(last_stock_status) + for storage in stock_dict["stockItems"]: + if storage["workpiece"]["type"] != "": + first_available_type = storage["workpiece"]["type"] + break + except KeyError: + logger.info("KeyError in MQTT stock message!") + + formatted_time = datetime.now().isoformat()[:-3] + "Z" + if first_available_type is not None: + logger.info(f"Ordering piece ({first_available_type}, {formatted_time})") + + success_info = mqtt_client.publish( + topic="f/o/order", + payload='{"type":"' + + first_available_type + + '","ts":"' + + formatted_time + + '"}', + ) + else: + logger.info(f"No piece available! ({formatted_time})") + + except Exception as exc: + print(f"Exception occured. Trying to re-establish soon. {exc}") + pass sleep(200) diff --git a/learning_factory_instance/learning_factory.cypher b/learning_factory_instance/learning_factory.cypher index 75566d9..1a209ad 100644 --- a/learning_factory_instance/learning_factory.cypher +++ b/learning_factory_instance/learning_factory.cypher @@ -1,5 +1,5 @@ CREATE (`HBW Actual Pos. Horizontal`:TIMESERIES {id_short: "hbw_actual_pos_horizontal", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/hbw_actual_pos_horizontal", description: "Measures the current actual horizontal position of the High Bay Warehouse", connection_topic: "ns=3;s=\"gtyp_HBW\".\"Horizontal_Axis\".\"di_Actual_Position\"", connection_keyword: "", caption: "HBW Actual Pos. Horizontal", value_type: "INTEGER"})<-[:TS_MATCH]-(`hbw_crane_stuck_01_actual_pos_horizontal_matcher`:ANNOTATION_TS_MATCHER {id_short: "hbw_crane_stuck_01_actual_pos_horizontal_matcher", iri: "www.sintef.no/aas_identifiers/learning_factory/annotations/matchers/identifiers/units/hbw_crane_stuck_01_actual_pos_horizontal_matcher", description: ""})-[:ORIGINAL_ANNOTATION]->(`HBW Actual Pos. Horizontal`)-[:HAS_UNIT]->(mm:UNIT {id_short: "mm", iri: "www.sintef.no/sindit/identifiers/units/mm", description: "Distance measurement in millimeters"})<-[:HAS_UNIT]-(`HBW Target Pos. Horizontal`:TIMESERIES {id_short: "hbw_target_pos_horizontal", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/hbw_target_pos_horizontal", description: "Provides the currently targeted horizontal position of the High Bay Warehouse", connection_topic: "ns=3;s=\"gtyp_HBW\".\"Horizontal_Axis\".\"di_Target_Position\"", connection_keyword: "", caption: "HBW Target Pos. Horizontal", value_type: "INTEGER"})<-[:HAS_TIMESERIES]-(HBW:ASSET {id_short: "hbw", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/hbw", description: "Automated high-bay warehouse 24V. Transfer station with conveyor belt, shelf stacker for storing and retrieving special workpiece carriers, 9 storage slots. ", similarity_vec: "[234,232,12,6,234,234]", caption: "HBW"})-[:HAS_TIMESERIES]->(`HBW Actual Pos. Horizontal`)-[:RUNTIME_ACCESS]->(learning_factory_opc_ua:RUNTIME_CONNECTION {id_short: "learning_factory_opc_ua", iri: "www.sintef.no/aas_identifiers/learning_factory/connections/opc_ua", description: "Provides access to timeseries data via OPC UA", type: "OPC_UA", host_environment_variable: "FACTORY_OPC_UA_HOST", port_environment_variable: "FACTORY_OPC_UA_PORT"})<-[:RUNTIME_ACCESS]-(`HBW Target Pos. Horizontal`)-[:TIMESERIES_DB_ACCESS]->(learning_factory_influx_db:DATABASE_CONNECTION {id_short: "learning_factory_influx_db", iri: "www.sintef.no/aas_identifiers/learning_factory/databases/learning_factory_influx_db", description: "Provides access to timeseries data via InfluxDB", type: "INFLUX_DB", host_environment_variable: "INFLUX_DB_HOST", port_environment_variable: "INFLUX_DB_PORT", database: "sindit", group: "sindit", key_environment_variable: "INFLUX_DB_TOKEN", caption: "Learning Factory InfluxDB Connection"})<-[:TIMESERIES_DB_ACCESS]-(`Factory Temperature`:TIMESERIES {id_short: "factory_temperature", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_temperature", description: "temperature compensated", connection_topic: "i/bme680", connection_keyword: "t", caption: "Factory Temperature", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(VGR:ASSET {id_short: "vgr", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/vgr", description: "Vacuum Suction Gripper 24V", similarity_vec: "[232,232,21,9,230,189]", visualization_positioning_x: 100, visualization_positioning_y: 100, caption: "VGR"})<-[:PRODUCT_FLOW]-(HBW)-[:SIMILARITY_MEASURE {similarity_distance: 2}]->(VGR)-[:HAS_SUPPLEMENTARY_FILE]->(:SUPPLEMENTARY_FILE {id_short: "vgr_step_cad", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/vgr_step_cad", description: "CAD model", type: "CAD_STEP", file_name: "vgr.step", caption: "VGR CAD (STEP)"})-[:FILE_DB_ACCESS]->(`learning_factory_minio_s3`:DATABASE_CONNECTION {id_short: "learning_factory_minio_s3", iri: "www.sintef.no/aas_identifiers/learning_factory/databases/learning_factory_minio_s3", description: "Provides access to binary files stored via the Amazon S3 API", type: "S3", host_environment_variable: "MINIO_S3_HOST", port_environment_variable: "MINIO_S3_PORT", database: "sindit", group: "sindit", key_environment_variable: "MINIO_S3_PASSWORD", user_environment_variable: "MINIO_S3_USER", caption: "Learning Factory Minio S3 Connection"})<-[:FILE_DB_ACCESS]-(`HBW Model Image JPG`:SUPPLEMENTARY_FILE {id_short: "hbw_model_image_jpg", iri: "www.sintef.no/aas_identifiers/learning_factory/files/hbw_model_image_jpg", description: "Image of the CAD model", type: "IMAGE_JPG", file_name: "hbw_model.jpg", caption: "HBW Model Image JPG"}), -(`HBW Actual Pos. Horizontal`)-[:TIMESERIES_DB_ACCESS]->(learning_factory_influx_db)<-[:TIMESERIES_DB_ACCESS]-(`Factory Temperature Raw`:TIMESERIES {id_short: "factory_temperature", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_temperature_raw", description: "temperature raw", connection_topic: "i/bme680", connection_keyword: "rt", caption: "Factory Temperature Raw", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(VGR)-[:HAS_TIMESERIES]->(`Factory Humidity`:TIMESERIES {id_short: "factory_humidity", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_humidity", description: "relative humidity compensated", connection_topic: "i/bme680", connection_keyword: "h", caption: "Factory Humidity", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(MPO:ASSET {id_short: "mpo", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/mpo", description: "Multi-processing station with kiln 24V. Furnace with pneumatic sliding door. Downstream processing station with pneumatic transfer unit including vacuum gripper, cutter with rotary table and conveyor belt.", similarity_vec: "[234,232,12,6,234,234]", caption: "MPO"})-[:HAS_TIMESERIES]->(`Factory Humidity Raw`:TIMESERIES {id_short: "factory_humidity_raw", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_humidity_raw", description: "relative humidity raw", connection_topic: "i/bme680", connection_keyword: "rh", caption: "Factory Humidity Raw", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(SLD:ASSET {id_short: "sld", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/sld", description: "Sorting line with colour recognition 24V. Detects workpieces of different colors and sorts them via a conveyor belt into the provided storage unit. ", similarity_vec: "[234,232,12,6,234,234]", caption: "SLD"})-[:HAS_TIMESERIES]->(`Factory Air Pressure`:TIMESERIES {id_short: "factory_air_pressure", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_air_pressure", description: "air pressure", connection_topic: "i/bme680", connection_keyword: "p", caption: "Factory Air Pressure", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(SSC:ASSET {id_short: "ssc", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/ssc", description: "Environmental station with surveillance camera 24V", similarity_vec: "[234,232,12,6,234,234]", caption: "SSC"})-[:HAS_TIMESERIES]->(`Factory Gas Resistance`:TIMESERIES {id_short: "factory_gas_resistance", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_gas_resistance", description: "gas resistance", connection_topic: "i/bme680", connection_keyword: "gr", caption: "Factory Gas Resistance", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(DPS:ASSET {id_short: "dps", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/dps", description: "Input/output station with colour detection and NFC Reader 24V", similarity_vec: "[234,232,12,6,234,234]", caption: "DPS"})-[:HAS_TIMESERIES]->(`Factory Index Air Quality`:TIMESERIES {id_short: "factory_index_air_quality", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_index_air_quality", description: "index air quality 0-500 (0...50:Good, 51...100:Moderate, 101...150:Unhealthy for Sensitive Groups, 151...200:Unhealthy, 201...300:Very Unhealthy, 301...500:Hazardous)", connection_topic: "i/bme680", connection_keyword: "iaq", caption: "Factory Index Air Quality", value_type: "INTEGER"}), +(`HBW Actual Pos. Horizontal`)-[:TIMESERIES_DB_ACCESS]->(learning_factory_influx_db)<-[:TIMESERIES_DB_ACCESS]-(`Factory Temperature Raw`:TIMESERIES {id_short: "factory_temperature_raw", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_temperature_raw", description: "temperature raw", connection_topic: "i/bme680", connection_keyword: "rt", caption: "Factory Temperature Raw", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(VGR)-[:HAS_TIMESERIES]->(`Factory Humidity`:TIMESERIES {id_short: "factory_humidity", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_humidity", description: "relative humidity compensated", connection_topic: "i/bme680", connection_keyword: "h", caption: "Factory Humidity", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(MPO:ASSET {id_short: "mpo", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/mpo", description: "Multi-processing station with kiln 24V. Furnace with pneumatic sliding door. Downstream processing station with pneumatic transfer unit including vacuum gripper, cutter with rotary table and conveyor belt.", similarity_vec: "[234,232,12,6,234,234]", caption: "MPO"})-[:HAS_TIMESERIES]->(`Factory Humidity Raw`:TIMESERIES {id_short: "factory_humidity_raw", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_humidity_raw", description: "relative humidity raw", connection_topic: "i/bme680", connection_keyword: "rh", caption: "Factory Humidity Raw", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(SLD:ASSET {id_short: "sld", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/sld", description: "Sorting line with colour recognition 24V. Detects workpieces of different colors and sorts them via a conveyor belt into the provided storage unit. ", similarity_vec: "[234,232,12,6,234,234]", caption: "SLD"})-[:HAS_TIMESERIES]->(`Factory Air Pressure`:TIMESERIES {id_short: "factory_air_pressure", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_air_pressure", description: "air pressure", connection_topic: "i/bme680", connection_keyword: "p", caption: "Factory Air Pressure", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(SSC:ASSET {id_short: "ssc", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/ssc", description: "Environmental station with surveillance camera 24V", similarity_vec: "[234,232,12,6,234,234]", caption: "SSC"})-[:HAS_TIMESERIES]->(`Factory Gas Resistance`:TIMESERIES {id_short: "factory_gas_resistance", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_gas_resistance", description: "gas resistance", connection_topic: "i/bme680", connection_keyword: "gr", caption: "Factory Gas Resistance", value_type: "DECIMAL"})<-[:HAS_TIMESERIES]-(DPS:ASSET {id_short: "dps", iri: "www.sintef.no/aas_identifiers/learning_factory/machines/dps", description: "Input/output station with colour detection and NFC Reader 24V", similarity_vec: "[234,232,12,6,234,234]", caption: "DPS"})-[:HAS_TIMESERIES]->(`Factory Index Air Quality`:TIMESERIES {id_short: "factory_index_air_quality", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/factory_index_air_quality", description: "index air quality 0-500 (0...50:Good, 51...100:Moderate, 101...150:Unhealthy for Sensitive Groups, 151...200:Unhealthy, 201...300:Very Unhealthy, 301...500:Hazardous)", connection_topic: "i/bme680", connection_keyword: "iaq", caption: "Factory Index Air Quality", value_type: "INTEGER"}), (hbw_step_cad_stl_conversion:SUPPLEMENTARY_FILE {id_short: "hbw_step_cad_stl_conversion", iri: "www.sintef.no/aas_identifiers/learning_factory/files/hbw_step_cad_stl_conversion", description: "CAD model (Converted automatically to STL for visualization on 2022-07-25T12:10:20.835239. This conversion is not loss-free!)", type: "CAD_STL", file_name: "hbw.step.stl"})<-[:SECONDARY_FORMAT]-(hbw_step_cad:SUPPLEMENTARY_FILE {id_short: "hbw_step_cad", iri: "www.sintef.no/aas_identifiers/learning_factory/files/hbw_step_cad", description: "CAD model", type: "CAD_STEP", file_name: "hbw.step", caption: "HBW CAD (STEP)"})-[:FILE_DB_ACCESS]->(`learning_factory_minio_s3`)<-[:FILE_DB_ACCESS]-(:SUPPLEMENTARY_FILE {id_short: "hbw_user_manual_pdf", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/hbw_user_manual_pdf", description: "User manual", type: "DOCUMENT_PDF", file_name: "hbw.pdf", caption: "HBW user manual"})<-[:HAS_SUPPLEMENTARY_FILE]-(HBW)-[:HAS_TIMESERIES]->(`Factory Temperature`)-[:RUNTIME_ACCESS]->(learning_factory_mqtt:RUNTIME_CONNECTION {id_short: "learning_factory_mqtt", iri: "www.sintef.no/aas_identifiers/learning_factory/connections/mqtt", description: "Provides access to timeseries data via MQTT", type: "MQTT", host_environment_variable: "FACTORY_MQTT_HOST", port_environment_variable: "FACTORY_MQTT_PORT"})<-[:RUNTIME_ACCESS]-(`Factory Temperature Raw`)-[:HAS_UNIT]->(deg_celsius:UNIT {id_short: "deg_celsius", iri: "www.sintef.no/sindit/identifiers/units/deg_celsius", description: "Temperature measurement in degrees celsius"}), (`VGR Actual Pos. Horizontal`:TIMESERIES {id_short: "vgr_actual_pos_horizontal", iri: "www.sintef.no/aas_identifiers/learning_factory/sensors/vgr_actual_pos_horizontal", description: "", connection_topic: "ns=3;s=\"gtyp_VGR\".\"horizontal_Axis\".\"di_Actual_Position\"", connection_keyword: "", caption: "VGR Actual Pos. Horizontal", value_type: "INTEGER"})<-[:TS_MATCH]-(`hbw_crane_stuck_01_actual_pos_horizontal_matcher`)<-[:DETECTABLE_WITH]-(:ANNOTATION_PRE_INDICATOR {id_short: "hbw_crane_stuck_01_pre_01", iri: "www.sintef.no/aas_identifiers/learning_factory/annotations/pre_indicators/hbw_crane_stuck_01_pre_01", description: "High Bay Warehouse crane did get stuck in horizontal layer", creation_date_time: "\"2022-06-29T10:49:37.122000000\"", indicator_start_date_time: "\"2022-06-29T10:49:24.122000000\"", indicator_end_date_time: "\"2022-06-29T10:49:27.122000000\"", remove: ""})<-[:PRE_INDICATABLE_WITH]-(`hbw_crane_stuck_01`:ANNOTATION_INSTANCE {id_short: "hbw_crane_stuck_01", iri: "www.sintef.no/aas_identifiers/learning_factory/annotations/instances/hbw_crane_stuck_01", description: "High Bay Warehouse crane did get stuck in horizontal layer", creation_date_time: "\"2022-06-29T10:49:37.122000000\"", occurance_start_date_time: "\"2022-06-29T10:49:31.122000000\"", occurance_end_date_time: "\"2022-06-29T10:49:34.122000000\"", remove: ""})<-[:ANNOTATION]-(HBW)-[:HAS_SUPPLEMENTARY_FILE]->(hbw_step_cad), (`hbw_crane_stuck_01_actual_pos_horizontal_matcher`)<-[:DETECTABLE_WITH]-(`hbw_crane_stuck_01`)-[:INSTANCE_OF]->(crane_stuck:ANNOTATION_DEFINITION {id_short: "crane_stuck", iri: "www.sintef.no/aas_identifiers/learning_factory/annotations/definitions/crane_stuck", description: "A movable crane got physically stuck and could not reach its target position", solution_proposal: "Recalibrate the crane"})<-[:OCCURANCE_SCAN]-(HBW), diff --git a/requirements.txt b/requirements.txt index 0da31eb..bb16f98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,7 +19,7 @@ python-dotenv~=0.20.0 uvicorn~=0.17.6 gunicorn pyi40aas - +pymemcache # inter-process communication influxdb-client # InfluxDB v2 asyncua~=0.9.94 paho-mqtt~=1.6.1 diff --git a/similarity_pipeline.py b/similarity_pipeline.py index 076f405..c5f9366 100644 --- a/similarity_pipeline.py +++ b/similarity_pipeline.py @@ -1,5 +1,5 @@ -# import similarity_pipeline_1_ts_feature_extraction -# import similarity_pipeline_2_ts_dimensionality_reduction +import similarity_pipeline_1_ts_feature_extraction +import similarity_pipeline_2_ts_dimensionality_reduction import similarity_pipeline_3_ts_clustering import similarity_pipeline_4_pdf_keyword_extraction import similarity_pipeline_5_cad_analysis diff --git a/similarity_pipeline_1_ts_feature_extraction.py b/similarity_pipeline_1_ts_feature_extraction.py index ce93816..f1bd61f 100644 --- a/similarity_pipeline_1_ts_feature_extraction.py +++ b/similarity_pipeline_1_ts_feature_extraction.py @@ -25,7 +25,8 @@ comparison_end_date_time = datetime( year=2022, month=7, day=29, hour=11, minute=0, second=0 ) -comparison_duration = timedelta(hours=20).total_seconds() # ~ 30 sec per timeseries +comparison_duration = timedelta(hours=2).total_seconds() +# comparison_duration = timedelta(hours=20).total_seconds() # ~ 30 sec per timeseries # comparison_duration = timedelta(hours=12).total_seconds() # ~ 10 sec per timeseries @@ -75,6 +76,12 @@ ) logger.info(f"Total entry count: {ts_entry_count}") + if ts_entry_count > 10000: + logger.warning( + f"Skipped extracting features for {timeseries_node.caption} because it has over 10000 entries." + ) + continue + # Cancel, if not int or float if timeseries_node.value_type in [ TimeseriesValueTypes.DECIMAL.value, diff --git a/util/inter_process_cache.py b/util/inter_process_cache.py new file mode 100644 index 0000000..de37d14 --- /dev/null +++ b/util/inter_process_cache.py @@ -0,0 +1,12 @@ +from pymemcache.client import base +import subprocess +from util.log import logger + +memcache = base.Client(("localhost", 11211)) + +try: + memcache.set("test-key", "test-value") +except ConnectionRefusedError: + # Run memcache + logger.info("Starting memcached inter-process cache on port 11211...") + subprocess.Popen(["memcached", "-u", "root"])