Skip to content

Commit

Permalink
Threads: replace enum with constexpr int and enum class (kokkos#6514)
Browse files Browse the repository at this point in the history
* Add support for enum in spinwait

* Replace enum by enum class and static constexpr in ThreadsInternal

* Let the compiler deduce the template parameter

* Move the ThreadsInternal::State enum to its own file

* Move Kokkos_Spinwait to the Threads backend

* Remove template parameters of functions in Kokkos_Threads_Spinwait

* Remove unused functions

* Fix indentation

* Remove useless include
  • Loading branch information
Rombur committed Oct 24, 2023
1 parent 0b59a1b commit cf5a859
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 225 deletions.
4 changes: 2 additions & 2 deletions Makefile.targets
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ Kokkos_TaskQueue.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/impl/Kokkos_Ta
$(CXX) $(KOKKOS_CPPFLAGS) $(KOKKOS_CXXFLAGS) $(CXXFLAGS) -c $(KOKKOS_PATH)/core/src/impl/Kokkos_TaskQueue.cpp
Kokkos_HostThreadTeam.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/impl/Kokkos_HostThreadTeam.cpp
$(CXX) $(KOKKOS_CPPFLAGS) $(KOKKOS_CXXFLAGS) $(CXXFLAGS) -c $(KOKKOS_PATH)/core/src/impl/Kokkos_HostThreadTeam.cpp
Kokkos_Spinwait.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/impl/Kokkos_Spinwait.cpp
$(CXX) $(KOKKOS_CPPFLAGS) $(KOKKOS_CXXFLAGS) $(CXXFLAGS) -c $(KOKKOS_PATH)/core/src/impl/Kokkos_Spinwait.cpp
Kokkos_HostBarrier.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/impl/Kokkos_HostBarrier.cpp
$(CXX) $(KOKKOS_CPPFLAGS) $(KOKKOS_CXXFLAGS) $(CXXFLAGS) -c $(KOKKOS_PATH)/core/src/impl/Kokkos_HostBarrier.cpp
Kokkos_Profiling.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/impl/Kokkos_Profiling.cpp
Expand Down Expand Up @@ -84,6 +82,8 @@ endif
ifeq ($(KOKKOS_INTERNAL_USE_THREADS), 1)
Kokkos_Threads_Instance.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/Threads/Kokkos_Threads_Instance.cpp
$(CXX) $(KOKKOS_CPPFLAGS) $(KOKKOS_CXXFLAGS) $(CXXFLAGS) -c $(KOKKOS_PATH)/core/src/Threads/Kokkos_Threads_Instance.cpp
Kokkos_Threads_Spinwait.o: $(KOKKOS_CPP_DEPENDS) $(KOKKOS_PATH)/core/src/Threads/Kokkos_Threads_Spinwait.cpp
$(CXX) $(KOKKOS_CPPFLAGS) $(KOKKOS_CXXFLAGS) $(CXXFLAGS) -c $(KOKKOS_PATH)/core/src/Threads/Kokkos_Spinwait.cpp
endif

ifeq ($(KOKKOS_INTERNAL_USE_OPENMP), 1)
Expand Down
1 change: 0 additions & 1 deletion core/src/OpenMPTarget/Kokkos_OpenMPTarget_Parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <sstream>
#include <Kokkos_Parallel.hpp>
#include <impl/Kokkos_Traits.hpp>
#include <impl/Kokkos_Spinwait.hpp>

#include <Kokkos_Atomic.hpp>
#include "Kokkos_OpenMPTarget_Abort.hpp"
Expand Down
1 change: 0 additions & 1 deletion core/src/OpenMPTarget/Kokkos_OpenMPTarget_Reducer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#define KOKKOS_OPENMPTARGETREDUCER_HPP

#include <impl/Kokkos_Traits.hpp>
#include <impl/Kokkos_Spinwait.hpp>

#include <Kokkos_Atomic.hpp>
#include "Kokkos_OpenMPTarget_Abort.hpp"
Expand Down
67 changes: 34 additions & 33 deletions core/src/Threads/Kokkos_Threads_Instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ void ThreadsInternal::global_unlock() {

//----------------------------------------------------------------------------

void ThreadsInternal::wait_yield(volatile int &flag, const int value) {
void ThreadsInternal::wait_yield(volatile ThreadState &flag,
const ThreadState value) {
while (value == flag) {
std::this_thread::yield();
}
Expand All @@ -146,13 +147,13 @@ void ThreadsInternal::driver() {

ThreadsInternal this_thread;

while (this_thread.m_pool_state == ThreadsInternal::Active) {
while (this_thread.m_pool_state == ThreadState::Active) {
(*s_current_function)(this_thread, s_current_function_arg);

// Deactivate thread and wait for reactivation
this_thread.m_pool_state = ThreadsInternal::Inactive;
this_thread.m_pool_state = ThreadState::Inactive;

wait_yield(this_thread.m_pool_state, ThreadsInternal::Inactive);
wait_yield(this_thread.m_pool_state, ThreadState::Inactive);
}
}

Expand All @@ -166,7 +167,7 @@ ThreadsInternal::ThreadsInternal()
m_pool_rank(0),
m_pool_size(0),
m_pool_fan_size(0),
m_pool_state(ThreadsInternal::Terminating) {
m_pool_state(ThreadState::Terminating) {
if (&s_threads_process != this) {
// A spawned thread

Expand All @@ -192,21 +193,21 @@ ThreadsInternal::ThreadsInternal()
m_pool_rank_rev = s_thread_pool_size[0] - (pool_rank() + 1);
m_pool_size = s_thread_pool_size[0];
m_pool_fan_size = fan_size(m_pool_rank, m_pool_size);
m_pool_state = ThreadsInternal::Active;
m_pool_state = ThreadState::Active;

s_threads_pid[m_pool_rank] = std::this_thread::get_id();

// Inform spawning process that the threads_exec entry has been set.
s_threads_process.m_pool_state = ThreadsInternal::Active;
s_threads_process.m_pool_state = ThreadState::Active;
} else {
// Inform spawning process that the threads_exec entry could not be set.
s_threads_process.m_pool_state = ThreadsInternal::Terminating;
s_threads_process.m_pool_state = ThreadState::Terminating;
}
} else {
// Enables 'parallel_for' to execute on unitialized Threads device
m_pool_rank = 0;
m_pool_size = 1;
m_pool_state = ThreadsInternal::Inactive;
m_pool_state = ThreadState::Inactive;

s_threads_pid[m_pool_rank] = std::this_thread::get_id();
}
Expand Down Expand Up @@ -234,14 +235,14 @@ ThreadsInternal::~ThreadsInternal() {
m_pool_size = 0;
m_pool_fan_size = 0;

m_pool_state = ThreadsInternal::Terminating;
m_pool_state = ThreadState::Terminating;

if (&s_threads_process != this && entry < MAX_THREAD_COUNT) {
ThreadsInternal *const nil = nullptr;

atomic_compare_exchange(s_threads_exec + entry, this, nil);

s_threads_process.m_pool_state = ThreadsInternal::Terminating;
s_threads_process.m_pool_state = ThreadState::Terminating;
}
}

Expand Down Expand Up @@ -278,12 +279,12 @@ void ThreadsInternal::execute_sleep(ThreadsInternal &exec, const void *) {
const int rank_rev = exec.m_pool_size - (exec.m_pool_rank + 1);

for (int i = 0; i < n; ++i) {
Impl::spinwait_while_equal<int>(
Impl::spinwait_while_equal(
exec.m_pool_base[rank_rev + (1 << i)]->m_pool_state,
ThreadsInternal::Active);
ThreadState::Active);
}

exec.m_pool_state = ThreadsInternal::Inactive;
exec.m_pool_state = ThreadState::Inactive;
}

} // namespace Impl
Expand Down Expand Up @@ -336,8 +337,8 @@ void ThreadsInternal::internal_fence(const std::string &name,
const auto &fence_lam = [&]() {
if (s_thread_pool_size[0]) {
// Wait for the root thread to complete:
Impl::spinwait_while_equal<int>(s_threads_exec[0]->m_pool_state,
ThreadsInternal::Active);
Impl::spinwait_while_equal(s_threads_exec[0]->m_pool_state,
ThreadState::Active);
}

s_current_function = nullptr;
Expand Down Expand Up @@ -378,13 +379,13 @@ void ThreadsInternal::start(void (*func)(ThreadsInternal &, const void *),

// Activate threads:
for (int i = s_thread_pool_size[0]; 0 < i--;) {
s_threads_exec[i]->m_pool_state = ThreadsInternal::Active;
s_threads_exec[i]->m_pool_state = ThreadState::Active;
}

if (s_threads_process.m_pool_size) {
// Master process is the root thread, run it:
(*func)(s_threads_process, arg);
s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;
}
}

Expand All @@ -403,7 +404,7 @@ bool ThreadsInternal::sleep() {

// Activate threads:
for (unsigned i = s_thread_pool_size[0]; 0 < i;) {
s_threads_exec[--i]->m_pool_state = ThreadsInternal::Active;
s_threads_exec[--i]->m_pool_state = ThreadState::Active;
}

return true;
Expand All @@ -418,7 +419,7 @@ bool ThreadsInternal::wake() {

if (s_threads_process.m_pool_base) {
execute_sleep(s_threads_process, nullptr);
s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;
}

fence();
Expand Down Expand Up @@ -455,16 +456,16 @@ void ThreadsInternal::execute_resize_scratch_in_serial() {
for (unsigned i = s_thread_pool_size[0]; begin < i;) {
ThreadsInternal &th = *s_threads_exec[--i];

th.m_pool_state = ThreadsInternal::Active;
th.m_pool_state = ThreadState::Active;

wait_yield(th.m_pool_state, ThreadsInternal::Active);
wait_yield(th.m_pool_state, ThreadState::Active);
}

if (s_threads_process.m_pool_base) {
deallocate_scratch_memory(s_threads_process);
s_threads_process.m_pool_state = ThreadsInternal::Active;
s_threads_process.m_pool_state = ThreadState::Active;
first_touch_allocate_thread_private_scratch(s_threads_process, nullptr);
s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;
}

s_current_function_arg = nullptr;
Expand Down Expand Up @@ -663,7 +664,7 @@ void ThreadsInternal::initialize(int thread_count_arg) {
&execute_function_noop; // Initialization work function

for (unsigned ith = thread_spawn_begin; ith < thread_count; ++ith) {
s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;

// If hwloc available then spawned thread will
// choose its own entry in 's_threads_coord'
Expand All @@ -680,8 +681,8 @@ void ThreadsInternal::initialize(int thread_count_arg) {
// If spawning and initialization is successful then
// an entry in 's_threads_exec' will be assigned.
ThreadsInternal::spawn();
wait_yield(s_threads_process.m_pool_state, ThreadsInternal::Inactive);
if (s_threads_process.m_pool_state == ThreadsInternal::Terminating) break;
wait_yield(s_threads_process.m_pool_state, ThreadState::Inactive);
if (s_threads_process.m_pool_state == ThreadState::Terminating) break;
}

// Wait for all spawned threads to deactivate before zeroing the function.
Expand All @@ -691,15 +692,15 @@ void ThreadsInternal::initialize(int thread_count_arg) {
ThreadsInternal *const th =
((ThreadsInternal * volatile *)s_threads_exec)[ith];
if (th) {
wait_yield(th->m_pool_state, ThreadsInternal::Active);
wait_yield(th->m_pool_state, ThreadState::Active);
} else {
++thread_spawn_failed;
}
}

s_current_function = nullptr;
s_current_function_arg = nullptr;
s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;

memory_fence();

Expand Down Expand Up @@ -789,11 +790,11 @@ void ThreadsInternal::finalize() {

for (unsigned i = s_thread_pool_size[0]; begin < i--;) {
if (s_threads_exec[i]) {
s_threads_exec[i]->m_pool_state = ThreadsInternal::Terminating;
s_threads_exec[i]->m_pool_state = ThreadState::Terminating;

wait_yield(s_threads_process.m_pool_state, ThreadsInternal::Inactive);
wait_yield(s_threads_process.m_pool_state, ThreadState::Inactive);

s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;
}

s_threads_pid[i] = std::thread::id();
Expand All @@ -819,7 +820,7 @@ void ThreadsInternal::finalize() {
s_threads_process.m_pool_rank = 0;
s_threads_process.m_pool_size = 1;
s_threads_process.m_pool_fan_size = 0;
s_threads_process.m_pool_state = ThreadsInternal::Inactive;
s_threads_process.m_pool_state = ThreadState::Inactive;

Kokkos::Profiling::finalize();
}
Expand Down

0 comments on commit cf5a859

Please sign in to comment.