From 4d11f1ee58c77b9e1f5b65b847684db4e5f6db00 Mon Sep 17 00:00:00 2001 From: Sergei S Date: Sun, 31 Oct 2021 01:41:05 +0200 Subject: [PATCH] psrt socket buf --- lib/eva/notify.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/lib/eva/notify.py b/lib/eva/notify.py index 2242cf78..51011656 100644 --- a/lib/eva/notify.py +++ b/lib/eva/notify.py @@ -9,10 +9,7 @@ import msgpack import requests import paho.mqtt.client as mqtt -try: - import psrt -except: - pass +import psrt import time from queue import Queue import glob @@ -2297,7 +2294,7 @@ def __init__(self, if self.proto == 'mqtt': self.mq = mqtt.Client() elif self.proto == 'psrt': - self.mq = psrt.Client() + self.mq = psrt.Client(buf_size=self.socket_buf_size) self.mq.timeout = self.get_timeout() else: raise RuntimeError(f'Invalid protocol: {self.proto}') @@ -3297,9 +3294,11 @@ def __init__(self, ping_interval=None, subscribe_all=False, timestamp_enabled=True, + socket_buf_size=10_000, ca_certs=None): if port is None: port = 2883 + self.socket_buf_size = socket_buf_size super().__init__(notifier_id=notifier_id, host=host, port=port, @@ -3330,7 +3329,15 @@ def __init__(self, def set_prop(self, prop, value): if prop in self.disabled_props: return False - return super().set_prop(prop, value) + elif prop == 'socket_buf_size': + try: + v = int(value) + except: + return False + self.socket_buf_size = v + return True + else: + return super().set_prop(prop, value) def serialize(self, props=False): d = super().serialize(props=props) @@ -3339,6 +3346,7 @@ def serialize(self, props=False): del d[p] except KeyError: pass + d['socket_buf_size'] = self.socket_buf_size return d @@ -4111,6 +4119,7 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True): bulk_topic = ncfg.get('bulk_topic') bulk_subscribe = ncfg.get('bulk_subscribe') bulk_compress = ncfg.get('bulk_compress', False) + socket_buf_size = ncfg.get('socket_buf_size', False) n = PSRTNotifier(notifier_id, host=host, port=port, @@ -4130,6 +4139,7 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True): bulk_topic=bulk_topic, bulk_subscribe=bulk_subscribe, bulk_compress=bulk_compress, + socket_buf_size=socket_buf_size, ca_certs=ca_certs) elif ncfg['type'] == 'udp': interval = ncfg.get('interval')