Skip to content

Commit

Permalink
Merge 59f9d28 into 0d6030c
Browse files Browse the repository at this point in the history
  • Loading branch information
romancardenas committed Mar 19, 2018
2 parents 0d6030c + 59f9d28 commit 96f7135
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 2 deletions.
44 changes: 42 additions & 2 deletions hbmqtt/broker.py
Expand Up @@ -465,7 +465,8 @@ def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterA
subscriptions = subscribe_waiter.result()
return_codes = []
for subscription in subscriptions['topics']:
return_codes.append(self.add_subscription(subscription, client_session))
result = yield from self.add_subscription(subscription, client_session)
return_codes.append(result)
yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)
for index, subscription in enumerate(subscriptions['topics']):
if return_codes[index] != 0x80:
Expand Down Expand Up @@ -559,6 +560,41 @@ def authenticate(self, session: Session, listener):
# If all plugins returned True, authentication is success
return auth_result

@asyncio.coroutine
def topic_filtering(self, session: Session, topic):
"""
This method call the topic_filtering method on registered plugins to check that the subscription is allowed.
User is considered allowed if all plugins called return True.
Plugins topic_filtering() method are supposed to return :
- True if MQTT client can be subscribed to the topic
- False if MQTT client is not allowed to subscribe to the topic
- None if topic filtering can't be achieved (then plugin result is then ignored)
:param session:
:param listener:
:param topic: Topic in which the client wants to subscribe
:return:
"""
topic_plugins = None
topic_config = self.config.get('topic-check', None)
if topic_config and topic_config.get('enabled', False):
topic_plugins = topic_config.get('plugins', None)
returns = yield from self.plugins_manager.map_plugin_coro(
"topic_filtering",
session=session,
topic=topic,
filter_plugins=topic_plugins)
topic_result = True
if returns:
for plugin in returns:
res = returns[plugin]
if res is False:
topic_result = False
self.logger.debug("Topic filtering failed due to '%s' plugin result: %s" % (plugin.name, res))
else:
self.logger.debug("'%s' plugin result: %s" % (plugin.name, res))
# If all plugins returned True, authentication is success
return topic_result

def retain_message(self, source_session, topic_name, data, qos=None):
if data is not None and data != b'':
# If retained flag set, store the message for further subscriptions
Expand All @@ -571,6 +607,7 @@ def retain_message(self, source_session, topic_name, data, qos=None):
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
del self._retained_messages[topic_name]

@asyncio.coroutine
def add_subscription(self, subscription, session):
try:
a_filter = subscription[0]
Expand All @@ -582,7 +619,10 @@ def add_subscription(self, subscription, session):
if "/+" not in a_filter and "+/" not in a_filter:
# [MQTT-4.7.1-3] + wildcard character must occupy entire level
return 0x80

# Check if the client is authorised to connect to the topic
permitted = yield from self.topic_filtering(session, topic=a_filter)
if not permitted:
return 0x80
qos = subscription[1]
if 'max-qos' in self.config and qos > self.config['max-qos']:
qos = self.config['max-qos']
Expand Down
37 changes: 37 additions & 0 deletions hbmqtt/plugins/topic_checking.py
@@ -0,0 +1,37 @@
import asyncio


class BaseTopicPlugin:
def __init__(self, context):
self.context = context
try:
self.topic_config = self.context.config['topic-check']
except KeyError:
self.context.logger.warning("'topic-check' section not found in context configuration")

def topic_filtering(self, *args, **kwargs):
if not self.topic_config:
# auth config section not found
self.context.logger.warning("'auth' section not found in context configuration")
return False
return True


class TopicTabooPlugin(BaseTopicPlugin):
def __init__(self, context):
super().__init__(context)
self._taboo = ['prohibited', 'top-secret', 'data/classified']

@asyncio.coroutine
def topic_filtering(self, *args, **kwargs):
filter_result = super().topic_filtering(*args, **kwargs)
if filter_result:
session = kwargs.get('session', None)
topic = kwargs.get('topic', None)
if session.username and topic:
if session.username != 'admin' and topic in self._taboo:
return False
return True
else:
return False
return filter_result
49 changes: 49 additions & 0 deletions samples/broker_taboo.py
@@ -0,0 +1,49 @@
import logging
import asyncio
import os
from hbmqtt.broker import Broker

logger = logging.getLogger(__name__)

config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': '0.0.0.0:1883',
},
'ws-mqtt': {
'bind': '127.0.0.1:8080',
'type': 'ws',
'max_connections': 10,
},
},
'sys_interval': 10,
'auth': {
'allow-anonymous': True,
'password-file': os.path.join(os.path.dirname(os.path.realpath(__file__)), "passwd"),
'plugins': [
'auth_file', 'auth_anonymous'
]

},
'topic-check': {
'enabled': True,
'plugins': [
'topic_taboo'
]
}
}

broker = Broker(config)


@asyncio.coroutine
def test_coro():
yield from broker.start()


if __name__ == '__main__':
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_forever()
33 changes: 33 additions & 0 deletions samples/client_publish_taboo.py
@@ -0,0 +1,33 @@
import logging
import asyncio

from hbmqtt.client import MQTTClient, ConnectException


#
# This sample shows how to publish messages to broker using different QOS
# Debug outputs shows the message flows
#

logger = logging.getLogger(__name__)


@asyncio.coroutine
def test_coro():
try:
C = MQTTClient()
yield from C.connect('mqtt://0.0.0.0:1883')
yield from C.publish('data/classified', b'TOP SECRET', qos=0x01)
yield from C.publish('data/memes', b'REAL FUN', qos=0x01)
logger.info("messages published")
yield from C.disconnect()
except ConnectException as ce:
logger.error("Connection failed: %s" % ce)
asyncio.get_event_loop().stop()


if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
formatter = "%(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
42 changes: 42 additions & 0 deletions samples/client_subscribe_taboo.py
@@ -0,0 +1,42 @@
import logging
import asyncio

from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1


#
# This sample shows how to subscbribe a topic and receive data from incoming messages
# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned
# by the broker.
#

logger = logging.getLogger(__name__)


@asyncio.coroutine
def uptime_coro():
C = MQTTClient()
yield from C.connect('mqtt://test:test@0.0.0.0:1883')
# Subscribe to '$SYS/broker/uptime' with QOS=1
yield from C.subscribe([
('data/memes', QOS_1), # Topic allowed
('data/classified', QOS_1), # Topic forbidden
])
logger.info("Subscribed")
try:
for i in range(1, 100):
message = yield from C.deliver_message()
packet = message.publish_packet
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
logger.info("UnSubscribed")
yield from C.disconnect()
except ClientException as ce:
logger.error("Client exception: %s" % ce)


if __name__ == '__main__':
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(uptime_coro())
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -47,6 +47,7 @@
'packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin',
'auth_anonymous = hbmqtt.plugins.authentication:AnonymousAuthPlugin',
'auth_file = hbmqtt.plugins.authentication:FileAuthPlugin',
'topic_taboo = hbmqtt.plugins.topic_checking:TopicTabooPlugin',
'broker_sys = hbmqtt.plugins.sys.broker:BrokerSysPlugin',
],
'hbmqtt.client.plugins': [
Expand Down

0 comments on commit 96f7135

Please sign in to comment.