-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pinned async resource #2858
Pinned async resource #2858
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,154 @@ | ||||||||||||||||||||||||
// Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. | ||||||||||||||||||||||||
// | ||||||||||||||||||||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||
// you may not use this file except in compliance with the License. | ||||||||||||||||||||||||
// You may obtain a copy of the License at | ||||||||||||||||||||||||
// | ||||||||||||||||||||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||
// | ||||||||||||||||||||||||
// Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||
// distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||
// See the License for the specific language governing permissions and | ||||||||||||||||||||||||
// limitations under the License. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
#include <cuda_runtime.h> | ||||||||||||||||||||||||
#include <gtest/gtest.h> | ||||||||||||||||||||||||
#include <random> | ||||||||||||||||||||||||
#include <vector> | ||||||||||||||||||||||||
#include "dali/core/mm/async_pool.h" | ||||||||||||||||||||||||
#include "dali/core/dev_buffer.h" | ||||||||||||||||||||||||
#include "dali/core/mm/mm_test_utils.h" | ||||||||||||||||||||||||
#include "dali/core/cuda_stream.h" | ||||||||||||||||||||||||
#include "rmm/mr/host/pinned_memory_resource.hpp" | ||||||||||||||||||||||||
#include "dali/test/tensor_test_utils.h" | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
namespace dali { | ||||||||||||||||||||||||
namespace mm { | ||||||||||||||||||||||||
namespace test { | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
TEST(MMPinnedAlloc, StageCopy) { | ||||||||||||||||||||||||
test_pinned_resource upstream; | ||||||||||||||||||||||||
{ | ||||||||||||||||||||||||
CUDAStream stream = CUDAStream::Create(true); | ||||||||||||||||||||||||
stream_view sv(stream); | ||||||||||||||||||||||||
async_pool_base<memory_kind::pinned> pool(&upstream); | ||||||||||||||||||||||||
std::mt19937_64 rng; | ||||||||||||||||||||||||
const int N = 1<<20; | ||||||||||||||||||||||||
vector<uint8_t> pattern(N), copy_back(N); | ||||||||||||||||||||||||
DeviceBuffer<uint8_t> dev_buf; | ||||||||||||||||||||||||
dev_buf.resize(N); | ||||||||||||||||||||||||
UniformRandomFill(pattern, rng, 0, 255); | ||||||||||||||||||||||||
void *mem1 = pool.allocate(N); | ||||||||||||||||||||||||
memcpy(mem1, pattern.data(), N); | ||||||||||||||||||||||||
CUDA_CALL(cudaMemcpyAsync(dev_buf, mem1, N, cudaMemcpyHostToDevice, stream)); | ||||||||||||||||||||||||
pool.deallocate_async(mem1, N, sv); | ||||||||||||||||||||||||
void *mem2 = pool.allocate_async(N, sv); | ||||||||||||||||||||||||
EXPECT_EQ(mem1, mem2); | ||||||||||||||||||||||||
CUDA_CALL(cudaMemcpyAsync(copy_back.data(), dev_buf, N, cudaMemcpyDeviceToHost, stream)); | ||||||||||||||||||||||||
pool.deallocate_async(mem1, N, sv); | ||||||||||||||||||||||||
CUDA_CALL(cudaStreamSynchronize(stream)); | ||||||||||||||||||||||||
EXPECT_EQ(pattern, copy_back); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
upstream.check_leaks(); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
TEST(MMPinnedAlloc, SyncAndSteal) { | ||||||||||||||||||||||||
test_pinned_resource upstream; | ||||||||||||||||||||||||
{ | ||||||||||||||||||||||||
CUDAStream s1, s2; | ||||||||||||||||||||||||
s1 = CUDAStream::Create(true); | ||||||||||||||||||||||||
s2 = CUDAStream::Create(true); | ||||||||||||||||||||||||
stream_view sv1(s1), sv2(s2); | ||||||||||||||||||||||||
const int N = 1<<24; | ||||||||||||||||||||||||
async_pool_base<memory_kind::pinned> pool(&upstream, true); | ||||||||||||||||||||||||
void *mem1 = pool.allocate_async(N, sv1); | ||||||||||||||||||||||||
CUDA_CALL(cudaMemsetAsync(mem1, 0, N, s1)); | ||||||||||||||||||||||||
pool.deallocate_async(mem1, N, sv1); | ||||||||||||||||||||||||
// We've requested a large chunk (16MiB) of memory - that memory is not going | ||||||||||||||||||||||||
// to be readily available, but the pool is configured with "avoid upstream" option | ||||||||||||||||||||||||
// and therefore will wait for the pending deallocations to complete - this is still | ||||||||||||||||||||||||
// lighter than calling cudaMallocHost, which would implicitly synchronize all devices, | ||||||||||||||||||||||||
// not just some streams. | ||||||||||||||||||||||||
void *mem2 = pool.allocate_async(N, sv2); | ||||||||||||||||||||||||
auto e = cudaStreamQuery(s1); | ||||||||||||||||||||||||
EXPECT_NE(e, cudaErrorNotReady) << "Synchronization should have occurred"; | ||||||||||||||||||||||||
if (e != cudaErrorNotReady) { | ||||||||||||||||||||||||
CUDA_CALL(cudaGetLastError()); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
EXPECT_EQ(mem1, mem2) << "Memory should have been stolen from the stream1 after it's finished"; | ||||||||||||||||||||||||
pool.deallocate_async(mem2, N, sv2); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
upstream.check_leaks(); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
TEST(MMPinnedAlloc, SyncCrossDevice) { | ||||||||||||||||||||||||
test_pinned_resource upstream; | ||||||||||||||||||||||||
int ndev = 0; | ||||||||||||||||||||||||
CUDA_CALL(cudaGetDeviceCount(&ndev)); | ||||||||||||||||||||||||
if (ndev < 2) { | ||||||||||||||||||||||||
GTEST_SKIP() << "This test requires at least 2 CUDA devices."; | ||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||
CUDAStream s1, s2; | ||||||||||||||||||||||||
DeviceGuard dg(0); | ||||||||||||||||||||||||
s1 = CUDAStream::Create(true); | ||||||||||||||||||||||||
cudaSetDevice(1); | ||||||||||||||||||||||||
s2 = CUDAStream::Create(true); | ||||||||||||||||||||||||
cudaSetDevice(0); | ||||||||||||||||||||||||
Comment on lines
+93
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||
stream_view sv1(s1), sv2(s2); | ||||||||||||||||||||||||
const int N = 1<<24; | ||||||||||||||||||||||||
async_pool_base<memory_kind::pinned> pool(&upstream, true); | ||||||||||||||||||||||||
void *mem1 = pool.allocate_async(N, sv1); | ||||||||||||||||||||||||
CUDA_CALL(cudaMemsetAsync(mem1, 0, N, s1)); | ||||||||||||||||||||||||
pool.deallocate_async(mem1, N, sv1); | ||||||||||||||||||||||||
// We've requested a large chunk (16MiB) of memory - that memory is not going | ||||||||||||||||||||||||
// to be readily available, but the pool is configured with "avoid upstream" option | ||||||||||||||||||||||||
// and therefore will wait for the pending deallocations to complete - this is still | ||||||||||||||||||||||||
// lighter than calling cudaMallocHost, which would implicitly synchronize all devices, | ||||||||||||||||||||||||
// not just some streams. | ||||||||||||||||||||||||
void *mem2 = pool.allocate_async(N, sv2); | ||||||||||||||||||||||||
auto e = cudaStreamQuery(s1); | ||||||||||||||||||||||||
EXPECT_NE(e, cudaErrorNotReady) << "Synchronization should have occurred"; | ||||||||||||||||||||||||
if (e != cudaErrorNotReady) { | ||||||||||||||||||||||||
CUDA_CALL(cudaGetLastError()); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
EXPECT_EQ(mem1, mem2) << "Memory should have been stolen from the stream1 after it's finished"; | ||||||||||||||||||||||||
pool.deallocate_async(mem2, N, sv2); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
upstream.check_leaks(); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
TEST(MMPinnedAlloc, FreeOnAnotherDevice) { | ||||||||||||||||||||||||
test_pinned_resource upstream; | ||||||||||||||||||||||||
int ndev = 0; | ||||||||||||||||||||||||
CUDA_CALL(cudaGetDeviceCount(&ndev)); | ||||||||||||||||||||||||
if (ndev < 2) { | ||||||||||||||||||||||||
GTEST_SKIP() << "This test requires at least 2 CUDA devices."; | ||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||
CUDAStream s1, s2; | ||||||||||||||||||||||||
DeviceGuard dg(0); | ||||||||||||||||||||||||
s1 = CUDAStream::Create(true); | ||||||||||||||||||||||||
cudaSetDevice(1); | ||||||||||||||||||||||||
s2 = CUDAStream::Create(true); | ||||||||||||||||||||||||
cudaSetDevice(0); | ||||||||||||||||||||||||
Comment on lines
+129
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||
stream_view sv1(s1), sv2(s2); | ||||||||||||||||||||||||
const int N = 1<<24; | ||||||||||||||||||||||||
async_pool_base<memory_kind::pinned> pool(&upstream, true); | ||||||||||||||||||||||||
void *mem1 = pool.allocate_async(N, sv1); | ||||||||||||||||||||||||
CUDA_CALL(cudaMemsetAsync(mem1, 0, N, s1)); | ||||||||||||||||||||||||
cudaStreamSynchronize(s1); | ||||||||||||||||||||||||
// don't set device - it should be inferred from the stream | ||||||||||||||||||||||||
pool.deallocate_async(mem1, N, sv2); | ||||||||||||||||||||||||
// now set the device and allocate | ||||||||||||||||||||||||
cudaSetDevice(1); | ||||||||||||||||||||||||
void *mem2 = pool.allocate_async(N, sv2); | ||||||||||||||||||||||||
EXPECT_EQ(mem1, mem2) << "Memory should have been moved to stream2 on another device."; | ||||||||||||||||||||||||
pool.deallocate_async(mem2, N, sv2); | ||||||||||||||||||||||||
Comment on lines
+143
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it matters - the only reason I use device guard at all is to restore the default device at the end of the test, even if it fails. |
||||||||||||||||||||||||
} | ||||||||||||||||||||||||
upstream.check_leaks(); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
} // namespace test | ||||||||||||||||||||||||
} // namespace mm | ||||||||||||||||||||||||
} // namespace dali |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
#include <mutex> | ||
#include <unordered_map> | ||
#include <utility> | ||
#include <vector> | ||
#include "dali/core/mm/pool_resource.h" | ||
#include "dali/core/mm/detail/free_list.h" | ||
#include "dali/core/small_vector.h" | ||
|
@@ -33,7 +34,8 @@ | |
namespace dali { | ||
namespace mm { | ||
|
||
template <memory_kind kind, class FreeList, class LockType, class Upstream = memory_resource<kind>> | ||
template <memory_kind kind, typename FreeList = free_tree, | ||
typename LockType = std::mutex, typename Upstream = memory_resource<kind>> | ||
class async_pool_base : public stream_aware_memory_resource<kind> { | ||
public: | ||
/** | ||
|
@@ -58,7 +60,6 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
* @brief Waits until all pending frees are finished. | ||
*/ | ||
void synchronize() { | ||
DeviceGuard dg(device_id_); | ||
synchronize_impl(true); | ||
} | ||
|
||
|
@@ -71,18 +72,20 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
for (auto &kv : stream_free_) { | ||
if (!kv.second.free_list.head) | ||
continue; | ||
if (!sync_stream_) | ||
sync_stream_ = CUDAStream::Create(true); | ||
CUDA_DTOR_CALL(cudaStreamWaitEvent(sync_stream_, kv.second.free_list.head->event, 0)); | ||
|
||
ContextScope scope(kv.second.free_list.head->ctx); | ||
int dev = 0; | ||
CUDA_DTOR_CALL(cudaGetDevice(&dev)); | ||
CUDA_DTOR_CALL(cudaStreamWaitEvent(GetSyncStream(dev), kv.second.free_list.head->event, 0)); | ||
} | ||
} | ||
if (sync_stream_) | ||
CUDA_DTOR_CALL(cudaStreamSynchronize(sync_stream_)); | ||
for (int dev = 0; dev < static_cast<int>(sync_streams_.size()); dev++) { | ||
if (sync_streams_[dev]) | ||
CUDA_DTOR_CALL(cudaStreamSynchronize(sync_streams_[dev])); | ||
} | ||
} | ||
|
||
void *do_allocate(size_t bytes, size_t alignment) override { | ||
if (device_id_ == -1) | ||
CUDA_CALL(cudaGetDevice(&device_id_)); | ||
adjust_size_and_alignment(bytes, alignment); | ||
std::lock_guard<LockType> guard(lock_); | ||
return allocate_from_global_pool(bytes, alignment); | ||
|
@@ -108,8 +111,6 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
* may be sufficient, there may be no satisfactory block. | ||
*/ | ||
void *do_allocate_async(size_t bytes, size_t alignment, stream_view stream) override { | ||
if (device_id_ == -1) | ||
CUDA_CALL(cudaGetDevice(&device_id_)); | ||
adjust_size_and_alignment(bytes, alignment); | ||
std::lock_guard<LockType> guard(lock_); | ||
auto it = stream_free_.find(stream.value()); | ||
|
@@ -190,6 +191,7 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
size_t bytes = 0; | ||
size_t alignment = alignof(std::max_align_t); | ||
CUDAEvent event; | ||
CUcontext ctx = nullptr; | ||
bool is_ready = false; | ||
pending_free *prev = nullptr, *next = nullptr; | ||
|
||
|
@@ -400,6 +402,8 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
|
||
auto *add_pending_free(PendingFreeList &free, char *base, size_t bytes, size_t alignment, | ||
cudaStream_t stream) { | ||
if (!cuInitChecked()) | ||
throw std::runtime_error("Cannot load CUDA driver API library"); | ||
pending_free *f = FreeDescAlloc::allocate(1); | ||
f = new (f)pending_free(); | ||
f->addr = base; | ||
|
@@ -411,8 +415,10 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
f->next->prev = f; | ||
free.head = f; | ||
if (!free.tail) free.tail = f; | ||
f->event = CUDAEventPool::instance().Get(device_id_); | ||
cudaEventRecord(f->event, stream); | ||
CUDA_CALL(cuStreamGetCtx(stream, &f->ctx)); | ||
ContextScope scope(f->ctx); | ||
f->event = CUDAEventPool::instance().Get(); | ||
CUDA_CALL(cudaEventRecord(f->event, stream)); | ||
num_pending_frees_++; | ||
return f; | ||
} | ||
|
@@ -425,7 +431,8 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
} | ||
|
||
pending_free *remove_pending_free(PendingFreeList &free, pending_free *f) { | ||
CUDAEventPool::instance().Put(std::move(f->event), device_id_); | ||
ContextScope scope(f->ctx); | ||
CUDAEventPool::instance().Put(std::move(f->event)); | ||
auto *prev = f->prev; | ||
auto *next = f->next; | ||
if (free.head == f) | ||
|
@@ -465,7 +472,44 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
using FreeDescAlloc = detail::object_pool_allocator<pending_free>; | ||
|
||
LockType lock_; | ||
CUDAStream sync_stream_; | ||
vector<CUDAStream> sync_streams_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe move those member variables to the end, with the rest of them? Up to you |
||
CUDAStream &GetSyncStream(int device_id) { | ||
int ndev = sync_streams_.size(); | ||
if (sync_streams_.empty()) { | ||
CUDA_CALL(cudaGetDeviceCount(&ndev)); | ||
sync_streams_.resize(ndev); | ||
} | ||
assert(device_id >= 0 && device_id < ndev); | ||
if (!sync_streams_[device_id]) | ||
sync_streams_[device_id] = CUDAStream::Create(true, device_id); | ||
return sync_streams_[device_id]; | ||
} | ||
|
||
/** | ||
* @brief Sets a new context for the lifetime of the object | ||
* | ||
* Unlike DeviceGuard, which focuses on restoring the old context upon destruction, | ||
* this object is optimized to reduce the number of API calls and doesn't restore | ||
Comment on lines
+491
to
+492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DeviceGuard also operates on device ID and stores context to be compatible with other libs which may not use |
||
* the old context if the new context and current context are the same at construction. | ||
*/ | ||
struct ContextScope { | ||
explicit ContextScope(CUcontext new_ctx) { | ||
CUDA_CALL(cuCtxGetCurrent(&old_ctx)); | ||
if (old_ctx == new_ctx) { | ||
old_ctx = nullptr; | ||
} else { | ||
CUDA_CALL(cuCtxSetCurrent(new_ctx)); | ||
} | ||
} | ||
~ContextScope() { | ||
if (old_ctx) { | ||
CUDA_DTOR_CALL(cuCtxSetCurrent(old_ctx)); | ||
} | ||
} | ||
|
||
private: | ||
CUcontext old_ctx; | ||
}; | ||
|
||
/** | ||
* @brief Indicates whether the global pool supports splitting | ||
|
@@ -482,7 +526,6 @@ class async_pool_base : public stream_aware_memory_resource<kind> { | |
|
||
pool_resource_base<kind, any_context, FreeList, detail::dummy_lock> global_pool_; | ||
|
||
int device_id_ = -1; | ||
int num_pending_frees_ = 0; | ||
bool avoid_upstream_ = true; | ||
}; | ||
|
+1 −1 | include/rmm/mr/host/pinned_memory_resource.hpp | |
+1 −1 | tests/mr/host/mr_tests.cpp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this allocate would case synchronization?
Because of the size? If so I would add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially. It's because the resource is created with
avoid upstream
and the size is large - thus, it will first try to wait for the pending deallocations before resorting to upstream allocation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add the comment in the next PR if there are no more serious issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how many times this can surprise the user. That his allocation won't happen immediately, but can sync on other stream (and random from the caller of the allocation point of view).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if the alternative is to implicitly synchronize the device (or all of them, as would be the case of pinned memory), then I'd say the user wouldn't notice any negative impact. Also, it's similar to happens in plain malloc - either you allocate from process-local heap (fast) or issue a syscall to expand the heap (slower).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in this case you provide a soft promise to allocate from the pool without any unnecessary delay.