Skip to content

Commit

Permalink
Playing with COV. Actuelly, request can be sent and app will react to…
Browse files Browse the repository at this point in the history
… a change on the network. But next part should be to modify the values of the registered devices points on a subscription to a COV... probably will need to improve the BAC0.points to support COV registration by making a request from there...
  • Loading branch information
ChristianTremblay committed Sep 14, 2020
1 parent 9338d12 commit 84d7985
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 3 deletions.
42 changes: 42 additions & 0 deletions BAC0/core/app/ScriptApplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from bacpypes.app import ApplicationIOController
from bacpypes.pdu import Address
from bacpypes.service.object import ReadWritePropertyMultipleServices
from bacpypes.service.cov import ChangeOfValueServices
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.bvllservice import (
BIPSimple,
Expand All @@ -31,6 +32,8 @@
AnnexJCodec,
UDPMultiplexer,
)
from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU, RejectPDU, AbortPDU

from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.comm import ApplicationServiceElement, bind, Client
from bacpypes.iocb import IOCB
Expand Down Expand Up @@ -97,6 +100,33 @@ def do_WhoIsRequest(self, apdu):
iocb = IOCB(self.iam_req) # make an IOCB
deferred(self.request_io, iocb)

def do_ConfirmedCOVNotificationRequest(self, apdu):
# look up the process identifier
context = self.subscription_contexts.get(apdu.subscriberProcessIdentifier, None)
if not context or apdu.pduSource != context.address:
# this is turned into an ErrorPDU and sent back to the client
raise RuntimeError("services", "unknownSubscription")

# now tell the context object
elements = context.cov_notification(apdu)

# success
response = SimpleAckPDU(context=apdu)

# return the result
self.response(response)
print(elements)

def do_UnconfirmedCOVNotificationRequest(self, apdu):
# look up the process identifier
context = self.subscription_contexts.get(apdu.subscriberProcessIdentifier, None)
if not context or apdu.pduSource != context.address:
return

# now tell the context object
elements = context.cov_notification(apdu)
print(elements)


@note_and_log
class BAC0Application(
Expand All @@ -106,6 +136,7 @@ class BAC0Application(
WhoHasIHaveServices,
ReadWritePropertyServices,
ReadWritePropertyMultipleServices,
ChangeOfValueServices,
):
"""
Defines a basic BACnet/IP application to process BACnet requests.
Expand Down Expand Up @@ -179,6 +210,9 @@ def __init__(
self._last_i_am_received = []
self._last_i_have_received = []

# to support CoV
self.subscription_contexts = {}

def close_socket(self):
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
Expand All @@ -199,6 +233,7 @@ class BAC0ForeignDeviceApplication(
WhoHasIHaveServices,
ReadWritePropertyServices,
ReadWritePropertyMultipleServices,
ChangeOfValueServices,
):
"""
Defines a basic BACnet/IP application to process BACnet requests.
Expand Down Expand Up @@ -271,6 +306,9 @@ def __init__(
self._last_i_am_received = []
self._last_i_have_received = []

# to support CoV
self.subscription_contexts = {}

def close_socket(self):
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
Expand All @@ -292,6 +330,7 @@ class BAC0BBMDDeviceApplication(
WhoHasIHaveServices,
ReadWritePropertyServices,
ReadWritePropertyMultipleServices,
ChangeOfValueServices,
):
"""
Defines a basic BACnet/IP application to process BACnet requests.
Expand Down Expand Up @@ -374,6 +413,9 @@ def __init__(
self._last_i_am_received = []
self._last_i_have_received = []

# to support CoV
self.subscription_contexts = {}

def add_peer(self, address):
try:
bdt_address = Address(address)
Expand Down
131 changes: 131 additions & 0 deletions BAC0/core/functions/cov.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU, RejectPDU, AbortPDU
from bacpypes.iocb import IOCB
from bacpypes.core import deferred
from bacpypes.pdu import Address
from bacpypes.object import get_object_class, get_datatype
from bacpypes.constructeddata import Array
from bacpypes.primitivedata import Tag, ObjectIdentifier, Unsigned

from BAC0.core.io.Read import cast_datatype_from_tag

"""
using cov, we build a "context" which is turned into a subscription being sent to
the destination.
Once the IOCB is over, the callback attached to it will execute (subscription_acknowledged)
and we'll get the answer
"""


class SubscriptionContext:
next_proc_id = 1

def __init__(self, address, objectID, confirmed=None, lifetime=None):
self.address = address
self.subscriberProcessIdentifier = self.next_proc_id
self.next_proc_id += 1
# subscription_contexts[self.subscriberProcessIdentifier] = self
self.monitoredObjectIdentifier = objectID
self.issueConfirmedNotifications = confirmed
self.lifetime = lifetime

def cov_notification(self, apdu):
# make a rash assumption that the property value is going to be
# a single application encoded tag
source = apdu.pduSource
object_changed = apdu.monitoredObjectIdentifier

elements = {
"source": source,
"object_changed": object_changed,
"properties": {},
}
for element in apdu.listOfValues:
prop_id = element.propertyIdentifier
datatype = get_datatype(object_changed[0], prop_id)
value = element.value

if not datatype:
# raise TypeError("unknown datatype")
value = cast_datatype_from_tag(
element.value, object_changed[0], prop_id
)
else:
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (
element.propertyArrayIndex is not None
):
if element.propertyArrayIndex == 0:
value = element.value.cast_out(Unsigned)
else:
value = element.value.cast_out(datatype.subtype)
else:
value = element.value.cast_out(datatype)

elements["properties"][prop_id] = value

return elements


class CoV:
"""
Mixin to support COV registration
"""

def cov_notification(self, apdu):
print("do something here")

def send_cov_subscription(self, request):
self._log.info("Request : {}".format(request))
iocb = IOCB(request)
# iocb.add_callback(self.subscription_acknowledged)
self._log.info("IOCB : {}".format(iocb))

iocb.add_callback(self.subscription_acknowledged)

# pass to the BACnet stack
deferred(self.this_application.request_io, iocb)

# Unconfirmed request...so wait until complete
# iocb.wait() # Wait for BACnet response

def subscription_acknowledged(self, iocb):
if iocb.ioResponse:
self._log.info("Subscription success")

if iocb.ioError:
self._log.error("Subscription failed. {}".format(iocb.ioError))

def cov(self, address, objectID, confirmed=True, lifetime=None):
address = Address(address)
context = self._build_cov_context(
address, objectID, confirmed=confirmed, lifetime=lifetime
)
request = self._build_cov_request(context)
self.send_cov_subscription(request)

def _build_cov_context(self, address, objectID, confirmed=True, lifetime=None):
context = SubscriptionContext(
address=address, objectID=objectID, confirmed=confirmed, lifetime=lifetime
)
self.this_application.subscription_contexts[
context.subscriberProcessIdentifier
] = context
return context

def _build_cov_request(self, context):
request = SubscribeCOVRequest(
subscriberProcessIdentifier=context.subscriberProcessIdentifier,
monitoredObjectIdentifier=context.monitoredObjectIdentifier,
)
request.pduDestination = context.address

# optional parameters
if context.issueConfirmedNotifications is not None:
request.issueConfirmedNotifications = context.issueConfirmedNotifications
if context.lifetime is not None:
request.lifetime = context.lifetime

return request
5 changes: 2 additions & 3 deletions BAC0/scripts/Lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ReadWriteScript(BasicScript,ReadProperty,WriteProperty)
from ..core.functions.TimeSync import TimeSync
from ..core.functions.Reinitialize import Reinitialize
from ..core.functions.DeviceCommunicationControl import DeviceCommunicationControl
from ..core.functions.cov import CoV
from ..core.io.Simulate import Simulation
from ..core.devices.Points import Point
from ..core.devices.Device import RPDeviceConnected, RPMDeviceConnected
Expand Down Expand Up @@ -66,6 +67,7 @@ class Lite(
TimeSync,
Reinitialize,
DeviceCommunicationControl,
CoV,
):
"""
Build a BACnet application to accept read and write requests.
Expand All @@ -76,9 +78,6 @@ class Lite(
:param ip='127.0.0.1': Address must be in the same subnet as the BACnet network
[BBMD and Foreign Device - not supported]
:param bokeh_server: (boolean) If set to false, will prevent Bokeh server
from being started. Can help troubleshoot issues with Bokeh. By default,
set to True.
"""

def __init__(
Expand Down

0 comments on commit 84d7985

Please sign in to comment.