Skip to content

Commit

Permalink
Disable hash join spilling in mixed execution mode (#9001)
Browse files Browse the repository at this point in the history
Summary:
The current hash join spilling implementation can't handle mixed execution mode,
see (#8998) for details. For now,
we disable the join spilling under mixed execution mode. We add method in task to
tell if a task is under mixed execution mode: task is group executed and has one
ungrouped split. The hash build then disables the spilling if the task is under mixed
execution mode.

Pull Request resolved: #9001

Reviewed By: mbasmanova

Differential Revision: D54649972

Pulled By: xiaoxmeng

fbshipit-source-id: 4c4ad851f441df11b412e226794c8ed3a061bed4
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Mar 8, 2024
1 parent b6e3aad commit fdbf239
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 7 deletions.
9 changes: 7 additions & 2 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ uint64_t SharedArbitrator::growCapacity(
uint64_t SharedArbitrator::shrinkCapacity(
MemoryPool* pool,
uint64_t targetBytes) {
uint64_t freedBytes;
uint64_t freeCapacity;
uint64_t freedBytes{0};
uint64_t freeCapacity{0};
{
std::lock_guard<std::mutex> l(mutex_);
++numReleases_;
Expand Down Expand Up @@ -225,6 +225,11 @@ uint64_t SharedArbitrator::shrinkCapacity(
return freedBytes;
}

void SharedArbitrator::testingFreeCapacity(uint64_t capacity) {
std::lock_guard<std::mutex> l(mutex_);
incrementFreeCapacityLocked(capacity);
}

std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats(
const std::vector<std::shared_ptr<MemoryPool>>& pools) {
std::vector<SharedArbitrator::Candidate> candidates;
Expand Down
3 changes: 3 additions & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class SharedArbitrator : public memory::MemoryArbitrator {
std::string toString() const;
};

/// Returns 'freeCapacity' back to the arbitrator for testing.
void testingFreeCapacity(uint64_t freeCapacity);

private:
// The kind string of shared arbitrator.
inline static const std::string kind_{"SHARED"};
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,10 @@ std::string HashBuild::stateName(State state) {
}
}

bool HashBuild::canReclaim() const {
return canSpill() && !operatorCtx_->task()->hasMixedExecutionGroup();
}

void HashBuild::reclaim(
uint64_t /*unused*/,
memory::MemoryReclaimer::Stats& stats) {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class HashBuild final : public Operator {

bool isFinished() override;

bool canReclaim() const override;

void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

Expand Down
8 changes: 8 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,14 @@ bool Task::isUngroupedExecution() const {
return not isGroupedExecution();
}

bool Task::hasMixedExecutionGroup() const {
if (!isGroupedExecution()) {
return false;
}
std::lock_guard<std::timed_mutex> l(mutex_);
return numDriversUngrouped_ > 0;
}

bool Task::isRunning() const {
std::lock_guard<std::timed_mutex> l(mutex_);
return isRunningLocked();
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ class Task : public std::enable_shared_from_this<Task> {

bool isUngroupedExecution() const;

/// Returns true if this task has ungrouped execution split under grouped
/// execution mode.
///
/// NOTE: calls this function after task has been started as the number of
/// ungrouped drivers is set during task startup.
bool hasMixedExecutionGroup() const;

/// Starts executing the plan fragment specified in the constructor. If leaf
/// nodes require splits (e.g. TableScan, Exchange, etc.), these splits can be
/// added before or after calling start().
Expand Down
147 changes: 144 additions & 3 deletions velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
*/
#include <regex>

#include <velox/type/Timestamp.h>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/TableScan.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/type/Type.h"

namespace facebook::velox::exec::test {
Expand All @@ -34,6 +34,7 @@ class GroupedExecutionTest : public virtual HiveConnectorTestBase {
}

static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
HiveConnectorTestBase::SetUpTestCase();
}

Expand Down Expand Up @@ -283,6 +284,146 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) {
EXPECT_EQ(18, taskStats.pipelineStats[1].operatorStats[1].inputVectors);
}

DEBUG_ONLY_TEST_F(
GroupedExecutionTest,
groupedExecutionWithHashJoinSpillCheck) {
// Create source file to read as split input.
auto vectors = makeVectors(24, 20);
auto filePath = TempFilePath::create();
writeToFile(filePath->path, vectors);

struct {
bool enableSpill;
bool mixedExecutionMode;
int expectedNumDrivers;
bool expectedSpill;

std::string debugString() const {
return fmt::format(
"enableSpill {}, mixedExecutionMode {}, expectedNumDrivers {}, expectedSpill {}",
enableSpill,
mixedExecutionMode,
expectedNumDrivers,
expectedSpill);
}
} testSettings[] = {
{false, false, 12, false},
{false, true, 9, false},
{true, false, 12, true},
{true, true, 9, false}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId probeScanNodeId;
core::PlanNodeId buildScanNodeId;

PlanBuilder planBuilder(planNodeIdGenerator, pool_.get());
planBuilder.tableScan(rowType_)
.capturePlanNodeId(probeScanNodeId)
.project({"c0 as x"});
// Hash join.
core::PlanNodeId joinNodeId;
auto planFragment = planBuilder
.hashJoin(
{"x"},
{"y"},
PlanBuilder(planNodeIdGenerator, pool_.get())
.tableScan(rowType_, {"c0 > 0"})
.capturePlanNodeId(buildScanNodeId)
.project({"c0 as y"})
.planNode(),
"",
{"x", "y"})
.capturePlanNodeId(joinNodeId)
.partitionedOutput({}, 1, {"x", "y"})
.planFragment();

planFragment.executionStrategy = core::ExecutionStrategy::kGrouped;
planFragment.groupedExecutionLeafNodeIds.emplace(probeScanNodeId);
if (!testData.mixedExecutionMode) {
planFragment.groupedExecutionLeafNodeIds.emplace(buildScanNodeId);
}
planFragment.numSplitGroups = 2;

auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
if (testData.enableSpill) {
std::unordered_map<std::string, std::string> configs;
configs.emplace(core::QueryConfig::kSpillEnabled, "true");
configs.emplace(core::QueryConfig::kJoinSpillEnabled, "true");
queryCtx->testingOverrideConfigUnsafe(std::move(configs));
}

SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::noMoreInput",
std::function<void(Operator*)>([&](Operator* op) {
if (op->operatorType() != "HashBuild") {
return;
}
ASSERT_EQ(op->canReclaim(), testData.expectedSpill);
if (testData.enableSpill) {
memory::testingRunArbitration(op->pool());
}
}));

auto task = exec::Task::create(
"0", std::move(planFragment), 0, std::move(queryCtx));
const auto spillDirectory = exec::test::TempDirectoryPath::create();
if (testData.enableSpill) {
task->setSpillDirectory(spillDirectory->path);
}

// 3 drivers max and 1 concurrent split group to execute one group at a
// time.
task->start(3, 1);
ASSERT_EQ(task->hasMixedExecutionGroup(), testData.mixedExecutionMode);

// Add split(s) to the build scan.
if (testData.mixedExecutionMode) {
task->addSplit(buildScanNodeId, makeHiveSplit(filePath->path));
} else {
task->addSplit(
buildScanNodeId, makeHiveSplitWithGroup(filePath->path, 0));
task->addSplit(
buildScanNodeId, makeHiveSplitWithGroup(filePath->path, 1));
}
// Add one split for probe split group (0).
task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 0));
// Add one split for probe split group (1).
task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 1));

// Finalize the build split(s).
if (testData.mixedExecutionMode) {
task->noMoreSplits(buildScanNodeId);
} else {
task->noMoreSplitsForGroup(buildScanNodeId, 0);
task->noMoreSplitsForGroup(buildScanNodeId, 1);
}
// Finalize probe split groups.
task->noMoreSplitsForGroup(probeScanNodeId, 0);
task->noMoreSplitsForGroup(probeScanNodeId, 1);

waitForFinishedDrivers(task, testData.expectedNumDrivers);

// 'Delete results' from output buffer triggers 'set all output consumed',
// which should finish the task.
auto outputBufferManager = exec::OutputBufferManager::getInstance().lock();
outputBufferManager->deleteResults(task->taskId(), 0);

// Task must be finished at this stage.
ASSERT_EQ(task->state(), exec::TaskState::kFinished);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& planStats = taskStats.at(joinNodeId);
if (testData.expectedSpill) {
ASSERT_GT(planStats.spilledBytes, 0);
} else {
ASSERT_EQ(planStats.spilledBytes, 0);
}
}
}

// Here we test various aspects of grouped/bucketed execution involving
// output buffer and 3 pipelines.
TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) {
Expand Down
13 changes: 11 additions & 2 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/Values.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand Down Expand Up @@ -1543,10 +1542,20 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) {
task->start(4, 1);

const int numReclaims{10};
const uint64_t queryCapacity = task->pool()->parent()->capacity();
for (int i = 0; i < numReclaims; ++i) {
MemoryReclaimer::Stats stats;
task->pool()->reclaim(1000, 1UL << 30, stats);
}
const int64_t reclaimedQueryCapacity =
queryCapacity - task->pool()->parent()->capacity();
ASSERT_GE(reclaimedQueryCapacity, 0);
auto* arbitrator = dynamic_cast<memory::SharedArbitrator*>(
memory::memoryManager()->arbitrator());
if (arbitrator != nullptr) {
arbitrator->testingFreeCapacity(reclaimedQueryCapacity);
}

const auto taskStats = task->taskStats();
ASSERT_EQ(taskStats.memoryReclaimCount, numReclaims);
ASSERT_GT(taskStats.memoryReclaimMs, 0);
Expand Down

0 comments on commit fdbf239

Please sign in to comment.