Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UIS Delta subscriptions & GraphQL null stripping back-end #118

Merged
merged 3 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 47 additions & 45 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@
"""

import asyncio
import logging

from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from functools import partial
import logging
from time import sleep

from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network.scan import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.data_store_mgr import (
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW,
DELTAS_MAP, apply_delta, generate_checksum
EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
apply_delta, generate_checksum, create_delta_store
)
from .workflows_mgr import workflow_request

Expand All @@ -62,10 +62,10 @@ def __init__(self, workflows_mgr):
self.workflows_mgr = workflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
self.topics.add(b'shutdown')
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
self.executors = {}
self.delta_queues = {}

async def sync_workflow(self, w_id, *args, **kwargs):
"""Run data store sync with workflow services.
Expand All @@ -80,6 +80,8 @@ async def sync_workflow(self, w_id, *args, **kwargs):
if w_id in self.w_subs:
return

self.delta_queues[w_id] = {}

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executors[w_id] = ThreadPoolExecutor()
Expand All @@ -95,6 +97,8 @@ def purge_workflow(self, w_id):
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]
if w_id in self.delta_queues:
del self.delta_queues[w_id]
self.executors[w_id].shutdown(wait=True)
del self.executors[w_id]

Expand Down Expand Up @@ -140,26 +144,33 @@ def update_workflow_data(self, topic, delta, w_id):
loop_cnt += 1
continue
if topic == 'shutdown':
self.delta_store_to_queues(w_id, topic, delta)
self.workflows_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
return
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
# otherwise apply the delta if it's newer than the previously applied.
if delta.reloaded:
if topic == WORKFLOW:
self.data[w_id][topic].CopyFrom(delta)
else:
self.data[w_id][topic] = {
ele.id: ele
for ele in delta.deltas
}
self.data[w_id]['delta_times'][topic] = delta_time
elif delta_time >= self.data[w_id]['delta_times'][topic]:
apply_delta(topic, delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = delta_time
self.reconcile_update(topic, delta, w_id)
for field, sub_delta in delta.ListFields():
delta_time = getattr(sub_delta, 'time', 0.0)
# If the workflow has reloaded clear the data before
# delta application.
if sub_delta.reloaded:
if field.name == WORKFLOW:
self.data[w_id][field.name].Clear()
else:
self.data[w_id][field.name].clear()
Comment on lines +155 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here, is there a difference between Clear and clear?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One's a Protobuf element (i.e. PbWorkflow), so uses Clear, the other(s) a dictionary.

self.data[w_id]['delta_times'][field.name] = 0.0
# Apply the delta if newer than the previously applied.
if delta_time >= self.data[w_id]['delta_times'][field.name]:
apply_delta(field.name, sub_delta, self.data[w_id])
self.data[w_id]['delta_times'][field.name] = delta_time
self.reconcile_update(field.name, sub_delta, w_id)

self.delta_store_to_queues(w_id, topic, delta)

def delta_store_to_queues(self, w_id, topic, delta):
# Queue delta for graphql subscription resolving
if self.delta_queues[w_id]:
delta_store = create_delta_store(delta, w_id)
for delta_queue in self.delta_queues[w_id].values():
delta_queue.put((w_id, topic, delta_store))

def reconcile_update(self, topic, delta, w_id):
"""Reconcile local with workflow data-store.
Expand Down Expand Up @@ -195,13 +206,12 @@ def reconcile_update(self, topic, delta, w_id):
try:
_, new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
except asyncio.TimeoutError:
logger.info(
logger.debug(
f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the subscription/sync.'
)
future.cancel()
self.workflows_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
except Exception as exc:
logger.exception(exc)
else:
Expand Down Expand Up @@ -238,26 +248,18 @@ async def entire_workflow_update(self, ids=None):
if not ids or kwargs['req_context'] in ids:
gathers += (workflow_request(**kwargs),)
items = await asyncio.gather(*gathers)
new_data = {}
for w_id, result in items:
if result is not None and result != MSG_TIMEOUT:
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data[w_id] = {
EDGES: {e.id: e for e in getattr(pb_data, EDGES)},
FAMILIES: {f.id: f for f in getattr(pb_data, FAMILIES)},
FAMILY_PROXIES: {
n.id: n
for n in getattr(pb_data, FAMILY_PROXIES)},
JOBS: {j.id: j for j in getattr(pb_data, JOBS)},
TASKS: {t.id: t for t in getattr(pb_data, TASKS)},
TASK_PROXIES: {
n.id: n
for n in getattr(pb_data, TASK_PROXIES)},
WORKFLOW: getattr(pb_data, WORKFLOW),
'delta_times': {
topic: getattr(pb_data, WORKFLOW).last_updated
for topic in DELTAS_MAP.keys()}
}

self.data.update(new_data)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
10 changes: 10 additions & 0 deletions cylc/uiserver/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ def context(self):
def prepare(self):
super().prepare()

async def execute(self, *args, **kwargs):
# Use own backend, and TornadoGraphQLHandler already does validation.
return await self.schema.execute(
*args,
backend=self.backend,
variable_values=kwargs.get('variables'),
validate=False,
**kwargs,
)


class SubscriptionHandler(websocket.WebSocketHandler):

Expand Down
35 changes: 25 additions & 10 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

from tornado import web, ioloop

from cylc.flow.network.graphql import (
CylcGraphQLBackend, IgnoreFieldMiddleware
)
from cylc.flow.network.schema import schema
from graphene_tornado.tornado_graphql_handler import TornadoGraphQLHandler
from jupyterhub.services.auth import HubOAuthCallbackHandler
Expand Down Expand Up @@ -76,7 +79,7 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
self.workflows_mgr = WorkflowsManager(self)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_store_mgr.data,
self.data_store_mgr,
workflows_mgr=self.workflows_mgr)

def _create_static_handler(
Expand Down Expand Up @@ -140,6 +143,8 @@ def _create_graphql_handler(
clazz,
schema=schema,
resolvers=self.resolvers,
backend=CylcGraphQLBackend(),
middleware=[IgnoreFieldMiddleware],
**kwargs
)

Expand All @@ -151,7 +156,11 @@ def _make_app(self, debug: bool):
"""
logger.info(self._static)
# subscription/websockets server
subscription_server = TornadoSubscriptionServer(schema)
subscription_server = TornadoSubscriptionServer(
schema,
backend=CylcGraphQLBackend(),
middleware=[IgnoreFieldMiddleware],
)
return MyApplication(
static_path=self._static,
debug=debug,
Expand All @@ -166,14 +175,20 @@ def _make_app(self, debug: bool):
self._create_handler("userprofile",
UserProfileHandler),
# graphql handlers
self._create_graphql_handler("graphql",
UIServerGraphQLHandler),
self._create_graphql_handler("graphql/batch",
UIServerGraphQLHandler,
batch=True),
self._create_graphql_handler("graphql/graphiql",
GraphiQLHandler,
graphiql=True),
self._create_graphql_handler(
"graphql",
UIServerGraphQLHandler,
),
self._create_graphql_handler(
"graphql/batch",
UIServerGraphQLHandler,
batch=True
),
self._create_graphql_handler(
"graphql/graphiql",
GraphiQLHandler,
graphiql=True
),
# subscription/websockets handler
self._create_handler("subscriptions",
SubscriptionHandler,
Expand Down
34 changes: 31 additions & 3 deletions cylc/uiserver/websockets/tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
# The file was copied from this revision:
# https://github.com/graphql-python/graphql-ws/blob/cf560b9a5d18d4a3908dc2cfe2199766cc988fef/graphql_ws/tornado.py

from inspect import isawaitable
from inspect import isawaitable, isclass

from asyncio import create_task, gather, wait, shield, sleep
from asyncio.queues import QueueEmpty
from tornado.websocket import WebSocketClosedError
from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql.execution.middleware import MiddlewareManager
from graphql_ws.base import ConnectionClosedException, BaseConnectionContext, BaseSubscriptionServer
from graphql_ws.observable_aiter import setup_observable_extension
from graphql_ws.constants import (
Expand Down Expand Up @@ -46,14 +47,41 @@ async def close(self, code):


class TornadoSubscriptionServer(BaseSubscriptionServer):
def __init__(self, schema, keep_alive=True, loop=None):
def __init__(self, schema, keep_alive=True, loop=None, backend=None, middleware=None):
self.loop = loop
self.backend = backend or None
self.middleware = middleware
super().__init__(schema, keep_alive)

@staticmethod
def instantiate_middleware(middlewares):
for middleware in middlewares:
if isclass(middleware):
yield middleware()
continue
yield middleware

def get_graphql_params(self, *args, **kwargs):
params = super(TornadoSubscriptionServer,
self).get_graphql_params(*args, **kwargs)
return dict(params, return_promise=True, executor=AsyncioExecutor(loop=self.loop))
# If middleware get instantiated here (optional), they will
# be local/private to each subscription.
if self.middleware is not None:
middleware = list(
self.instantiate_middleware(self.middleware)
)
else:
middleware = self.middleware
return dict(
params,
return_promise=True,
executor=AsyncioExecutor(loop=self.loop),
backend=self.backend,
middleware=MiddlewareManager(
*middleware,
wrap_in_promise=False
),
)

async def _handle(self, ws, request_context):
connection_context = TornadoConnectionContext(ws, request_context)
Expand Down