diff --git a/BAC0/core/app/ScriptApplication.py b/BAC0/core/app/ScriptApplication.py index 83a19743..5bce2508 100644 --- a/BAC0/core/app/ScriptApplication.py +++ b/BAC0/core/app/ScriptApplication.py @@ -23,6 +23,7 @@ from bacpypes.app import ApplicationIOController from bacpypes.pdu import Address from bacpypes.service.object import ReadWritePropertyMultipleServices +from bacpypes.service.cov import ChangeOfValueServices from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement from bacpypes.bvllservice import ( BIPSimple, @@ -31,6 +32,8 @@ AnnexJCodec, UDPMultiplexer, ) +from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU, RejectPDU, AbortPDU + from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint from bacpypes.comm import ApplicationServiceElement, bind, Client from bacpypes.iocb import IOCB @@ -97,6 +100,33 @@ def do_WhoIsRequest(self, apdu): iocb = IOCB(self.iam_req) # make an IOCB deferred(self.request_io, iocb) + def do_ConfirmedCOVNotificationRequest(self, apdu): + # look up the process identifier + context = self.subscription_contexts.get(apdu.subscriberProcessIdentifier, None) + if not context or apdu.pduSource != context.address: + # this is turned into an ErrorPDU and sent back to the client + raise RuntimeError("services", "unknownSubscription") + + # now tell the context object + elements = context.cov_notification(apdu) + + # success + response = SimpleAckPDU(context=apdu) + + # return the result + self.response(response) + print(elements) + + def do_UnconfirmedCOVNotificationRequest(self, apdu): + # look up the process identifier + context = self.subscription_contexts.get(apdu.subscriberProcessIdentifier, None) + if not context or apdu.pduSource != context.address: + return + + # now tell the context object + elements = context.cov_notification(apdu) + print(elements) + @note_and_log class BAC0Application( @@ -106,6 +136,7 @@ class BAC0Application( WhoHasIHaveServices, ReadWritePropertyServices, ReadWritePropertyMultipleServices, + ChangeOfValueServices, ): """ Defines a basic BACnet/IP application to process BACnet requests. @@ -179,6 +210,9 @@ def __init__( self._last_i_am_received = [] self._last_i_have_received = [] + # to support CoV + self.subscription_contexts = {} + def close_socket(self): # pass to the multiplexer, then down to the sockets self.mux.close_socket() @@ -199,6 +233,7 @@ class BAC0ForeignDeviceApplication( WhoHasIHaveServices, ReadWritePropertyServices, ReadWritePropertyMultipleServices, + ChangeOfValueServices, ): """ Defines a basic BACnet/IP application to process BACnet requests. @@ -271,6 +306,9 @@ def __init__( self._last_i_am_received = [] self._last_i_have_received = [] + # to support CoV + self.subscription_contexts = {} + def close_socket(self): # pass to the multiplexer, then down to the sockets self.mux.close_socket() @@ -292,6 +330,7 @@ class BAC0BBMDDeviceApplication( WhoHasIHaveServices, ReadWritePropertyServices, ReadWritePropertyMultipleServices, + ChangeOfValueServices, ): """ Defines a basic BACnet/IP application to process BACnet requests. @@ -374,6 +413,9 @@ def __init__( self._last_i_am_received = [] self._last_i_have_received = [] + # to support CoV + self.subscription_contexts = {} + def add_peer(self, address): try: bdt_address = Address(address) diff --git a/BAC0/core/functions/cov.py b/BAC0/core/functions/cov.py new file mode 100644 index 00000000..ebb0c5c0 --- /dev/null +++ b/BAC0/core/functions/cov.py @@ -0,0 +1,131 @@ +from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU, RejectPDU, AbortPDU +from bacpypes.iocb import IOCB +from bacpypes.core import deferred +from bacpypes.pdu import Address +from bacpypes.object import get_object_class, get_datatype +from bacpypes.constructeddata import Array +from bacpypes.primitivedata import Tag, ObjectIdentifier, Unsigned + +from BAC0.core.io.Read import cast_datatype_from_tag + +""" + +using cov, we build a "context" which is turned into a subscription being sent to +the destination. + +Once the IOCB is over, the callback attached to it will execute (subscription_acknowledged) +and we'll get the answer + +""" + + +class SubscriptionContext: + next_proc_id = 1 + + def __init__(self, address, objectID, confirmed=None, lifetime=None): + self.address = address + self.subscriberProcessIdentifier = self.next_proc_id + self.next_proc_id += 1 + # subscription_contexts[self.subscriberProcessIdentifier] = self + self.monitoredObjectIdentifier = objectID + self.issueConfirmedNotifications = confirmed + self.lifetime = lifetime + + def cov_notification(self, apdu): + # make a rash assumption that the property value is going to be + # a single application encoded tag + source = apdu.pduSource + object_changed = apdu.monitoredObjectIdentifier + + elements = { + "source": source, + "object_changed": object_changed, + "properties": {}, + } + for element in apdu.listOfValues: + prop_id = element.propertyIdentifier + datatype = get_datatype(object_changed[0], prop_id) + value = element.value + + if not datatype: + # raise TypeError("unknown datatype") + value = cast_datatype_from_tag( + element.value, object_changed[0], prop_id + ) + else: + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and ( + element.propertyArrayIndex is not None + ): + if element.propertyArrayIndex == 0: + value = element.value.cast_out(Unsigned) + else: + value = element.value.cast_out(datatype.subtype) + else: + value = element.value.cast_out(datatype) + + elements["properties"][prop_id] = value + + return elements + + +class CoV: + """ + Mixin to support COV registration + """ + + def cov_notification(self, apdu): + print("do something here") + + def send_cov_subscription(self, request): + self._log.info("Request : {}".format(request)) + iocb = IOCB(request) + # iocb.add_callback(self.subscription_acknowledged) + self._log.info("IOCB : {}".format(iocb)) + + iocb.add_callback(self.subscription_acknowledged) + + # pass to the BACnet stack + deferred(self.this_application.request_io, iocb) + + # Unconfirmed request...so wait until complete + # iocb.wait() # Wait for BACnet response + + def subscription_acknowledged(self, iocb): + if iocb.ioResponse: + self._log.info("Subscription success") + + if iocb.ioError: + self._log.error("Subscription failed. {}".format(iocb.ioError)) + + def cov(self, address, objectID, confirmed=True, lifetime=None): + address = Address(address) + context = self._build_cov_context( + address, objectID, confirmed=confirmed, lifetime=lifetime + ) + request = self._build_cov_request(context) + self.send_cov_subscription(request) + + def _build_cov_context(self, address, objectID, confirmed=True, lifetime=None): + context = SubscriptionContext( + address=address, objectID=objectID, confirmed=confirmed, lifetime=lifetime + ) + self.this_application.subscription_contexts[ + context.subscriberProcessIdentifier + ] = context + return context + + def _build_cov_request(self, context): + request = SubscribeCOVRequest( + subscriberProcessIdentifier=context.subscriberProcessIdentifier, + monitoredObjectIdentifier=context.monitoredObjectIdentifier, + ) + request.pduDestination = context.address + + # optional parameters + if context.issueConfirmedNotifications is not None: + request.issueConfirmedNotifications = context.issueConfirmedNotifications + if context.lifetime is not None: + request.lifetime = context.lifetime + + return request diff --git a/BAC0/scripts/Lite.py b/BAC0/scripts/Lite.py index eaf0b54b..690546ee 100755 --- a/BAC0/scripts/Lite.py +++ b/BAC0/scripts/Lite.py @@ -37,6 +37,7 @@ class ReadWriteScript(BasicScript,ReadProperty,WriteProperty) from ..core.functions.TimeSync import TimeSync from ..core.functions.Reinitialize import Reinitialize from ..core.functions.DeviceCommunicationControl import DeviceCommunicationControl +from ..core.functions.cov import CoV from ..core.io.Simulate import Simulation from ..core.devices.Points import Point from ..core.devices.Device import RPDeviceConnected, RPMDeviceConnected @@ -66,6 +67,7 @@ class Lite( TimeSync, Reinitialize, DeviceCommunicationControl, + CoV, ): """ Build a BACnet application to accept read and write requests. @@ -76,9 +78,6 @@ class Lite( :param ip='127.0.0.1': Address must be in the same subnet as the BACnet network [BBMD and Foreign Device - not supported] - :param bokeh_server: (boolean) If set to false, will prevent Bokeh server - from being started. Can help troubleshoot issues with Bokeh. By default, - set to True. """ def __init__(