Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BBT#260] Topic/hyperthread v2.1 #312

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ mark_as_advanced(PARSEC_SCHED_DEPS_MASK)
option(PARSEC_SCHED_DEPS_MASK
"Use a complete bitmask to track the dependencies, instead of a counter -- increase the debugging features, but limits to a maximum of 30 input dependencies" ON)

mark_as_advanced(PARSEC_HAVE_HYPERTHREAD_SCHEDULER)
option(PARSEC_HAVE_HYPERTHREAD_SCHEDULER
"Double the number of threads, half will compute, the other half is in charge of scheduling, lookups, and all the boring stuff" ON)

### Distributed engine parameters
mark_as_advanced(PARSEC_DIST_THREAD PARSEC_DIST_PRIORITIES)
option(PARSEC_DIST_WITH_MPI
Expand Down
5 changes: 5 additions & 0 deletions parsec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ if( PARSEC_PROF_GRAPHER )
endif( PARSEC_PROF_GRAPHER )

add_subdirectory(data_dist)

if( PARSEC_HAVE_HYPERTHREAD_SCHEDULER )
add_definitions(-DPARSEC_HYPERTHREAD_SCHEDULER -DPARSEC_HYPERTHREAD_ROUND_ROBIN)
endif( PARSEC_HAVE_HYPERTHREAD_SCHEDULER )

#
# Setup targets
#
Expand Down
32 changes: 32 additions & 0 deletions parsec/include/parsec/execution_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "parsec/mempool.h"
#include "parsec/profiling.h"
#include "parsec/class/barrier.h"
#include "parsec/class/list.h"

#ifdef PINS_ENABLE
#include "parsec/mca/pins/pins.h"
Expand All @@ -29,6 +30,15 @@

BEGIN_C_DECLS

/**
* @brief A container for interaction between scheduler and worker thread
*/
typedef struct parsec_shared_information_s parsec_shared_information_t;
/**
* @brief A container for interaction between scheduler and worker thread
*/
typedef struct parsec_loctask_s parsec_loctask_t;

/**
* Computational Thread-specific structure
*/
Expand Down Expand Up @@ -65,6 +75,9 @@ struct parsec_execution_stream_s {
* we use these mempools */
parsec_thread_mempool_t *dependencies_mempool; /**< If using hashtables to store dependencies
* those are allocated using this mempool */
#if defined(PARSEC_HYPERTHREAD_SCHEDULER)
parsec_shared_information_t *shared;
#endif
};

/**
Expand Down Expand Up @@ -159,6 +172,25 @@ struct parsec_context_s {

#define PARSEC_THREAD_IS_MASTER(eu) ( ((eu)->th_id == 0) && ((eu)->virtual_process->vp_id == 0) )

struct parsec_shared_information_s {
parsec_barrier_t *barrier;
/* the address pointer needs to be volatile, not the content pointed to */
parsec_loctask_t *volatile input;
parsec_loctask_t *volatile output;
parsec_loctask_t *freelist; /* simple lifo no lock */
int keepgoing;
int submitted;
};

struct parsec_loctask_s {
parsec_list_item_t super;
int rc;
int distance;
parsec_task_t *task;
parsec_execution_stream_t *es;
parsec_loctask_t *next;
};

END_C_DECLS

#endif /* PARSEC_EXECUTION_UNIT_H_HAS_BEEN_INCLUDED */
9 changes: 8 additions & 1 deletion parsec/interfaces/ptg/ptg-compiler/jdf2c.c
Original file line number Diff line number Diff line change
Expand Up @@ -5328,6 +5328,13 @@ static void jdf_generate_code_hook(const jdf_t *jdf,
}

coutput("%s\n", body->external_code);

if( profile_on ) {
coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n"
" this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n"
" this_task);\n");
}

if( !JDF_COMPILER_GLOBAL_ARGS.noline ) {
coutput("#line %d \"%s\"\n", cfile_lineno+1, jdf_cfilename);
}
Expand Down Expand Up @@ -5388,7 +5395,7 @@ jdf_generate_code_complete_hook(const jdf_t *jdf,
}
}

if( profile_on ) {
if( 0 && profile_on ) {
coutput(" PARSEC_TASK_PROF_TRACE(es->es_profile,\n"
" this_task->taskpool->profiling_array[2*this_task->task_class->task_class_id+1],\n"
" (parsec_task_t*)this_task);\n");
Expand Down
94 changes: 87 additions & 7 deletions parsec/parsec.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "parsec/utils/output.h"
#include "parsec/data_internal.h"
#include "parsec/class/list.h"
#include "parsec/class/fifo.h"
#include "parsec/scheduling.h"
#include "parsec/class/barrier.h"
#include "parsec/remote_dep.h"
Expand Down Expand Up @@ -167,12 +168,34 @@ typedef struct __parsec_temporary_thread_initialization_t {
int bindto_ht;
parsec_barrier_t* barrier; /*< the barrier used to synchronize for the
* local VP data construction. */
#if defined(PARSEC_HYPERTHREAD_SCHEDULER)
parsec_execution_stream_t* es;
#endif
} __parsec_temporary_thread_initialization_t;

static int parsec_parse_binding_parameter(const char* option, parsec_context_t* context,
__parsec_temporary_thread_initialization_t* startup);
__parsec_temporary_thread_initialization_t* startup);
static int parsec_parse_comm_binding_parameter(const char* option, parsec_context_t* context);

#if defined(PARSEC_HYPERTHREAD_SCHEDULER)
static void* __parsec_worker_init( __parsec_temporary_thread_initialization_t* startup )
{
parsec_execution_stream_t* es = startup->es;
int bindto = startup->bindto;
int bindto_ht = (-1 == startup->bindto_ht) ? 1 : startup->bindto_ht+1;

parsec_bindthread(bindto, bindto_ht);
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind worker thread %i.%i on core %i [HT %i]",
es->virtual_process->vp_id, es->th_id,
bindto, bindto_ht);

parsec_barrier_wait(es->shared->barrier);
void* ret = (void*)(long)__parsec_work_wait(es);

return ret;
}
#endif

static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* startup )
{
parsec_execution_stream_t* es;
Expand All @@ -181,10 +204,11 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s
/* don't use PARSEC_THREAD_IS_MASTER, it is too early and we cannot yet allocate the es struct */
if( (0 != startup->virtual_process->vp_id) || (0 != startup->th_id) || parsec_runtime_bind_main_thread ) {
/* Bind to the specified CORE */
if( -1 == startup->bindto_ht ) startup->bindto_ht = 0;
parsec_bindthread(startup->bindto, startup->bindto_ht);
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Bind thread %i.%i on core %i [HT %i]",
startup->virtual_process->vp_id, startup->th_id,
startup->bindto, startup->bindto_ht);
startup->virtual_process->vp_id, startup->th_id,
startup->bindto, startup->bindto_ht);
} else {
PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Don't bind the main thread %i.%i",
startup->virtual_process->vp_id, startup->th_id);
Expand Down Expand Up @@ -233,6 +257,50 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s
offsetof(parsec_hashable_dependency_t, mempool_owner),
vp->nb_cores);
}

#if defined(PARSEC_HYPERTHREAD_SCHEDULER)
int ht_activated = 0;
pthread_t worker;
es->shared = NULL;

if( 1 < parsec_hwloc_allow_ht(2) ) {
ht_activated = 1;
/* es->shared = (parsec_shared_information_t*)malloc(sizeof(parsec_shared_information_t)); */
posix_memalign((void**)&es->shared, 64, sizeof(parsec_shared_information_t));
es->shared->barrier = (parsec_barrier_t*)malloc(sizeof(parsec_barrier_t));
parsec_barrier_init(es->shared->barrier, NULL, 2);
es->shared->input = NULL;
es->shared->output = NULL;
es->shared->keepgoing = 1;
es->shared->submitted = 0;
es->shared->freelist = NULL;

int i;
for (i = 0; i < 5; ++i) { /* we cannot have more than 5 tasks in the pipeline */
parsec_loctask_t *t = (parsec_loctask_t*)calloc(1, sizeof(parsec_loctask_t));
t->next = es->shared->freelist;
es->shared->freelist = t;
}

pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM);

startup->es = es;
/* Summon your intern */
pthread_create( &worker,
&thread_attr,
(void* (*)(void*))__parsec_worker_init,
(void*)startup);

parsec_barrier_wait(es->shared->barrier);
}
else {
parsec_fatal("PaRSEC has been compiled with hyperthreading capabilities.\nThis machine does not support hyperthread, recompile without the PARSEC_HAVE_HYPERTHREAD_SCHEDULER.\n");
}

#endif

/* Synchronize with the other threads */
parsec_barrier_wait(startup->barrier);

Expand Down Expand Up @@ -275,6 +343,17 @@ static void* __parsec_thread_init( __parsec_temporary_thread_initialization_t* s

void *ret = (void*)(long)__parsec_context_wait(es);
PARSEC_PAPI_SDE_THREAD_FINI();

#if defined(PARSEC_HYPERTHREAD_SCHEDULER)
if( ht_activated ) {
/* Clean worker thread body */
es->shared->keepgoing = 0;
void *work_ret = NULL;
pthread_join(worker, &work_ret);

free(es->shared);
}
#endif
return ret;
}

Expand All @@ -296,11 +375,12 @@ static void parsec_vp_init( parsec_vp_t *vp,
startup[t].th_id = t;
startup[t].virtual_process = vp;
startup[t].bindto = -1;
startup[t].bindto_ht = -1;
startup[t].bindto_ht = 0;
startup[t].barrier = barrier;
pi = vpmap_get_nb_cores_affinity(vp->vp_id, t);
if( 1 == pi )
if( 1 == pi ) {
vpmap_get_core_affinity(vp->vp_id, t, &startup[t].bindto, &startup[t].bindto_ht);
}
else if( 1 < pi )
parsec_warning("multiple core to bind on... for now, do nothing"); //TODO: what does that mean?
}
Expand Down Expand Up @@ -662,14 +742,14 @@ parsec_context_t* parsec_init( int nb_cores, int* pargc, char** pargv[] )
0, NULL,
&queue_remove_begin, &queue_remove_end);
# endif /* PARSEC_PROF_TRACE_SCHEDULING_EVENTS */
#if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET)
# if defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET)
parsec_profiling_add_dictionary_keyword( "ARENA_MEMORY", "fill:#B9B243",
sizeof(size_t), "size{int64_t}",
&arena_memory_alloc_key, &arena_memory_free_key);
parsec_profiling_add_dictionary_keyword( "ARENA_ACTIVE_SET", "fill:#B9B243",
sizeof(size_t), "size{int64_t}",
&arena_memory_used_key, &arena_memory_unused_key);
#endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */
# endif /* defined(PARSEC_PROF_TRACE_ACTIVE_ARENA_SET) */
parsec_profiling_add_dictionary_keyword( "TASK_MEMORY", "fill:#B9B243",
sizeof(size_t), "size{int64_t}",
&task_memory_alloc_key, &task_memory_free_key);
Expand Down
1 change: 1 addition & 0 deletions parsec/parsec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "parsec/data.h"
#include "parsec/class/list_item.h"
#include "parsec/class/parsec_hash_table.h"
#include "parsec/class/fifo.h"
#include "parsec/parsec_description_structures.h"
#include "parsec/profiling.h"
#include "parsec/mempool.h"
Expand Down
Loading