Skip to content

Commit

Permalink
fixed broken privatization from (accidental) thread-shared ptr
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadenSmith committed Apr 20, 2017
1 parent 0b48ff8 commit f8b0fc3
Showing 1 changed file with 72 additions and 41 deletions.
113 changes: 72 additions & 41 deletions src/mttkrp.c
Expand Up @@ -45,6 +45,48 @@ typedef void (* csf_mttkrp_func)(
*****************************************************************************/


/**
* @brief Perform a reduction on thread-local MTTKRP outputs.
*
* @param ws MTTKRP workspace containing thread-local outputs.
* @param global_output The global MTTKRP output we are reducing into.
* @param nrows The number of rows in the MTTKRP.
* @param ncols The number of columns in the MTTKRP.
*/
static void p_reduce_privatized(
splatt_mttkrp_ws * const ws,
val_t * const restrict global_output,
idx_t const nrows,
idx_t const ncols)
{
/* Ensure everyone has completed their local MTTKRP. */
#pragma omp barrier

sp_timer_t reduction_timer;
timer_fstart(&reduction_timer);

int const tid = splatt_omp_get_thread_num();

idx_t const num_threads = splatt_omp_get_num_threads();
idx_t const elem_per_thread = (nrows * ncols) / num_threads;
idx_t const start = tid * elem_per_thread;
idx_t const stop = ((idx_t)tid == num_threads-1) ?
(nrows * ncols) : (tid + 1) * elem_per_thread;

/* reduction */
for(idx_t t=0; t < num_threads; ++t){
val_t const * const restrict thread_buf = ws->privatize_buffer[t];
for(idx_t x=start; x < stop; ++x) {
global_output[x] += thread_buf[x];
}
}

timer_stop(&reduction_timer);
#pragma omp master
ws->reduction_time = reduction_timer.seconds;
}



/**
* @brief Map MTTKRP functions onto a (possibly tiled) CSF tensor. This function
Expand Down Expand Up @@ -73,8 +115,11 @@ static void p_schedule_tiles(
idx_t const nmodes = csf->nmodes;
idx_t const depth = nmodes - 1;

/* Store this in case we privatize */
val_t * restrict global_output = mats[MAX_NMODES]->vals;
idx_t const nrows = mats[mode]->I;
idx_t const ncols = mats[mode]->J;

/* Store old pointer */
val_t * const restrict global_output = mats[MAX_NMODES]->vals;

#pragma omp parallel
{
Expand All @@ -83,22 +128,31 @@ static void p_schedule_tiles(
idx_t const * const tile_partition = ws->tile_partition[csf_id];
idx_t const * const tree_partition = ws->tree_partition[csf_id];

idx_t const nrows = mats[mode]->I;
idx_t const ncols = mats[mode]->J;
/*
* We may need to edit mats[MAX_NMODES]->vals, so create a private copy of
* the pointers to edit. (NOT actual factors).
*/
matrix_t * mats_priv[MAX_NMODES+1];
for(idx_t m=0; m < MAX_NMODES; ++m) {
mats_priv[m] = mats[m];
}
/* each thread gets separate structure, but do a shallow copy of ptr */
mats_priv[MAX_NMODES] = splatt_malloc(sizeof(*mats_priv));
*(mats_priv[MAX_NMODES]) = *(mats[MAX_NMODES]);

/* Give each thread its own private buffer and overwrite atomic
* function. */
if(ws->is_privatized[mode]) {
mats[MAX_NMODES]->vals = ws->privatize_buffer[tid];
/* change (thread-private!) output structure */
memset(ws->privatize_buffer[tid], 0,
nrows * ncols * sizeof(**(ws->privatize_buffer)));
mats_priv[MAX_NMODES]->vals = ws->privatize_buffer[tid];

/*
* Don't use atomics if we privatized.
*/
/* Don't use atomics if we privatized. */
atomic_func = nosync_func;
}


/*
* Distribute tiles to threads in some fashion.
*/
Expand All @@ -118,7 +172,7 @@ static void p_schedule_tiles(
tile_id =
get_next_tileid(TILE_BEGIN, csf->tile_dims, nmodes, mode, t);
while(tile_id != TILE_END) {
nosync_func(csf, tile_id, mats, mode, thds, tree_partition);
nosync_func(csf, tile_id, mats_priv, mode, thds, tree_partition);
tile_id =
get_next_tileid(tile_id, csf->tile_dims, nmodes, mode, t);
}
Expand All @@ -128,7 +182,7 @@ static void p_schedule_tiles(
} else {
for(idx_t tile_id = tile_partition[tid];
tile_id < tile_partition[tid+1]; ++tile_id) {
atomic_func(csf, tile_id, mats, mode, thds, tree_partition);
atomic_func(csf, tile_id, mats_priv, mode, thds, tree_partition);
}
}

Expand All @@ -137,44 +191,21 @@ static void p_schedule_tiles(
*/
} else {
assert(tree_partition != NULL);
atomic_func(csf, 0, mats, mode, thds, tree_partition);
atomic_func(csf, 0, mats_priv, mode, thds, tree_partition);
}
timer_stop(&thds[tid].ttime);


/*
* If we used privatization, perform a reduction.
*/
/* If we used privatization, perform a reduction. */
if(ws->is_privatized[mode]) {
/* Ensure everyone has completed their local MTTKRP. */
#pragma omp barrier

sp_timer_t reduction_timer;
timer_fstart(&reduction_timer);

idx_t const num_threads = splatt_omp_get_num_threads();
idx_t const elem_per_thread = (nrows * ncols) / num_threads;
idx_t const start = tid * elem_per_thread;
idx_t const stop = ((idx_t)tid == num_threads-1) ?
(nrows * ncols) : (tid + 1) * elem_per_thread;

/* reduction */
for(idx_t t=0; t < num_threads; ++t){
val_t const * const restrict thread_buf = ws->privatize_buffer[t];
for(idx_t x=start; x < stop; ++x) {
global_output[x] += thread_buf[x];
}
}
p_reduce_privatized(ws, global_output, nrows, ncols);
}

timer_stop(&reduction_timer);
#pragma omp master
{
/* restore pointer */
mats[MAX_NMODES]->vals = global_output;
ws->reduction_time = reduction_timer.seconds;
}
} /* if privatized */
splatt_free(mats_priv[MAX_NMODES]);
} /* end omp parallel */

/* restore pointer */
mats[MAX_NMODES]->vals = global_output;
}


Expand Down

0 comments on commit f8b0fc3

Please sign in to comment.