Skip to content

Commit

Permalink
IMPALA-4231: fix codegen time regression
Browse files Browse the repository at this point in the history
The commit "IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder"
slightly increased codegen time, which caused TPC-H Q2 to sometimes
regress significantly because of races in runtime filter arrival.

This patch attempts to fix the regression by improving codegen time in a
few places.

* Revert to using the old bool/Status return pattern. The regular Status
  return pattern results in significantly more complex IR because it has
  to emit code to copy and free statuses. I spent some time trying to
  convince it to optimise the extra code out, but didn't have much success.
* Remove some code that cannot be specialized from cross-compilation.
* Add noexcept to some functions that are used from the IR to ensure
  exception-handling IR is not emitted. This is less important after the
  first change but still should help produce cleaner IR.

Performance:
I was able to reproduce a regression locally, which is fixed by this
patch. I'm in the process of trying to verify the fix on a cluster.

Change-Id: Idf0fdedabd488550b6db90167a30c582949d608d
Reviewed-on: http://gerrit.cloudera.org:8080/4623
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information
Tim Armstrong authored and Internal Jenkins committed Oct 14, 2016
1 parent 89b41c6 commit c7fe438
Show file tree
Hide file tree
Showing 19 changed files with 382 additions and 368 deletions.
30 changes: 17 additions & 13 deletions be/src/common/status.h
Expand Up @@ -99,6 +99,10 @@ class Status {
if (UNLIKELY(status.msg_ != NULL)) CopyMessageFrom(status);
}

/// Move constructor that moves the error message (if any) and resets 'other' to the
/// default OK Status.
ALWAYS_INLINE Status(Status&& other) : msg_(other.msg_) { other.msg_ = NULL; }

/// Status using only the error code as a parameter. This can be used for error messages
/// that don't take format parameters.
Status(TErrorCode::type code);
Expand Down Expand Up @@ -153,6 +157,15 @@ class Status {
return *this;
}

/// Move assignment that moves the error message (if any) and resets 'other' to the
/// default OK Status.
ALWAYS_INLINE Status& operator=(Status&& other) {
if (UNLIKELY(msg_ != NULL)) FreeMessage();
msg_ = other.msg_;
other.msg_ = NULL;
return *this;
}

ALWAYS_INLINE ~Status() {
// The UNLIKELY and inlining here are important hints for the compiler to
// streamline the common case of Status::OK(). Use FreeMessage() which is
Expand Down Expand Up @@ -244,21 +257,12 @@ class Status {
};

/// some generally useful macros
#define RETURN_IF_ERROR(stmt) \
do { \
Status __status__ = (stmt); \
if (UNLIKELY(!__status__.ok())) return __status__; \
#define RETURN_IF_ERROR(stmt) \
do { \
Status __status__ = (stmt); \
if (UNLIKELY(!__status__.ok())) return std::move(__status__); \
} while (false)

#define RETURN_IF_ERROR_PREPEND(expr, prepend) \
do { \
Status __status__ = (stmt); \
if (UNLIKELY(!__status__.ok())) { \
return Status(strings::Substitute("$0: $1", prepend, __status__.GetDetail())); \
} \
} while (false)


#define ABORT_IF_ERROR(stmt) \
do { \
Status __status__ = (stmt); \
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/hash-table.cc
Expand Up @@ -149,7 +149,7 @@ uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
}

uint32_t HashTableCtx::HashRow(
const uint8_t* expr_values, const uint8_t* expr_values_null) const {
const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept {
DCHECK_LT(level_, seeds_.size());
if (expr_values_cache_.var_result_offset() == -1) {
/// This handles NULLs implicitly since a constant seed value was put
Expand All @@ -162,7 +162,7 @@ uint32_t HashTableCtx::HashRow(
}

bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs,
uint8_t* expr_values, uint8_t* expr_values_null) {
uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
bool has_null = false;
for (int i = 0; i < ctxs.size(); ++i) {
void* loc = expr_values_cache_.ExprValuePtr(expr_values, i);
Expand Down Expand Up @@ -213,7 +213,7 @@ uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values,

template <bool FORCE_NULL_EQUALITY>
bool HashTableCtx::Equals(const TupleRow* build_row, const uint8_t* expr_values,
const uint8_t* expr_values_null) const {
const uint8_t* expr_values_null) const noexcept {
for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
void* val = build_expr_ctxs_[i]->GetValue(build_row);
if (val == NULL) {
Expand Down Expand Up @@ -331,7 +331,7 @@ void HashTableCtx::ExprValuesCache::ResetIterators() {
cur_expr_values_hash_ = expr_values_hash_array_.get();
}

void HashTableCtx::ExprValuesCache::Reset() {
void HashTableCtx::ExprValuesCache::Reset() noexcept {
ResetIterators();
// Set the end pointer after resetting the other pointers so they point to
// the same location.
Expand Down
16 changes: 8 additions & 8 deletions be/src/exec/hash-table.h
Expand Up @@ -256,7 +256,7 @@ class HashTableCtx {
void Close(MemTracker* tracker);

/// Resets the cache states (iterators, end pointers etc) before writing.
void Reset();
void Reset() noexcept;

/// Resets the iterators to the start before reading. Will record the current position
/// of the iterators in end pointer before resetting so AtEnd() can determine if all
Expand Down Expand Up @@ -406,7 +406,7 @@ class HashTableCtx {
/// This will be replaced by codegen. We don't want this inlined for replacing
/// with codegen'd functions so the function name does not change.
uint32_t IR_NO_INLINE HashRow(
const uint8_t* expr_values, const uint8_t* expr_values_null) const;
const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept;

/// Wrapper function for calling correct HashUtil function in non-codegen'd case.
uint32_t Hash(const void* input, int len, uint32_t hash) const;
Expand All @@ -416,15 +416,15 @@ class HashTableCtx {
/// inlined when cross compiled because we need to be able to differentiate between
/// EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked into the
/// codegen'd function.
bool IR_NO_INLINE EvalBuildRow(const TupleRow* row, uint8_t* expr_values,
uint8_t* expr_values_null) {
bool IR_NO_INLINE EvalBuildRow(
const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null);
}

/// Evaluate 'row' over probe exprs, storing the values into 'expr_values' and nullness
/// into 'expr_values_null'. This will be replaced by codegen.
bool IR_NO_INLINE EvalProbeRow(const TupleRow* row, uint8_t* expr_values,
uint8_t* expr_values_null) {
bool IR_NO_INLINE EvalProbeRow(
const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null);
}

Expand All @@ -437,15 +437,15 @@ class HashTableCtx {
/// 'expr_values_null'. Returns whether any expr evaluated to NULL. This will be
/// replaced by codegen.
bool EvalRow(const TupleRow* row, const std::vector<ExprContext*>& ctxs,
uint8_t* expr_values, uint8_t* expr_values_null);
uint8_t* expr_values, uint8_t* expr_values_null) noexcept;

/// Returns true if the values of build_exprs evaluated over 'build_row' equal the
/// values in 'expr_values' with nullness 'expr_values_null'. FORCE_NULL_EQUALITY is
/// true if all nulls should be treated as equal, regardless of the values of
/// 'finds_nulls_'. This will be replaced by codegen.
template <bool FORCE_NULL_EQUALITY>
bool IR_NO_INLINE Equals(const TupleRow* build_row, const uint8_t* expr_values,
const uint8_t* expr_values_null) const;
const uint8_t* expr_values_null) const noexcept;

/// Helper function that calls Equals() with the current row. Always inlined so that
/// it does not appear in cross-compiled IR.
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/partitioned-aggregation-node-ir.cc
Expand Up @@ -153,7 +153,7 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__
insert_it.SetTuple(intermediate_tuple, hash);
return Status::OK();
} else if (!process_batch_status_.ok()) {
return process_batch_status_;
return std::move(process_batch_status_);
}

// We did not have enough memory to add intermediate_tuple to the stream.
Expand Down Expand Up @@ -198,13 +198,13 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
!TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx],
&process_batch_status_)) {
RETURN_IF_ERROR(process_batch_status_);
RETURN_IF_ERROR(std::move(process_batch_status_));
// Tuple is not going into hash table, add it to the output batch.
Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
out_batch->tuple_data_pool(), &process_batch_status_);
if (UNLIKELY(intermediate_tuple == NULL)) {
DCHECK(!process_batch_status_.ok());
return process_batch_status_;
return std::move(process_batch_status_);
}
UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row);
out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/partitioned-aggregation-node.cc
Expand Up @@ -965,7 +965,7 @@ Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
}

Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) {
const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) noexcept {
const int fixed_size = intermediate_tuple_desc_->byte_size();
const int varlen_size = GroupingExprsVarlenSize();
const int tuple_data_size = fixed_size + varlen_size;
Expand All @@ -985,8 +985,8 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
}

Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
const vector<FunctionContext*>& agg_fn_ctxs,
BufferedTupleStream* stream, Status* status) {
const vector<FunctionContext*>& agg_fn_ctxs, BufferedTupleStream* stream,
Status* status) noexcept {
DCHECK(stream != NULL && status != NULL);
// Allocate space for the entire tuple in the stream.
const int fixed_size = intermediate_tuple_desc_->byte_size();
Expand Down Expand Up @@ -1090,8 +1090,8 @@ void PartitionedAggregationNode::InitAggSlots(
}
}

void PartitionedAggregationNode::UpdateTuple(FunctionContext** agg_fn_ctxs,
Tuple* tuple, TupleRow* row, bool is_merge) {
void PartitionedAggregationNode::UpdateTuple(
FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
DCHECK(tuple != NULL || aggregate_evaluators_.empty());
for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
if (is_merge) {
Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/partitioned-aggregation-node.h
Expand Up @@ -444,14 +444,13 @@ class PartitionedAggregationNode : public ExecNode {
/// full, it will attempt to switch to IO-buffers.
Tuple* ConstructIntermediateTuple(
const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
BufferedTupleStream* stream, Status* status);
BufferedTupleStream* stream, Status* status) noexcept;

/// Constructs intermediate tuple, allocating memory from pool instead of the stream.
/// Returns NULL and sets status if there is not enough memory to allocate the tuple.
Tuple* ConstructIntermediateTuple(
const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
MemPool* pool, Status* status);

const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool,
Status* status) noexcept;

/// Returns the number of bytes of variable-length data for the grouping values stored
/// in 'ht_ctx_'.
Expand All @@ -477,7 +476,7 @@ class PartitionedAggregationNode : public ExecNode {
/// This function is replaced by codegen (which is why we don't use a vector argument
/// for agg_fn_ctxs).. Any var-len data is allocated from the FunctionContexts.
void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row,
bool is_merge = false);
bool is_merge = false) noexcept;

/// Called on the intermediate tuple of each group after all input rows have been
/// consumed and aggregated. Computes the final aggregate values to be returned in
Expand Down
20 changes: 13 additions & 7 deletions be/src/exec/partitioned-hash-join-builder-ir.cc
Expand Up @@ -29,15 +29,16 @@

using namespace impala;

inline Status PhjBuilder::AppendRow(BufferedTupleStream* stream, TupleRow* row) {
Status status;
if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
RETURN_IF_ERROR(status);
return AppendRowStreamFull(stream, row);
inline bool PhjBuilder::AppendRow(
BufferedTupleStream* stream, TupleRow* row, Status* status) {
if (LIKELY(stream->AddRow(row, status))) return true;
if (UNLIKELY(!status->ok())) return false;
return AppendRowStreamFull(stream, row, status);
}

Status PhjBuilder::ProcessBuildBatch(
RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) {
Status status;
HashTableCtx::ExprValuesCache* expr_vals_cache = ctx->expr_values_cache();
expr_vals_cache->Reset();
FOREACH_ROW(build_batch, 0, build_batch_iter) {
Expand All @@ -47,7 +48,10 @@ Status PhjBuilder::ProcessBuildBatch(
// TODO: remove with codegen/template
// If we are NULL aware and this build row has NULL in the eq join slot,
// append it to the null_aware partition. We will need it later.
RETURN_IF_ERROR(AppendRow(null_aware_partition_->build_rows(), build_row));
if (UNLIKELY(
!AppendRow(null_aware_partition_->build_rows(), build_row, &status))) {
return std::move(status);
}
}
continue;
}
Expand All @@ -66,7 +70,9 @@ Status PhjBuilder::ProcessBuildBatch(
const uint32_t hash = expr_vals_cache->CurExprValuesHash();
const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
Partition* partition = hash_partitions_[partition_idx];
RETURN_IF_ERROR(AppendRow(partition->build_rows(), build_row));
if (UNLIKELY(!AppendRow(partition->build_rows(), build_row, &status))) {
return std::move(status);
}
}
return Status::OK();
}
Expand Down
19 changes: 11 additions & 8 deletions be/src/exec/partitioned-hash-join-builder.cc
Expand Up @@ -260,23 +260,26 @@ Status PhjBuilder::CreateHashPartitions(int level) {
return Status::OK();
}

Status PhjBuilder::AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row) {
Status status;
bool PhjBuilder::AppendRowStreamFull(
BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
while (true) {
// Check if the stream is still using small buffers and try to switch to IO-buffers.
if (stream->using_small_buffers()) {
bool got_buffer;
RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer));
*status = stream->SwitchToIoBuffers(&got_buffer);
if (!status->ok()) return false;

if (got_buffer) {
if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
RETURN_IF_ERROR(status);
if (LIKELY(stream->AddRow(row, status))) return true;
if (!status->ok()) return false;
}
}
// We ran out of memory. Pick a partition to spill. If we ran out of unspilled
// partitions, SpillPartition() will return an error status.
RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
if (stream->AddRow(row, &status)) return Status::OK();
RETURN_IF_ERROR(status);
*status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
if (!status->ok()) return false;
if (stream->AddRow(row, status)) return true;
if (!status->ok()) return false;
// Spilling one partition does not guarantee we can append a row. Keep
// spilling until we can append this row.
}
Expand Down
14 changes: 9 additions & 5 deletions be/src/exec/partitioned-hash-join-builder.h
Expand Up @@ -261,14 +261,18 @@ class PhjBuilder : public DataSink {

/// Append 'row' to 'stream'. In the common case, appending the row to the stream
/// immediately succeeds. Otherwise this function falls back to the slower path of
/// AppendRowStreamFull(), which may spill partitions to free memory. Returns an error
/// if it was unable to append the row, even after spilling partitions.
Status AppendRow(BufferedTupleStream* stream, TupleRow* row);
/// AppendRowStreamFull(), which may spill partitions to free memory. Returns false
/// and sets 'status' if it was unable to append the row, even after spilling
/// partitions. This odd return convention is used to avoid emitting unnecessary code
/// for ~Status in perf-critical code.
bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status);

/// Slow path for AppendRow() above. It is called when the stream has failed to append
/// the row. We need to find more memory by either switching to IO-buffers, in case the
/// stream still uses small buffers, or spilling a partition.
Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row);
/// stream still uses small buffers, or spilling a partition. Returns false and sets
/// 'status' if it was unable to append the row, even after spilling partitions.
bool AppendRowStreamFull(
BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept;

/// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
/// to the Spill() call for the selected partition. The current policy is to spill the
Expand Down

0 comments on commit c7fe438

Please sign in to comment.