Skip to content

Commit

Permalink
ARROW-8072 [Plasma] Add const for plasma protocol
Browse files Browse the repository at this point in the history
See https://issues.apache.org/jira/browse/ARROW-8072. This could also be a blocker for later refactoring like windows integration, since the new interface will just use const instead, and this confused the reviewers in the past (the git diff failed to present the difference correctly because some changes mixed with the 'const' prefix changes).

I also convert "@<!-- -->note" to "\note". I missed it in a previous PR.

Closes #6579 from suquark/add_const

Authored-by: Siyuan <suquark@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
suquark authored and wesm committed Mar 11, 2020
1 parent 08f8bff commit f06da31
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 67 deletions.
2 changes: 1 addition & 1 deletion cpp/src/plasma/eviction_policy.h
Expand Up @@ -174,7 +174,7 @@ class EvictionPolicy {
/// called, the eviction policy will assume that the objects chosen to be
/// evicted will in fact be evicted from the Plasma store by the caller.
///
/// @note This method is not part of the API. It is exposed in the header file
/// \note This method is not part of the API. It is exposed in the header file
/// only for testing.
///
/// \param num_bytes_required The number of bytes of space to try to free up.
Expand Down
68 changes: 35 additions & 33 deletions cpp/src/plasma/protocol.cc
Expand Up @@ -20,7 +20,6 @@
#include <utility>

#include "flatbuffers/flatbuffers.h"

#include "plasma/common.h"
#include "plasma/io.h"
#include "plasma/plasma_generated.h"
Expand Down Expand Up @@ -136,7 +135,7 @@ Status SendSetOptionsRequest(int sock, const std::string& client_name,
return PlasmaSend(sock, MessageType::PlasmaSetOptionsRequest, &fbb, message);
}

Status ReadSetOptionsRequest(uint8_t* data, size_t size, std::string* client_name,
Status ReadSetOptionsRequest(const uint8_t* data, size_t size, std::string* client_name,
int64_t* output_memory_quota) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsRequest>(data);
Expand All @@ -152,7 +151,7 @@ Status SendSetOptionsReply(int sock, PlasmaError error) {
return PlasmaSend(sock, MessageType::PlasmaSetOptionsReply, &fbb, message);
}

Status ReadSetOptionsReply(uint8_t* data, size_t size) {
Status ReadSetOptionsReply(const uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -173,7 +172,8 @@ Status SendGetDebugStringReply(int sock, const std::string& debug_string) {
return PlasmaSend(sock, MessageType::PlasmaGetDebugStringReply, &fbb, message);
}

Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_string) {
Status ReadGetDebugStringReply(const uint8_t* data, size_t size,
std::string* debug_string) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetDebugStringReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -192,7 +192,7 @@ Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full,
return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message);
}

Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
Status ReadCreateRequest(const uint8_t* data, size_t size, ObjectID* object_id,
bool* evict_if_full, int64_t* data_size, int64_t* metadata_size,
int* device_num) {
DCHECK(data);
Expand Down Expand Up @@ -239,7 +239,7 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
return PlasmaSend(sock, MessageType::PlasmaCreateReply, &fbb, message);
}

Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
Status ReadCreateReply(const uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object, int* store_fd, int64_t* mmap_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data);
Expand Down Expand Up @@ -276,7 +276,7 @@ Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message);
}

Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
Status ReadCreateAndSealRequest(const uint8_t* data, size_t size, ObjectID* object_id,
bool* evict_if_full, std::string* object_data,
std::string* metadata, std::string* digest) {
DCHECK(data);
Expand Down Expand Up @@ -307,7 +307,7 @@ Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& obje
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchRequest, &fbb, message);
}

Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
Status ReadCreateAndSealBatchRequest(const uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids,
bool* evict_if_full,
std::vector<std::string>* object_data,
Expand Down Expand Up @@ -341,7 +341,7 @@ Status SendCreateAndSealReply(int sock, PlasmaError error) {
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealReply, &fbb, message);
}

Status ReadCreateAndSealReply(uint8_t* data, size_t size) {
Status ReadCreateAndSealReply(const uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -355,7 +355,7 @@ Status SendCreateAndSealBatchReply(int sock, PlasmaError error) {
return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchReply, &fbb, message);
}

Status ReadCreateAndSealBatchReply(uint8_t* data, size_t size) {
Status ReadCreateAndSealBatchReply(const uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -368,7 +368,7 @@ Status SendAbortRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message);
}

Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadAbortRequest(const uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -382,7 +382,7 @@ Status SendAbortReply(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message);
}

Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadAbortReply(const uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -399,7 +399,7 @@ Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest)
return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message);
}

Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
Status ReadSealRequest(const uint8_t* data, size_t size, ObjectID* object_id,
std::string* digest) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
Expand All @@ -417,7 +417,7 @@ Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) {
return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message);
}

Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadSealReply(const uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -434,7 +434,7 @@ Status SendReleaseRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message);
}

Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadReleaseRequest(const uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -449,7 +449,7 @@ Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) {
return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message);
}

Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadReleaseReply(const uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -467,7 +467,8 @@ Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message);
}

Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids) {
Status ReadDeleteRequest(const uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids) {
using fb::PlasmaDeleteRequest;

DCHECK(data);
Expand All @@ -493,7 +494,8 @@ Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message);
}

Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
Status ReadDeleteReply(const uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids,
std::vector<PlasmaError>* errors) {
using fb::PlasmaDeleteReply;

Expand All @@ -520,7 +522,7 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message);
}

Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
Status ReadContainsRequest(const uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -535,7 +537,7 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message);
}

Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
Status ReadContainsReply(const uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data);
Expand All @@ -553,7 +555,7 @@ Status SendListRequest(int sock) {
return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message);
}

Status ReadListRequest(uint8_t* data, size_t size) { return Status::OK(); }
Status ReadListRequest(const uint8_t* data, size_t size) { return Status::OK(); }

Status SendListReply(int sock, const ObjectTable& objects) {
flatbuffers::FlatBufferBuilder fbb;
Expand All @@ -575,7 +577,7 @@ Status SendListReply(int sock, const ObjectTable& objects) {
return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message);
}

Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) {
Status ReadListReply(const uint8_t* data, size_t size, ObjectTable* objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -602,15 +604,15 @@ Status SendConnectRequest(int sock) {
return PlasmaSend(sock, MessageType::PlasmaConnectRequest, &fbb, message);
}

Status ReadConnectRequest(uint8_t* data) { return Status::OK(); }
Status ReadConnectRequest(const uint8_t* data) { return Status::OK(); }

Status SendConnectReply(int sock, int64_t memory_capacity) {
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaConnectReply(fbb, memory_capacity);
return PlasmaSend(sock, MessageType::PlasmaConnectReply, &fbb, message);
}

Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
Status ReadConnectReply(const uint8_t* data, size_t size, int64_t* memory_capacity) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaConnectReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -626,7 +628,7 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType::PlasmaEvictRequest, &fbb, message);
}

Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
Status ReadEvictRequest(const uint8_t* data, size_t size, int64_t* num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaEvictRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -640,7 +642,7 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType::PlasmaEvictReply, &fbb, message);
}

Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
Status ReadEvictReply(const uint8_t* data, size_t size, int64_t& num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaEvictReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -658,7 +660,7 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
return PlasmaSend(sock, MessageType::PlasmaGetRequest, &fbb, message);
}

Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
Status ReadGetRequest(const uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
int64_t* timeout_ms) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data);
Expand Down Expand Up @@ -702,7 +704,7 @@ Status SendGetReply(int sock, ObjectID object_ids[],
return PlasmaSend(sock, MessageType::PlasmaGetReply, &fbb, message);
}

Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
Status ReadGetReply(const uint8_t* data, size_t size, ObjectID object_ids[],
PlasmaObject plasma_objects[], int64_t num_objects,
std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) {
DCHECK(data);
Expand Down Expand Up @@ -757,8 +759,8 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message);
}

Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
int* port) {
Status ReadDataRequest(const uint8_t* data, size_t size, ObjectID* object_id,
char** address, int* port) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand All @@ -777,7 +779,7 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message);
}

Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
Status ReadDataReply(const uint8_t* data, size_t size, ObjectID* object_id,
int64_t* object_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data);
Expand All @@ -799,7 +801,7 @@ Status SendRefreshLRURequest(int sock, const std::vector<ObjectID>& object_ids)
return PlasmaSend(sock, MessageType::PlasmaRefreshLRURequest, &fbb, message);
}

Status ReadRefreshLRURequest(uint8_t* data, size_t size,
Status ReadRefreshLRURequest(const uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRURequest>(data);
Expand All @@ -817,7 +819,7 @@ Status SendRefreshLRUReply(int sock) {
return PlasmaSend(sock, MessageType::PlasmaRefreshLRUReply, &fbb, message);
}

Status ReadRefreshLRUReply(uint8_t* data, size_t size) {
Status ReadRefreshLRUReply(const uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRUReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
Expand Down

0 comments on commit f06da31

Please sign in to comment.