Skip to content

Commit

Permalink
kwargs added to constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
RolefH committed Feb 4, 2020
1 parent 100b86e commit 9def18c
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 43 deletions.
4 changes: 2 additions & 2 deletions package/docs/source/conf.py
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.1'
version = '0.2'
# The full version, including alpha/beta/rc tags.
release = '0.1.1'
release = '0.2.0'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion package/streamsx/mqtt/__init__.py
Expand Up @@ -41,7 +41,7 @@
"""


__version__='0.1.1'
__version__='0.2.0'

__all__ = ['MQTTSink', 'MQTTSource']
from streamsx.mqtt._mqtt import MQTTSink, MQTTSource
Expand Down
55 changes: 50 additions & 5 deletions package/streamsx/mqtt/_mqtt.py
Expand Up @@ -26,7 +26,7 @@ class MQTTComposite(object):
_APP_CONFIG_PROP_NAME_FOR_PASSWORD = 'password'
_APP_CONFIG_PROP_NAME_FOR_USERNAME = 'username'

def __init__(self):
def __init__(self, **options):
self._vm_arg = None
self._app_config_name = None
self._username = None
Expand All @@ -45,6 +45,40 @@ def __init__(self):
self._command_timeout_millis = None
self._client_id = None
self._ssl_debug = False
if 'vm_arg' in options:
self.vm_arg = options.get('vm_arg')
if 'ssl_debug' in options:
self.ssl_debug = options.get('ssl_debug')
if 'app_config_name' in options:
self.app_config_name = options.get('app_config_name')
if 'username' in options:
self.username = options.get('username')
if 'password' in options:
self.password = options.get('password')
if 'trusted_certs' in options:
self.trusted_certs = options.get('trusted_certs')
if 'truststore' in options:
self.truststore = options.get('truststore')
if 'truststore_password' in options:
self.truststore_password = options.get('truststore_password')
if 'client_cert' in options:
self.client_cert = options.get('client_cert')
if 'client_private_key' in options:
self.client_private_key = options.get('client_private_key')
if 'keystore' in options:
self.keystore = options.get('keystore')
if 'keystore_password' in options:
self.keystore_password = options.get('keystore_password')
if 'ssl_protocol' in options:
self.ssl_protocol = options.get('ssl_protocol')
if 'reconnection_bound' in options:
self.reconnection_bound = options.get('reconnection_bound')
if 'keep_alive_seconds' in options:
self.keep_alive_seconds = options.get('keep_alive_seconds')
if 'command_timeout_millis' in options:
self.command_timeout_millis = options.get('command_timeout_millis')
if 'client_id' in options:
self.client_id = options.get('client_id')

@property
def ssl_debug(self):
Expand Down Expand Up @@ -463,9 +497,10 @@ class MQTTSink(MQTTComposite, AbstractSink):
topic_attribute_name(str): The name of a tuple attribute denoting the destination topic.
Mutually exclusive with ``topic``.
data_attribute_name(str): The name of the tuple attribute containing the message data to be published. ``data`` is assumed as default.
**options(kwargs): optional parameters as keyword arguments
"""
def __init__(self, server_uri, topic=None, topic_attribute_name=None, data_attribute_name = None):
MQTTComposite.__init__(self)
def __init__(self, server_uri, topic=None, topic_attribute_name=None, data_attribute_name=None, **options):
MQTTComposite.__init__(self, **options)
AbstractSink.__init__(self)
if not topic and not topic_attribute_name:
raise ValueError('One of topic or topic_attribute_name is required')
Expand All @@ -480,6 +515,11 @@ def __init__(self, server_uri, topic=None, topic_attribute_name=None, data_attri
self._topic_attribute_name = topic_attribute_name
self._data_attribute_name = data_attribute_name
self._qos = None
if 'qos' in options:
self.qos = options.get('qos')
if 'retain' in options:
self.retain = options.get('retain')


def create_spl_params(self, topology) -> dict:
spl_params = MQTTComposite.create_spl_params(self, topology)
Expand Down Expand Up @@ -557,10 +597,11 @@ class MQTTSource(MQTTComposite, AbstractSource):
schema: The schema of the created stream
data_attribute_name(str): The name of the tuple attribute containing the message data. ``data`` is assumed as default.
topic_attribute_name(str): The name of a tuple attribute denoting the source topic of received messages.
**options(kwargs): optional parameters as keyword arguments
"""

def __init__(self, server_uri, topics, schema, data_attribute_name=None, topic_attribute_name=None):
MQTTComposite.__init__(self)
def __init__(self, server_uri, topics, schema, data_attribute_name=None, topic_attribute_name=None, **options):
MQTTComposite.__init__(self, **options)
AbstractSource.__init__(self)
if not topics:
raise ValueError(topics)
Expand All @@ -575,6 +616,10 @@ def __init__(self, server_uri, topics, schema, data_attribute_name=None, topic_a
self._data_attribute_name = data_attribute_name
self._qos = None
self._message_queue_size = 500
if 'qos' in options:
self.qos = options.get('qos')
if 'message_queue_size' in options:
self.message_queue_size = options.get('message_queue_size')

@property
def qos(self):
Expand Down
168 changes: 133 additions & 35 deletions package/streamsx/mqtt/tests/test_mqtt.py
Expand Up @@ -22,47 +22,15 @@ def cloud_creds_env_var():
os.environ['IOT_SERVICE_CREDENTIALS']
except KeyError:
result = False

return result


class MqttDataTuple(typing.NamedTuple):
topic_name: str
data: str


class Test(unittest.TestCase):

@classmethod
def setUpClass(self):
print (str(self))
self.mqtt_toolkit_home = os.environ["MQTT_TOOLKIT_HOME"]

def _build_only(self, name, topo):
result = streamsx.topology.context.submit(ContextTypes.TOOLKIT, topo.graph) # creates tk* directory
print(name + ' (TOOLKIT):' + str(result))
assert(result.return_code == 0)
result = streamsx.topology.context.submit(ContextTypes.BUNDLE, topo.graph) # creates sab file
print(name + ' (BUNDLE):' + str(result))
assert(result.return_code == 0)

def _get_app_config(self):
creds_file = pathlib.Path.cwd().joinpath('streamsx','mqtt', 'tests', 'appConfig.json')
with open(creds_file) as data_file:
credentials = json.load(data_file)

# credentials = json.loads('{"userID" : "user", "password" : "xxx", "serverURI" : "xxx" }')
return credentials

def _get_device_config(self):
creds_file = pathlib.Path.cwd().joinpath('streamsx','mqtt', 'tests', 'deviceConfig.json')
with open(creds_file) as data_file:
credentials = json.load(data_file)
return credentials

def _create_stream(self, topo):
s = topo.source([("test",'{"id":"testid1"}'),("test",'{"id":"testid2"}'),("test",'{"id":"testid3"}')])
return s.map(lambda x : {'topic_name':x[0],'data':x[1]}, schema=MqttDataTuple)

class TestParams(unittest.TestCase):
def test_bad_param(self):
print ('\n---------'+str(self))
# constructor tests
Expand Down Expand Up @@ -110,7 +78,138 @@ def test_bad_param(self):
sink.keep_alive_seconds = -1
with self.assertRaises(ValueError):
sink.command_timeout_millis = -1

def test_options_kwargs_MQTTSink(self):
print ('\n---------'+str(self))
sink = MQTTSink(server_uri='tcp://server:1833',
topic='topic1',
data_attribute_name='data',
#kwargs
vm_arg = ["-Xmx1G"],
ssl_debug = True,
reconnection_bound = 5,
qos = 2,
trusted_certs = ['cert1', 'cert2'],
truststore = "/truststore",
truststore_password = "trustpasswd",
client_cert = 'client_cert',
client_private_key = 'private_key',
keystore = "/keystore",
keystore_password = "keypasswd",
ssl_protocol = 'TLSv1.2',
app_config_name = "abbconf",
client_id = "client-IDsink",
command_timeout_millis = 47,
keep_alive_seconds = 3,
password = "passw0rd",
username = "rolef",
retain = True)
self.assertEqual(sink.server_uri, 'tcp://server:1833')
self.assertEqual(sink._topic, 'topic1')
self.assertEqual(sink._data_attribute_name, 'data')
self.assertEqual(sink.reconnection_bound, 5)
self.assertEqual(sink.ssl_debug, True)
self.assertListEqual(sink.vm_arg, ["-Xmx1G"])
self.assertEqual(sink.qos, 2)
self.assertListEqual(sink.trusted_certs, ['cert1', 'cert2'])
self.assertEqual(sink.truststore, '/truststore')
self.assertEqual(sink.truststore_password, 'trustpasswd')
self.assertEqual(sink.client_cert, 'client_cert')
self.assertEqual(sink.client_private_key, 'private_key')
self.assertEqual(sink.keystore, '/keystore')
self.assertEqual(sink.keystore_password, 'keypasswd')
self.assertEqual(sink.ssl_protocol, 'TLSv1.2')
self.assertEqual(sink.app_config_name, 'abbconf')
self.assertEqual(sink.client_id, 'client-IDsink')
self.assertEqual(sink.command_timeout_millis, 47)
self.assertEqual(sink.keep_alive_seconds, 3)
self.assertEqual(sink.password, 'passw0rd')
self.assertEqual(sink.username, 'rolef')
self.assertEqual(sink.retain, True)

def test_options_kwargs_MQTTSource(self):
print ('\n---------'+str(self))
src = MQTTSource(server_uri='tcp://server:1833',
topics=['topic1', 'topic2'],
schema=[MqttDataTuple],
data_attribute_name='data',
#kwargs
vm_arg = ["-Xmx1G"],
ssl_debug = True,
reconnection_bound = 5,
qos = [1, 2],
message_queue_size = 122,
trusted_certs = ['cert1', 'cert2'],
truststore = "/truststore",
truststore_password = "trustpasswd",
client_cert = 'client_cert',
client_private_key = 'private_key',
keystore = "/keystore",
keystore_password = "keypasswd",
ssl_protocol = 'TLSv1.2',
app_config_name = "abbconf",
client_id = "client-IDsink",
command_timeout_millis = 47,
keep_alive_seconds = 3,
password = "passw0rd",
username = "rolef")
self.assertEqual(src.server_uri, 'tcp://server:1833')
self.assertListEqual(src._topics, ['topic1', 'topic2'])
self.assertEqual(src._data_attribute_name, 'data')
self.assertEqual(src.reconnection_bound, 5)
self.assertEqual(src.ssl_debug, True)
self.assertListEqual(src.vm_arg, ["-Xmx1G"])
self.assertListEqual(src.qos, [1,2])
self.assertListEqual(src.trusted_certs, ['cert1', 'cert2'])
self.assertEqual(src.truststore, '/truststore')
self.assertEqual(src.truststore_password, 'trustpasswd')
self.assertEqual(src.client_cert, 'client_cert')
self.assertEqual(src.client_private_key, 'private_key')
self.assertEqual(src.keystore, '/keystore')
self.assertEqual(src.keystore_password, 'keypasswd')
self.assertEqual(src.ssl_protocol, 'TLSv1.2')
self.assertEqual(src.app_config_name, 'abbconf')
self.assertEqual(src.client_id, 'client-IDsink')
self.assertEqual(src.command_timeout_millis, 47)
self.assertEqual(src.keep_alive_seconds, 3)
self.assertEqual(src.password, 'passw0rd')
self.assertEqual(src.username, 'rolef')
self.assertEqual(src.message_queue_size, 122)


class Test(unittest.TestCase):

@classmethod
def setUpClass(self):
print (str(self))
self.mqtt_toolkit_home = os.environ["MQTT_TOOLKIT_HOME"]

def _build_only(self, name, topo):
result = streamsx.topology.context.submit(ContextTypes.TOOLKIT, topo.graph) # creates tk* directory
print(name + ' (TOOLKIT):' + str(result))
assert(result.return_code == 0)
result = streamsx.topology.context.submit(ContextTypes.BUNDLE, topo.graph) # creates sab file
print(name + ' (BUNDLE):' + str(result))
assert(result.return_code == 0)

def _get_app_config(self):
creds_file = pathlib.Path.cwd().joinpath('streamsx','mqtt', 'tests', 'appConfig.json')
with open(creds_file) as data_file:
credentials = json.load(data_file)

# credentials = json.loads('{"userID" : "user", "password" : "xxx", "serverURI" : "xxx" }')
return credentials

def _get_device_config(self):
creds_file = pathlib.Path.cwd().joinpath('streamsx','mqtt', 'tests', 'deviceConfig.json')
with open(creds_file) as data_file:
credentials = json.load(data_file)
return credentials

def _create_stream(self, topo):
s = topo.source([("test",'{"id":"testid1"}'),("test",'{"id":"testid2"}'),("test",'{"id":"testid3"}')])
return s.map(lambda x : {'topic_name':x[0],'data':x[1]}, schema=MqttDataTuple)


def test_compile_MQTTSource(self):
print ('\n---------'+str(self))
Expand Down Expand Up @@ -141,7 +240,6 @@ def test_compile_MQTTSource(self):
# build only
self._build_only(name, topo)


def test_compile_MQTTSink(self):
print ('\n---------'+str(self))
name = 'test_MQTTSink'
Expand Down

0 comments on commit 9def18c

Please sign in to comment.