Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Memory allocator bug fix #5035 (#5133)
Browse files Browse the repository at this point in the history
* Always pass in the bucket with default_bucket_key as the shared_module while binding new buckets

* Imbalance version of shared pool during plan memory

* Auto search and updated shared mem pool

* Cleanup unused code

* Sort new pool before memory matching. Remove shared pool in PlanMemory. Add test case for bucketing

* Remove itoa().

* Fix lint warnings

* Change total_exec_bytes to private.
  • Loading branch information
eric-haibin-lin authored and piiswrong committed Feb 28, 2017
1 parent eeeab1c commit 884b50a
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 13 deletions.
3 changes: 2 additions & 1 deletion example/rnn/lstm_bucketing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
def tokenize_text(fname, vocab=None, invalid_label=-1, start_label=0):
lines = open(fname).readlines()
lines = [filter(None, i.split(' ')) for i in lines]
sentences, vocab = mx.rnn.encode_sentences(lines, vocab=vocab, invalid_label=invalid_label, start_label=start_label)
sentences, vocab = mx.rnn.encode_sentences(lines, vocab=vocab, invalid_label=invalid_label,
start_label=start_label)
return sentences, vocab


Expand Down
1 change: 1 addition & 0 deletions python/mxnet/module/base_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def __init__(self, logger=logging):
self.params_initialized = False
self.optimizer_initialized = False
self._symbol = None
self._total_exec_bytes = 0

################################################################################
# High Level API
Expand Down
2 changes: 1 addition & 1 deletion python/mxnet/module/bucketing_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def switch_bucket(self, bucket_key, data_shapes, label_shapes=None):
state_names=self._state_names)
module.bind(data_shapes, label_shapes, self._curr_module.for_training,
self._curr_module.inputs_need_grad,
force_rebind=False, shared_module=self._curr_module)
force_rebind=False, shared_module=self._buckets[self._default_bucket_key])
self._buckets[bucket_key] = module

self._curr_module = self._buckets[bucket_key]
Expand Down
5 changes: 4 additions & 1 deletion python/mxnet/module/executor_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ def __init__(self, symbol, contexts, workload, data_shapes, label_shapes, param_
self.inputs_need_grad = inputs_need_grad

self.logger = logger

#In the future we should have a better way to profile memory per device (haibin)
self._total_exec_bytes = 0
self.fixed_param_names = fixed_param_names
if self.fixed_param_names is None:
self.fixed_param_names = []
Expand Down Expand Up @@ -615,6 +616,8 @@ def _get_or_reshape(name, shared_data_arrays, arg_shape, arg_type, context, logg
executor = self.symbol.bind(ctx=context, args=arg_arrays,
args_grad=grad_arrays, aux_states=aux_arrays,
grad_req=self.grad_req, shared_exec=shared_exec)
# Get the total bytes allocated for this executor
self._total_exec_bytes += int(executor.debug_str().split('\n')[-3].split()[1])
return executor

def _sliced_shape(self, shapes, i, major_axis):
Expand Down
1 change: 1 addition & 0 deletions python/mxnet/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def bind(self, data_shapes, label_shapes=None, for_training=True,
fixed_param_names=self._fixed_param_names,
grad_req=grad_req,
state_names=self._state_names)
self._total_exec_bytes = self._exec_group._total_exec_bytes
if shared_module is not None:
self.params_initialized = True
self._arg_params = shared_module._arg_params
Expand Down
37 changes: 28 additions & 9 deletions src/executor/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ void GraphExecutor::Init(nnvm::Symbol symbol,
g = AttachOpResources(g);
graph_ = std::move(g);
if (shared_exec != nullptr) {
this->InitDataEntryMemory(dynamic_cast<GraphExecutor*>(shared_exec)->data_pool_);
this->InitDataEntryMemory(&(dynamic_cast<GraphExecutor*>(shared_exec)->data_pool_));
} else {
this->InitDataEntryMemory({});
this->InitDataEntryMemory(nullptr);
}
{
// initialize output arrays
Expand Down Expand Up @@ -420,7 +420,7 @@ Graph GraphExecutor::InitGraph(nnvm::Symbol symbol,
}

// initialize the memory of each entries
void GraphExecutor::InitDataEntryMemory(const std::vector<NDArray>& shared_pool) {
void GraphExecutor::InitDataEntryMemory(std::vector<NDArray>* shared_pool) {
using nnvm::DTypeVector;
using nnvm::ShapeVector;
using nnvm::StorageVector;
Expand Down Expand Up @@ -475,19 +475,33 @@ void GraphExecutor::InitDataEntryMemory(const std::vector<NDArray>& shared_pool)
}
// construct the re-use pool, if needed
std::multimap<size_t, NDArray> free_pool;
for (const NDArray& nd : shared_pool) {
size_t bytes = nd.shape().Size() * mshadow::mshadow_sizeof(nd.dtype());
free_pool.insert(std::make_pair(bytes, nd));
if (shared_pool != nullptr) {
for (const NDArray& nd : *shared_pool) {
size_t bytes = nd.shape().Size() * mshadow::mshadow_sizeof(nd.dtype());
free_pool.insert(std::make_pair(bytes, nd));
}
}
// remake the data pool
data_pool_.clear();
for (size_t i = 0; i < pool_info.size(); ++i) {
data_pool_.resize(pool_info.size());

// sort the pool info the descending order before allocating memory
std::vector<size_t> sorted_pool_index;
for (size_t i = 0; i < pool_info.size(); i++) {
sorted_pool_index.push_back(i);
}
auto pool_comparator = [&pool_info](int lhs, int rhs){
return pool_info[lhs].second > pool_info[rhs].second;
};
std::sort(sorted_pool_index.begin(), sorted_pool_index.end(), pool_comparator);

for (size_t i : sorted_pool_index) {
const Context& ctx = pool_info[i].first;
size_t bytes = pool_info[i].second;
bool allocated = false;
for (auto it = free_pool.lower_bound(bytes); it != free_pool.end(); ++it) {
if (it->second.ctx() == ctx && it->first >= bytes) {
data_pool_.push_back(it->second);
data_pool_[i] = it->second;
free_pool.erase(it);
allocated = true;
break;
Expand All @@ -498,7 +512,12 @@ void GraphExecutor::InitDataEntryMemory(const std::vector<NDArray>& shared_pool)
CHECK_LE(nword, std::numeric_limits<index_t>::max());
// allocate float arrays
TShape shape{index_t(nword)};
data_pool_.emplace_back(NDArray(shape, ctx));
NDArray nd(shape, ctx);
data_pool_[i] = nd;
// put the new allocated arrays to shared pool
if (shared_pool != nullptr) {
shared_pool->push_back(nd);
}
}
}
CHECK_EQ(data_pool_.size(), pool_info.size());
Expand Down
3 changes: 2 additions & 1 deletion src/executor/graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using nnvm::Graph;
class GraphExecutor : public Executor {
public:
using Executor::MonitorCallback;

virtual ~GraphExecutor();
void Forward(bool is_train) override;
void PartialForward(bool is_train, int step, int *step_left) override;
Expand Down Expand Up @@ -76,7 +77,7 @@ class GraphExecutor : public Executor {
// initialize the resources in the graph
// initialize the memory of data entries
// shared_pool: extra memory shared from other parts
void InitDataEntryMemory(const std::vector<NDArray>& shared_pool);
void InitDataEntryMemory(std::vector<NDArray>* shared_pool);
// run ops from topo order start to end
void RunOps(bool is_train, size_t topo_start, size_t topo_end);
// internal graph
Expand Down
55 changes: 55 additions & 0 deletions tests/python/unittest/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,63 @@ def test_module_states():
for x1, x2 in zip(out1, out2):
assert not mx.test_utils.almost_equal(x1.asnumpy(), x2.asnumpy(), rtol=1e-3)

def test_module_switch_bucket():
vocab_dim = 5000
num_hidden = 100
num_embedding = 100
num_layer = 2
default_key = 10
test_key = 5
batch_size = 32
contexts = [mx.cpu(0)]
initializer = mx.init.Xavier(factor_type="in", magnitude=2.34)

#generate symbols for an LSTM network
def sym_gen(seq_len):
data = mx.sym.Variable('data')
label = mx.sym.Variable('softmax_label')
embed = mx.sym.Embedding(data=data, input_dim=vocab_dim,
output_dim=num_embedding, name='embed')
stack = mx.rnn.SequentialRNNCell()
for i in range(num_layer):
stack.add(mx.rnn.LSTMCell(num_hidden=num_hidden, prefix='lstm_l%d_'%i))
outputs, states = stack.unroll(seq_len, inputs=embed, merge_outputs=True)

pred = mx.sym.Reshape(outputs, shape=(-1, num_hidden))
pred = mx.sym.FullyConnected(data=pred, num_hidden=vocab_dim, name='pred')

label = mx.sym.Reshape(label, shape=(-1,))
pred = mx.sym.SoftmaxOutput(data=pred, label=label, name='softmax')

return pred, ('data',), ('softmax_label',)

def create_bucketing_module(key):
model = mx.mod.BucketingModule(
sym_gen = sym_gen,
default_bucket_key = key,
context = contexts)
model.bind([('data', (batch_size, key))],
[('softmax_label', (batch_size, key))], True, False)
model.init_params(initializer=initializer)
return model
#initialize the bucketing module with the default bucket key
bucketing_model = create_bucketing_module(default_key)
#switch to test_key
bucketing_model.switch_bucket(test_key, [('data', (batch_size, test_key))],
[('softmax_label', (batch_size, test_key))])
total_bytes_before = bucketing_model._buckets[default_key]._total_exec_bytes

#remove test_key and switch again
del bucketing_model._buckets[test_key]
bucketing_model.switch_bucket(test_key, [('data', (batch_size, test_key))],
[('softmax_label', (batch_size, test_key))])
total_bytes_after = bucketing_model._buckets[default_key]._total_exec_bytes
#the default bucket is expected to reuse the bytes allocated
assert total_bytes_after == total_bytes_before

if __name__ == '__main__':
test_module_states()
test_module_reshape()
test_save_load()
test_module_layout()
test_module_switch_bucket()

0 comments on commit 884b50a

Please sign in to comment.