Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/arrow/gpu/cuda_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
/// memory is page-locked (using cudaHostRegister).
Status GetDeviceAddress(uint8_t* addr, uint8_t** devaddr);

/// \brief Release CUDA memory on GPU device for this context
/// \param[in] device_ptr the buffer address
/// \param[in] nbytes number of bytes
/// \return Status
Status Free(void* device_ptr, int64_t nbytes);

private:
CudaContext();

Expand All @@ -143,7 +149,6 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
Status CopyDeviceToDevice(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>& dst_ctx, void* dst,
const void* src, int64_t nbytes);
Status Free(void* device_ptr, int64_t nbytes);

class CudaContextImpl;
std::unique_ptr<CudaContextImpl> impl_;
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ Status PlasmaStore::AllocateCudaMemory(
// The IPC handle will keep the buffer memory alive
return cuda_buffer->ExportForIpc(out_ipc_handle);
}

Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointer) {
std::shared_ptr<CudaContext> context_;
RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context_));
RETURN_NOT_OK(context_->Free(pointer, size));
return Status::OK();
}
#endif

// Create a new object buffer in the hash table.
Expand Down Expand Up @@ -532,7 +539,14 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,

void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) {
auto& object = store_info_.objects[object_id];
PlasmaAllocator::Free(object->pointer, object->data_size + object->metadata_size);
auto buff_size = object->data_size + object->metadata_size;
if (object->device_num == 0) {
PlasmaAllocator::Free(object->pointer, buff_size);
} else {
#ifdef PLASMA_CUDA
ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer));
#endif
}
store_info_.objects.erase(object_id);
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ class PlasmaStore {
#ifdef PLASMA_CUDA
Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer,
std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle);

Status FreeCudaMemory(int device_num, int64_t size, uint8_t* out_pointer);
#endif

/// Event loop of the plasma store.
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/plasma/test/client_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,52 @@ TEST_F(TestPlasmaStore, GetGPUTest) {
AssertCudaRead(object_buffers[0].metadata, {42});
}

TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) {
ObjectID object_id1 = random_object_id();
ObjectID object_id2 = random_object_id();

// Test for deleting non-existance object.
Status result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
ARROW_CHECK_OK(result);
// Test for the object being in local Plasma store.
// First create object.
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(
client_.Create(object_id1, data_size, metadata, metadata_size, &data, 1));
ARROW_CHECK_OK(client_.Seal(object_id1));
ARROW_CHECK_OK(
client_.Create(object_id2, data_size, metadata, metadata_size, &data, 1));
ARROW_CHECK_OK(client_.Seal(object_id2));
// Release the ref count of Create function.
ARROW_CHECK_OK(client_.Release(object_id1));
ARROW_CHECK_OK(client_.Release(object_id2));
// Increase the ref count by calling Get using client2_.
std::vector<ObjectBuffer> object_buffers;
ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
// Objects are still used by client2_.
result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
ARROW_CHECK_OK(result);
// The object is used and it should not be deleted right now.
bool has_object = false;
ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
ASSERT_TRUE(has_object);
ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
ASSERT_TRUE(has_object);
// Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
// client2_ won't send the release request immediately because the trigger
// condition is not reached. The release is only added to release cache.
object_buffers.clear();
// Delete the objects.
result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
ASSERT_FALSE(has_object);
ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
ASSERT_FALSE(has_object);
}

TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
ObjectID object_id = random_object_id();
std::vector<ObjectBuffer> object_buffers;
Expand Down