From 048f0c2abeb69322f369f1dbe2e0cf9ba9b4f506 Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 00:31:25 -0800 Subject: [PATCH 01/12] Export plasma notification socket. --- cpp/src/plasma/client.cc | 29 +++++++++++++++++------ cpp/src/plasma/client.h | 3 +++ python/pyarrow/_plasma.pyx | 36 +++++++++++++++++++++++++++++ python/pyarrow/tests/test_plasma.py | 16 +++++++++++++ 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index d37b033f8fce..4ed892c36c6f 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -201,6 +201,9 @@ class PlasmaClient::Impl : public std::enable_shared_from_this(notification.get()); +Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, + ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size) { + auto object_info = flatbuffers::GetRoot(buffer); ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID)); if (object_info->is_deletion()) { @@ -961,6 +961,15 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, return Status::OK(); } +Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size) { + auto notification = ReadMessageAsync(fd); + if (notification == NULL) { + return Status::IOError("Failed to read object notification from Plasma socket"); + } + return DecodeNotification(notification.get(), object_id, data_size, metadata_size); +} + Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, const std::string& manager_socket_name, int release_delay, int num_retries) { @@ -1137,6 +1146,12 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_ return impl_->GetNotification(fd, object_id, data_size, metadata_size); } +Status PlasmaClient::DecodeNotification(const uint8_t* buffer, + ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size) { + return impl_->DecodeNotification(buffer, object_id, data_size, metadata_size); +} + Status PlasmaClient::Disconnect() { return impl_->Disconnect(); } Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) { diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 1ad09f5c0673..9e080b7760dc 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -246,6 +246,9 @@ class ARROW_EXPORT PlasmaClient { Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); + Status DecodeNotification(const uint8_t* buffer, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size); + /// Disconnect from the local plasma instance, including the local store and /// manager. /// diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 677e768035e1..aa5f0d4ce54d 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -32,6 +32,7 @@ from cpython.pycapsule cimport * import collections import pyarrow import random +import socket from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, @@ -131,6 +132,10 @@ cdef extern from "plasma/client.h" nogil: CStatus Subscribe(int* fd) + CStatus DecodeNotification(const uint8_t* buffer, + CUniqueID* object_id, int64_t* data_size, + int64_t* metadata_size) + CStatus GetNotification(int fd, CUniqueID* object_id, int64_t* data_size, int64_t* metadata_size) @@ -728,6 +733,37 @@ cdef class PlasmaClient: """Subscribe to notifications about sealed objects.""" with nogil: check_status(self.client.get().Subscribe(&self.notification_fd)) + + def get_notification_socket(self): + """ + Get the notification socket. + """ + return socket.socket(fileno=self.notification_fd, family=socket.AF_UNIX) + + def decode_notification(self, const uint8_t* buf): + """ + Get the notification from the buffer. + + Returns + ------- + ObjectID + The object ID of the object that was stored. + int + The data size of the object that was stored. + int + The metadata size of the object that was stored. + """ + + cdef CUniqueID object_id + cdef int64_t data_size + cdef int64_t metadata_size + with nogil: + check_status(self.client.get() + .DecodeNotification(buf, + &object_id, + &data_size, + &metadata_size)) + return ObjectID(object_id.binary()), data_size, metadata_size def get_next_notification(self): """ diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 69b3d9c0166f..7e2ec28e1534 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -742,6 +742,22 @@ def test_subscribe(self): assert data_sizes[j] == recv_dsize assert metadata_sizes[j] == recv_msize + for j in range(i): + self.plasma_client.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client.seal(object_ids[j]) + rsock = self.plasma_client.get_notification_socket() + + # Check that we received notifications for all of the objects. + for j in range(i): + content = rsock.recv(104)[8:] + recv_objid, recv_dsize, recv_msize = ( + self.plasma_client.decode_notification(content)) + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + def test_subscribe_deletions(self): # Subscribe to notifications from the Plasma Store. We use # plasma_client2 to make sure that all used objects will get evicted From 2719d0dc1bda964783206f9f03b573cc6c9a845c Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 00:52:06 -0800 Subject: [PATCH 02/12] lint --- cpp/src/plasma/client.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 4ed892c36c6f..ecedb63080c7 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -201,7 +201,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this(buffer); ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); @@ -1146,8 +1146,8 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_ return impl_->GetNotification(fd, object_id, data_size, metadata_size); } -Status PlasmaClient::DecodeNotification(const uint8_t* buffer, - ObjectID* object_id, int64_t* data_size, +Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size) { return impl_->DecodeNotification(buffer, object_id, data_size, metadata_size); } From 6005b34a5ae09638a046dbdc35cacc2a3c031474 Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 02:38:18 -0800 Subject: [PATCH 03/12] lint --- cpp/src/plasma/client.cc | 3 +-- python/pyarrow/tests/test_plasma.py | 7 +++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index ecedb63080c7..4cfc246aafe9 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -1147,8 +1147,7 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_ } Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_id, - int64_t* data_size, - int64_t* metadata_size) { + int64_t* data_size, int64_t* metadata_size) { return impl_->DecodeNotification(buffer, object_id, data_size, metadata_size); } diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 7e2ec28e1534..2959bdc203ce 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -742,6 +742,11 @@ def test_subscribe(self): assert data_sizes[j] == recv_dsize assert metadata_sizes[j] == recv_msize + # Get notification from socket. + object_ids = [random_object_id() for _ in range(i)] + metadata_sizes = [np.random.randint(1000) for _ in range(i)] + data_sizes = [np.random.randint(1000) for _ in range(i)] + for j in range(i): self.plasma_client.create( object_ids[j], data_sizes[j], @@ -751,6 +756,8 @@ def test_subscribe(self): # Check that we received notifications for all of the objects. for j in range(i): + # Assume the plasma store will not be full, + # so we always get the data size instead of -1. content = rsock.recv(104)[8:] recv_objid, recv_dsize, recv_msize = ( self.plasma_client.decode_notification(content)) From bbf07b94d401a9cf99fa80c6f5a1b6a5157351e2 Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 02:56:44 -0800 Subject: [PATCH 04/12] lint --- python/pyarrow/_plasma.pyx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index aa5f0d4ce54d..addd233dbef0 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -733,12 +733,13 @@ cdef class PlasmaClient: """Subscribe to notifications about sealed objects.""" with nogil: check_status(self.client.get().Subscribe(&self.notification_fd)) - + def get_notification_socket(self): """ Get the notification socket. """ - return socket.socket(fileno=self.notification_fd, family=socket.AF_UNIX) + return socket.socket(fileno=self.notification_fd, + family=socket.AF_UNIX) def decode_notification(self, const uint8_t* buf): """ From 8e636ebafbf0f144e0cf94b139c3d95435ce6f56 Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 03:22:14 -0800 Subject: [PATCH 05/12] py2 compatibility --- python/pyarrow/_plasma.pyx | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index addd233dbef0..c615234f0926 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -28,6 +28,7 @@ from libcpp.unordered_map cimport unordered_map from libc.stdint cimport int64_t, uint8_t, uintptr_t from cython.operator cimport dereference as deref, preincrement as inc from cpython.pycapsule cimport * +from cpython.version cimport PY_MAJOR_VERSION import collections import pyarrow @@ -738,8 +739,17 @@ cdef class PlasmaClient: """ Get the notification socket. """ - return socket.socket(fileno=self.notification_fd, - family=socket.AF_UNIX) + + if PY_MAJOR_VERSION >= 3: + return socket.socket(fileno=self.notification_fd, + family=socket.AF_UNIX, + type=socket.SOCK_STREAM) + else: + socket_obj = socket.fromfd(self.notification_fd, + family=socket.AF_UNIX, + type=socket.SOCK_STREAM) + return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, + _sock=socket_obj) def decode_notification(self, const uint8_t* buf): """ From 19f6b98524d4566a8761a67f9be2d50eb3056b44 Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 03:39:26 -0800 Subject: [PATCH 06/12] py2 compatibility --- python/pyarrow/_plasma.pyx | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index c615234f0926..56afed0bd5fb 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -739,15 +739,13 @@ cdef class PlasmaClient: """ Get the notification socket. """ - if PY_MAJOR_VERSION >= 3: return socket.socket(fileno=self.notification_fd, family=socket.AF_UNIX, type=socket.SOCK_STREAM) else: - socket_obj = socket.fromfd(self.notification_fd, - family=socket.AF_UNIX, - type=socket.SOCK_STREAM) + socket_obj = socket.fromfd(self.notification_fd, socket.AF_UNIX, + socket.SOCK_STREAM) return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, _sock=socket_obj) @@ -764,7 +762,6 @@ cdef class PlasmaClient: int The metadata size of the object that was stored. """ - cdef CUniqueID object_id cdef int64_t data_size cdef int64_t metadata_size From 37b6560131b2f559be5f9342d6584ec12b7f814d Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 04:47:25 -0800 Subject: [PATCH 07/12] fix test --- python/pyarrow/tests/test_plasma.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 2959bdc203ce..216594cc5089 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -742,6 +742,11 @@ def test_subscribe(self): assert data_sizes[j] == recv_dsize assert metadata_sizes[j] == recv_msize + def test_subscribe_socket(self): + # Subscribe to notifications from the Plasma Store. + self.plasma_client.subscribe() + rsock = self.plasma_client.get_notification_socket() + for i in self.SUBSCRIBE_TEST_SIZES: # Get notification from socket. object_ids = [random_object_id() for _ in range(i)] metadata_sizes = [np.random.randint(1000) for _ in range(i)] @@ -752,7 +757,6 @@ def test_subscribe(self): object_ids[j], data_sizes[j], metadata=bytearray(np.random.bytes(metadata_sizes[j]))) self.plasma_client.seal(object_ids[j]) - rsock = self.plasma_client.get_notification_socket() # Check that we received notifications for all of the objects. for j in range(i): From 942a62f8425a3eecd4c65a07cdde735a4716ba64 Mon Sep 17 00:00:00 2001 From: suquark Date: Wed, 21 Nov 2018 05:13:04 -0800 Subject: [PATCH 08/12] fix test --- python/pyarrow/tests/test_plasma.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 216594cc5089..fd61ef0b334c 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -25,6 +25,7 @@ import pytest import random import signal +import struct import subprocess import sys import time @@ -762,7 +763,8 @@ def test_subscribe_socket(self): for j in range(i): # Assume the plasma store will not be full, # so we always get the data size instead of -1. - content = rsock.recv(104)[8:] + msg_len = struct.unpack('L', rsock.recv(8))[0] + content = rsock.recv(msg_len) recv_objid, recv_dsize, recv_msize = ( self.plasma_client.decode_notification(content)) assert object_ids[j] == recv_objid From 19dae23d0e6d287bef84e7e6b9af4ce954d819b8 Mon Sep 17 00:00:00 2001 From: suquark Date: Fri, 30 Nov 2018 16:33:49 -0800 Subject: [PATCH 09/12] use compat --- python/pyarrow/_plasma.pyx | 12 +++--------- python/pyarrow/compat.py | 9 +++++++++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 56afed0bd5fb..e1f19962c98f 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -39,6 +39,7 @@ from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, CFixedSizeBufferWriter, CStatus) +from pyarrow import compat PLASMA_WAIT_TIMEOUT = 2 ** 30 @@ -739,15 +740,8 @@ cdef class PlasmaClient: """ Get the notification socket. """ - if PY_MAJOR_VERSION >= 3: - return socket.socket(fileno=self.notification_fd, - family=socket.AF_UNIX, - type=socket.SOCK_STREAM) - else: - socket_obj = socket.fromfd(self.notification_fd, socket.AF_UNIX, - socket.SOCK_STREAM) - return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, - _sock=socket_obj) + compat.get_socket_from_fd(self.notification_fd, family=socket.AF_UNIX, + type=socket.SOCK_STREAM) def decode_notification(self, const uint8_t* buf): """ diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py index a481db0d53c5..f29f52f6c866 100644 --- a/python/pyarrow/compat.py +++ b/python/pyarrow/compat.py @@ -267,4 +267,13 @@ def import_pytorch_extension(): integer_types = six.integer_types + (np.integer,) + +def get_socket_from_fd(fileno, family, type): + if PY2: + socket_obj = socket.fromfd(fileno, family, type) + return socket.socket(family, type, _sock=socket_obj) + else: + return socket.socket(fileno=fileno, family=family, type=type) + + __all__ = [] From f037c319ee4600fafe1e00bf749fecbee6a0d16b Mon Sep 17 00:00:00 2001 From: suquark Date: Fri, 30 Nov 2018 16:54:40 -0800 Subject: [PATCH 10/12] fix imports --- python/pyarrow/compat.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py index f29f52f6c866..068d5607de81 100644 --- a/python/pyarrow/compat.py +++ b/python/pyarrow/compat.py @@ -25,6 +25,7 @@ import sys import six from six import BytesIO, StringIO, string_types as py_string +import socket PY26 = sys.version_info[:2] == (2, 6) From a6d5981596bff21bd67a93a5ff10cb9c3bc5b0ea Mon Sep 17 00:00:00 2001 From: suquark Date: Fri, 30 Nov 2018 17:16:04 -0800 Subject: [PATCH 11/12] fix a bug. style --- python/pyarrow/_plasma.pyx | 5 +++-- python/pyarrow/tests/test_plasma.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index e1f19962c98f..3211966e3334 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -740,8 +740,9 @@ cdef class PlasmaClient: """ Get the notification socket. """ - compat.get_socket_from_fd(self.notification_fd, family=socket.AF_UNIX, - type=socket.SOCK_STREAM) + return compat.get_socket_from_fd(self.notification_fd, + family=socket.AF_UNIX, + type=socket.SOCK_STREAM) def decode_notification(self, const uint8_t* buf): """ diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index fd61ef0b334c..e3d31b7de199 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -763,7 +763,7 @@ def test_subscribe_socket(self): for j in range(i): # Assume the plasma store will not be full, # so we always get the data size instead of -1. - msg_len = struct.unpack('L', rsock.recv(8))[0] + msg_len, = struct.unpack('L', rsock.recv(8)) content = rsock.recv(msg_len) recv_objid, recv_dsize, recv_msize = ( self.plasma_client.decode_notification(content)) From 0bc89eba5b4a5d176283cdf3f04bbd4d2cb915b2 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 3 Dec 2018 12:52:15 -0800 Subject: [PATCH 12/12] Update _plasma.pyx --- python/pyarrow/_plasma.pyx | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 3211966e3334..2fad09c0549c 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -28,7 +28,6 @@ from libcpp.unordered_map cimport unordered_map from libc.stdint cimport int64_t, uint8_t, uintptr_t from cython.operator cimport dereference as deref, preincrement as inc from cpython.pycapsule cimport * -from cpython.version cimport PY_MAJOR_VERSION import collections import pyarrow