Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3116: [Plasma] Add "ls" to object store #2470

Closed
wants to merge 22 commits into from
Closed
11 changes: 11 additions & 0 deletions cpp/src/plasma/client.cc
Expand Up @@ -184,6 +184,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

Status Contains(const ObjectID& object_id, bool* has_object);

Status List(ObjectTable* objects);

Status Abort(const ObjectID& object_id);

Status Seal(const ObjectID& object_id);
Expand Down Expand Up @@ -705,6 +707,13 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
return Status::OK();
}

Status PlasmaClient::Impl::List(ObjectTable* objects) {
RETURN_NOT_OK(SendListRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
return ReadListReply(buffer.data(), buffer.size(), objects);
}

static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
Expand Down Expand Up @@ -1057,6 +1066,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
return impl_->Contains(object_id, has_object);
}

Status PlasmaClient::List(ObjectTable* objects) { return impl_->List(objects); }

Status PlasmaClient::Abort(const ObjectID& object_id) { return impl_->Abort(object_id); }

Status PlasmaClient::Seal(const ObjectID& object_id) { return impl_->Seal(object_id); }
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/plasma/client.h
Expand Up @@ -151,6 +151,21 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Contains(const ObjectID& object_id, bool* has_object);

/// List all the objects in the object store.
///
/// This API is experimental and might change in the future.
///
/// \param[out] objects ObjectTable of objects in the store. For each entry
/// in the map, the following fields are available:
/// - metadata_size: Size of the object metadata in bytes
/// - data_size: Size of the object data in bytes
/// - ref_count: Number of clients referencing the object buffer
/// - create_time: Unix timestamp of the object creation
/// - construct_duration: Object creation time in seconds
/// - state: Is the object still being created or already sealed?
/// \return The return status.
Status List(ObjectTable* objects);

/// Abort an unsealed object in the object store. If the abort succeeds, then
/// it will be as if the object was never created at all. The unsealed object
/// must have only a single reference (the one that would have been removed by
Expand Down
52 changes: 52 additions & 0 deletions cpp/src/plasma/common.h
Expand Up @@ -18,13 +18,17 @@
#ifndef PLASMA_COMMON_H
#define PLASMA_COMMON_H

#include <stddef.h>

#include <cstring>
#include <memory>
#include <string>
// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
// and get rid of the next three lines:
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <unordered_map>

#include "plasma/compat.h"

Expand Down Expand Up @@ -83,6 +87,54 @@ struct ObjectRequest {
ObjectLocation location;
};

enum class ObjectState : int {
/// Object was created but not sealed in the local Plasma Store.
PLASMA_CREATED = 1,
/// Object is sealed and stored in the local Plasma Store.
PLASMA_SEALED
};

/// This type is used by the Plasma store. It is here because it is exposed to
/// the eviction policy.
struct ObjectTableEntry {
ObjectTableEntry();

~ObjectTableEntry();

/// Memory mapped file containing the object.
int fd;
/// Device number.
int device_num;
/// Size of the underlying map.
int64_t map_size;
/// Offset from the base of the mmap.
ptrdiff_t offset;
/// Pointer to the object data. Needed to free the object.
uint8_t* pointer;
/// Size of the object in bytes.
int64_t data_size;
/// Size of the object metadata in bytes.
int64_t metadata_size;
#ifdef PLASMA_GPU
/// IPC GPU handle to share with clients.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
#endif
/// Number of clients currently using this object.
int ref_count;
/// Unix epoch of when this object was created.
int64_t create_time;
/// How long creation of this object took.
int64_t construct_duration;

/// The state of the object, e.g., whether it is open or sealed.
ObjectState state;
/// The digest of the object. Used to see if two objects are the same.
unsigned char digest[kDigestSize];
};

/// Mapping from ObjectIDs to information about the object.
typedef std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> ObjectTable;

/// Globally accessible reference to plasma store configuration.
/// TODO(pcm): This can be avoided with some refactoring of existing code
/// by making it possible to pass a context object through dlmalloc.
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/plasma/format/common.fbs
Expand Up @@ -25,11 +25,14 @@ table ObjectInfo {
data_size: long;
// Number of bytes the metadata of this object occupies in memory.
metadata_size: long;
// Number of clients using the objects.
ref_count: int;
// Unix epoch of when this object was created.
create_time: long;
// How long creation of this object took.
construct_duration: long;
// Hash of the object content.
// Hash of the object content. If the object is not sealed yet this is
// an empty string.
digest: string;
// Specifies if this object was deleted or added.
is_deletion: bool;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/plasma/format/plasma.fbs
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

include "common.fbs";

// Plasma protocol specification
namespace plasma.flatbuf;

Expand Down Expand Up @@ -44,6 +46,9 @@ enum MessageType:long {
// See if the store contains an object (will be deprecated).
PlasmaContainsRequest,
PlasmaContainsReply,
// List all objects in the store.
PlasmaListRequest,
PlasmaListReply,
// Get information for a newly connecting client.
PlasmaConnectRequest,
PlasmaConnectReply,
Expand Down Expand Up @@ -257,6 +262,13 @@ table PlasmaContainsReply {
has_object: int;
}

table PlasmaListRequest {
}

table PlasmaListReply {
objects: [ObjectInfo];
}

// PlasmaConnect is used by a plasma client the first time it connects with the
// store. This is not really necessary, but is used to get some information
// about the store such as its memory capacity.
Expand Down
43 changes: 1 addition & 42 deletions cpp/src/plasma/plasma.h
Expand Up @@ -95,58 +95,17 @@ struct PlasmaObject {
int device_num;
};

enum class ObjectState : int {
/// Object was created but not sealed in the local Plasma Store.
PLASMA_CREATED = 1,
/// Object is sealed and stored in the local Plasma Store.
PLASMA_SEALED
};

enum class ObjectStatus : int {
/// The object was not found.
OBJECT_NOT_FOUND = 0,
/// The object was found.
OBJECT_FOUND = 1
};

/// This type is used by the Plasma store. It is here because it is exposed to
/// the eviction policy.
struct ObjectTableEntry {
ObjectTableEntry();

~ObjectTableEntry();

/// Memory mapped file containing the object.
int fd;
/// Device number.
int device_num;
/// Size of the underlying map.
int64_t map_size;
/// Offset from the base of the mmap.
ptrdiff_t offset;
/// Pointer to the object data. Needed to free the object.
uint8_t* pointer;
/// Size of the object in bytes.
int64_t data_size;
/// Size of the object metadata in bytes.
int64_t metadata_size;
#ifdef PLASMA_GPU
/// IPC GPU handle to share with clients.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
#endif
/// Number of clients currently using this object.
int ref_count;

/// The state of the object, e.g., whether it is open or sealed.
ObjectState state;
/// The digest of the object. Used to see if two objects are the same.
unsigned char digest[kDigestSize];
};

/// The plasma store information that is exposed to the eviction policy.
struct PlasmaStoreInfo {
/// Objects that are in the Plasma store.
std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>> objects;
ObjectTable objects;
/// The amount of memory (in bytes) that we allow to be allocated in the
/// store.
int64_t memory_capacity;
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/plasma/protocol.cc
Expand Up @@ -17,6 +17,8 @@

#include "plasma/protocol.h"

#include <utility>

#include "flatbuffers/flatbuffers.h"
#include "plasma/plasma_generated.h"

Expand Down Expand Up @@ -410,6 +412,53 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
return Status::OK();
}

// List messages.

Status SendListRequest(int sock) {
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaListRequest(fbb);
return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message);
}

Status ReadListRequest(uint8_t* data, size_t size) { return Status::OK(); }

Status SendListReply(int sock, const ObjectTable& objects) {
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<fb::ObjectInfo>> object_infos;
for (auto const& entry : objects) {
auto digest = entry.second->state == ObjectState::PLASMA_CREATED
? fbb.CreateString("")
: fbb.CreateString(reinterpret_cast<char*>(entry.second->digest),
kDigestSize);
auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()),
entry.second->data_size, entry.second->metadata_size,
entry.second->ref_count, entry.second->create_time,
entry.second->construct_duration, digest);
object_infos.push_back(info);
}
auto message = fb::CreatePlasmaListReply(fbb, fbb.CreateVector(object_infos));
return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message);
}

Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
for (auto const& object : *message->objects()) {
ObjectID object_id = ObjectID::from_binary(object->object_id()->str());
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
entry->data_size = object->data_size();
entry->metadata_size = object->metadata_size();
entry->ref_count = object->ref_count();
entry->create_time = object->create_time();
entry->construct_duration = object->construct_duration();
entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED
: ObjectState::PLASMA_SEALED;
(*objects)[object_id] = std::move(entry);
}
return Status::OK();
}

// Connect messages.

Status SendConnectRequest(int sock) {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/plasma/protocol.h
Expand Up @@ -141,6 +141,16 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object);

/* Plasma List message functions. */

Status SendListRequest(int sock);

Status ReadListRequest(uint8_t* data, size_t size);

Status SendListReply(int sock, const ObjectTable& objects);

Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects);

/* Plasma Connect message functions. */

Status SendConnectRequest(int sock);
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/plasma/store.cc
Expand Up @@ -43,6 +43,7 @@
#include <sys/un.h>
#include <unistd.h>

#include <ctime>
#include <deque>
#include <memory>
#include <string>
Expand Down Expand Up @@ -214,6 +215,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
entry->offset = offset;
entry->state = ObjectState::PLASMA_CREATED;
entry->device_num = device_num;
entry->create_time = std::time(nullptr);
entry->construct_duration = -1;
#ifdef PLASMA_GPU
if (device_num != 0) {
DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle));
Expand Down Expand Up @@ -445,6 +448,8 @@ void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[])
entry->state = ObjectState::PLASMA_SEALED;
// Set the object digest.
std::memcpy(&entry->digest[0], &digest[0], kDigestSize);
// Set object construction duration.
entry->construct_duration = std::time(nullptr) - entry->create_time;
// Inform all subscribers that a new object has been sealed.
ObjectInfoT info;
info.object_id = object_id.binary();
Expand Down Expand Up @@ -784,6 +789,10 @@ Status PlasmaStore::ProcessMessage(Client* client) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
}
} break;
case fb::MessageType::PlasmaListRequest: {
RETURN_NOT_OK(ReadListRequest(input, input_size));
HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd);
} break;
case fb::MessageType::PlasmaSealRequest: {
unsigned char digest[kDigestSize];
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
Expand Down