Skip to content

Commit

Permalink
Implement tagval parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
vzahradnik committed Sep 4, 2023
1 parent f7a5563 commit 77bb786
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 42 deletions.
2 changes: 1 addition & 1 deletion tcmenu/__init__.py
Expand Up @@ -6,4 +6,4 @@
"__version__",
]

__version__ = "4.1.1"
__version__ = "4.1.2"
9 changes: 9 additions & 0 deletions tcmenu/remote/commands/dialog_mode.py
Expand Up @@ -15,3 +15,12 @@ class DialogMode(Enum):

""" Perform the following action on the dialog. """
ACTION = auto()

@staticmethod
def from_string(mode: str) -> "DialogMode":
if mode == "S":
return DialogMode.SHOW
elif mode == "H":
return DialogMode.HIDE
else:
return DialogMode.ACTION
7 changes: 7 additions & 0 deletions tcmenu/remote/commands/menu_button_type.py
Expand Up @@ -32,3 +32,10 @@ def button_name(self) -> str:
@property
def type_value(self) -> int:
return self._value_.type_value

@staticmethod
def from_id(button_id: int):
for button_type in MenuButtonType:
if button_type.type_value == button_id:
return button_type
return MenuButtonType.NONE
14 changes: 11 additions & 3 deletions tcmenu/remote/commands/menu_heartbeat_command.py
Expand Up @@ -10,9 +10,9 @@
class MenuHeartbeatCommand(MenuCommand):
# noinspection PyArgumentList
class HeartbeatMode(Enum):
START = auto()
NORMAL = auto()
END = auto()
NORMAL = 0
START = 1
END = 2

heartbeat_interval: int

Expand All @@ -22,5 +22,13 @@ class HeartbeatMode(Enum):
def command_type(self) -> MessageField:
return MenuCommandType.HEARTBEAT.message_field

@staticmethod
def from_id(heartbeat_id: int):
return (
MenuHeartbeatCommand.HeartbeatMode(heartbeat_id)
if heartbeat_id in (item.value for item in MenuHeartbeatCommand.HeartbeatMode)
else MenuHeartbeatCommand.HeartbeatMode.NORMAL
)

def __repr__(self):
return "MenuHeartbeatCommand{}"
6 changes: 3 additions & 3 deletions tcmenu/remote/protocol/configurable_protocol_converter.py
Expand Up @@ -82,7 +82,7 @@ def add_raw_out_processor(self, field, processor, clazz):

def from_channel(self, buffer: io.BytesIO) -> MenuCommand:
proto_id = buffer.read(1)[0]
protocol = CommandProtocol(proto_id).value
protocol = CommandProtocol(proto_id)

msg_type = self._get_msg_type_from_buffer(buffer)
cmd_type = MessageField.from_id(msg_type)
Expand All @@ -94,11 +94,11 @@ def from_channel(self, buffer: io.BytesIO) -> MenuCommand:
parser = TagValTextParser(buffer)
logging.debug(f"Protocol convert in: {parser}")
return self._tag_val_incoming_parsers[cmd_type](parser)
elif protocol == CommandProtocol.RAW_BIN_PROTOCOL and cmd_type in self._raw_incoming_parsers:
elif protocol.value == CommandProtocol.RAW_BIN_PROTOCOL and cmd_type in self._raw_incoming_parsers:
length = int.from_bytes(buffer.read(4), byteorder="big")
return self._raw_incoming_parsers[cmd_type](buffer, length)
else:
raise TcProtocolException(f"Unknown protocol used in message {protocol}")
raise TcProtocolException(f"Unknown protocol used in message: {protocol.name}")

def to_channel(self, buffer: io.BytesIO, command: MenuCommand) -> None:
raw_processor = self._raw_output_writers.get(command.command_type)
Expand Down
10 changes: 5 additions & 5 deletions tcmenu/remote/protocol/message_field.py
Expand Up @@ -8,12 +8,12 @@ class MessageField:

second_byte: str

_ALL_FIELDS_MAP: ClassVar[dict[str, "MessageField"]] = dict()
_ALL_FIELDS_DICT: ClassVar[dict[str, "MessageField"]] = {}

def __post_init__(self):
if self.id in MessageField._ALL_FIELDS_MAP.keys():
if self.id in MessageField._ALL_FIELDS_DICT.keys():
raise ValueError(f"Duplicate key: {self.id}")
MessageField._ALL_FIELDS_MAP[self.id] = self
MessageField._ALL_FIELDS_DICT[self.id] = self

@property
def high(self) -> str:
Expand All @@ -29,8 +29,8 @@ def id(self) -> str:

@staticmethod
def from_id(field_id: str) -> "MessageField":
if field_id in MessageField._ALL_FIELDS_MAP.keys():
return MessageField._ALL_FIELDS_MAP[field_id]
if field_id in MessageField._ALL_FIELDS_DICT.keys():
return MessageField._ALL_FIELDS_DICT[field_id]
else:
raise ValueError("An unknown message type was generated.")

Expand Down
41 changes: 31 additions & 10 deletions tcmenu/remote/protocol/tag_val_menu_command_processors.py
@@ -1,5 +1,7 @@
import io
import uuid

from tcmenu.remote.commands.command_factory import CommandFactory
from tcmenu.remote.commands.dialog_mode import DialogMode
from tcmenu.remote.commands.menu_acknowledgement_command import MenuAcknowledgementCommand
from tcmenu.remote.commands.menu_boot_commands import (
Expand All @@ -24,6 +26,9 @@
from tcmenu.remote.commands.menu_heartbeat_command import MenuHeartbeatCommand
from tcmenu.remote.commands.menu_join_command import MenuJoinCommand
from tcmenu.remote.commands.menu_pairing_command import MenuPairingCommand
from tcmenu.remote.protocol.correlation_id import CorrelationId
from tcmenu.remote.protocol.protocol_util import ProtocolUtil
from tcmenu.remote.protocol.tag_val_menu_fields import TagValMenuFields
from tcmenu.remote.protocol.tag_val_text_parser import TagValTextParser


Expand Down Expand Up @@ -172,11 +177,25 @@ def add_handlers_to_protocol(proto: "ConfigurableProtocolConverter"):

@staticmethod
def _process_join(parser: TagValTextParser) -> MenuCommand:
pass
uuid_str = parser.get_value(TagValMenuFields.KEY_UUID_FIELD.value, "")
uuid_val = uuid.UUID(uuid_str) if len(uuid_str) > 0 else uuid.uuid4()

return MenuJoinCommand(
my_name=parser.get_value(TagValMenuFields.KEY_NAME_FIELD.value),
api_version=parser.get_value_as_int(TagValMenuFields.KEY_VER_FIELD.value),
platform=ProtocolUtil.from_key_to_api_platform(
parser.get_value_as_int(TagValMenuFields.KEY_PLATFORM_ID.value)
),
app_uuid=uuid_val,
serial_number=parser.get_value_as_int(TagValMenuFields.KEY_SERIAL_NO.value, 0),
)

@staticmethod
def _process_heartbeat(parser: TagValTextParser) -> MenuCommand:
pass
return CommandFactory.new_heartbeat_command(
frequency=parser.get_value_as_int(TagValMenuFields.HB_FREQUENCY_FIELD.value, 10000),
mode=MenuHeartbeatCommand.from_id(parser.get_value_as_int(TagValMenuFields.HB_MODE_FIELD.value, 0)),
)

@staticmethod
def _process_bootstrap(parser: TagValTextParser) -> MenuCommand:
Expand Down Expand Up @@ -240,15 +259,17 @@ def _process_pairing_request(parser: TagValTextParser) -> MenuCommand:

@staticmethod
def _process_dialog_update(parser: TagValTextParser) -> MenuCommand:
pass
cor = parser.get_value(TagValMenuFields.KEY_CORRELATION_FIELD.value, "")
correlation_id = CorrelationId.EMPTY_CORRELATION if len(cor) == 0 else CorrelationId.from_string(cor)

@staticmethod
def _as_dialog_mode(mode: str) -> DialogMode:
pass

@staticmethod
def _as_button(req: int) -> MenuButtonType:
pass
return CommandFactory.new_dialog_command(
mode=DialogMode.from_string(parser.get_value(TagValMenuFields.KEY_MODE_FIELD.value)),
header=parser.get_value(TagValMenuFields.KEY_HEADER_FIELD.value, ""),
message=parser.get_value(TagValMenuFields.KEY_BUFFER_FIELD.value, ""),
button1=MenuButtonType.from_id(parser.get_value_as_int(TagValMenuFields.KEY_BUTTON1_FIELD.value, 0)),
button2=MenuButtonType.from_id(parser.get_value_as_int(TagValMenuFields.KEY_BUTTON2_FIELD.value, 0)),
correlation_id=correlation_id,
)

@staticmethod
def _write_join(buffer: io.StringIO, command: MenuCommand) -> None:
Expand Down
7 changes: 7 additions & 0 deletions test/remote/commands/test_dialog_mode.py
Expand Up @@ -10,3 +10,10 @@ def test_enum_values():
def test_enum_uniqueness():
values = [mode.value for mode in DialogMode]
assert len(values) == len(set(values)), "Enum values are not unique"


def test_dialog_mode_from_string():
assert DialogMode.SHOW == DialogMode.from_string("S")
assert DialogMode.HIDE == DialogMode.from_string("H")
assert DialogMode.ACTION == DialogMode.from_string("")
assert DialogMode.ACTION == DialogMode.from_string("AA")
10 changes: 10 additions & 0 deletions test/remote/commands/test_menu_button_type.py
Expand Up @@ -13,3 +13,13 @@ def test_button_not_set():
assert type(button.button_name) == str
assert len(button.button_name) == 0
assert type(button.type_value) == int


def test_button_type_from_id():
assert MenuButtonType.OK == MenuButtonType.from_id(0)
assert MenuButtonType.ACCEPT == MenuButtonType.from_id(1)
assert MenuButtonType.CANCEL == MenuButtonType.from_id(2)
assert MenuButtonType.CLOSE == MenuButtonType.from_id(3)
assert MenuButtonType.NONE == MenuButtonType.from_id(4)
assert MenuButtonType.NONE == MenuButtonType.from_id(5)
assert MenuButtonType.NONE == MenuButtonType.from_id(100)
7 changes: 7 additions & 0 deletions test/remote/commands/test_menu_heartbeat_command.py
Expand Up @@ -9,3 +9,10 @@ def test_menu_heartbeat_command():
assert command.mode == MenuHeartbeatCommand.HeartbeatMode.START
assert isinstance(command.command_type, MessageField) is True
assert command.command_type.id == "HB"


def test_menu_heartbeat_command_from_id():
assert MenuHeartbeatCommand.HeartbeatMode.START == MenuHeartbeatCommand.from_id(1)
assert MenuHeartbeatCommand.HeartbeatMode.END == MenuHeartbeatCommand.from_id(2)
assert MenuHeartbeatCommand.HeartbeatMode.NORMAL == MenuHeartbeatCommand.from_id(0)
assert MenuHeartbeatCommand.HeartbeatMode.NORMAL == MenuHeartbeatCommand.from_id(100)
21 changes: 18 additions & 3 deletions test/remote/protocol/test_message_field.py
Expand Up @@ -3,8 +3,25 @@
from tcmenu.remote.protocol.message_field import MessageField


# noinspection PyProtectedMember
@pytest.fixture(autouse=True)
def message_field_fixture():
"""
This code temporarily changes internal dictionary of a MessageField.
We have to ensure that original entries are set back to avoid
test failures.
"""
# Setup
entries: dict[str, "MessageField"] = MessageField._ALL_FIELDS_DICT.copy()
MessageField._ALL_FIELDS_DICT.clear()

yield

# Teardown
MessageField._ALL_FIELDS_DICT = entries


def test_message_field():
MessageField._ALL_FIELDS_MAP.clear()
message = MessageField("R", "V")

assert message.first_byte is "R"
Expand All @@ -16,15 +33,13 @@ def test_message_field():


def test_message_field_duplicate_entry():
MessageField._ALL_FIELDS_MAP.clear()
MessageField("A", "A")

with pytest.raises(ValueError):
MessageField("A", "A")


def test_message_from_id():
MessageField._ALL_FIELDS_MAP.clear()
MessageField("A", "A")

message = MessageField.from_id("AA")
Expand Down
94 changes: 77 additions & 17 deletions test/remote/protocol/test_tag_val_menu_command_protocol.py
@@ -1,5 +1,13 @@
import io
import uuid

from tcmenu.remote.commands.dialog_mode import DialogMode
from tcmenu.remote.commands.menu_button_type import MenuButtonType
from tcmenu.remote.commands.menu_command_type import MenuCommandType
from tcmenu.remote.commands.menu_dialog_command import MenuDialogCommand
from tcmenu.remote.commands.menu_heartbeat_command import MenuHeartbeatCommand
from tcmenu.remote.commands.menu_join_command import MenuJoinCommand
from tcmenu.remote.protocol.api_platform import ApiPlatform
from tcmenu.remote.protocol.command_protocol import CommandProtocol
from tcmenu.remote.protocol.configurable_protocol_converter import ConfigurableProtocolConverter
from tcmenu.remote.protocol.message_field import MessageField
Expand All @@ -8,23 +16,75 @@


def test_receive_join_command():
# command = protocol.from_channel(
# to_buffer(
# MenuCommandType.JOIN.message_field,
# "NM=IoTdevice|UU=07cd8bc6-734d-43da-84e7-6084990becfc|US=987654321|VE=1223|PF=1|\u0002",
# )
# )

# assert type(command) is MenuJoinCommand
# # noinspection PyTypeChecker
# join: MenuJoinCommand = command
# assert "07cd8bc6-734d-43da-84e7-6084990becfc" == str(join.app_uuid)
# assert "IoTdevice" == join.my_name
# assert 1223 == join.api_version
# assert 987654321 == join
# assert ApiPlatform.JAVA_API == join.platform
# assert MenuCommandType.JOIN.message_field == join.command_type
pass
command = protocol.from_channel(
to_buffer(
MenuCommandType.JOIN.message_field,
"NM=IoTdevice|UU=07cd8bc6-734d-43da-84e7-6084990becfc|US=987654321|VE=1223|PF=1|\u0002",
)
)

assert type(command) is MenuJoinCommand
# noinspection PyTypeChecker
join: MenuJoinCommand = command
assert "07cd8bc6-734d-43da-84e7-6084990becfc" == str(join.app_uuid)
assert "IoTdevice" == join.my_name
assert 1223 == join.api_version
assert 987654321 == join.serial_number
assert ApiPlatform.JAVA_API == join.platform
assert MenuCommandType.JOIN.message_field == join.command_type


def test_receive_join_command_with_no_uuid_and_sn():
command = protocol.from_channel(
to_buffer(
MenuCommandType.JOIN.message_field,
"NM=IoTdevice|VE=1223|PF=1|\u0002",
)
)

assert type(command) is MenuJoinCommand
# noinspection PyTypeChecker
join: MenuJoinCommand = command
assert isinstance(join.app_uuid, uuid.UUID)
assert "IoTdevice" == join.my_name
assert 1223 == join.api_version
assert 0 == join.serial_number
assert ApiPlatform.JAVA_API == join.platform
assert MenuCommandType.JOIN.message_field == join.command_type


def test_receive_dialog_command():
command = protocol.from_channel(
to_buffer(
MenuCommandType.DIALOG_UPDATE.message_field,
"MO=S|HF=Hello\\||BU=Buffer\\=|B1=0|B2=4|\u0002",
)
)

assert type(command) is MenuDialogCommand
# noinspection PyTypeChecker
dlg: MenuDialogCommand = command
assert DialogMode.SHOW == dlg.dialog_mode
assert "Hello|" == dlg.header
assert "Buffer=" == dlg.buffer
assert MenuButtonType.OK, dlg.button1
assert MenuButtonType.NONE, dlg.button2


def test_receive_heartbeat_command():
command = protocol.from_channel(
to_buffer(
MenuCommandType.HEARTBEAT.message_field,
"\u0002",
)
)

assert type(command) is MenuHeartbeatCommand
# noinspection PyTypeChecker
hb: MenuHeartbeatCommand = command
assert MenuCommandType.HEARTBEAT.message_field == hb.command_type
assert hb.mode == MenuHeartbeatCommand.HeartbeatMode.NORMAL
assert hb.heartbeat_interval == 10000


def to_buffer(message_type: MessageField, s: str) -> io.BytesIO:
Expand Down

0 comments on commit 77bb786

Please sign in to comment.