/
model_sender_custom.py
78 lines (55 loc) · 2.26 KB
/
model_sender_custom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from py_utils.wait.BooleanWaitHandler import BooleanWaitHandler
from amlip_py.node.ModelManagerSenderNode import ModelManagerSenderNode, ModelReplier
from amlip_py.types.AmlipIdDataType import AmlipIdDataType
from amlip_py.types.ModelReplyDataType import ModelReplyDataType
from amlip_py.types.ModelRequestDataType import ModelRequestDataType
# Domain ID
DOMAIN_ID = 166
# Variable to wait to the model request
waiter = BooleanWaitHandler(True, False)
class CustomModelReplier(ModelReplier):
def fetch_model(
self,
request: ModelRequestDataType) -> ModelReplyDataType:
reply = ModelReplyDataType(request.to_string().upper())
print(f'Model request received from client\n'
f' request: {request.to_string()}\n'
f' reply: {reply.to_string()}')
waiter.open()
return reply
def main():
"""Execute main routine."""
id = AmlipIdDataType('ModelManagerSender')
id.set_id([10, 20, 30, 40])
# Create node
print('Starting Manual Test Model Manager Sender Node Py execution. Creating Node...')
model_sender_node = ModelManagerSenderNode(
id=id,
domain=DOMAIN_ID)
print(f'Node created: {model_sender_node.get_id()}. '
'Already processing models.')
model_sender_node.publish_statistics(
'ModelManagerSenderStatistics',
'hello world')
model_sender_node.start(
listener=CustomModelReplier())
# Wait for the solution to be sent
waiter.wait()
model_sender_node.stop()
print('Finishing Manual Test Model Manager Sender Node Py execution.')
# Call main in program execution
if __name__ == '__main__':
main()