Skip to content

Commit

Permalink
Test NestedLoopJoin and MergeJoin in join fuzzer (#9901)
Browse files Browse the repository at this point in the history
Summary:

A correctness bug was found in NestedLoopJoin recently (#9892), 
so this diff adds NestedLoopJoin query plans with and without
TableScan to JoinFuzzer. It also adds MergeJoin with TableScan.

Differential Revision: D57703982
  • Loading branch information
kagamiori authored and facebook-github-bot committed May 24, 2024
1 parent dcffce5 commit f0af018
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 14 deletions.
161 changes: 147 additions & 14 deletions velox/exec/fuzzer/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,21 @@ core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode) {
joinNode.outputType());
}

core::PlanNodePtr tryFlipJoinSides(const core::NestedLoopJoinNode& joinNode) {
auto flippedJoinType = tryFlipJoinType(joinNode.joinType());
if (!flippedJoinType.has_value()) {
return nullptr;
}

return std::make_shared<core::NestedLoopJoinNode>(
joinNode.id(),
flippedJoinType.value(),
joinNode.joinCondition(),
joinNode.sources()[1],
joinNode.sources()[0],
joinNode.outputType());
}

bool containsTypeKind(const TypePtr& type, const TypeKind& search) {
if (type->kind() == search) {
return true;
Expand Down Expand Up @@ -759,6 +774,46 @@ std::vector<core::PlanNodePtr> makeSources(
return sourceNodes;
}

// Returns an equality join filter between probeKeys and buildKeys.
std::string makeJoinFilter(
const std::vector<std::string>& probeKeys,
const std::vector<std::string>& buildKeys) {
std::string filter{};
auto numKeys = probeKeys.size();
VELOX_CHECK_EQ(numKeys, buildKeys.size());
for (auto i = 0; i < numKeys; ++i) {
if (i > 0) {
filter += " AND ";
}
filter += fmt::format("{} = {}", probeKeys[i], buildKeys[i]);
}
return filter;
}

template <typename TNode>
void addFlippedJoinPlan(
const core::PlanNodePtr& plan,
std::vector<JoinFuzzer::PlanWithSplits>& plans,
const core::PlanNodeId& probeScanId = "",
const core::PlanNodeId& buildScanId = "",
const std::unordered_map<core::PlanNodeId, std::vector<velox::exec::Split>>&
splits = {},
core::ExecutionStrategy executionStrategy =
core::ExecutionStrategy::kUngrouped,
int32_t numGroups = 0) {
auto joinNode = std::dynamic_pointer_cast<const TNode>(plan);
VELOX_CHECK_NOT_NULL(joinNode);
if (auto flippedPlan = tryFlipJoinSides(*joinNode)) {
plans.push_back(JoinFuzzer::PlanWithSplits{
flippedPlan,
probeScanId,
buildScanId,
splits,
executionStrategy,
numGroups});
}
}

void makeAlternativePlans(
const core::PlanNodePtr& plan,
const std::vector<RowVectorPtr>& probeInput,
Expand All @@ -768,9 +823,7 @@ void makeAlternativePlans(
VELOX_CHECK_NOT_NULL(joinNode);

// Flip join sides.
if (auto flippedPlan = tryFlipJoinSides(*joinNode)) {
plans.push_back(JoinFuzzer::PlanWithSplits{flippedPlan});
}
addFlippedJoinPlan<core::HashJoinNode>(plan, plans);

// Parallelize probe and build sides.
const auto probeKeys = fieldNames(joinNode->leftKeys());
Expand Down Expand Up @@ -813,6 +866,25 @@ void makeAlternativePlans(
joinNode->joinType())
.planNode()}});
}

// Use NestedLoopJoin.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin() ||
joinNode->isFullJoin()) {
auto filter = makeJoinFilter(probeKeys, buildKeys);
planNodeIdGenerator->reset();
JoinFuzzer::PlanWithSplits planWithSplits{
PlanBuilder(planNodeIdGenerator)
.values(probeInput)
.nestedLoopJoin(
PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(),
filter,
asRowType(joinNode->outputType())->names(),
joinNode->joinType())
.planNode()};
plans.push_back(planWithSplits);

addFlippedJoinPlan<core::NestedLoopJoinNode>(planWithSplits.plan, plans);
}
}

std::vector<std::string> makeNames(const std::string& prefix, size_t n) {
Expand Down Expand Up @@ -1069,13 +1141,16 @@ void JoinFuzzer::addPlansWithTableScan(
buildScanSplits.push_back(makeSplit(filePath));
}

auto probeType = asRowType(probeInput[0]->type());
auto buildType = asRowType(buildInput[0]->type());

std::vector<PlanWithSplits> plansWithTableScan;
auto defaultPlan = makeDefaultPlanWithTableScan(
tableDir,
joinType,
nullAware,
asRowType(probeInput[0]->type()),
asRowType(buildInput[0]->type()),
probeType,
buildType,
probeKeys,
buildKeys,
probeScanSplits,
Expand All @@ -1088,15 +1163,12 @@ void JoinFuzzer::addPlansWithTableScan(
VELOX_CHECK_NOT_NULL(joinNode);

// Flip join sides.
if (auto flippedPlan = tryFlipJoinSides(*joinNode)) {
plansWithTableScan.push_back(PlanWithSplits{
flippedPlan,
defaultPlan.probeScanId,
defaultPlan.buildScanId,
defaultPlan.splits,
core::ExecutionStrategy::kUngrouped,
0});
}
addFlippedJoinPlan<core::HashJoinNode>(
defaultPlan.plan,
plansWithTableScan,
defaultPlan.probeScanId,
defaultPlan.buildScanId,
defaultPlan.splits);

const int32_t numGroups = randInt(1, probeScanSplits.size());
const std::vector<exec::Split> groupedProbeScanSplits =
Expand All @@ -1114,6 +1186,67 @@ void JoinFuzzer::addPlansWithTableScan(
groupedProbeScanSplits,
groupedBuildScanSplits));
}

// Add ungrouped MergeJoin with TableScan.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
if (joinNode->isInnerJoin() || joinNode->isLeftJoin()) {
altPlans.push_back({JoinFuzzer::PlanWithSplits{
PlanBuilder(planNodeIdGenerator)
.tableScan(probeType)
.capturePlanNodeId(probeScanId)
.orderBy(probeKeys, false)
.mergeJoin(
probeKeys,
buildKeys,
PlanBuilder(planNodeIdGenerator)
.tableScan(buildType)
.capturePlanNodeId(buildScanId)
.orderBy(buildKeys, false)
.planNode(),
/*filter=*/"",
asRowType(joinNode->outputType())->names(),
joinNode->joinType())
.planNode(),
probeScanId,
buildScanId,
{{probeScanId, fromConnectorSplits(probeScanSplits)},
{buildScanId, fromConnectorSplits(buildScanSplits)}}}});
}

// Add ungrouped NestedLoopJoin with TableScan.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin() ||
joinNode->isFullJoin()) {
auto filter = makeJoinFilter(probeKeys, buildKeys);
planNodeIdGenerator->reset();
JoinFuzzer::PlanWithSplits planWithSplits{
PlanBuilder(planNodeIdGenerator)
.tableScan(probeType)
.capturePlanNodeId(probeScanId)
.nestedLoopJoin(
PlanBuilder(planNodeIdGenerator)
.tableScan(buildType)
.capturePlanNodeId(buildScanId)
.planNode(),
filter,
asRowType(joinNode->outputType())->names(),
joinNode->joinType())
.planNode(),
probeScanId,
buildScanId,
{{probeScanId, fromConnectorSplits(probeScanSplits)},
{buildScanId, fromConnectorSplits(buildScanSplits)}}};
altPlans.push_back(planWithSplits);

addFlippedJoinPlan<core::NestedLoopJoinNode>(
planWithSplits.plan,
altPlans,
probeScanId,
buildScanId,
{{probeScanId, fromConnectorSplits(probeScanSplits)},
{buildScanId, fromConnectorSplits(buildScanSplits)}});
}
}

std::vector<exec::Split> JoinFuzzer::generateSplitsWithGroup(
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/fuzzer/JoinFuzzerRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/exec/fuzzer/JoinFuzzer.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/parse/TypeResolver.h"
#include "velox/serializers/PrestoSerializer.h"

/// Join FuzzerRunner leverages JoinFuzzer and VectorFuzzer to
Expand Down Expand Up @@ -62,6 +64,8 @@ class JoinFuzzerRunner {
facebook::velox::serializer::presto::PrestoVectorSerde::
registerVectorSerde();
facebook::velox::filesystems::registerLocalFileSystem();
facebook::velox::functions::prestosql::registerAllScalarFunctions();
facebook::velox::parse::registerTypeResolver();

facebook::velox::exec::test::joinFuzzer(seed);
return RUN_ALL_TESTS();
Expand Down

0 comments on commit f0af018

Please sign in to comment.