From 768c053e9c2b70ee345fdde5ca9ce40d014cac31 Mon Sep 17 00:00:00 2001 From: davebshow Date: Tue, 23 Aug 2016 16:33:29 -0400 Subject: [PATCH 1/2] Implemented side effect interface for gremlin-ppython --- .../python/TraversalSourceGenerator.groovy | 4 + .../driver/driver_remote_connection.py | 163 +++++++++++++----- .../driver/remote_connection.py | 1 - .../gremlin_python/process/traversal.py | 4 + 4 files changed, 126 insertions(+), 46 deletions(-) diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy index 742c60914da..84da1b7ae63 100644 --- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy +++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy @@ -81,6 +81,10 @@ class Traversal(object): if self.last_traverser.bulk <= 0: self.last_traverser = None return object + def sideEffects(self): + if self.traversers is None: + self.traversal_strategies.apply_strategies(self) + return self.side_effects def toList(self): return list(iter(self)) def toSet(self): diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index 1b42af48e7e..d7ddc895d83 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@ -1,4 +1,4 @@ -''' +""" Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -15,7 +15,7 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -''' +""" import json import uuid from tornado import gen @@ -32,10 +32,14 @@ class GremlinServerError(Exception): pass +def parse_traverser(result): + return Traverser(result["@value"], result["@value"]["bulk"]["@value"]) + + class DriverRemoteConnection(RemoteConnection): """Remote connection to the Gremlin Server""" - def __init__(self, url, traversal_source, loop=None, username='', password=''): + def __init__(self, url, traversal_source, loop=None, username="", password=""): super(DriverRemoteConnection, self).__init__(url, traversal_source) if loop is None: self._loop = ioloop.IOLoop.current() @@ -46,13 +50,15 @@ def __init__(self, url, traversal_source, loop=None, username='', password=''): def submit(self, bytecode, op="bytecode", - processor="traversal", - session=None): - traversers = self._loop.run_sync(lambda: self._submit(bytecode, op, processor, session)) - return RemoteTraversal(iter(traversers), {}) + processor="traversal"): + request_id = str(uuid.uuid4()) + traversers = self._loop.run_sync(lambda: self.submit_bytecode( + bytecode, request_id)) + return RemoteTraversal(iter(traversers), + SideEffectManager(self, request_id)) @gen.coroutine - def _submit(self, bytecode, op, processor, session): + def submit_bytecode(self, bytecode, request_id): """ Submit bytecode to Gremlin Server @@ -61,42 +67,100 @@ def _submit(self, bytecode, op, processor, session): "gremlin-groovy" by default :param str op: Gremlin Server op argument. "eval" by default. :param str processor: Gremlin Server processor argument. "" by default. - :param str session: Session id (optional). Typically a uuid :returns: :py:class:`Response` object """ - request_id = str(uuid.uuid4()) - message = self._prepare_message(bytecode, op, processor, session, request_id) + print(request_id) + message = self._get_bytecode_message(bytecode, request_id) + traversers = yield self._execute_message(message, parse_traverser) + raise gen.Return(traversers) + + @gen.coroutine + def submit_keys(self, request_id): + message = self._get_keys_message(request_id) + resp_parser = lambda result: result + keys = yield self._execute_message(message, resp_parser) + raise gen.Return(keys) + + @gen.coroutine + def submit_gather(self, request_id, key): + message = self._get_gather_message(request_id, key) + side_effects = yield self._execute_message(message, parse_traverser) + raise gen.Return(side_effects) + + @gen.coroutine + def _execute_message(self, message, resp_parser): if self._ws.protocol is None: self._ws = yield websocket.websocket_connect(self.url) self._ws.write_message(message, binary=True) - resp = Response(self._ws, processor, session, self._username, self._password) - traversers = [] + resp = Response(self._ws, self._username, self._password, resp_parser) + results = [] while True: msg = yield resp.receive() if msg is None: break - traversers += msg - raise gen.Return(traversers) + results += msg + raise gen.Return(results) def close(self): """Close underlying connection and mark as closed.""" self._ws.close() - def _prepare_message(self, bytecode, op, processor, session, request_id): + def _get_bytecode_message(self, bytecode, request_id): message = { - "requestId": request_id, - "op": op, - "processor": processor, + "requestId": { + "@type": "gremlin:uuid", + "@value": request_id + }, + "op": "bytecode", + "processor": "traversal", "args": { "gremlin": GraphSONWriter.writeObject(bytecode), - "aliases": {'g': self.traversal_source} + "aliases": {"g": self.traversal_source} } } - message = self._finalize_message(message, processor, session) + message = self._finalize_message(message) return message - def _authenticate(self, username, password, processor, session): + def _get_keys_message(self, request_id): + message = { + "requestId": { + "@type": "gremlin:uuid", + "@value": str(uuid.uuid4()) + }, + "op": "keys", + "processor": "traversal", + "args": { + "sideEffect": { + "@type": "gremlin:uuid", + "@value": request_id + } + } + } + message = self._finalize_message(message) + return message + + def _get_gather_message(self, request_id, key): + message = { + "requestId": { + "@type": "gremlin:uuid", + "@value": str(uuid.uuid4()) + }, + "op": "gather", + "processor": "traversal", + "args": { + "sideEffect": { + "@type": "gremlin:uuid", + "@value": request_id + }, + "sideEffectKey": key, + "aliases": {"g": self.traversal_source} + } + } + message = self._finalize_message(message) + return message + + def _authenticate(self, username, password, processor): auth = b"".join([b"\x00", username.encode("utf-8"), b"\x00", password.encode("utf-8")]) message = { @@ -107,36 +171,27 @@ def _authenticate(self, username, password, processor, session): "sasl": base64.b64encode(auth).decode() } } - message = self._finalize_message(message, processor, session) + message = self._finalize_message(message) self._ws.send_message(message, binary=True) - def _finalize_message(self, message, processor, session): - if processor == "session": - if session is None: - raise RuntimeError("session processor requires a session id") - else: - message["args"].update({"session": session}) + def _finalize_message(self, message): message = json.dumps(message) - return self._set_message_header(message, "application/json") + return self._set_message_header(message) @staticmethod - def _set_message_header(message, mime_type): - if mime_type == "application/json": - mime_len = b"\x10" - mime_type = b"application/json" - else: - raise ValueError("Unknown mime type.") + def _set_message_header(message): + mime_type = b"application/vnd.gremlin-v2.0+json" + mime_len = b"\x21" return b"".join([mime_len, mime_type, message.encode("utf-8")]) class Response: - def __init__(self, ws, processor, session, username, password): + def __init__(self, ws, username, password, resp_parser): self._ws = ws self._closed = False - self._processor = processor - self._session = session self._username = username self._password = password + self._resp_parser = resp_parser @gen.coroutine def receive(self): @@ -144,24 +199,42 @@ def receive(self): return data = yield self._ws.read_message() message = json.loads(data) - status_code = message['status']['code'] + status_code = message["status"]["code"] data = message["result"]["data"] msg = message["status"]["message"] if status_code == 407: - self._authenticate(self._username, self._password, self._processor, - self._session) + self._authenticate(self._username, self._password, self._processor) traversers = yield self.receive() elif status_code == 204: self._closed = True return elif status_code in [200, 206]: - traversers = [] + results = [] for result in data: - traversers.append(Traverser(result['value'], result['bulk'])) + results.append(self._resp_parser(result)) if status_code == 200: self._closed = True else: self._closed = True raise GremlinServerError( "{0}: {1}".format(status_code, msg)) - raise gen.Return(traversers) + raise gen.Return(results) + + +class SideEffectManager(object): + + def __init__(self, remote_connection, request_id): + self._remote_connection = remote_connection + self._request_id = request_id + self._loop = self._remote_connection._loop + + def keys(self): + keys = self._loop.run_sync( + lambda: self._remote_connection.submit_keys(self._request_id)) + return keys + + def get(self, key): + side_effects = self._loop.run_sync( + lambda: self._remote_connection.submit_gather(self._request_id, key) + ) + return side_effects diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py index 43fed530d99..dee05804d66 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py @@ -61,4 +61,3 @@ def apply(self, traversal): remote_traversal = self.remote_connection.submit(traversal.bytecode) traversal.side_effects = remote_traversal.side_effects traversal.traversers = remote_traversal.traversers - return diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py index 2531efb14be..4e93f9b9102 100644 --- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py +++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py @@ -43,6 +43,10 @@ def __next__(self): if self.last_traverser.bulk <= 0: self.last_traverser = None return object + def sideEffects(self): + if self.traversers is None: + self.traversal_strategies.apply_strategies(self) + return self.side_effects def toList(self): return list(iter(self)) def toSet(self): From 8b0927378ef1f929035b14a0b4340065578ea3fb Mon Sep 17 00:00:00 2001 From: davebshow Date: Tue, 23 Aug 2016 16:37:53 -0400 Subject: [PATCH 2/2] removed print statement --- .../jython/gremlin_python/driver/driver_remote_connection.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index d7ddc895d83..fa479207fb7 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@ -70,7 +70,6 @@ def submit_bytecode(self, bytecode, request_id): :returns: :py:class:`Response` object """ - print(request_id) message = self._get_bytecode_message(bytecode, request_id) traversers = yield self._execute_message(message, parse_traverser) raise gen.Return(traversers)