Skip to content

Commit

Permalink
Merge c91d68a into fe51e1b
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuZuD committed Dec 23, 2020
2 parents fe51e1b + c91d68a commit 2e13194
Show file tree
Hide file tree
Showing 8 changed files with 1,834 additions and 5 deletions.
566 changes: 566 additions & 0 deletions asyncua/client/ha/ha_client.py

Large diffs are not rendered by default.

403 changes: 403 additions & 0 deletions asyncua/client/ha/reconciliator.py

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions asyncua/client/ha/utils.py
@@ -0,0 +1,35 @@
import asyncio
import hashlib
import logging
import pickle

from itertools import chain, islice


_logger = logging.getLogger(__name__)


class ClientNotFound(Exception):
pass


async def event_wait(evt, timeout) -> bool:
try:
await asyncio.wait_for(evt.wait(), timeout)
except asyncio.TimeoutError:
pass
return evt.is_set()


def get_digest(conf) -> str:
return hashlib.md5(pickle.dumps(conf)).hexdigest()


def batch(iterable, size):
iterator = iter(iterable)
while True:
try:
batchiter = islice(iterator, size)
yield list(chain([next(batchiter)], batchiter))
except StopIteration:
break
43 changes: 43 additions & 0 deletions asyncua/client/ha/virtual_subscription.py
@@ -0,0 +1,43 @@
from dataclasses import dataclass, field
from typing import Any, Iterable, Optional, Set
from asyncua import ua
from sortedcontainers import SortedDict


TypeSubHandler = Any


@dataclass(frozen=True)
class NodeAttr:
attr: Optional[ua.AttributeIds] = None
queuesize: int = 0


@dataclass
class VirtualSubscription:
period: int
handler: TypeSubHandler
publishing: bool
monitoring: ua.MonitoringMode
# full type: SortedDict[str, NodeAttr]
nodes: SortedDict = field(default_factory=SortedDict)

def subscribe_data_change(
self, nodes: Iterable[str], attr: ua.AttributeIds, queuesize: int
) -> None:
for node in nodes:
self.nodes[node] = NodeAttr(attr, queuesize)

def unsubscribe(self, nodes: Iterable[str]) -> None:
for node in nodes:
if self.nodes.get(node):
self.nodes.pop(node)

def set_monitoring_mode(self, mode: ua.MonitoringMode) -> None:
self.monitoring = mode

def set_publishing_mode(self, mode: bool) -> None:
self.publishing = mode

def get_nodes(self) -> Set[str]:
return set(self.nodes)
103 changes: 103 additions & 0 deletions examples/ha_client-example.py
@@ -0,0 +1,103 @@
import sys
import asyncio
import logging
import time

# import asyncua
sys.path.insert(0, "..")

from asyncua import Server, ua
from asyncua.client.ha.ha_client import HaClient, HaMode, HaConfig


# set up logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
# diable logging for the servers
logging.getLogger("asyncua.server").setLevel(logging.WARNING)


class SubHandler:
"""
Basic subscription handler to support datachange_notification.
No need to implement the other handlermethods since the
HA_CLIENT only supports datachange for now.
"""

def datachange_notification(self, node, val, data):
"""
called for every datachange notification from server
"""
print(f"Node: {node} has value: {val}\n")


async def start_servers():
""" Spin up two servers with identical configurations """
ports = [4840, 4841]
urls = []
loop = asyncio.get_event_loop()
for port in ports:
server = Server()
await server.init()
url = f"opc.tcp://0.0.0.0:{port}/freeopcua/server/"
urls.append(url)
server.set_endpoint(url)
server.set_server_name("FreeOpcUa Example Server {port}")
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)

myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 6.7)
await server.start()
loop.create_task(server_var_update(server, myvar))
return urls, myvar


async def server_var_update(server, myvar):
"""
Constantly increment the variable with epoch time
to simulate data notifications.
"""
while True:
await asyncio.sleep(1)
await server.write_attribute_value(myvar.nodeid, ua.DataValue(time.time()))


async def main():
# start the servers
urls, node = await start_servers()

# set up ha_client with the serveur urls
ha_config = HaConfig(
HaMode.WARM,
keepalive_timer=15,
manager_timer=15,
reconciliator_timer=15,
urls=urls,
session_timeout=30
)
ha = HaClient(ha_config)
await ha.start()

publish_interval = 1000
handler = SubHandler()

# subscribe to two nodes
sub1 = await ha.create_subscription(publish_interval, handler)
await ha.subscribe_data_change(sub1, [node])

# Watch the debug log and check what's happening in the background.
# A basic check could be to `iptables -A OUTPUT -p tcp --dport 4840 -j DROP`
# and observe the failover in action
await asyncio.sleep(60)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -15,7 +15,7 @@
packages=find_packages(),
provides=["asyncua"],
license="GNU Lesser General Public License v3 or later",
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", 'cryptography'],
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", "cryptography", "sortedcontainers"],
classifiers=[
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
Expand Down

0 comments on commit 2e13194

Please sign in to comment.