/
pubsubs.py
145 lines (109 loc) · 4.54 KB
/
pubsubs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import sys
from twisted.internet.defer import inlineCallbacks, DeferredList
from twisted.internet import reactor, task
from twisted.internet.endpoints import clientFromString
from twisted.application.internet import ClientService, backoffPolicy
from twisted.logger import (
Logger, LogLevel, globalLogBeginner, textFileLogObserver,
FilteringLogObserver, LogLevelFilterPredicate)
from mqtt.client.factory import MQTTFactory
# ----------------
# Global variables
# ----------------
# Global object to control globally namespace logging
logLevelFilterPredicate = LogLevelFilterPredicate(defaultLogLevel=LogLevel.info)
BROKER = "tcp:test.mosquitto.org:1883"
# -----------------
# Utility Functions
# -----------------
def startLogging(console=True, filepath=None):
'''
Starts the global Twisted logger subsystem with maybe
stdout and/or a file specified in the config file
'''
global logLevelFilterPredicate
observers = []
if console:
observers.append( FilteringLogObserver(observer=textFileLogObserver(sys.stdout),
predicates=[logLevelFilterPredicate] ))
if filepath is not None and filepath != "":
observers.append( FilteringLogObserver(observer=textFileLogObserver(open(filepath,'a')),
predicates=[logLevelFilterPredicate] ))
globalLogBeginner.beginLoggingTo(observers)
def setLogLevel(namespace=None, levelStr='info'):
'''
Set a new log level for a given namespace
LevelStr is: 'critical', 'error', 'warn', 'info', 'debug'
'''
level = LogLevel.levelWithName(levelStr)
logLevelFilterPredicate.setLogLevelForNamespace(namespace=namespace, level=level)
# -----------------------
# MQTT Subscriber Service
# ------------------------
class MQTTService(ClientService):
def __init__(self, endpoint, factory):
ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
def startService(self):
log.info("starting MQTT Client Subscriber Service")
# invoke whenConnected() inherited method
self.whenConnected().addCallback(self.connectToBroker)
ClientService.startService(self)
@inlineCallbacks
def connectToBroker(self, protocol):
'''
Connect to MQTT broker
'''
self.protocol = protocol
self.protocol.onPublish = self.onPublish
self.protocol.onDisconnection = self.onDisconnection
self.protocol.setWindowSize(3)
self.task = task.LoopingCall(self.publish)
self.task.start(5.0, now=False)
try:
yield self.protocol.connect("TwistedMQTT-pubsubs", keepalive=60)
yield self.subscribe()
except Exception as e:
log.error("Connecting to {broker} raised {excp!s}",
broker=BROKER, excp=e)
else:
log.info("Connected and subscribed to {broker}", broker=BROKER)
def publish(self):
def _logFailure(failure):
log.debug("publisher reported {message}", message=failure.getErrorMessage())
return failure
d1 = self.protocol.publish(topic="foo/bar/baz1", qos=1, message="hello world 1")
d1.addErrback(_logFailure)
return d1
def subscribe(self):
def _logFailure(failure):
log.debug("subscriber reported {message}", message=failure.getErrorMessage())
return failure
def _logGrantedQoS(value):
log.debug("subscriber response {value!r}", value=value)
return True
d1 = self.protocol.subscribe("foo/bar/baz1", 2)
d1.addCallbacks(_logGrantedQoS, _logFailure)
return d1
def onPublish(self, topic, payload, qos, dup, retain, msgId):
'''
Callback Receiving messages from publisher
'''
log.debug("msg={payload}", payload=payload)
def onDisconnection(self, reason):
'''
get notfied of disconnections
and get a deferred for a new protocol object (next retry)
'''
log.debug("<Connection was lost !> <reason={r}>", r=reason)
self.whenConnected().addCallback(self.connectToBroker)
if __name__ == '__main__':
import sys
log = Logger()
startLogging()
setLogLevel(namespace='mqtt', levelStr='debug')
setLogLevel(namespace='__main__', levelStr='debug')
factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
myEndpoint = clientFromString(reactor, BROKER)
serv = MQTTService(myEndpoint, factory)
serv.startService()
reactor.run()