diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc index 0964df46fd6cb..a6db4052a9179 100644 --- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -110,7 +110,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( } std::shared_ptr data; - Status s = client->Create(oid, size, md, md_size, &data); + Status s = client->Create(oid, /*evict_if_full=*/true, size, md, md_size, &data); if (plasma::IsPlasmaObjectExists(s)) { jclass exceptionClass = env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException"); diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index a3cd8e4f27200..d951cf082ca62 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -89,8 +89,8 @@ class TestPlasmaStore : public ::testing::Test { const std::vector& metadata, const std::vector& data, bool release = true) { std::shared_ptr data_buffer; - ARROW_CHECK_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(), - &data_buffer)); + ARROW_CHECK_OK(client.Create(object_id, /*evict_if_full=*/true, data.size(), + metadata.data(), metadata.size(), &data_buffer)); for (size_t i = 0; i < data.size(); i++) { data_buffer->mutable_data()[i] = data[i]; } @@ -121,8 +121,8 @@ TEST_F(TestPlasmaStore, NewSubscriberTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK( - local_client.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(local_client.Create(object_id, /*evict_if_full=*/true, data_size, + metadata, metadata_size, &data)); ARROW_CHECK_OK(local_client.Seal(object_id)); // Test that new subscriber client2 can receive notifications about existing objects. @@ -170,7 +170,8 @@ TEST_F(TestPlasmaStore, BatchNotificationTest) { std::vector data = {"hello", "world!"}; std::vector metadata = {"1", "23"}; - ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, data, metadata)); + ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, /*evict_if_full=*/true, data, + metadata)); ObjectID object_id = random_object_id(); int64_t data_size = 0; @@ -229,10 +230,14 @@ TEST_F(TestPlasmaStore, SetQuotaBasicTest) { // Too big to fit in quota at all std::shared_ptr data_buffer; - ASSERT_FALSE( - client_.Create(random_object_id(), 7 * 1024 * 1024, {}, 0, &data_buffer).ok()); - ASSERT_TRUE( - client_.Create(random_object_id(), 4 * 1024 * 1024, {}, 0, &data_buffer).ok()); + ASSERT_FALSE(client_ + .Create(random_object_id(), /*evict_if_full=*/true, 7 * 1024 * 1024, + {}, 0, &data_buffer) + .ok()); + ASSERT_TRUE(client_ + .Create(random_object_id(), /*evict_if_full=*/true, 4 * 1024 * 1024, {}, + 0, &data_buffer) + .ok()); } TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) { @@ -513,7 +518,8 @@ TEST_F(TestPlasmaStore, DeleteTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); ARROW_CHECK_OK(client_.Seal(object_id)); result = client_.Delete(object_id); @@ -542,9 +548,11 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id1, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id2, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); ARROW_CHECK_OK(client_.Seal(object_id2)); // Release the ref count of Create function. ARROW_CHECK_OK(client_.Release(object_id1)); @@ -662,11 +670,13 @@ TEST_F(TestPlasmaStore, MultipleGetTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id1, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); data->mutable_data()[0] = 1; ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id2, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); data->mutable_data()[0] = 2; ARROW_CHECK_OK(client_.Seal(object_id2)); @@ -683,7 +693,8 @@ TEST_F(TestPlasmaStore, BatchCreateTest) { std::vector data = {"hello", "world"}; std::vector metadata = {"1", "2"}; - ARROW_CHECK_OK(client_.CreateAndSealBatch(object_ids, data, metadata)); + ARROW_CHECK_OK( + client_.CreateAndSealBatch(object_ids, /*evict_if_full=*/true, data, metadata)); std::vector object_buffers; @@ -714,7 +725,8 @@ TEST_F(TestPlasmaStore, AbortTest) { int64_t metadata_size = sizeof(metadata); std::shared_ptr data; uint8_t* data_ptr; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); data_ptr = data->mutable_data(); // Write some data. for (int64_t i = 0; i < data_size / 2; i++) { @@ -759,7 +771,8 @@ TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) { // Test the sequence: create -> release -> abort -> ... for (int64_t i = 0; i < loop_times; i++) { std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); ARROW_CHECK_OK(client_.Release(object_id)); ARROW_CHECK_OK(client_.Abort(object_id)); } @@ -767,7 +780,8 @@ TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) { // Test the sequence: create -> seal -> release -> delete -> ... for (int64_t i = 0; i < loop_times; i++) { std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); ARROW_CHECK_OK(client_.Seal(object_id)); ARROW_CHECK_OK(client_.Release(object_id)); ARROW_CHECK_OK(client_.Delete(object_id)); @@ -789,7 +803,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); ARROW_CHECK_OK(client2_.Seal(object_id)); // Test that the first client can get the object. ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); @@ -800,7 +815,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) { // Test that one client disconnecting does not interfere with the other. // First create object on the second client. object_id = random_object_id(); - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); // Disconnect the first client. ARROW_CHECK_OK(client_.Disconnect()); // Test that the second client can seal and get the created object. @@ -830,7 +846,8 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data)); if (i % 3 == 0) { // Seal one third of the objects. @@ -910,8 +927,8 @@ TEST_F(TestPlasmaStore, GetGPUTest) { int64_t metadata_size = sizeof(metadata); std::shared_ptr data_buffer; std::shared_ptr gpu_buffer; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, - &data_buffer, kGpuDeviceNumber)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data_buffer, kGpuDeviceNumber)); ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(data_buffer)); CudaBufferWriter writer(gpu_buffer); ARROW_CHECK_OK(writer.Write(data, data_size)); @@ -940,11 +957,11 @@ TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); + ARROW_CHECK_OK(client_.Create(object_id1, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data, kGpuDeviceNumber)); ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); + ARROW_CHECK_OK(client_.Create(object_id2, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data, kGpuDeviceNumber)); ARROW_CHECK_OK(client_.Seal(object_id2)); // Release the ref count of Create function. ARROW_CHECK_OK(client_.Release(object_id1)); @@ -986,7 +1003,8 @@ TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) { ObjectID& object_id = object_ids[i]; std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, 0, 0, + &data, kGpuDeviceNumber)); ARROW_CHECK_OK(client_.Seal(object_id)); ARROW_CHECK_OK(client_.Release(object_id)); } @@ -998,7 +1016,8 @@ TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) { ARROW_CHECK_OK(client_.Delete(object_id)); std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, 0, 0, + &data, kGpuDeviceNumber)); ARROW_CHECK_OK(client_.Seal(object_id)); ARROW_CHECK_OK(client_.Release(object_id)); } @@ -1013,8 +1032,8 @@ TEST_F(TestPlasmaStore, GPUBufferLifetime) { const int64_t data_size = 40; std::shared_ptr create_buff; - ARROW_CHECK_OK( - client_.Create(object_id, data_size, nullptr, 0, &create_buff, kGpuDeviceNumber)); + ARROW_CHECK_OK(client_.Create(object_id, /*evict_if_full=*/true, data_size, nullptr, 0, + &create_buff, kGpuDeviceNumber)); ARROW_CHECK_OK(client_.Seal(object_id)); ARROW_CHECK_OK(client_.Release(object_id)); @@ -1047,8 +1066,8 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); + ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data, kGpuDeviceNumber)); ARROW_CHECK_OK(client2_.Seal(object_id)); // Test that the first client can get the object. ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); @@ -1058,8 +1077,8 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) { // Test that one client disconnecting does not interfere with the other. // First create object on the second client. object_id = random_object_id(); - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); + ARROW_CHECK_OK(client2_.Create(object_id, /*evict_if_full=*/true, data_size, metadata, + metadata_size, &data, kGpuDeviceNumber)); // Disconnect the first client. ARROW_CHECK_OK(client_.Disconnect()); // Test that the second client can seal and get the created object. diff --git a/cpp/src/plasma/test/external_store_tests.cc b/cpp/src/plasma/test/external_store_tests.cc index 804e9cdc8bfb0..d0fa31acc713a 100644 --- a/cpp/src/plasma/test/external_store_tests.cc +++ b/cpp/src/plasma/test/external_store_tests.cc @@ -105,7 +105,8 @@ TEST_F(TestPlasmaStoreWithExternal, EvictionTest) { // Test for the object being in local Plasma store. // Create and seal the object. - ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata)); + ARROW_CHECK_OK( + client_.CreateAndSeal(object_id, /*evict_if_full=*/true, data, metadata)); // Test that the client can get the object. ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); ASSERT_TRUE(has_object); diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 560c1de10dbda..3011594af190e 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -97,15 +97,18 @@ TEST_F(TestPlasmaSerialization, CreateRequest) { int64_t data_size1 = 42; int64_t metadata_size1 = 11; int device_num1 = 0; - ASSERT_OK(SendCreateRequest(fd, object_id1, data_size1, metadata_size1, device_num1)); + ASSERT_OK(SendCreateRequest(fd, object_id1, /*evict_if_full=*/true, data_size1, + metadata_size1, device_num1)); std::vector data = read_message_from_file(fd, MessageType::PlasmaCreateRequest); ObjectID object_id2; + bool evict_if_full; int64_t data_size2; int64_t metadata_size2; int device_num2; - ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2, - &metadata_size2, &device_num2)); + ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &evict_if_full, + &data_size2, &metadata_size2, &device_num2)); + ASSERT_TRUE(evict_if_full); ASSERT_EQ(data_size1, data_size2); ASSERT_EQ(metadata_size1, metadata_size2); ASSERT_EQ(object_id1, object_id2);