Skip to content

Commit

Permalink
Eliminate VisitableAllocator.
Browse files Browse the repository at this point in the history
The visitor pattern is used to allow pre-registration of memory for
DMA access, e.g. for fast GPU/CPU i/o and for RDMA networking.  The
VisitableAllocator interface was introduced to support this use some
time ago, prior to SubAllocators. Memory registration works best if
it's done infrequently, on large pieces of memory, rather than on
every piece that's dynamically allocated/freed.  This usage pattern
fits the SubAllocator better than a general Allocator.  This change
moves memory allocation visitor access to SubAllocator and eliminates
the VisitableAllocator subclass of Allocator.

This change also more rigorously enforces the requirement that all
Visitors be declared prior to memory allocation begining.  This is
accomplished by requiring that Visitors be provided to the SubAllocator
constructor.

This refactoring will ease an upcoming CL introducing
NUMA specific CPU devices.  It also should fix some performance
pitfalls (e.g. accidental use of PoolAllocator) introduced by an
earlier refactoring of ProcessState that was also in preparation for
NUMA.  It restores the default use of the cpu_allocator() value (i.e.
no SubAllocator) by model executions that don't use allocation
visitors (since visitor registration must precede the first allocation,
hence can be detected at that time).

PiperOrigin-RevId: 213371553
  • Loading branch information
tensorflower-gardener committed Sep 18, 2018
1 parent 4338803 commit 185aa89
Show file tree
Hide file tree
Showing 32 changed files with 628 additions and 577 deletions.
102 changes: 39 additions & 63 deletions tensorflow/contrib/gdr/gdr_memory_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class GdrMemoryManager : public RemoteMemoryManager {
Device* device, DeviceContext* device_context, bool on_host,
StatusCallback done) override;

static void RegMemVisitors();

protected:
Status CreateEndpoint(const string& host, const string& port,
RdmaEndpointPtr& endpoint);
Expand Down Expand Up @@ -183,35 +185,51 @@ class GdrMemoryManager : public RemoteMemoryManager {
TF_DISALLOW_COPY_AND_ASSIGN(GdrMemoryManager);
};

// TODO(byronyi): remove this class and its registration when the default
// cpu_allocator() returns visitable allocator, or cpu_allocator() is no
// longer in use.
class BFCGdrAllocator : public BFCAllocator {
public:
BFCGdrAllocator()
: BFCAllocator(new BasicCPUAllocator(port::kNUMANoAffinity), 1LL << 36,
true, "cpu_gdr_bfc") {}
};
class BFCGdrAllocatorFactory : public AllocatorFactory {
public:
Allocator* CreateAllocator() override { return new BFCGdrAllocator; }

virtual SubAllocator* CreateSubAllocator(int numa_node) {
return new BasicCPUAllocator(numa_node);
}
};

REGISTER_MEM_ALLOCATOR("BFCGdrAllocator", 102, BFCGdrAllocatorFactory);

GdrMemoryManager::GdrMemoryManager(const string& host, const string& port)
: host_(host),
port_(port),
listening_(nullptr, EndpointDeleter),
stopped_(true),
next_key_(0) {}
next_key_(0) {
static std::once_flag flag;
std::call_once(flag, []() { RegMemVisitors(); });
}

GdrMemoryManager::~GdrMemoryManager() { close(epfd_); }

/*static*/ void GdrMemoryManager::RegMemVisitors() {
SubAllocator::Visitor alloc_visitor = [](void* ptr, int numa_node,
size_t num_bytes) {
GdrMemoryManager::Singleton().InsertMemoryRegion(
ptr, num_bytes, strings::StrCat("CPU:", numa_node));
};
SubAllocator::Visitor free_visitor = [](void* ptr, int numa_node,
size_t num_bytes) {
GdrMemoryManager::Singleton().EvictMemoryRegion(ptr, num_bytes);
};
ProcessState::singleton()->AddCPUAllocVisitor(alloc_visitor);
ProcessState::singleton()->AddCPUFreeVisitor(free_visitor);

#if GOOGLE_CUDA
if (IsGDRAvailable()) {
int32_t bus_id = TryToReadNumaNode(rdma_adapter_->context_->device) + 1;

// Note we don't free allocated GPU memory so there is no free visitor
SubAllocator::Visitor cuda_alloc_visitor = [](void* ptr, int gpu_id,
size_t num_bytes) {
RdmaMemoryMgr::Singleton().InsertMemoryRegion(
ptr, num_bytes, strings::StrCat("GPU:", gpu_id));
};
GPUProcessState::singleton()->AddGPUAllocVisitor(bus_id,
cuda_alloc_visitor);
GPUProcessState::singleton()->AddCUDAHostAllocVisitor(bus_id,
alloc_visitor);
GPUProcessState::singleton()->AddCUDAHostFreeVisitor(bus_id, free_visitor);
LOG(INFO) << "Instrumenting GPU allocator with bus_id " << bus_id;
}
#endif // GOOGLE_CUDA
}

Status GdrMemoryManager::Init() {
epfd_ = epoll_create1(0);
if (epfd_ == -1) {
Expand Down Expand Up @@ -271,48 +289,6 @@ Status GdrMemoryManager::Init() {
"cannot add server to epoll");
}

Allocator* allocators[] = {
#if GOOGLE_CUDA
GPUProcessState::singleton()->GetCUDAHostAllocator(0),
#endif // GOOGLE_CUDA
ProcessState::singleton()->GetCPUAllocator(0),
cpu_allocator(),
};

using namespace std::placeholders;
VisitableAllocator::Visitor alloc_visitor =
std::bind(&GdrMemoryManager::InsertMemoryRegion, this, _1, _2);
VisitableAllocator::Visitor free_visitor =
std::bind(&GdrMemoryManager::EvictMemoryRegion, this, _1, _2);

std::set<Allocator*> instrumented_;

// Host memory allocators
for (Allocator* allocator : allocators) {
auto* visitable_allocator = dynamic_cast<VisitableAllocator*>(allocator);
CHECK(visitable_allocator)
<< "is not visitable for instrumentation" << allocator->Name();
// Make sure we don't instrument the same allocator twice
if (instrumented_.find(allocator) == std::end(instrumented_)) {
visitable_allocator->AddAllocVisitor(alloc_visitor);
visitable_allocator->AddFreeVisitor(free_visitor);
instrumented_.insert(allocator);
LOG(INFO) << "Instrumenting CPU allocator " << allocator->Name();
}
}

#if GOOGLE_CUDA
VisitableAllocator::Visitor cuda_alloc_visitor =
std::bind(&GdrMemoryManager::InsertMemoryRegion, this, _1, _2);
if (IsGDRAvailable()) {
// Note we don't free allocated GPU memory so there is no free visitor
int32_t bus_id = TryToReadNumaNode(listening_->verbs->device) + 1;
GPUProcessState::singleton()->AddGPUAllocVisitor(bus_id,
cuda_alloc_visitor);
LOG(INFO) << "Instrumenting GPU allocator with bus_id " << bus_id;
}
#endif // GOOGLE_CUDA

return Status::OK();
}

Expand Down
81 changes: 24 additions & 57 deletions tensorflow/contrib/verbs/rdma_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ limitations under the License.
#include <vector>
#include "tensorflow/contrib/verbs/grpc_verbs_client.h"
#include "tensorflow/contrib/verbs/verbs_service.pb.h"
#include "tensorflow/core/common_runtime/bfc_allocator.h"
#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
#include "tensorflow/core/common_runtime/gpu/gpu_util.h"
#include "tensorflow/core/common_runtime/pool_allocator.h"
Expand All @@ -29,6 +28,7 @@ limitations under the License.
#include "tensorflow/core/distributed_runtime/session_mgr.h"
#include "tensorflow/core/framework/allocator_registry.h"
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/strings/strcat.h"

namespace tensorflow {

Expand Down Expand Up @@ -256,74 +256,41 @@ void MRDeleter(ibv_mr* mr) {
}
}

// TODO(byronyi): remove this class and its registration when the default
// cpu_allocator() returns visitable allocator, or cpu_allocator() is no
// longer in use.
class BFCRdmaAllocator : public BFCAllocator {
public:
BFCRdmaAllocator()
: BFCAllocator(new BasicCPUAllocator(port::kNUMANoAffinity), 1LL << 36,
true, "cpu_rdma_bfc") {}
};
class BFCRdmaAllocatorFactory : public AllocatorFactory {
public:
Allocator* CreateAllocator() { return new BFCRdmaAllocator; }

SubAllocator* CreateSubAllocator(int numa_node) {
return new BasicCPUAllocator(numa_node);
}
};

REGISTER_MEM_ALLOCATOR("BFCRdmaAllocator", 101, BFCRdmaAllocatorFactory);

void RdmaMgr::InitAllocators() {
RdmaMemoryMgr::Singleton().pd_ = rdma_adapter_->pd_;
static std::once_flag flag;
std::call_once(
flag, [this]() { RdmaMemoryMgr::Singleton().pd_ = rdma_adapter_->pd_; });
}

Allocator* allocators[] = {
#if GOOGLE_CUDA
GPUProcessState::singleton()->GetCUDAHostAllocator(0),
#endif // GOOGLE_CUDA
ProcessState::singleton()->GetCPUAllocator(0),
cpu_allocator(),
/*static*/ void RdmaMgr::RegMemVisitors() {
SubAllocator::Visitor alloc_visitor = [](void* ptr, int numa_node,
size_t num_bytes) {
RdmaMemoryMgr::Singleton().InsertMemoryRegion(
ptr, num_bytes, strings::StrCat("CPU:", numa_node));
};
SubAllocator::Visitor free_visitor = [](void* ptr, int numa_node,
size_t num_bytes) {
RdmaMemoryMgr::Singleton().EvictMemoryRegion(ptr, num_bytes);
};

using namespace std::placeholders;

std::set<Allocator*> instrumented_;

// Host memory allocators
for (Allocator* allocator : allocators) {
VisitableAllocator::Visitor alloc_visitor =
std::bind(&RdmaMemoryMgr::InsertMemoryRegion,
&RdmaMemoryMgr::Singleton(), _1, _2, allocator->Name());
VisitableAllocator::Visitor free_visitor = std::bind(
&RdmaMemoryMgr::EvictMemoryRegion, &RdmaMemoryMgr::Singleton(), _1, _2);

auto* visitable_allocator = dynamic_cast<VisitableAllocator*>(allocator);
CHECK(visitable_allocator)
<< "is not visitable for instrumentation" << allocator->Name();
// Make sure we don't instrument the same allocator twice
if (instrumented_.find(allocator) == std::end(instrumented_)) {
visitable_allocator->AddAllocVisitor(alloc_visitor);
visitable_allocator->AddFreeVisitor(free_visitor);
instrumented_.insert(allocator);
LOG(INFO) << "Instrumenting CPU allocator " << allocator->Name();
}
}
ProcessState::singleton()->AddCPUAllocVisitor(alloc_visitor);
ProcessState::singleton()->AddCPUFreeVisitor(free_visitor);

#if GOOGLE_CUDA
if (IsGDRAvailable()) {
// Note we don't free allocated GPU memory so there is no free visitor
int32_t bus_id = TryToReadNumaNode(rdma_adapter_->context_->device) + 1;

char buf[8];
sprintf(buf, "gpu");
VisitableAllocator::Visitor cuda_alloc_visitor =
std::bind(&RdmaMemoryMgr::InsertMemoryRegion,
&RdmaMemoryMgr::Singleton(), _1, _2, std::string(buf));

SubAllocator::Visitor cuda_alloc_visitor = [](void* ptr, int gpu_id,
size_t num_bytes) {
RdmaMemoryMgr::Singleton().InsertMemoryRegion(
ptr, num_bytes, strings::StrCat("GPU:", gpu_id));
};
GPUProcessState::singleton()->AddGPUAllocVisitor(bus_id,
cuda_alloc_visitor);
GPUProcessState::singleton()->AddCUDAHostAllocVisitor(bus_id,
alloc_visitor);
GPUProcessState::singleton()->AddCUDAHostFreeVisitor(bus_id, free_visitor);
LOG(INFO) << "Instrumenting GPU allocator with bus_id " << bus_id;
}
#endif // GOOGLE_CUDA
Expand Down
1 change: 1 addition & 0 deletions tensorflow/contrib/verbs/rdma_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RdmaMgr {
void SetupChannels();
bool ConnectivityCheck();
void InitAllocators();
static void RegMemVisitors();
const string& local_worker() { return local_worker_; }

private:
Expand Down
5 changes: 5 additions & 0 deletions tensorflow/contrib/verbs/verbs_server_lib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,13 @@ Status VerbsServer::ChannelCacheFactory(const ServerDef& server_def,
return Status::OK();
}

namespace {
std::once_call reg_mem_visitors_call;
} // namespace

Status VerbsServer::Init(ServiceInitFunction service_func,
RendezvousMgrCreationFunction rendezvous_mgr_func) {
std::call_once(reg_mem_visitors_call, []() { RdmaMgr::RegMemVisitors(); });
Status s = GrpcServer::Init(service_func, rendezvous_mgr_func);
{
mutex_lock l(mu_);
Expand Down
1 change: 0 additions & 1 deletion tensorflow/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2783,7 +2783,6 @@ CORE_CPU_LIB_HEADERS = CORE_CPU_BASE_HDRS + [
"common_runtime/step_stats_collector.h",
"common_runtime/threadpool_device.h",
"common_runtime/tracing_device.h",
"common_runtime/visitable_allocator.h",
"common_runtime/process_state.h",
"common_runtime/pool_allocator.h",
"graph/gradients.h",
Expand Down
21 changes: 4 additions & 17 deletions tensorflow/core/common_runtime/bfc_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace tensorflow {

BFCAllocator::BFCAllocator(SubAllocator* sub_allocator, size_t total_memory,
bool allow_growth, const string& name)
: suballocator_(sub_allocator),
: sub_allocator_(sub_allocator),
name_(name),
free_chunks_list_(kInvalidChunkHandle),
next_allocation_id_(1) {
Expand Down Expand Up @@ -72,7 +72,7 @@ BFCAllocator::~BFCAllocator() {
VLOG(2) << "Number of regions allocated: "
<< region_manager_.regions().size();
for (const auto& region : region_manager_.regions()) {
suballocator_->Free(region.ptr(), region.memory_size());
sub_allocator_->Free(region.ptr(), region.memory_size());
}

for (BinNum b = 0; b < kNumBins; b++) {
Expand Down Expand Up @@ -108,7 +108,7 @@ bool BFCAllocator::Extend(size_t alignment, size_t rounded_bytes) {

// Try allocating.
size_t bytes = std::min(curr_region_allocation_bytes_, available_bytes);
void* mem_addr = suballocator_->Alloc(alignment, bytes);
void* mem_addr = sub_allocator_->Alloc(alignment, bytes);
if (mem_addr == nullptr && !started_backpedal_) {
// Only backpedal once.
started_backpedal_ = true;
Expand All @@ -119,7 +119,7 @@ bool BFCAllocator::Extend(size_t alignment, size_t rounded_bytes) {
while (mem_addr == nullptr) {
bytes = RoundedBytes(bytes * kBackpedalFactor);
if (bytes < rounded_bytes) break;
mem_addr = suballocator_->Alloc(alignment, bytes);
mem_addr = sub_allocator_->Alloc(alignment, bytes);
}
}

Expand Down Expand Up @@ -158,10 +158,6 @@ bool BFCAllocator::Extend(size_t alignment, size_t rounded_bytes) {
// Insert the chunk into the right bin.
InsertFreeChunkIntoBin(h);

// Invoke visitors on newly allocated region.
for (const auto& visitor : region_visitors_) {
visitor(mem_addr, bytes);
}
return true;
}

Expand Down Expand Up @@ -490,15 +486,6 @@ void BFCAllocator::FreeAndMaybeCoalesce(BFCAllocator::ChunkHandle h) {
InsertFreeChunkIntoBin(coalesced_chunk);
}

void BFCAllocator::AddAllocVisitor(Visitor visitor) {
VLOG(1) << "AddVisitor";
mutex_lock l(lock_);
region_visitors_.push_back(visitor);
for (const auto& region : region_manager_.regions()) {
visitor(region.ptr(), region.memory_size());
}
}

bool BFCAllocator::TracksAllocationSizes() { return true; }

size_t BFCAllocator::RequestedSize(const void* ptr) {
Expand Down
14 changes: 3 additions & 11 deletions tensorflow/core/common_runtime/bfc_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ limitations under the License.
#include <vector>

#include "tensorflow/core/common_runtime/allocator_retry.h"
#include "tensorflow/core/common_runtime/visitable_allocator.h"
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/lib/gtl/stl_util.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/macros.h"
Expand All @@ -42,7 +42,7 @@ namespace tensorflow {
// coalescing. One assumption we make is that the process using this
// allocator owns pretty much all of the memory, and that nearly
// all requests to allocate memory go through this interface.
class BFCAllocator : public VisitableAllocator {
class BFCAllocator : public Allocator {
public:
// Takes ownership of sub_allocator.
BFCAllocator(SubAllocator* sub_allocator, size_t total_memory,
Expand All @@ -55,11 +55,6 @@ class BFCAllocator : public VisitableAllocator {
const AllocationAttributes& allocation_attr) override;
void DeallocateRaw(void* ptr) override;

void AddAllocVisitor(Visitor visitor) override;

// Does nothing, because memory is never freed.
void AddFreeVisitor(Visitor visitor) override {}

bool TracksAllocationSizes() override;

size_t RequestedSize(const void* ptr) override;
Expand Down Expand Up @@ -423,7 +418,7 @@ class BFCAllocator : public VisitableAllocator {
// of the available memory.
bool started_backpedal_ = false;

std::unique_ptr<SubAllocator> suballocator_;
std::unique_ptr<SubAllocator> sub_allocator_;
string name_;

// Structures mutable after construction
Expand All @@ -435,9 +430,6 @@ class BFCAllocator : public VisitableAllocator {
// Pointer to head of linked list of free Chunks
ChunkHandle free_chunks_list_ GUARDED_BY(lock_);

// Called once on each region, ASAP.
std::vector<Visitor> region_visitors_ GUARDED_BY(lock_);

// Counter containing the next unique identifier to assign to a
// newly-created chunk.
int64 next_allocation_id_ GUARDED_BY(lock_);
Expand Down

0 comments on commit 185aa89

Please sign in to comment.