Permalink
Browse files

IMPALA-2015: Add support for nested loop join

Implement nested-loop join in Impala with support for multiple join
modes, including inner, outer, semi and anti joins. Null-aware left
anti-join is not currently supported.

Summary of changes:
Introduced the NestedLoopJoinNode class in the FE that represents the nested
loop join. Common functionality between NestedLoopJoinNode and HashJoinNode
(e.g. cardinality estimation) was moved to the JoinNode class.
In the BE, introduced the NestedLoopJoinNode class that implements the nested-loop
join execution strategy.

Change-Id: I238ec7dc0080f661847e5e1b84e30d61c3b0bb5c
Reviewed-on: http://gerrit.cloudera.org:8080/652
Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information...
skye authored and Internal Jenkins committed Jun 24, 2015
1 parent 63bea74 commit 07589ccf7dd4cfc21ed58a2983688464eab75237
Showing with 1,738 additions and 1,084 deletions.
  1. +2 −2 be/src/exec/CMakeLists.txt
  2. +5 −0 be/src/exec/blocking-join-node.cc
  3. +0 −179 be/src/exec/cross-join-node.cc
  4. +0 −87 be/src/exec/cross-join-node.h
  5. +3 −3 be/src/exec/exec-node.cc
  6. +590 −0 be/src/exec/nested-loop-join-node.cc
  7. +138 −0 be/src/exec/nested-loop-join-node.h
  8. +2 −9 be/src/exec/partitioned-hash-join-node.cc
  9. +21 −13 common/thrift/PlanNodes.thrift
  10. +0 −264 fe/.settings/org.eclipse.jdt.core.prefs
  11. +0 −111 fe/.settings/org.eclipse.jdt.ui.prefs
  12. +0 −5 fe/.settings/org.eclipse.m2e.core.prefs
  13. +16 −15 fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
  14. +4 −4 fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
  15. +6 −252 fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
  16. +282 −16 fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
  17. +39 −33 fe/src/main/java/com/cloudera/impala/planner/{CrossJoinNode.java → NestedLoopJoinNode.java}
  18. +1 −0 fe/src/main/java/com/cloudera/impala/planner/Planner.java
  19. +41 −27 fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
  20. +2 −2 fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java
  21. +1 −1 fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
  22. +7 −0 fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java
  23. +1 −1 testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
  24. +48 −18 testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
  25. +6 −6 testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
  26. +146 −5 testdata/workloads/functional-planner/queries/PlannerTest/joins.test
  27. +151 −0 testdata/workloads/functional-planner/queries/PlannerTest/nested-loop-join.test
  28. +2 −2 testdata/workloads/functional-planner/queries/PlannerTest/order.test
  29. +2 −2 testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
  30. +1 −1 testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
  31. +14 −14 testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
  32. +8 −8 testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
  33. +4 −4 testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
  34. +67 −0 testdata/workloads/functional-query/queries/QueryTest/joins.test
  35. +121 −0 testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
  36. +7 −0 tests/query_test/test_join_queries.py
@@ -28,7 +28,6 @@ add_library(Exec
base-sequence-scanner.cc
blocking-join-node.cc
catalog-op-executor.cc
cross-join-node.cc
data-sink.cc
data-source-scan-node.cc
delimited-text-parser.cc
@@ -61,6 +60,7 @@ add_library(Exec
hbase-scan-node.cc
hbase-table-scanner.cc
incr-stats-util.cc
nested-loop-join-node.cc
partitioned-aggregation-node.cc
partitioned-aggregation-node-ir.cc
partitioned-hash-join-node.cc
@@ -88,4 +88,4 @@ ADD_BE_TEST(read-write-util-test)
ADD_BE_TEST(parquet-plain-test)
ADD_BE_TEST(parquet-version-test)
ADD_BE_TEST(row-batch-list-test)
ADD_BE_TEST(incr-stats-util-test)
ADD_BE_TEST(incr-stats-util-test)
@@ -38,12 +38,17 @@ BlockingJoinNode::BlockingJoinNode(const string& node_name, const TJoinOp::type
join_op_(join_op),
eos_(false),
probe_side_eos_(false),
probe_batch_pos_(-1),
current_probe_row_(NULL),
semi_join_staging_row_(NULL),
can_add_probe_filters_(false) {
}
Status BlockingJoinNode::Init(const TPlanNode& tnode) {
RETURN_IF_ERROR(ExecNode::Init(tnode));
DCHECK((join_op_ != TJoinOp::LEFT_SEMI_JOIN && join_op_ != TJoinOp::LEFT_ANTI_JOIN &&
join_op_ != TJoinOp::RIGHT_SEMI_JOIN && join_op_ != TJoinOp::RIGHT_ANTI_JOIN &&
join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || conjunct_ctxs_.size() == 0);
return Status::OK();
}
@@ -1,179 +0,0 @@
// Copyright 2013 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "exec/cross-join-node.h"
#include <sstream>
#include "codegen/llvm-codegen.h"
#include "exprs/expr.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/debug-util.h"
#include "util/runtime-profile.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
using namespace impala;
using namespace llvm;
CrossJoinNode::CrossJoinNode(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: BlockingJoinNode("CrossJoinNode", TJoinOp::CROSS_JOIN, pool, tnode, descs) {
}
Status CrossJoinNode::Prepare(RuntimeState* state) {
DCHECK(join_op_ == TJoinOp::CROSS_JOIN);
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
return Status::OK();
}
Status CrossJoinNode::Reset(RuntimeState* state) {
build_batch_pool_.Clear();
build_batches_.Reset();
return BlockingJoinNode::Reset(state);
}
void CrossJoinNode::Close(RuntimeState* state) {
if (is_closed()) return;
build_batch_pool_.Clear();
BlockingJoinNode::Close(state);
}
Status CrossJoinNode::ConstructBuildSide(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->Open(state));
while (true) {
RowBatch* batch = build_batch_pool_.Add(
new RowBatch(child(1)->row_desc(), state->batch_size(), mem_tracker()));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
bool eos;
RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos));
DCHECK_EQ(batch->num_io_buffers(), 0) << "Build batch should be compact.";
SCOPED_TIMER(build_timer_);
build_batches_.AddRowBatch(batch);
VLOG_ROW << BuildListDebugString();
COUNTER_SET(build_row_counter_,
static_cast<int64_t>(build_batches_.total_num_rows()));
if (eos) break;
}
return Status::OK();
}
Status CrossJoinNode::InitGetNext(TupleRow* first_left_row) {
current_build_row_ = build_batches_.Iterator();
return Status::OK();
}
Status CrossJoinNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* eos) {
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
SCOPED_TIMER(runtime_profile_->total_time_counter());
if (ReachedLimit() || eos_) {
*eos = true;
probe_batch_->TransferResourceOwnership(output_batch);
build_batches_.TransferResourceOwnership(output_batch);
return Status::OK();
}
*eos = false;
ScopedTimer<MonotonicStopWatch> timer(probe_timer_);
while (!eos_) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
// Compute max rows that should be added to output_batch
int64_t max_added_rows = output_batch->capacity() - output_batch->num_rows();
if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
// Continue processing this row batch
num_rows_returned_ +=
ProcessLeftChildBatch(output_batch, probe_batch_.get(), max_added_rows);
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
if (ReachedLimit() || output_batch->AtCapacity()) {
*eos = ReachedLimit();
break;
}
// Check to see if we're done processing the current left child batch
if (current_build_row_.AtEnd() && probe_batch_pos_ == probe_batch_->num_rows()) {
probe_batch_->TransferResourceOwnership(output_batch);
probe_batch_pos_ = 0;
if (output_batch->AtCapacity()) break;
if (probe_side_eos_) {
*eos = eos_ = true;
break;
} else {
timer.Stop();
RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
timer.Start();
COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
}
}
}
if (*eos) {
probe_batch_->TransferResourceOwnership(output_batch);
build_batches_.TransferResourceOwnership(output_batch);
}
return Status::OK();
}
string CrossJoinNode::BuildListDebugString() {
stringstream out;
out << "BuildList(";
out << build_batches_.DebugString(child(1)->row_desc());
out << ")";
return out.str();
}
// TODO: this can be replaced with a codegen'd function
int CrossJoinNode::ProcessLeftChildBatch(RowBatch* output_batch, RowBatch* batch,
int max_added_rows) {
int row_idx = output_batch->AddRows(max_added_rows);
DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX);
uint8_t* output_row_mem = reinterpret_cast<uint8_t*>(output_batch->GetRow(row_idx));
TupleRow* output_row = reinterpret_cast<TupleRow*>(output_row_mem);
int rows_returned = 0;
ExprContext* const* ctxs = &conjunct_ctxs_[0];
while (true) {
while (!current_build_row_.AtEnd()) {
CreateOutputRow(output_row, current_probe_row_, current_build_row_.GetRow());
current_build_row_.Next();
if (!EvalConjuncts(ctxs, conjunct_ctxs_.size(), output_row)) continue;
++rows_returned;
// Filled up out batch or hit limit
if (UNLIKELY(rows_returned == max_added_rows)) goto end;
// Advance to next out row
output_row_mem += output_batch->row_byte_size();
output_row = reinterpret_cast<TupleRow*>(output_row_mem);
}
DCHECK(current_build_row_.AtEnd());
// Advance to the next row in the left child batch
if (UNLIKELY(probe_batch_pos_ == batch->num_rows())) goto end;
current_probe_row_ = batch->GetRow(probe_batch_pos_++);
current_build_row_ = build_batches_.Iterator();
}
end:
output_batch->CommitRows(rows_returned);
return rows_returned;
}
@@ -1,87 +0,0 @@
// Copyright 2013 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef IMPALA_EXEC_CROSS_JOIN_NODE_H
#define IMPALA_EXEC_CROSS_JOIN_NODE_H
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_set.hpp>
#include <boost/thread.hpp>
#include <string>
#include "exec/exec-node.h"
#include "exec/blocking-join-node.h"
#include "exec/row-batch-list.h"
#include "runtime/descriptors.h" // for TupleDescriptor
#include "runtime/mem-pool.h"
#include "util/promise.h"
#include "gen-cpp/PlanNodes_types.h"
namespace impala {
class RowBatch;
class TupleRow;
/// Node for cross joins.
/// Iterates over the left child rows and then the right child rows and, for
/// each combination, writes the output row if the conjuncts are satisfied. The
/// build batches are kept in a list that is fully constructed from the right child in
/// ConstructBuildSide() (called by BlockingJoinNode::Open()) while rows are fetched from
/// the left child as necessary in GetNext().
class CrossJoinNode : public BlockingJoinNode {
public:
CrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
virtual Status Prepare(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state);
virtual void Close(RuntimeState* state);
protected:
virtual Status InitGetNext(TupleRow* first_left_row);
virtual Status ConstructBuildSide(RuntimeState* state);
private:
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
/// Object pool for build RowBatches. Stores and owns all batches in build_batches_.
ObjectPool build_batch_pool_;
/// List of build batches. The batches are owned by build_batch_pool_.
RowBatchList build_batches_;
/// END: Members that must be Reset()
/////////////////////////////////////////
RowBatchList::TupleRowIterator current_build_row_;
/// Processes a batch from the left child.
/// output_batch: the batch for resulting tuple rows
/// batch: the batch from the left child to process. This function can be called to
/// continue processing a batch in the middle
/// max_added_rows: maximum rows that can be added to output_batch
/// return the number of rows added to output_batch
int ProcessLeftChildBatch(RowBatch* output_batch, RowBatch* batch, int max_added_rows);
/// Returns a debug string for build_rows_. This is used for debugging during the
/// build list construction and before doing the join.
std::string BuildListDebugString();
};
}
#endif
View
@@ -26,13 +26,13 @@
#include "exprs/expr.h"
#include "exec/aggregation-node.h"
#include "exec/analytic-eval-node.h"
#include "exec/cross-join-node.h"
#include "exec/data-source-scan-node.h"
#include "exec/empty-set-node.h"
#include "exec/exchange-node.h"
#include "exec/hash-join-node.h"
#include "exec/hbase-scan-node.h"
#include "exec/hdfs-scan-node.h"
#include "exec/nested-loop-join-node.h"
#include "exec/partitioned-aggregation-node.h"
#include "exec/partitioned-hash-join-node.h"
#include "exec/select-node.h"
@@ -299,8 +299,8 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
*node = pool->Add(new HashJoinNode(pool, tnode, descs));
}
break;
case TPlanNodeType::CROSS_JOIN_NODE:
*node = pool->Add(new CrossJoinNode(pool, tnode, descs));
case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
*node = pool->Add(new NestedLoopJoinNode(pool, tnode, descs));
break;
case TPlanNodeType::EMPTY_SET_NODE:
*node = pool->Add(new EmptySetNode(pool, tnode, descs));
Oops, something went wrong.

0 comments on commit 07589cc

Please sign in to comment.