Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang committed Mar 3, 2020
1 parent 2889274 commit 62b2f63
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create(
}

std::shared_ptr<Buffer> data;
Status s = client->Create(oid, size, md, md_size, &data);
Status s = client->Create(oid, /*evict_if_full=*/true, size, md, md_size, &data);
if (plasma::IsPlasmaObjectExists(s)) {
jclass exceptionClass =
env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException");
Expand Down
89 changes: 54 additions & 35 deletions cpp/src/plasma/test/client_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ class TestPlasmaStore : public ::testing::Test {
const std::vector<uint8_t>& metadata,
const std::vector<uint8_t>& data, bool release = true) {
std::shared_ptr<Buffer> data_buffer;
ARROW_CHECK_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(),
&data_buffer));
ARROW_CHECK_OK(client.Create(object_id, /*evict_if_full=*/true, data.size(),
metadata.data(), metadata.size(), &data_buffer));
for (size_t i = 0; i < data.size(); i++) {
data_buffer->mutable_data()[i] = data[i];
}
Expand Down Expand Up @@ -121,8 +121,8 @@ TEST_F(TestPlasmaStore, NewSubscriberTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(
local_client.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(local_client.Create(object_id, /*evict_if_full=*/true, data_size,
metadata, metadata_size, &data));
ARROW_CHECK_OK(local_client.Seal(object_id));

// Test that new subscriber client2 can receive notifications about existing objects.
Expand Down Expand Up @@ -170,7 +170,8 @@ TEST_F(TestPlasmaStore, BatchNotificationTest) {

std::vector<std::string> data = {"hello", "world!"};
std::vector<std::string> metadata = {"1", "23"};
ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, data, metadata));
ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, /*evict_if_full=*/true, data,
metadata));

ObjectID object_id = random_object_id();
int64_t data_size = 0;
Expand Down Expand Up @@ -229,10 +230,14 @@ TEST_F(TestPlasmaStore, SetQuotaBasicTest) {

// Too big to fit in quota at all
std::shared_ptr<Buffer> data_buffer;
ASSERT_FALSE(
client_.Create(random_object_id(), 7 * 1024 * 1024, {}, 0, &data_buffer).ok());
ASSERT_TRUE(
client_.Create(random_object_id(), 4 * 1024 * 1024, {}, 0, &data_buffer).ok());
ASSERT_FALSE(client_
.Create(random_object_id(), /*evict_if_full=*/true, 7 * 1024 * 1024,
{}, 0, &data_buffer)
.ok());
ASSERT_TRUE(client_
.Create(random_object_id(), /*evict_if_full=*/true, 4 * 1024 * 1024, {},
0, &data_buffer)
.ok());
}

TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) {
Expand Down Expand Up @@ -513,7 +518,8 @@ TEST_F(TestPlasmaStore, DeleteTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));

result = client_.Delete(object_id);
Expand Down Expand Up @@ -542,9 +548,11 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id1, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id1));
ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id2, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id2));
// Release the ref count of Create function.
ARROW_CHECK_OK(client_.Release(object_id1));
Expand Down Expand Up @@ -662,11 +670,13 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id1, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
data->mutable_data()[0] = 1;
ARROW_CHECK_OK(client_.Seal(object_id1));

ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id2, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
data->mutable_data()[0] = 2;
ARROW_CHECK_OK(client_.Seal(object_id2));

Expand All @@ -683,7 +693,8 @@ TEST_F(TestPlasmaStore, BatchCreateTest) {
std::vector<std::string> data = {"hello", "world"};
std::vector<std::string> metadata = {"1", "2"};

ARROW_CHECK_OK(client_.CreateAndSealBatch(object_ids, data, metadata));
ARROW_CHECK_OK(
client_.CreateAndSealBatch(object_ids, /*evict_if_full=*/true, data, metadata));

std::vector<ObjectBuffer> object_buffers;

Expand Down Expand Up @@ -714,7 +725,8 @@ TEST_F(TestPlasmaStore, AbortTest) {
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
uint8_t* data_ptr;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
data_ptr = data->mutable_data();
// Write some data.
for (int64_t i = 0; i < data_size / 2; i++) {
Expand Down Expand Up @@ -759,15 +771,17 @@ TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) {
// Test the sequence: create -> release -> abort -> ...
for (int64_t i = 0; i < loop_times; i++) {
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client_.Release(object_id));
ARROW_CHECK_OK(client_.Abort(object_id));
}

// Test the sequence: create -> seal -> release -> delete -> ...
for (int64_t i = 0; i < loop_times; i++) {
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));
ARROW_CHECK_OK(client_.Delete(object_id));
Expand All @@ -789,7 +803,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client2_.Seal(object_id));
// Test that the first client can get the object.
ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
Expand All @@ -800,7 +815,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
// Test that one client disconnecting does not interfere with the other.
// First create object on the second client.
object_id = random_object_id();
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));
// Disconnect the first client.
ARROW_CHECK_OK(client_.Disconnect());
// Test that the second client can seal and get the created object.
Expand Down Expand Up @@ -830,7 +846,8 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data));

if (i % 3 == 0) {
// Seal one third of the objects.
Expand Down Expand Up @@ -910,8 +927,8 @@ TEST_F(TestPlasmaStore, GetGPUTest) {
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data_buffer;
std::shared_ptr<CudaBuffer> gpu_buffer;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
&data_buffer, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data_buffer, kGpuDeviceNumber));
ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(data_buffer));
CudaBufferWriter writer(gpu_buffer);
ARROW_CHECK_OK(writer.Write(data, data_size));
Expand Down Expand Up @@ -940,11 +957,11 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data,
kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Create(object_id1, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Seal(object_id1));
ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data,
kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Create(object_id2, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Seal(object_id2));
// Release the ref count of Create function.
ARROW_CHECK_OK(client_.Release(object_id1));
Expand Down Expand Up @@ -986,7 +1003,8 @@ TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) {
ObjectID& object_id = object_ids[i];

std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, 0, 0,
&data, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));
}
Expand All @@ -998,7 +1016,8 @@ TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) {
ARROW_CHECK_OK(client_.Delete(object_id));

std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, 0, 0,
&data, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));
}
Expand All @@ -1013,8 +1032,8 @@ TEST_F(TestPlasmaStore, GPUBufferLifetime) {
const int64_t data_size = 40;

std::shared_ptr<Buffer> create_buff;
ARROW_CHECK_OK(
client_.Create(object_id, data_size, nullptr, 0, &create_buff, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, nullptr, 0,
&create_buff, kGpuDeviceNumber));
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));

Expand Down Expand Up @@ -1047,8 +1066,8 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data,
kGpuDeviceNumber));
ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data, kGpuDeviceNumber));
ARROW_CHECK_OK(client2_.Seal(object_id));
// Test that the first client can get the object.
ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
Expand All @@ -1058,8 +1077,8 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
// Test that one client disconnecting does not interfere with the other.
// First create object on the second client.
object_id = random_object_id();
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data,
kGpuDeviceNumber));
ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata,
metadata_size, &data, kGpuDeviceNumber));
// Disconnect the first client.
ARROW_CHECK_OK(client_.Disconnect());
// Test that the second client can seal and get the created object.
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/plasma/test/external_store_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ TEST_F(TestPlasmaStoreWithExternal, EvictionTest) {

// Test for the object being in local Plasma store.
// Create and seal the object.
ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata));
ARROW_CHECK_OK(
client_.CreateAndSeal(object_id, /*evict_if_full=*/true, data, metadata));
// Test that the client can get the object.
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_TRUE(has_object);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/plasma/test/serialization_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,18 @@ TEST_F(TestPlasmaSerialization, CreateRequest) {
int64_t data_size1 = 42;
int64_t metadata_size1 = 11;
int device_num1 = 0;
ASSERT_OK(SendCreateRequest(fd, object_id1, data_size1, metadata_size1, device_num1));
ASSERT_OK(SendCreateRequest(fd, object_id1, /*evict_if_full=*/true, data_size1,
metadata_size1, device_num1));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaCreateRequest);
ObjectID object_id2;
bool evict_if_full;
int64_t data_size2;
int64_t metadata_size2;
int device_num2;
ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2,
&metadata_size2, &device_num2));
ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &evict_if_full,
&data_size2, &metadata_size2, &device_num2));
ASSERT_TRUE(evict_if_full);
ASSERT_EQ(data_size1, data_size2);
ASSERT_EQ(metadata_size1, metadata_size2);
ASSERT_EQ(object_id1, object_id2);
Expand Down

0 comments on commit 62b2f63

Please sign in to comment.