From 1b2654ff6aecb186eb5fd689dc5373fb92fb3c5f Mon Sep 17 00:00:00 2001 From: "Christian Tremblay, ing" Date: Thu, 24 Sep 2020 23:42:35 -0400 Subject: [PATCH] This fix some issues with trendlogs I had... including sorting by index when calling history. Also, trendloags can be added to Bokeh... but considered analog... to be continued --- BAC0/core/devices/Device.py | 6 +- BAC0/core/devices/Trends.py | 90 +++++++++++++++++++++------- BAC0/core/io/Read.py | 69 +++++++++++++++++---- BAC0/core/proprietary_objects/jci.py | 1 + BAC0/tasks/TaskManager.py | 6 +- BAC0/web/BokehRenderer.py | 4 +- 6 files changed, 138 insertions(+), 38 deletions(-) diff --git a/BAC0/core/devices/Device.py b/BAC0/core/devices/Device.py index ad55b83a..6ec25db0 100755 --- a/BAC0/core/devices/Device.py +++ b/BAC0/core/devices/Device.py @@ -674,10 +674,14 @@ def _trendlogs(self): yield trendlog @property - def trendlogs(self): + def trendlogs_names(self): for each in self._trendlogs(): yield each.properties.object_name + @property + def trendlogs(self): + return list(self._trendlogs()) + def _findTrend(self, name): for trend in self._trendlogs(): if trend.properties.object_name == name: diff --git a/BAC0/core/devices/Trends.py b/BAC0/core/devices/Trends.py index 71e2883b..3997251b 100755 --- a/BAC0/core/devices/Trends.py +++ b/BAC0/core/devices/Trends.py @@ -12,6 +12,8 @@ from datetime import datetime from collections import namedtuple import time +from itertools import islice + # --- 3rd party modules --- try: @@ -27,6 +29,7 @@ _PANDAS = False from bacpypes.object import TrendLogObject +from bacpypes.primitivedata import Date, Time # --- this application's modules --- from ...tasks.Poll import SimplePoll as Poll @@ -52,6 +55,7 @@ def __init__(self): self.buffer_size = 0 self.record_count = 0 self.total_record_count = 0 + self.log_interval = 0 self.description = None self.statusFlags = None self.status_flags = { @@ -60,14 +64,20 @@ def __init__(self): "overridden": False, "out_of_service": False, } - + self._history_components = {"index": [], "logdatum": [], "status": []} self._df = None + self.type = "TrendLog" + self.units_state = "None" def __repr__(self): return "{} | Descr : {} | Record count : {}".format( self.object_name, self.description, self.record_count ) + @property + def name(self): + return self.object_name + @note_and_log class TrendLog(TrendLogProperties): @@ -81,39 +91,64 @@ def __init__( self.properties = TrendLogProperties() self.properties.device = device self.properties.oid = OID + self.update_properties() + + if read_log_on_creation: + self.read_log_buffer() + self._last_index = 0 + + def update_properties(self): try: - self.properties.object_name, self.properties.description, self.properties.record_count, self.properties.buffer_size, self.properties.total_record_count, self.properties.log_device_object_property, self.properties.statusFlags = self.properties.device.properties.network.readMultiple( - "{addr} trendLog {oid} objectName description recordCount bufferSize totalRecordCount logDeviceObjectProperty statusFlags".format( + self.properties.object_name, self.properties.description, self.properties.record_count, self.properties.buffer_size, self.properties.total_record_count, self.properties.log_device_object_property, self.properties.statusFlags, self.properties.log_interval = self.properties.device.properties.network.readMultiple( + "{addr} trendLog {oid} objectName description recordCount bufferSize totalRecordCount logDeviceObjectProperty statusFlags logInterval".format( addr=self.properties.device.properties.address, oid=str(self.properties.oid), ) ) - - if read_log_on_creation: - self.read_log_buffer() except Exception as error: raise Exception("Problem reading trendLog informations: {}".format(error)) + def _total_record_count(self): + self.properties.total_record_count = self.properties.device.properties.network.read( + "{addr} trendLog {oid} totalRecordCount".format( + addr=self.properties.device.properties.address, + oid=str(self.properties.oid), + ) + ) + return self.properties.total_record_count + def read_log_buffer(self): - try: - _log_buffer = self.properties.device.properties.network.readRange( + RECORDS = 10 + log_buffer = set() + _actual_index = self._total_record_count() + start = max(_actual_index - self.properties.record_count, self._last_index) + 1 + _count = _actual_index - start + steps = int(_count / 10) + int(_count % 10) + + self._log.debug("Reading log : {} {} {}".format(start, _count, steps)) + + _from = start + for each in range(steps): + range_params = ("s", _from, Date("1979-01-01"), Time("00:00"), RECORDS) + _chunk = self.properties.device.properties.network.readRange( "{} trendLog {} logBuffer".format( self.properties.device.properties.address, str(self.properties.oid) - ) + ), + range_params=range_params, ) - self.create_dataframe(_log_buffer) - except Exception as error: - raise Exception("Problem reading buffer: {}".format(error)) + _from += len(_chunk) + for chunk in _chunk: + log_buffer.add(chunk) + + self._last_index = _from + self.create_dataframe(log_buffer) def create_dataframe(self, log_buffer): - index = [] - logdatum = [] - status = [] for each in log_buffer: year, month, day, dow = each.timestamp.date year = year + 1900 hours, minutes, seconds, ms = each.timestamp.time - index.append( + self.properties._history_components["index"].append( pd.to_datetime( "{}-{}-{} {}:{}:{}.{}".format( year, month, day, hours, minutes, seconds, ms @@ -121,11 +156,19 @@ def create_dataframe(self, log_buffer): format="%Y-%m-%d %H:%M:%S.%f", ) ) - logdatum.append(each.logDatum.dict_contents()) - status.append(each.statusFlags) + self.properties._history_components["logdatum"].append( + each.logDatum.dict_contents() + ) + self.properties._history_components["status"].append(each.statusFlags) if _PANDAS: - df = pd.DataFrame({"index": index, "logdatum": logdatum, "status": status}) + df = pd.DataFrame( + { + "index": self.properties._history_components["index"], + "logdatum": self.properties._history_components["logdatum"], + "status": self.properties._history_components["status"], + } + ) df = df.set_index("index") df["choice"] = df["logdatum"].apply(lambda x: list(x.keys())[0]) df[self.properties.object_name] = df["logdatum"].apply( @@ -134,18 +177,19 @@ def create_dataframe(self, log_buffer): self.properties._df = df else: - self.properties._history_components = (index, logdatum, status) + # self.properties._history_components = (self.index, self.logdatum, self.status) self._log.warning( "Pandas not installed. Treating histories as simple list." ) @property def history(self): + self.read_log_buffer() if not _PANDAS: return dict( zip( - self.properties._history_components[0], - self.properties._history_components[1], + self.properties._history_components["index"], + self.properties._history_components["logdatum"], ) ) objectType, objectAddress = ( @@ -171,7 +215,7 @@ def history(self): serie.states = "analog" serie.description = self.properties.description serie.datatype = objectType - return serie + return serie.sort_index() def chart(self, remove=False): """ diff --git a/BAC0/core/io/Read.py b/BAC0/core/io/Read.py index 58b4b940..f5b01f6a 100644 --- a/BAC0/core/io/Read.py +++ b/BAC0/core/io/Read.py @@ -38,14 +38,18 @@ def readMultiple() AbortPDU, ) -from bacpypes.basetypes import PropertyIdentifier +from bacpypes.basetypes import PropertyIdentifier, DateTime from bacpypes.apdu import ( ReadPropertyMultipleACK, ReadPropertyACK, ReadRangeRequest, ReadRangeACK, + Range, + RangeByPosition, + RangeBySequenceNumber, + RangeByTime, ) -from bacpypes.primitivedata import Tag, ObjectIdentifier, Unsigned +from bacpypes.primitivedata import Tag, ObjectIdentifier, Unsigned, Date, Time from bacpypes.constructeddata import Array from bacpypes.iocb import IOCB, TimeoutError from bacpypes.core import deferred @@ -506,8 +510,11 @@ def build_rpm_request(self, args, vendor_id=0): request.pduDestination = Address(addr) return request - def build_rrange_request(self, args, arr_index=None, vendor_id=0, bacoid=None): + def build_rrange_request( + self, args, range_params=None, arr_index=None, vendor_id=0, bacoid=None + ): addr, obj_type, obj_inst, prop_id = args[:4] + vendor_id = vendor_id bacoid = bacoid @@ -529,13 +536,44 @@ def build_rrange_request(self, args, arr_index=None, vendor_id=0, bacoid=None): objectIdentifier=(obj_type, obj_inst), propertyIdentifier=prop_id ) request.pduDestination = Address(addr) + if range_params is not None: + range_type, first, date, time, count = range_params + if range_type == "p": + rbp = RangeByPosition(referenceIndex=int(first), count=int(count)) + request.range = Range(byPosition=rbp) + elif range_type == "s": + rbs = RangeBySequenceNumber( + referenceSequenceNumber=int(first), count=int(count) + ) + request.range = Range(bySequenceNumber=rbs) + elif range_type == "t": + rbt = RangeByTime( + referenceTime=DateTime( + date=Date(date).value, time=Time(time).value + ), + count=int(count), + ) + request.range = Range(byTime=rbt) + elif range_type == "x": + # should be missing required parameter + request.range = Range() + else: + raise ValueError("unknown range type: %r" % (range_type,)) if len(args) == 5: request.propertyArrayIndex = int(args[4]) self._log.debug("{:<20} {!r}".format("REQUEST", request)) return request - def readRange(self, args, arr_index=None, vendor_id=0, bacoid=None, timeout=10): + def readRange( + self, + args, + range_params=None, + arr_index=None, + vendor_id=0, + bacoid=None, + timeout=10, + ): """ Build a ReadProperty request, wait for the answer and return the value @@ -557,18 +595,21 @@ def readRange(self, args, arr_index=None, vendor_id=0, bacoid=None, timeout=10): args_split = args.split() - self.log_title("Read range", args_split) + self.log_title("Read range ", args_split) vendor_id = vendor_id bacoid = bacoid try: # build ReadProperty request - iocb = IOCB( - self.build_rrange_request( - args_split, arr_index=arr_index, vendor_id=vendor_id, bacoid=bacoid - ) + request = self.build_rrange_request( + args_split, + range_params=range_params, + arr_index=arr_index, + vendor_id=vendor_id, + bacoid=bacoid, ) + iocb = IOCB(request) iocb.set_timeout(timeout) # pass to the BACnet stack deferred(self.this_application.request_io, iocb) @@ -602,7 +643,15 @@ def readRange(self, args, arr_index=None, vendor_id=0, bacoid=None, timeout=10): apdu.propertyIdentifier, ) - value = apdu.itemData[0].cast_out(datatype) + try: + value = apdu.itemData.cast_out(datatype) + except TypeError as error: + self._log.error( + "Problem casting value : {} | Datatype : {} | error : {}".format( + apdu.itemData, datatype, error + ) + ) + return apdu self._log.debug("{:<20} {:<20}".format("value", "datatype")) self._log.debug("{!r:<20} {!r:<20}".format(value, datatype)) diff --git a/BAC0/core/proprietary_objects/jci.py b/BAC0/core/proprietary_objects/jci.py index 1e773042..66e93780 100644 --- a/BAC0/core/proprietary_objects/jci.py +++ b/BAC0/core/proprietary_objects/jci.py @@ -170,4 +170,5 @@ def tec_short_point_list(): ("analogOutput", 86914), ("analogOutput", 86915), ("multiStateValue", 6), + ("trendLog", 101010), ] diff --git a/BAC0/tasks/TaskManager.py b/BAC0/tasks/TaskManager.py index e674046b..b30329ce 100644 --- a/BAC0/tasks/TaskManager.py +++ b/BAC0/tasks/TaskManager.py @@ -82,7 +82,7 @@ def stopAllTasks(cls): cls.enable = False while cls.manager.is_alive(): pass - #cls.enable = False + # cls.enable = False cls.clean_tasklist() return True @@ -97,8 +97,8 @@ def start_service(cls): def stop_service(cls): cls._log.info("Stopping TaskManager") cls.enable = False - #time.sleep(1) - #cls.manager.join() + # time.sleep(1) + # cls.manager.join() @classmethod def clean_tasklist(cls): diff --git a/BAC0/web/BokehRenderer.py b/BAC0/web/BokehRenderer.py index 4745b95c..ee2df7ea 100644 --- a/BAC0/web/BokehRenderer.py +++ b/BAC0/web/BokehRenderer.py @@ -42,6 +42,7 @@ from ..tasks.RecurringTask import RecurringTask from ..core.utils.notes import note_and_log from ..core.devices.Virtuals import VirtualPoint +from ..core.devices.Trends import TrendLog @note_and_log @@ -198,7 +199,7 @@ def update_glyphs(self): name = "{}/{}".format( point.properties.device.properties.name, point.properties.name ) - if "analog" in point.properties.type: + if "analog" in point.properties.type or "TrendLog" in point.properties.type: self.analog_queue.put( (name, point.properties.description, point.properties.units_state) ) @@ -676,6 +677,7 @@ def generate_selection(self): except KeyError: pass options = list(device.points_name) + options.extend(list(device.trendlogs_names)) mc = MultiChoice( value=_cache, options=options, title=device.properties.name )