Skip to content

Commit

Permalink
add on_set and on_get callbacks to modbus, resolves AdvancedClimateSy…
Browse files Browse the repository at this point in the history
  • Loading branch information
brainelectronics committed Jan 3, 2023
1 parent 245469f commit d8fce04
Showing 1 changed file with 143 additions and 44 deletions.
187 changes: 143 additions & 44 deletions umodbus/modbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .common import Request

# typing not natively supported on MicroPython
from .typing import dict_keys, List, Optional, Union
from .typing import Callable, dict_keys, List, Optional, Union


class Modbus(object):
Expand Down Expand Up @@ -106,8 +106,7 @@ def process(self) -> bool:

def _create_response(self,
request: Request,
reg_type: str) -> Union[bool, int,
List[bool], List[int]]:
reg_type: str) -> Union[List[bool], List[int]]:
"""
Create a response.
Expand All @@ -117,13 +116,15 @@ def _create_response(self,
:type reg_type: str
:returns: Values of this register
:rtype: Union[bool, int, List[int], List[bool]]
:rtype: Union[List[bool], List[int]]
"""
data = []
if type(self._register_dict[reg_type][request.register_addr]) is list:
data = self._register_dict[reg_type][request.register_addr]
data = self._register_dict[reg_type][request.register_addr]['val']
else:
data = [self._register_dict[reg_type][request.register_addr]]
data = [
self._register_dict[reg_type][request.register_addr]['val']
]

return data[:request.quantity]

Expand All @@ -136,9 +137,15 @@ def _process_read_access(self, request: Request, reg_type: str) -> None:
:param reg_type: The register type
:type reg_type: str
"""
if request.register_addr in self._register_dict[reg_type]:
address = request.register_addr

if address in self._register_dict[reg_type]:
vals = self._create_response(request=request, reg_type=reg_type)
request.send_response(vals)

if self._register_dict[reg_type][address].get('on_get_cb', 0):
_cb = self._register_dict[reg_type][address]['on_get_cb']
_cb(reg_type=reg_type, address=address, val=vals)
else:
request.send_exception(Const.ILLEGAL_DATA_ADDRESS)

Expand Down Expand Up @@ -192,23 +199,42 @@ def _process_write_access(self, request: Request, reg_type: str) -> None:
self._set_changed_register(reg_type=reg_type,
address=address,
value=val)
if self._register_dict[reg_type][address].get('on_set_cb', 0):
_cb = self._register_dict[reg_type][address]['on_set_cb']
_cb(reg_type=reg_type, address=address, val=val)
else:
request.send_exception(Const.ILLEGAL_DATA_ADDRESS)

def add_coil(self,
address: int,
value: Union[bool, List[bool]] = False) -> None:
value: Union[bool, List[bool]] = False,
on_set_cb: Callable[[str, int, Union[List[bool], List[int]]],
None] = None,
on_get_cb: Callable[[str, int, Union[List[bool], List[int]]],
None] = None) -> None:
"""
Add a coil to the modbus register dictionary.
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[bool, List[bool]], optional
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[bool, List[bool]], optional
:param on_set_cb: Callback on setting the coil
:type on_set_cb: Callable[
[str, int, Union[List[bool], List[int]]],
None
]
:param on_get_cb: Callback on getting the coil
:type on_get_cb: Callable[
[str, int, Union[List[bool], List[int]]],
None
]
"""
self._set_reg_in_dict(reg_type='COILS',
address=address,
value=value)
value=value,
on_set_cb=on_set_cb,
on_get_cb=on_get_cb)

def remove_coil(self, address: int) -> Union[None, bool, List[bool]]:
"""
Expand Down Expand Up @@ -260,18 +286,28 @@ def coils(self) -> dict_keys:
"""
return self._get_regs_of_dict(reg_type='COILS')

def add_hreg(self, address: int, value: Union[int, List[int]] = 0) -> None:
def add_hreg(self,
address: int,
value: Union[int, List[int]] = 0,
on_set_cb: Callable[[str, int, List[int]], None] = None,
on_get_cb: Callable[[str, int, List[int]], None] = None) -> None:
"""
Add a holding register to the modbus register dictionary.
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[int, List[int]], optional
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[int, List[int]], optional
:param on_set_cb: Callback on setting the holding register
:type on_set_cb: Callable[[str, int, List[int]], None]
:param on_get_cb: Callback on getting the holding register
:type on_get_cb: Callable[[str, int, List[int]], None]
"""
self._set_reg_in_dict(reg_type='HREGS',
address=address,
value=value)
value=value,
on_set_cb=on_set_cb,
on_get_cb=on_get_cb)

def remove_hreg(self, address: int) -> Union[None, int, List[int]]:
"""
Expand Down Expand Up @@ -323,18 +359,26 @@ def hregs(self) -> dict_keys:

def add_ist(self,
address: int,
value: Union[bool, List[bool]] = False) -> None:
value: Union[bool, List[bool]] = False,
on_get_cb: Callable[[str, int, Union[List[bool], List[int]]],
None] = None) -> None:
"""
Add a discrete input register to the modbus register dictionary.
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: bool or list of bool, optional
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: bool or list of bool, optional
:param on_get_cb: Callback on getting the discrete input register
:type on_get_cb: Callable[
[str, int, Union[List[bool], List[int]]],
None
]
"""
self._set_reg_in_dict(reg_type='ISTS',
address=address,
value=value)
value=value,
on_get_cb=on_get_cb)

def remove_ist(self, address: int) -> Union[None, bool, List[bool]]:
"""
Expand Down Expand Up @@ -384,18 +428,28 @@ def ists(self) -> dict_keys:
"""
return self._get_regs_of_dict(reg_type='ISTS')

def add_ireg(self, address: int, value: Union[int, List[int]] = 0) -> None:
def add_ireg(self,
address: int,
value: Union[int, List[int]] = 0,
on_get_cb: Callable[[str, int, Union[List[bool], List[int]]],
None] = None) -> None:
"""
Add an input register to the modbus register dictionary.
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[int, List[int]], optional
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[int, List[int]], optional
:param on_get_cb: Callback on getting the input register
:type on_get_cb: Callable[
[str, int, Union[List[bool], List[int]]],
None
]
"""
self._set_reg_in_dict(reg_type='IREGS',
address=address,
value=value)
value=value,
on_get_cb=on_get_cb)

def remove_ireg(self, address: int) -> Union[None, int, List[int]]:
"""
Expand Down Expand Up @@ -448,16 +502,32 @@ def iregs(self) -> dict_keys:
def _set_reg_in_dict(self,
reg_type: str,
address: int,
value: Union[bool, int, List[bool], List[int]]) -> None:
value: Union[bool, int, List[bool], List[int]],
on_set_cb: Callable[[str, int, Union[List[bool],
List[int]]],
None] = None,
on_get_cb: Callable[[str, int, Union[List[bool],
List[int]]],
None] = None) -> None:
"""
Set the register value in the dictionary of registers.
:param reg_type: The register type
:type reg_type: str
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[bool, int, List[bool], List[int]]
:param reg_type: The register type
:type reg_type: str
:param address: The address (ID) of the register
:type address: int
:param value: The default value
:type value: Union[bool, int, List[bool], List[int]]
:param on_set_cb: Callback on setting the register
:type on_get_cb: Callable[
[str, int, Union[List[bool], List[int]]],
None
]
:param on_get_cb: Callback on getting the register
:type on_get_cb: Callable[
[str, int, Union[List[bool], List[int]]],
None
]
:raise KeyError: No register at specified address found
:returns: Register value
Expand All @@ -467,7 +537,27 @@ def _set_reg_in_dict(self,
raise KeyError('{} is not a valid register type of {}'.
format(reg_type, self._available_register_types))

self._register_dict[reg_type][address] = value
data = {'val': value}

# if the register exists already in the register dict a "set_*"
# function might have called this functions
if address in self._register_dict[reg_type]:
# try to get the (already) registered callback function from the
# register dict of this address with the this time call function
# parameter callback value as fallback
on_set_cb = self._register_dict[reg_type][address].get('on_set_cb',
on_set_cb)

on_get_cb = self._register_dict[reg_type][address].get('on_get_cb',
on_get_cb)

if callable(on_set_cb):
data['on_set_cb'] = on_set_cb

if callable(on_get_cb):
data['on_get_cb'] = on_get_cb

self._register_dict[reg_type][address] = data

def _remove_reg_from_dict(self,
reg_type: str,
Expand Down Expand Up @@ -510,7 +600,7 @@ def _get_reg_in_dict(self,
format(reg_type, self._available_register_types))

if address in self._register_dict[reg_type]:
return self._register_dict[reg_type][address]
return self._register_dict[reg_type][address]['val']
else:
raise KeyError('No {} available for the register address {}'.
format(reg_type, address))
Expand Down Expand Up @@ -656,18 +746,27 @@ def setup_registers(self,
else:
value = val['val']

on_set_cb = val.get('on_set_cb', None)
on_get_cb = val.get('on_get_cb', None)

if reg_type == 'COILS':
self.add_coil(address=address,
value=value)
value=value,
on_set_cb=on_set_cb,
on_get_cb=on_get_cb)
elif reg_type == 'HREGS':
self.add_hreg(address=address,
value=value)
value=value,
on_set_cb=on_set_cb,
on_get_cb=on_get_cb)
elif reg_type == 'ISTS':
self.add_ist(address=address,
value=value)
value=value,
on_get_cb=on_get_cb) # only getter
elif reg_type == 'IREGS':
self.add_ireg(address=address,
value=value)
value=value,
on_get_cb=on_get_cb) # only getter
else:
# invalid register type
pass
Expand Down

0 comments on commit d8fce04

Please sign in to comment.