diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index d0db915c5258..ace2ab2524ba 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/fuzzer/JoinFuzzer.h" #include +#include #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" @@ -221,6 +222,13 @@ class JoinFuzzer { const std::vector& buildInput, const core::PlanNodePtr& plan); + // Generates and executes plans using NestedLoopJoin without filters. The + // result is compared to DuckDB. + void testCrossProduct( + core::JoinType joinType, + const std::vector& probeInput, + const std::vector& buildInput); + int32_t randInt(int32_t min, int32_t max) { return boost::random::uniform_int_distribution(min, max)(rng_); } @@ -361,20 +369,23 @@ std::vector JoinFuzzer::generateBuildInput( // probeInput. // To ensure there are some matches, sample with replacement 10% of probe join - // keys and use these as build keys. - // TODO Add a few random rows as well. + // keys and use these as 80% of build keys. The rest build keys are randomly + // generated. std::vector input; for (const auto& probe : probeInput) { - auto numRows = 1 + probe->size() / 10; - auto build = BaseVector::create(rowType, numRows, probe->pool()); + auto numRows = 1 + probe->size() / 8; + auto build = vectorFuzzer_.fuzzRow(rowType, numRows, false); // Pick probe side rows to copy. std::vector rowNumbers(numRows); + SelectivityVector rows(numRows, false); for (auto i = 0; i < numRows; ++i) { - rowNumbers[i] = randInt(0, probe->size() - 1); + if (vectorFuzzer_.coinToss(0.8) && probe->size() > 0) { + rowNumbers[i] = randInt(0, probe->size() - 1); + rows.setValid(i, true); + } } - SelectivityVector rows(numRows); for (auto i = 0; i < probeKeys.size(); ++i) { build->childAt(i)->resize(numRows); build->childAt(i)->copy(probe->childAt(i).get(), rows, rowNumbers.data()); @@ -968,6 +979,51 @@ bool isTableScanSupported(const TypePtr& type) { return true; } +void JoinFuzzer::testCrossProduct( + core::JoinType joinType, + const std::vector& probeInput, + const std::vector& buildInput) { + auto outputColumns = + concat(asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) + ->names(); + + auto planNodeIdGenerator = std::make_shared(); + auto plan = JoinFuzzer::PlanWithSplits{ + PlanBuilder(planNodeIdGenerator) + .values(probeInput) + .nestedLoopJoin( + PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), + "", + outputColumns, + joinType) + .planNode()}; + + const auto expected = execute(plan, /*injectSpill=*/false); + + // If OOM injection is not enabled verify the results against DuckDB. + if (!FLAGS_enable_oom_injection) { + if (auto duckDbResult = + computeDuckDbResult(probeInput, buildInput, plan.plan)) { + VELOX_CHECK( + assertEqualResults( + duckDbResult.value(), plan.plan->outputType(), {expected}), + "Velox and DuckDB results don't match"); + } + } + + std::vector altPlans; + addFlippedJoinPlan(plan.plan, altPlans); + + for (const auto& plan : altPlans) { + auto actual = execute(plan, /*injectSpill=*/false); + if (actual != nullptr && expected != nullptr) { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } + } +} + void JoinFuzzer::verify(core::JoinType joinType) { const bool nullAware = isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5); @@ -998,6 +1054,18 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } + // Test cross product without filter with 10% chance. Avoid testing cross + // product if input size is too large. + if ((core::isInnerJoin(joinType) || core::isLeftJoin(joinType) || + core::isFullJoin(joinType)) && + FLAGS_batch_size * FLAGS_num_batches <= 10000) { + if (vectorFuzzer_.coinToss(0.1)) { + testCrossProduct(joinType, probeInput, buildInput); + testCrossProduct(joinType, flatProbeInput, flatBuildInput); + return; + } + } + auto outputColumns = (core::isLeftSemiProjectJoin(joinType) || core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) @@ -1173,10 +1241,18 @@ void JoinFuzzer::addPlansWithTableScan( const int32_t numGroups = randInt(1, probeScanSplits.size()); const std::vector groupedProbeScanSplits = generateSplitsWithGroup( - tableDir, numGroups, /*isProbe=*/true, probeKeys.size(), probeInput); + tableDir, + numGroups, + /*isProbe=*/true, + probeKeys.size(), + probeInput); const std::vector groupedBuildScanSplits = generateSplitsWithGroup( - tableDir, numGroups, /*isProbe=*/false, buildKeys.size(), buildInput); + tableDir, + numGroups, + /*isProbe=*/false, + buildKeys.size(), + buildInput); for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan);