Skip to content

Commit bc2ac81

Browse files
committed
Concurrency improvements: Don’t use the main reactor thread for MQTT message processing. Use a different thread pool instead.
1 parent 83e34b6 commit bc2ac81

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

CHANGES.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ in progress
77
===========
88

99

10+
.. _kotori-0.13.0:
11+
12+
2017-03-20 0.13.0
13+
=================
14+
- Concurrency improvements: Don’t use the main reactor thread for MQTT message processing. Use a different thread pool instead.
15+
16+
1017
.. _kotori-0.12.3:
1118

1219
2017-03-20 0.12.3

kotori/daq/services/mig.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import time
44
import json
55
from bunch import Bunch
6+
from kotori.thimble import Thimble
67
from twisted.logger import Logger, LogLevel
78
from twisted.internet import reactor, threads
89
from twisted.internet.task import LoopingCall
910
from twisted.application.service import MultiService
11+
from twisted.python.threadpool import ThreadPool
1012
from kotori.configuration import read_list
1113
from kotori.daq.services import MultiServiceMixin
1214
from kotori.daq.intercom.mqtt import MqttAdapter
@@ -43,6 +45,10 @@ def setupService(self):
4345

4446
self.registerService(self.mqtt_service)
4547

48+
# Perform MQTT message processing using a different thread pool
49+
self.threadpool = ThreadPool()
50+
self.thimble = Thimble(reactor, self.threadpool, self, ["process_message"])
51+
4652
def startService(self):
4753
self.setupService()
4854
self.log(log.info, u'Starting')
@@ -66,7 +72,11 @@ def mqtt_receive(self, topic=None, payload=None, **kwargs):
6672
#return self.process_message(topic, payload, **kwargs)
6773

6874
# Asynchronous message processing
69-
deferred = threads.deferToThread(self.process_message, topic, payload, **kwargs)
75+
#deferred = threads.deferToThread(self.process_message, topic, payload, **kwargs)
76+
77+
# Asynchronous message processing using different thread pool
78+
deferred = self.thimble.process_message(topic, payload, **kwargs)
79+
7080
deferred.addErrback(self.mqtt_receive_error, topic)
7181
return deferred
7282

kotori/thimble.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""Implementation of a Twisted-friendly thread pool wrapper."""
2+
from functools import partial
3+
from twisted.internet.threads import deferToThreadPool
4+
from twisted.internet.defer import fail
5+
from twisted.internet.error import ReactorNotRunning
6+
7+
# Shamelessly stolen from:
8+
# https://github.com/lvh/thimble
9+
# https://pypi.python.org/pypi/thimble
10+
11+
class Thimble(object):
12+
13+
"""A Twisted thread-pool wrapper for a blocking API."""
14+
15+
def __init__(self, reactor, pool, wrapped, blocking_methods):
16+
"""Initialize a :class:`Thimble`.
17+
:param reactor: The reactor that will handle events.
18+
:type reactor: :class:`twisted.internet.interfaces.IReactorThreads` and
19+
:class:`twisted.internet.interfaces.IReactorCore`. Pretty much any
20+
real reactor implementation will do.
21+
:param pool: The thread pool to defer to.
22+
:type pool: :class:`twisted.python.threadpool.ThreadPool`
23+
:param wrapped: The blocking implementation being wrapped.
24+
:param blocking_methods: The names of the methods that will be wrapped
25+
and executed in the thread pool.
26+
:type blocking_methods: ``list`` of native ``str``
27+
"""
28+
self._reactor = reactor
29+
self._pool = pool
30+
self._wrapped = wrapped
31+
self._blocking_methods = blocking_methods
32+
33+
def _deferToThreadPool(self, f, *args, **kwargs):
34+
"""Defer execution of ``f(*args, **kwargs)`` to the thread pool.
35+
This returns a deferred which will callback with the result of
36+
that expression, or errback with a failure wrapping the raised
37+
exception.
38+
"""
39+
if self._pool.joined:
40+
return fail(
41+
ReactorNotRunning("This thimble's threadpool already stopped.")
42+
)
43+
if not self._pool.started:
44+
self._pool.start()
45+
self._reactor.addSystemEventTrigger(
46+
'during', 'shutdown', self._pool.stop)
47+
48+
return deferToThreadPool(self._reactor, self._pool, f, *args, **kwargs)
49+
50+
def __getattr__(self, attr):
51+
"""Get and maybe wraps an attribute from the wrapped object.
52+
If the attribute is blocking, it will be wrapped so that
53+
calling it will return a Deferred and the actual function will
54+
be ran in a thread pool.
55+
"""
56+
value = getattr(self._wrapped, attr)
57+
58+
if attr in self._blocking_methods:
59+
value = partial(self._deferToThreadPool, value)
60+
61+
return value

0 commit comments

Comments
 (0)