Skip to content

Commit

Permalink
psrt socket buf
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Oct 30, 2021
1 parent 1544dcb commit 4d11f1e
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions lib/eva/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import msgpack
import requests
import paho.mqtt.client as mqtt
try:
import psrt
except:
pass
import psrt
import time
from queue import Queue
import glob
Expand Down Expand Up @@ -2297,7 +2294,7 @@ def __init__(self,
if self.proto == 'mqtt':
self.mq = mqtt.Client()
elif self.proto == 'psrt':
self.mq = psrt.Client()
self.mq = psrt.Client(buf_size=self.socket_buf_size)
self.mq.timeout = self.get_timeout()
else:
raise RuntimeError(f'Invalid protocol: {self.proto}')
Expand Down Expand Up @@ -3297,9 +3294,11 @@ def __init__(self,
ping_interval=None,
subscribe_all=False,
timestamp_enabled=True,
socket_buf_size=10_000,
ca_certs=None):
if port is None:
port = 2883
self.socket_buf_size = socket_buf_size
super().__init__(notifier_id=notifier_id,
host=host,
port=port,
Expand Down Expand Up @@ -3330,7 +3329,15 @@ def __init__(self,
def set_prop(self, prop, value):
if prop in self.disabled_props:
return False
return super().set_prop(prop, value)
elif prop == 'socket_buf_size':
try:
v = int(value)
except:
return False
self.socket_buf_size = v
return True
else:
return super().set_prop(prop, value)

def serialize(self, props=False):
d = super().serialize(props=props)
Expand All @@ -3339,6 +3346,7 @@ def serialize(self, props=False):
del d[p]
except KeyError:
pass
d['socket_buf_size'] = self.socket_buf_size
return d


Expand Down Expand Up @@ -4111,6 +4119,7 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True):
bulk_topic = ncfg.get('bulk_topic')
bulk_subscribe = ncfg.get('bulk_subscribe')
bulk_compress = ncfg.get('bulk_compress', False)
socket_buf_size = ncfg.get('socket_buf_size', False)
n = PSRTNotifier(notifier_id,
host=host,
port=port,
Expand All @@ -4130,6 +4139,7 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True):
bulk_topic=bulk_topic,
bulk_subscribe=bulk_subscribe,
bulk_compress=bulk_compress,
socket_buf_size=socket_buf_size,
ca_certs=ca_certs)
elif ncfg['type'] == 'udp':
interval = ncfg.get('interval')
Expand Down

0 comments on commit 4d11f1e

Please sign in to comment.