Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DYOD] NUMA - Node stealing and Grouping changes #2608

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ set(
utils/meta_tables/meta_tables_table.hpp
utils/meta_tables/segment_meta_data.cpp
utils/meta_tables/segment_meta_data.hpp
utils/numa_utils.hpp
utils/numa_utils.cpp
utils/pausable_loop_thread.cpp
utils/pausable_loop_thread.hpp
utils/performance_warning.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/lib/scheduler/abstract_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class AbstractScheduler : public Noncopyable {

virtual const std::vector<std::shared_ptr<TaskQueue>>& queues() const = 0;

// Returns a vector containing indices for all tasks queues, prioritized by their actual node's NUMA distance to
// the given node_id.
virtual const std::vector<NodeID>& prioritized_queue_ids(NodeID node_id) const = 0;

virtual void schedule(std::shared_ptr<AbstractTask> task, NodeID preferred_node_id = CURRENT_NODE_ID,
SchedulePriority priority = SchedulePriority::Default) = 0;

Expand Down
4 changes: 4 additions & 0 deletions src/lib/scheduler/immediate_execution_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const std::vector<std::shared_ptr<TaskQueue>>& ImmediateExecutionScheduler::queu
return _queues;
}

const std::vector<NodeID>& ImmediateExecutionScheduler::prioritized_queue_ids(NodeID node_id) const {
return _queue_order;
}

void ImmediateExecutionScheduler::schedule(std::shared_ptr<AbstractTask> task, NodeID /*preferred_node_id*/,
SchedulePriority /*priority*/) {
DebugAssert(task->is_scheduled(), "Don't call ImmediateExecutionScheduler::schedule(), call schedule() on the task");
Expand Down
3 changes: 3 additions & 0 deletions src/lib/scheduler/immediate_execution_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ class ImmediateExecutionScheduler : public AbstractScheduler {

const std::vector<std::shared_ptr<TaskQueue>>& queues() const override;

const std::vector<NodeID>& prioritized_queue_ids(NodeID node_id) const override;

void schedule(std::shared_ptr<AbstractTask> task, NodeID preferred_node_id = CURRENT_NODE_ID,
SchedulePriority priority = SchedulePriority::Default) override;

private:
std::vector<std::shared_ptr<TaskQueue>> _queues = std::vector<std::shared_ptr<TaskQueue>>{};
std::vector<NodeID> _queue_order = std::vector<NodeID>{0};
};

} // namespace hyrise
7 changes: 7 additions & 0 deletions src/lib/scheduler/node_queue_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void NodeQueueScheduler::begin() {
// Tracked per node as core restrictions can lead to unbalanced core counts.
_workers_per_node.emplace_back(topology_node.cpus.size());
}
_prioritized_numa_queue_ids = numa_utils::make_node_priority_matrix(numa_utils::get_distance_matrix());

_active = true;

Expand Down Expand Up @@ -161,6 +162,12 @@ void NodeQueueScheduler::schedule(std::shared_ptr<AbstractTask> task, NodeID pre
_queues[node_id_for_queue]->push(task, priority);
}

const std::vector<NodeID>& NodeQueueScheduler::prioritized_queue_ids(NodeID node_id) const {
DebugAssert(node_id < _prioritized_numa_queue_ids.size(),
"node_id " + std::to_string(node_id) + " is out of bounds.");
return _prioritized_numa_queue_ids[node_id];
}

NodeID NodeQueueScheduler::determine_queue_id(const NodeID preferred_node_id) const {
// Early out: no need to check for preferred node or other queues, if there is only a single node queue.
if (_node_count == 1) {
Expand Down
8 changes: 6 additions & 2 deletions src/lib/scheduler/node_queue_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include "abstract_scheduler.hpp"
#include "utils/numa_utils.hpp"

namespace hyrise {

Expand Down Expand Up @@ -35,7 +36,7 @@ namespace hyrise {
* TaskQueue.
*
* Note: currently, TaskQueues are not explicitly allocated on a NUMA node. This means most workers will frequently
* access distant TaskQueues, which is ~1.6 times slower than accessing a local node [1].
* access distant TaskQueues, which is ~1.6 times slower than accessing a local node [1].
*
* [1] http://frankdenneman.nl/2016/07/13/numa-deep-dive-4-local-memory-optimization/
*
Expand Down Expand Up @@ -64,6 +65,8 @@ class NodeQueueScheduler : public AbstractScheduler {

const std::vector<std::shared_ptr<TaskQueue>>& queues() const override;

const std::vector<NodeID>& prioritized_queue_ids(NodeID node_id) const override;

const std::vector<std::shared_ptr<Worker>>& workers() const;

/**
Expand All @@ -88,7 +91,7 @@ class NodeQueueScheduler : public AbstractScheduler {
const std::atomic_int64_t& active_worker_count() const;

// Number of groups for _group_tasks
static constexpr auto NUM_GROUPS = 10;
static constexpr auto NUM_GROUPS = 30;

protected:
void _group_tasks(const std::vector<std::shared_ptr<AbstractTask>>& tasks) const override;
Expand All @@ -97,6 +100,7 @@ class NodeQueueScheduler : public AbstractScheduler {
std::atomic<TaskID::base_type> _task_counter{0};
std::shared_ptr<UidAllocator> _worker_id_allocator;
std::vector<std::shared_ptr<TaskQueue>> _queues;
numa_utils::NodePriorityMatrix _prioritized_numa_queue_ids;
std::vector<std::shared_ptr<Worker>> _workers;

std::atomic_bool _active{false};
Expand Down
4 changes: 3 additions & 1 deletion src/lib/scheduler/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ void Worker::_work(const AllowSleep allow_sleep) {

if (!task) {
// Simple work stealing without explicitly transferring data between nodes.
for (const auto& queue : Hyrise::get().scheduler()->queues()) {
const auto& queues = Hyrise::get().scheduler()->queues();
for (const auto queue_id : Hyrise::get().scheduler()->prioritized_queue_ids(_queue->node_id())) {
const auto& queue = queues[queue_id];
if (queue == _queue) {
continue;
}
Expand Down
64 changes: 64 additions & 0 deletions src/lib/utils/numa_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "numa_utils.hpp"

#if HYRISE_NUMA_SUPPORT

#include <numa.h>

#endif

#include <numeric>
#include <string>

#include "hyrise.hpp"
#include "utils/assert.hpp"

namespace hyrise::numa_utils {

DistanceMatrix get_distance_matrix() {
const auto num_nodes = Hyrise::get().topology.nodes().size();

// 10 is the default distance to the same node.
auto distance_matrix = DistanceMatrix(num_nodes, std::vector<int>(num_nodes, 10));

#if HYRISE_NUMA_SUPPORT

// If numa_distance does not work (e.g. code is executed on Windows), 0 will be returned.
// To determine, wether the numa distance can be correctly read on the system, we get the
// distance between the two "max" nodes. If that distance is set, all other distances
// should be set as well.
if (numa_distance(num_nodes - 1, num_nodes - 1) == 0) {
PerformanceWarning(
"Distance between numa nodes could not be calculated. Falling back to default distance for every "
"interconnect.");
return distance_matrix;
}

for (auto node_x = size_t{0}; node_x < num_nodes; ++node_x) {
for (auto node_y = size_t{0}; node_y < num_nodes; ++node_y) {
// TODO(anyone): Assert that numa_distance is symmetric.
// (e.g. we can also set dis_matrix[y][x] from the same call)
distance_matrix[node_x][node_y] = numa_distance(node_x, node_y);
DebugAssert(distance_matrix[node_x][node_y] != 0, "numa distance could not find distance between node " +
std::to_string(node_x) + " and node " +
std::to_string(node_y));
}
}

#endif

return distance_matrix;
}

NodePriorityMatrix make_node_priority_matrix(const DistanceMatrix& distance_matrix) {
const auto matrix_size = distance_matrix.size();
auto node_matrix = NodePriorityMatrix(matrix_size, std::vector<NodeID>(matrix_size, NodeID{0}));

for (auto node_id = size_t{0}; node_id < matrix_size; ++node_id) {
std::iota(node_matrix[node_id].begin(), node_matrix[node_id].end(), 0);
std::sort(node_matrix[node_id].begin(), node_matrix[node_id].end(),
[&](auto l, auto r) -> bool { return distance_matrix[node_id][l] < distance_matrix[node_id][r]; });
}
return node_matrix;
}

} // namespace hyrise::numa_utils
50 changes: 50 additions & 0 deletions src/lib/utils/numa_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include <vector>

#include "performance_warning.hpp"
#include "scheduler/abstract_task.hpp"
#include "types.hpp"

namespace hyrise::numa_utils {

using DistanceMatrix = std::vector<std::vector<int>>;
using NodePriorityMatrix = std::vector<std::vector<NodeID>>;

/*
Returns a NxN matrix M where each element M[x,y] is the distance between
Node x and Node y. Same Node distance should be equal to 10.
Exemplary output for 8 Nodes:
{
{10, 16, 19, 16, 50, 50, 50, 50},
{16, 10, 16, 19, 50, 50, 50, 50},
{19, 16, 10, 16, 50, 50, 50, 50},
{16, 19, 16, 10, 50, 50, 50, 50},
{50, 50, 50, 50, 10, 16, 19, 16},
{50, 50, 50, 50, 16, 10, 16, 19},
{50, 50, 50, 50, 19, 16, 10, 16},
{50, 50, 50, 50, 16, 19, 16, 10},
}
*/
DistanceMatrix get_distance_matrix();

/*
Takes a distance matrix M[n,n] where element M[x,y] is the distance between
Node x and Node y. Returns n vectors of size n. For each vector at position
j, the first NodeID is the closest to j, ..., and the last is the furthest
based on the distance matrix.
Exemplary output for 8 Nodes:
{
{0, 1, 3, 2, 4, 5, 6, 7},
{1, 0, 2, 3, 4, 5, 6, 7},
{2, 1, 3, 0, 4, 5, 6, 7},
{3, 0, 2, 1, 4, 5, 6, 7},
{4, 5, 7, 6, 0, 1, 2, 3},
{5, 4, 6, 7, 0, 1, 2, 3},
{6, 5, 7, 4, 0, 1, 2, 3},
{7, 4, 6, 5, 0, 1, 2, 3},
}
*/
NodePriorityMatrix make_node_priority_matrix(const DistanceMatrix& distance_matrix);

} // namespace hyrise::numa_utils
1 change: 1 addition & 0 deletions src/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ set(
lib/utils/meta_tables/meta_table_test.cpp
lib/utils/mock_setting.cpp
lib/utils/mock_setting.hpp
lib/utils/numa_utils_test.cpp
lib/utils/plugin_manager_test.cpp
lib/utils/plugin_test_utils.cpp
lib/utils/plugin_test_utils.hpp
Expand Down
2 changes: 1 addition & 1 deletion src/test/lib/scheduler/scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ TEST_F(SchedulerTest, Grouping) {
auto output = std::vector<size_t>{};
auto tasks = std::vector<std::shared_ptr<AbstractTask>>{};

constexpr auto TASK_COUNT = 50;
constexpr auto TASK_COUNT = 60;

for (auto task_id = 0; task_id < TASK_COUNT; ++task_id) {
tasks.emplace_back(std::make_shared<JobTask>([&output, task_id] { output.emplace_back(task_id); }));
Expand Down
25 changes: 25 additions & 0 deletions src/test/lib/utils/numa_utils_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "base_test.hpp"

#include "types.hpp"
#include "utils/numa_utils.hpp"

namespace hyrise {

class NumaUtilsTest : public BaseTest {};

TEST_F(NumaUtilsTest, MakeNodePriorityMatrix) {
const auto distance_matrix = numa_utils::DistanceMatrix{
{1, 2, 3},
{3, 2, 1},
{2, 3, 1},
};

const auto priority_matrix = numa_utils::make_node_priority_matrix(distance_matrix);

const auto expected_priority_matrix = numa_utils::NodePriorityMatrix{
{NodeID{0}, NodeID{1}, NodeID{2}}, {NodeID{2}, NodeID{1}, NodeID{0}}, {NodeID{2}, NodeID{0}, NodeID{1}}};

EXPECT_EQ(priority_matrix, expected_priority_matrix);
}

} // namespace hyrise