Skip to content

Commit 9ecbd39

Browse files
committed
Make several modifications to executor logic
Change asyncio "join" method; construct "delayed_backgrd_task" method; various other minor changes
1 parent 8ff97d8 commit 9ecbd39

File tree

5 files changed

+84
-79
lines changed

5 files changed

+84
-79
lines changed

graphql_subscriptions/executors/asyncio.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -60,41 +60,33 @@ def ws_send(ws, msg):
6060
def ws_recv(ws):
6161
return ws.recv()
6262

63-
@staticmethod
64-
def sleep(time):
65-
return asyncio.sleep(time)
66-
67-
@staticmethod
68-
@asyncio.coroutine
69-
def set_timeout(callback, period):
70-
while True:
71-
callback()
72-
yield from asyncio.sleep(period)
63+
def sleep(self, time):
64+
return self.loop.run_until_complete(asyncio.sleep(time))
7365

7466
@staticmethod
7567
def kill(future):
7668
future.cancel()
7769

78-
@staticmethod
79-
def join(future, timeout):
80-
return asyncio.wait_for(future, timeout=timeout)
70+
def join(self, future, timeout=None):
71+
return self.loop.run_until_complete(asyncio.wait_for(future,
72+
timeout=timeout))
8173

82-
def join_all(self):
83-
while self.futures:
84-
futures = self.futures
85-
self.futures = []
86-
asyncio.wait(futures)
87-
return futures
74+
@staticmethod
75+
@asyncio.coroutine
76+
def delayed_backgrd_task(func, callback=None, period=0, *arg, **kwargs):
77+
while True:
78+
if func:
79+
result = func(*arg, **kwargs)
80+
if asyncio.iscoroutine(result):
81+
msg = yield from result
82+
if callback:
83+
callback(msg)
84+
if callback:
85+
callback(result)
86+
yield from asyncio.sleep(period)
8887

8988
def execute(self, fn, *args, **kwargs):
9089
coro = fn(*args, **kwargs)
9190
future = ensure_future(coro, loop=self.loop)
9291
self.futures.append(future)
9392
return future
94-
95-
def execute_and_call_callback(self, fn, callback, *args, **kwargs):
96-
coro = fn(*args, **kwargs)
97-
future = ensure_future(coro, loop=self.loop)
98-
self.futures.append(future)
99-
future.add_done_callback(callback)
100-
return future

graphql_subscriptions/executors/gevent.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def sleep(time):
4040
gevent.sleep(time)
4141

4242
@staticmethod
43-
def set_timeout(callback, period):
43+
def delayed_backgrd_task(callback, period):
4444
while True:
4545
callback()
4646
gevent.sleep(period)
@@ -53,11 +53,6 @@ def kill(greenlet):
5353
def join(greenlet):
5454
greenlet.join()
5555

56-
def join_all(self):
57-
joined_greenlets = gevent.joinall(self.greenlets)
58-
self.greenlets = []
59-
return joined_greenlets
60-
6156
def execute(self, fn, *args, **kwargs):
6257
greenlet = gevent.spawn(fn, *args, **kwargs)
6358
self.greenlets.append(greenlet)

graphql_subscriptions/subscription_manager/pubsub.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(self,
3636
self.pubsub = self.redis.pubsub()
3737

3838
self.executor = executor()
39-
self.get_message_task = None
39+
self.backgrd_task = None
4040

4141
self.subscriptions = {}
4242
self.sub_id_counter = 1
@@ -57,8 +57,8 @@ def subscribe(self, trigger_name, on_message_handler, options):
5757
self.subscriptions[self.sub_id_counter] = [
5858
trigger_name, on_message_handler
5959
]
60-
if not self.get_message_task:
61-
self.get_message_task = self.executor.execute(
60+
if not self.backgrd_task:
61+
self.backgrd_task = self.executor.execute(
6262
self.wait_and_get_message)
6363
return Promise.resolve(self.sub_id_counter)
6464

@@ -73,17 +73,30 @@ def unsubscribe(self, sub_id):
7373
self.executor.execute(self.pubsub.unsubscribe, trigger_name)
7474

7575
if not self.subscriptions:
76-
self.get_message_task = self.executor.kill(self.get_message_task)
76+
self.backgrd_task = self.executor.kill(self.backgrd_task)
7777

78-
def wait_and_get_message(self):
78+
def _wait_and_get_message_async(self):
79+
self.executor.execute(self.executor.delayed_backgrd_task,
80+
self.pubsub.get_message,
81+
self.handle_message,
82+
.001,
83+
ignore_subscribe_messages=True)
84+
85+
def _wait_and_get_message_sync(self):
7986
while True:
80-
# import pytest; pytest.set_trace()
8187
message = self.pubsub.get_message(ignore_subscribe_messages=True)
8288
if message:
8389
self.handle_message(message)
8490
self.executor.sleep(.001)
8591

92+
def wait_and_get_message(self):
93+
if hasattr(self.executor, 'loop'):
94+
self._wait_and_get_message_async()
95+
else:
96+
self._wait_and_get_message_sync()
97+
8698
def handle_message(self, message):
99+
87100
if isinstance(message['channel'], bytes):
88101
channel = message['channel'].decode()
89102

graphql_subscriptions/subscription_transport_ws/server.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ def __init__(self,
1919
on_connect=None,
2020
on_disconnect=None):
2121

22-
assert subscription_manager, "Must provide\
23-
'subscription_manager' to websocket app constructor"
22+
assert subscription_manager, ("Must provide\
23+
'subscription_manager' to websocket app constructor")
2424

2525
self.ws = websocket
2626
self.subscription_manager = subscription_manager
@@ -54,9 +54,11 @@ def _handle_async(self):
5454

5555
while True:
5656
try:
57-
self.executor.execute_and_call_callback(self.executor.ws_recv,
58-
self._on_message,
59-
self.ws)
57+
self.executor.execute(self.executor.delayed_backgrd_task,
58+
self.executor.ws_recv,
59+
self.on_message,
60+
0,
61+
self.ws)
6062
except self.executor.error:
6163
self.on_close()
6264
break
@@ -86,9 +88,11 @@ def keep_alive_callback():
8688
self.executor.kill(keep_alive_timer)
8789

8890
if self.keep_alive:
89-
keep_alive_timer = self.executor.execute(self.executor.set_timeout,
90-
keep_alive_callback,
91-
self.keep_alive)
91+
keep_alive_timer = self.executor.execute(
92+
self.executor.delayed_backgrd_task,
93+
None,
94+
keep_alive_callback,
95+
self.keep_alive)
9296

9397
def on_close(self):
9498
for sub_id in list(self.connection_subscriptions.keys()):
@@ -98,13 +102,12 @@ def on_close(self):
98102
if self.on_disconnect:
99103
self.on_disconnect(self.ws)
100104

101-
def _on_message(self, future):
102-
self.on_message(future.result())
103-
104105
def on_message(self, msg):
105106

106107
if msg is None:
107108
return
109+
elif hasattr(msg, 'result'): # check if future from asyncio
110+
msg = msg.result()
108111

109112
non_local = {'on_init_resolve': None, 'on_init_reject': None}
110113

@@ -184,8 +187,9 @@ def subscription_start_promise_handler(init_result):
184187

185188
def promised_params_handler(params):
186189
if not isinstance(params, dict):
187-
error = 'Invalid params returned from\
188-
OnSubscribe! Return value must be an dict'
190+
error = ('Invalid params returned from'
191+
'OnSubscribe! Return value must be an'
192+
'dict')
189193

190194
self.send_subscription_fail(
191195
sub_id, {'errors': [{

tests/test_subscription_manager.py

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111
from graphql_subscriptions import RedisPubsub, SubscriptionManager
1212
from graphql_subscriptions.subscription_manager.validation import (
1313
SubscriptionHasSingleRootField)
14-
from graphql_subscriptions.executors.gevent import GeventExecutor
14+
# from graphql_subscriptions.executors.gevent import GeventExecutor
15+
from graphql_subscriptions.executors.asyncio import AsyncioExecutor
1516

1617

17-
@pytest.fixture(params=[GeventExecutor])
18+
@pytest.fixture(params=[AsyncioExecutor])
1819
def executor(request):
1920
return request.param
2021

@@ -31,13 +32,13 @@ def test_pubsub_subscribe_and_publish(pubsub, executor, test_input, expected):
3132
def message_callback(message):
3233
try:
3334
assert message == expected
34-
executor.kill(pubsub.get_message_task)
35+
executor.kill(pubsub.backgrd_task)
3536
except AssertionError as e:
3637
sys.exit(e)
3738

3839
def publish_callback(sub_id):
3940
assert pubsub.publish('a', test_input)
40-
executor.join(pubsub.get_message_task)
41+
executor.join(pubsub.backgrd_task)
4142

4243
p1 = pubsub.subscribe('a', message_callback, {})
4344
p2 = p1.then(publish_callback)
@@ -52,7 +53,7 @@ def unsubscribe_publish_callback(sub_id):
5253
pubsub.unsubscribe(sub_id)
5354
assert pubsub.publish('a', 'test')
5455
try:
55-
executor.join(pubsub.get_message_task)
56+
executor.join(pubsub.backgrd_task)
5657
except AttributeError:
5758
return
5859

@@ -190,14 +191,14 @@ def test_subscribe_with_valid_query_and_return_root_value(sub_mgr, executor):
190191
def callback(e, payload):
191192
try:
192193
assert payload.data.get('testSubscription') == 'good'
193-
executor.kill(sub_mgr.pubsub.get_message_task)
194+
executor.kill(sub_mgr.pubsub.backgrd_task)
194195
except AssertionError as e:
195196
sys.exit(e)
196197

197198
def publish_and_unsubscribe_handler(sub_id):
198199
sub_mgr.publish('testSubscription', 'good')
199200
# pytest.set_trace()
200-
executor.join(sub_mgr.pubsub.get_message_task)
201+
executor.join(sub_mgr.pubsub.backgrd_task)
201202
sub_mgr.unsubscribe(sub_id)
202203

203204
p1 = sub_mgr.subscribe(query, 'X', callback, {}, {}, None, None)
@@ -218,14 +219,14 @@ def callback(err, payload):
218219
assert True
219220
else:
220221
assert payload.data.get('testFilter') == 'good_filter'
221-
executor.kill(sub_mgr.pubsub.get_message_task)
222+
executor.kill(sub_mgr.pubsub.backgrd_task)
222223
except AssertionError as e:
223224
sys.exit(e)
224225

225226
def publish_and_unsubscribe_handler(sub_id):
226227
sub_mgr.publish('filter_1', {'filterBoolean': False})
227228
sub_mgr.publish('filter_1', {'filterBoolean': True})
228-
executor.join(sub_mgr.pubsub.get_message_task)
229+
executor.join(sub_mgr.pubsub.backgrd_task)
229230
sub_mgr.unsubscribe(sub_id)
230231

231232
p1 = sub_mgr.subscribe(query, 'Filter1', callback, {'filterBoolean': True},
@@ -247,15 +248,15 @@ def callback(err, payload):
247248
assert True
248249
else:
249250
assert payload.data.get('testFilter') == 'good_filter'
250-
executor.kill(sub_mgr.pubsub.get_message_task)
251+
executor.kill(sub_mgr.pubsub.backgrd_task)
251252
except AssertionError as e:
252253
sys.exit(e)
253254

254255
def publish_and_unsubscribe_handler(sub_id):
255256
sub_mgr.publish('filter_2', {'filterBoolean': False})
256257
sub_mgr.publish('filter_2', {'filterBoolean': True})
257258
try:
258-
executor.join(sub_mgr.pubsub.get_message_task)
259+
executor.join(sub_mgr.pubsub.backgrd_task)
259260
except:
260261
raise
261262
sub_mgr.unsubscribe(sub_id)
@@ -286,13 +287,13 @@ def callback(err, payload):
286287
except AssertionError as e:
287288
sys.exit(e)
288289
if non_local['trigger_count'] == 2:
289-
executor.kill(sub_mgr.pubsub.get_message_task)
290+
executor.kill(sub_mgr.pubsub.backgrd_task)
290291

291292
def publish_and_unsubscribe_handler(sub_id):
292293
sub_mgr.publish('not_a_trigger', {'filterBoolean': False})
293294
sub_mgr.publish('trigger_1', {'filterBoolean': True})
294295
sub_mgr.publish('trigger_2', {'filterBoolean': True})
295-
executor.join(sub_mgr.pubsub.get_message_task)
296+
executor.join(sub_mgr.pubsub.backgrd_task)
296297
sub_mgr.unsubscribe(sub_id)
297298

298299
p1 = sub_mgr.subscribe(query, 'multiTrigger', callback,
@@ -343,7 +344,7 @@ def unsubscribe_and_publish_handler(sub_id):
343344
sub_mgr.unsubscribe(sub_id)
344345
sub_mgr.publish('testSubscription', 'good')
345346
try:
346-
executor.join(sub_mgr.pubsub.get_message_task)
347+
executor.join(sub_mgr.pubsub.backgrd_task)
347348
except AttributeError:
348349
return
349350

@@ -393,17 +394,17 @@ def test_calls_the_error_callback_if_there_is_an_execution_error(
393394
def callback(err, payload):
394395
try:
395396
assert payload is None
396-
assert err.message == 'Variable "$uga" of required type\
397-
"Boolean!" was not provided.'
397+
assert err.message == ('Variable "$uga" of required type'
398+
'"Boolean!" was not provided.')
398399

399-
executor.kill(sub_mgr.pubsub.get_message_task)
400+
executor.kill(sub_mgr.pubsub.backgrd_task)
400401
except AssertionError as e:
401402
sys.exit(e)
402403

403404
def unsubscribe_and_publish_handler(sub_id):
404405
sub_mgr.publish('testSubscription', 'good')
405406
try:
406-
executor.join(sub_mgr.pubsub.get_message_task)
407+
executor.join(sub_mgr.pubsub.backgrd_task)
407408
except AttributeError:
408409
return
409410
sub_mgr.unsubscribe(sub_id)
@@ -427,14 +428,14 @@ def callback(err, payload):
427428
try:
428429
assert err is None
429430
assert payload.data.get('testContext') == 'trigger'
430-
executor.kill(sub_mgr.pubsub.get_message_task)
431+
executor.kill(sub_mgr.pubsub.backgrd_task)
431432
except AssertionError as e:
432433
sys.exit(e)
433434

434435
def unsubscribe_and_publish_handler(sub_id):
435436
sub_mgr.publish('context_trigger', 'ignored')
436437
try:
437-
executor.join(sub_mgr.pubsub.get_message_task)
438+
executor.join(sub_mgr.pubsub.backgrd_task)
438439
except AttributeError:
439440
return
440441
sub_mgr.unsubscribe(sub_id)
@@ -459,14 +460,14 @@ def callback(err, payload):
459460
try:
460461
assert payload is None
461462
assert str(err) == 'context error'
462-
executor.kill(sub_mgr.pubsub.get_message_task)
463+
executor.kill(sub_mgr.pubsub.backgrd_task)
463464
except AssertionError as e:
464465
sys.exit(e)
465466

466467
def unsubscribe_and_publish_handler(sub_id):
467468
sub_mgr.publish('context_trigger', 'ignored')
468469
try:
469-
executor.join(sub_mgr.pubsub.get_message_task)
470+
executor.join(sub_mgr.pubsub.backgrd_task)
470471
except AttributeError:
471472
return
472473
sub_mgr.unsubscribe(sub_id)
@@ -529,16 +530,16 @@ def test_should_not_allow_inline_fragments(validation_schema):
529530
errors = validate(validation_schema,
530531
parse(sub), [SubscriptionHasSingleRootField])
531532
assert len(errors) == 1
532-
assert errors[0].message == 'Apollo subscriptions do not support\
533-
fragments on the root field'
533+
assert errors[0].message == ('Apollo subscriptions do not support'
534+
'fragments on the root field')
534535

535536

536537
def test_should_not_allow_fragments(validation_schema):
537-
sub = 'subscription S5{ ...testFragment }\
538-
fragment testFragment on Subscription{ test2 }'
538+
sub = ('subscription S5{ ...testFragment }'
539+
'fragment testFragment on Subscription{ test2 }')
539540

540541
errors = validate(validation_schema,
541542
parse(sub), [SubscriptionHasSingleRootField])
542543
assert len(errors) == 1
543-
assert errors[0].message == 'Apollo subscriptions do not support\
544-
fragments on the root field'
544+
assert errors[0].message == ('Apollo subscriptions do not support'
545+
'fragments on the root field')

0 commit comments

Comments
 (0)