Skip to content

Commit

Permalink
IMPALA-4410: Safer tear-down of RuntimeState
Browse files Browse the repository at this point in the history
* Add RuntimeState::Close() which is guaranteed to release resources
  safely, rather than having PFE::Close() do the same piecemeal.
* Fix for crash where PFE::Prepare() fails before descriptor table is
  created.
* Remove some dead code from TestEnv, and rename some methods for clarity.

Testing: Found by debug-actions, which has a reproducible test where
PFE::Prepare() fails. Manually tested on master.

Change-Id: Ie416e4d57240142bf685385299b749c3a6792c45
Reviewed-on: http://gerrit.cloudera.org:8080/4893
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <henry@cloudera.com>
  • Loading branch information
Henry Robinson committed Nov 23, 2016
1 parent bbf5255 commit 707f71b
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 72 deletions.
9 changes: 5 additions & 4 deletions be/src/exec/hash-table-test.cc
Expand Up @@ -58,6 +58,7 @@ class HashTableTest : public testing::Test {
MemPool mem_pool_;
vector<ExprContext*> build_expr_ctxs_;
vector<ExprContext*> probe_expr_ctxs_;
int next_query_id_ = 0;

virtual void SetUp() {
test_env_.reset(new TestEnv());
Expand Down Expand Up @@ -184,8 +185,8 @@ class HashTableTest : public testing::Test {
bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
int max_num_blocks = 100, int reserved_blocks = 10) {
EXPECT_OK(
test_env_->CreateQueryState(0, max_num_blocks, block_size, &runtime_state_));
EXPECT_OK(test_env_->CreatePerQueryState(
next_query_id_++, max_num_blocks, block_size, &runtime_state_));
MemTracker* client_tracker = pool_.Add(
new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
BufferedBlockMgr::Client* client;
Expand Down Expand Up @@ -602,8 +603,8 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {

// Test that hashing empty string updates hash value.
TEST_F(HashTableTest, HashEmpty) {
EXPECT_TRUE(test_env_->CreateQueryState(0, 100, 8 * 1024 * 1024,
&runtime_state_).ok());
EXPECT_TRUE(
test_env_->CreatePerQueryState(0, 100, 8 * 1024 * 1024, &runtime_state_).ok());
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
probe_expr_ctxs_, false /* !stores_nulls_ */,
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/buffered-block-mgr-test.cc
Expand Up @@ -142,7 +142,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size,
RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
RuntimeState* state;
EXPECT_OK(test_env_->CreateQueryState(
EXPECT_OK(test_env_->CreatePerQueryState(
query_id, max_buffers, block_size, &state, query_options));
if (query_state != NULL) *query_state = state;
return state->block_mgr();
Expand Down Expand Up @@ -185,7 +185,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
void TearDownMgrs() {
// Tear down the query states, which DCHECKs that the memory consumption of
// the query's trackers is zero.
test_env_->TearDownQueryStates();
test_env_->TearDownRuntimeStates();
}

void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
Expand Down Expand Up @@ -924,7 +924,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(
// scenario by holding onto a reference to the block mgr. This should be safe so
// long as blocks are properly deleted before the runtime state is torn down.
DeleteBlocks(blocks);
test_env_->TearDownQueryStates();
test_env_->TearDownRuntimeStates();

// Optionally wait for writes to complete after cancellation.
if (wait_for_writes) WaitForWrites(block_mgr.get());
Expand Down
12 changes: 9 additions & 3 deletions be/src/runtime/buffered-tuple-stream-test.cc
Expand Up @@ -101,7 +101,7 @@ class SimpleTupleStreamTest : public testing::Test {
/// Setup a block manager with the provided settings and client with no reservation,
/// tracked by tracker_.
void InitBlockMgr(int64_t limit, int block_size) {
ASSERT_OK(test_env_->CreateQueryState(0, limit, block_size, &runtime_state_));
ASSERT_OK(test_env_->CreatePerQueryState(0, limit, block_size, &runtime_state_));
MemTracker* client_tracker = pool_.Add(
new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
Expand Down Expand Up @@ -723,14 +723,20 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write)
}

/// Test attaching memory to a row batch from a pinned stream.
TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStream) {
TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamReadWrite) {
TestTransferMemory(true, true);
}

TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamNoReadWrite) {
TestTransferMemory(true, false);
}

/// Test attaching memory to a row batch from an unpinned stream.
TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStream) {
TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamReadWrite) {
TestTransferMemory(false, true);
}

TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
TestTransferMemory(false, false);
}

Expand Down
5 changes: 1 addition & 4 deletions be/src/runtime/plan-fragment-executor.cc
Expand Up @@ -534,10 +534,7 @@ void PlanFragmentExecutor::Close() {
// Prepare should always have been called, and so runtime_state_ should be set
DCHECK(prepared_promise_.IsSet());
if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get());
runtime_state_->UnregisterReaderContexts();
exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
runtime_state_->filter_bank()->Close();
runtime_state_->ReleaseResources();

if (mem_usage_sampled_counter_ != NULL) {
PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/runtime-state.cc
Expand Up @@ -306,4 +306,12 @@ void RuntimeState::UnregisterReaderContexts() {
reader_contexts_.clear();
}

void RuntimeState::ReleaseResources() {
UnregisterReaderContexts();
if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this);
if (filter_bank_ != nullptr) filter_bank_->Close();
if (resource_pool_ != nullptr) {
ExecEnv::GetInstance()->thread_mgr()->UnregisterPool(resource_pool_);
}
}
}
13 changes: 9 additions & 4 deletions be/src/runtime/runtime-state.h
Expand Up @@ -62,8 +62,10 @@ typedef std::map<std::string, TInsertStats> PartitionInsertStats;
/// deleted.
typedef std::map<std::string, std::string> FileMoveMap;

/// A collection of items that are part of the global state of a
/// query and shared across all execution nodes of that query.
/// A collection of items that are part of the global state of a query and shared across
/// all execution nodes of that query. After initialisation, callers must call
/// ReleaseResources() to ensure that all resources are correctly freed before
/// destruction.
class RuntimeState {
public:
RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env);
Expand Down Expand Up @@ -311,6 +313,9 @@ class RuntimeState {
/// TODO: Fix IMPALA-4233
Status CodegenScalarFns();

/// Release resources and prepare this object for destruction.
void ReleaseResources();

private:
/// Allow TestEnv to set block_mgr manually for testing.
friend class TestEnv;
Expand All @@ -325,7 +330,7 @@ class RuntimeState {

static const int DEFAULT_BATCH_SIZE = 1024;

DescriptorTbl* desc_tbl_;
DescriptorTbl* desc_tbl_ = nullptr;
boost::scoped_ptr<ObjectPool> obj_pool_;

/// Lock protecting error_log_
Expand All @@ -351,7 +356,7 @@ class RuntimeState {

/// Thread resource management object for this fragment's execution. The runtime
/// state is responsible for returning this pool to the thread mgr.
ThreadResourceMgr::ResourcePool* resource_pool_;
ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;

/// Temporary Hdfs files created, and where they should be moved to ultimately.
/// Mapping a filename to a blank destination causes it to be deleted.
Expand Down
49 changes: 18 additions & 31 deletions be/src/runtime/test-env.cc
Expand Up @@ -19,6 +19,8 @@
#include "util/disk-info.h"
#include "util/impalad-metrics.h"

#include "gutil/strings/substitute.h"

#include <memory>

#include "common/names.h"
Expand Down Expand Up @@ -47,8 +49,7 @@ void TestEnv::InitMetrics() {
metrics_.reset(new MetricGroup("test-env-metrics"));
}

void TestEnv::InitTmpFileMgr(const std::vector<std::string>& tmp_dirs,
bool one_dir_per_device) {
void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool one_dir_per_device) {
// Need to recreate metrics to avoid error when registering metric twice.
InitMetrics();
tmp_file_mgr_.reset(new TmpFileMgr);
Expand All @@ -57,29 +58,26 @@ void TestEnv::InitTmpFileMgr(const std::vector<std::string>& tmp_dirs,

TestEnv::~TestEnv() {
// Queries must be torn down first since they are dependent on global state.
TearDownQueryStates();
TearDownRuntimeStates();
exec_env_.reset();
io_mgr_tracker_.reset();
tmp_file_mgr_.reset();
metrics_.reset();
}

RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id,
TQueryOptions* query_options) {
Status TestEnv::CreatePerQueryState(int64_t query_id, int max_buffers, int block_size,
RuntimeState** runtime_state, TQueryOptions* query_options) {
// Enforce invariant that each query ID can be registered at most once.
if (runtime_states_.find(query_id) != runtime_states_.end()) {
return Status(Substitute("Duplicate query id found: $0", query_id));
}

TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
if (query_options != NULL) plan_params.query_ctx.request.query_options = *query_options;
plan_params.query_ctx.query_id.hi = 0;
plan_params.query_ctx.query_id.lo = query_id;
return new RuntimeState(plan_params, exec_env_.get());
}

Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,
RuntimeState** runtime_state, TQueryOptions* query_options) {
*runtime_state = CreateRuntimeState(query_id, query_options);
if (*runtime_state == NULL) {
return Status("Unexpected error creating RuntimeState");
}

*runtime_state = new RuntimeState(plan_params, exec_env_.get());
(*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);

shared_ptr<BufferedBlockMgr> mgr;
Expand All @@ -88,24 +86,13 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si
tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
(*runtime_state)->set_block_mgr(mgr);

query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
return Status::OK();
}

Status TestEnv::CreateQueryStates(int64_t start_query_id, int num_mgrs,
int buffers_per_mgr, int block_size,
vector<RuntimeState*>* runtime_states) {
for (int i = 0; i < num_mgrs; ++i) {
RuntimeState* runtime_state;
RETURN_IF_ERROR(CreateQueryState(start_query_id + i, buffers_per_mgr, block_size,
&runtime_state));
runtime_states->push_back(runtime_state);
}
runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state);
return Status::OK();
}

void TestEnv::TearDownQueryStates() {
query_states_.clear();
void TestEnv::TearDownRuntimeStates() {
for (auto& runtime_state : runtime_states_) runtime_state.second->ReleaseResources();
runtime_states_.clear();
}


Expand All @@ -117,8 +104,8 @@ int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {

int64_t TestEnv::TotalQueryMemoryConsumption() {
int64_t total = 0;
for (shared_ptr<RuntimeState>& query_state : query_states_) {
total += query_state->query_mem_tracker()->consumption();
for (const auto& runtime_state : runtime_states_) {
total += runtime_state.second->query_mem_tracker()->consumption();
}
return total;
}
Expand Down
22 changes: 7 additions & 15 deletions be/src/runtime/test-env.h
Expand Up @@ -38,17 +38,13 @@ class TestEnv {
void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);

/// Create a RuntimeState for a query with a new block manager and the given query
/// options. The RuntimeState is owned by the TestEnv.
Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
/// options. The RuntimeState is owned by the TestEnv. Returns an error if
/// CreatePerQueryState() has been called with the same query ID already.
Status CreatePerQueryState(int64_t query_id, int max_buffers, int block_size,
RuntimeState** runtime_state, TQueryOptions* query_options = NULL);

/// Create multiple separate RuntimeStates with associated block managers, e.g. as if
/// multiple queries were executing. The RuntimeStates are owned by TestEnv.
Status CreateQueryStates(int64_t start_query_id, int num_mgrs, int buffers_per_mgr,
int block_size, std::vector<RuntimeState*>* runtime_states);

/// Destroy all RuntimeStates and block managers created by this TestEnv.
void TearDownQueryStates();
void TearDownRuntimeStates();

/// Calculate memory limit accounting for overflow and negative values.
/// If max_buffers is -1, no memory limit will apply.
Expand All @@ -63,23 +59,19 @@ class TestEnv {
TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }

private:

/// Recreate global metric groups.
void InitMetrics();

/// Create a new RuntimeState sharing global environment with given query options
RuntimeState* CreateRuntimeState(int64_t query_id,
TQueryOptions* query_options = NULL);

/// Global state for test environment.
static boost::scoped_ptr<MetricGroup> static_metrics_;
boost::scoped_ptr<ExecEnv> exec_env_;
boost::scoped_ptr<MemTracker> io_mgr_tracker_;
boost::scoped_ptr<MetricGroup> metrics_;
boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;

/// Per-query states with associated block managers.
vector<std::shared_ptr<RuntimeState>> query_states_;
/// Per-query states with associated block managers. Key is the integer query ID passed
/// to CreatePerQueryState().
std::unordered_map<int64_t, std::shared_ptr<RuntimeState>> runtime_states_;
};

}
Expand Down
19 changes: 11 additions & 8 deletions be/src/service/fe-support.cc
Expand Up @@ -26,25 +26,26 @@
#include "common/logging.h"
#include "common/status.h"
#include "exec/catalog-op-executor.h"
#include "exprs/expr.h"
#include "exprs/expr-context.h"
#include "exprs/expr.h"
#include "gen-cpp/Data_types.h"
#include "gen-cpp/Frontend_types.h"
#include "rpc/jni-thrift-util.h"
#include "rpc/thrift-server.h"
#include "runtime/client-cache.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/lib-cache.h"
#include "runtime/client-cache.h"
#include "runtime/runtime-state.h"
#include "service/impala-server.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/dynamic-util.h"
#include "util/jni-util.h"
#include "util/mem-info.h"
#include "util/scope-exit-trigger.h"
#include "util/symbols-util.h"
#include "rpc/jni-thrift-util.h"
#include "rpc/thrift-server.h"
#include "util/debug-util.h"
#include "gen-cpp/Data_types.h"
#include "gen-cpp/Frontend_types.h"

#include "common/names.h"

Expand Down Expand Up @@ -94,6 +95,8 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
// Java exception.
query_ctx.request.query_options.max_errors = 1;
RuntimeState state(query_ctx);
// Make sure to close the runtime state no matter how this scope is exited.
ScopeExitTrigger close_runtime_state([&state]() { state.ReleaseResources(); });

THROW_IF_ERROR_RET(jni_frame.push(env), env, JniUtil::internal_exc_class(),
result_bytes);
Expand Down
40 changes: 40 additions & 0 deletions be/src/util/scope-exit-trigger.h
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_UTIL_SCOPE_EXIT_TRIGGER_H
#define IMPALA_UTIL_SCOPE_EXIT_TRIGGER_H

#include <functional>

namespace impala {

/// Utility class that calls a client-supplied function when it is destroyed.
///
/// Use judiciously - scope exits can be hard to reason about, and this class should not
/// act as proxy for work-performing d'tors, which we try to avoid.
class ScopeExitTrigger {
public:
ScopeExitTrigger(const auto& trigger) : trigger_(trigger) {}

~ScopeExitTrigger() { trigger_(); }

private:
std::function<void()> trigger_;
};
}

#endif

0 comments on commit 707f71b

Please sign in to comment.