/
z_queryable.py
94 lines (82 loc) · 3.17 KB
/
z_queryable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#
# Copyright (c) 2022 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
import sys
import time
import argparse
import json
import zenoh
from zenoh import config, Sample, Value
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='z_queryable',
description='zenoh queryable example')
parser.add_argument('--mode', '-m', dest='mode',
choices=['peer', 'client'],
type=str,
help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to listen on.')
parser.add_argument('--key', '-k', dest='key',
default='demo/example/zenoh-python-queryable',
type=str,
help='The key expression matching queries to reply to.')
parser.add_argument('--value', '-v', dest='value',
default='Queryable from Python!',
type=str,
help='The value to reply to queries.')
parser.add_argument('--complete', dest='complete',
default=False,
action='store_true',
help='Declare the queryable as complete w.r.t. the key expression.')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
args = parser.parse_args()
conf = zenoh.Config.from_file(args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key
value = args.value
complete = args.complete
# Zenoh code --- --- --- --- --- --- --- --- --- --- ---
def queryable_callback(query):
print(f">> [Queryable ] Received Query '{query.selector}'" + (f" with value: {query.value.payload}" if query.value is not None else ""))
query.reply(Sample(key, value))
# initiate logging
zenoh.init_logger()
print("Opening session...")
session = zenoh.open(conf)
print("Declaring Queryable on '{}'...".format(key))
queryable = session.declare_queryable(key, queryable_callback, complete)
print("Enter 'q' to quit...")
c = '\0'
while c != 'q':
c = sys.stdin.read(1)
if c == '':
time.sleep(1)
queryable.undeclare()
session.close()