Skip to content

Commit

Permalink
rowMedians.c: pthread processing
Browse files Browse the repository at this point in the history
Signed-off-by: Dongcan Jiang <dongcan.jiang@gmail.com>
  • Loading branch information
Dongcan-Jiang committed Aug 13, 2015
1 parent bf0c93a commit 1fc1478
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 16 deletions.
4 changes: 2 additions & 2 deletions R/rowMedians.S4.R
Expand Up @@ -63,7 +63,7 @@ setMethod("rowMedians", signature(x="matrix"), function(x, rows=NULL, cols=NULL,
na.rm <- as.logical(na.rm);
hasNAs <- TRUE; # Add as an argument? /2007-08-24

.Call("rowMedians", x, dim., rows, cols, na.rm, hasNAs, TRUE, PACKAGE="matrixStats");
.Call("rowMedians", x, dim., rows, cols, na.rm, hasNAs, TRUE, mc.cores, PACKAGE="matrixStats");
})


Expand All @@ -76,7 +76,7 @@ setMethod("colMedians", signature(x="matrix"), function(x, rows=NULL, cols=NULL,
na.rm <- as.logical(na.rm);
hasNAs <- TRUE; # Add as an argument? /2007-08-24

.Call("rowMedians", x, dim., rows, cols, na.rm, hasNAs, FALSE, PACKAGE="matrixStats");
.Call("rowMedians", x, dim., rows, cols, na.rm, hasNAs, FALSE, mc.cores, PACKAGE="matrixStats");
})


Expand Down
15 changes: 10 additions & 5 deletions src/rowMedians.c
Expand Up @@ -12,16 +12,16 @@

#define METHOD rowMedians
#define RETURN_TYPE void
#define ARGUMENTS_LIST X_C_TYPE *x, R_xlen_t nrow, R_xlen_t ncol, void *rows, R_xlen_t nrows, void *cols, R_xlen_t ncols, int narm, int hasna, int byrow, double *ans
#define ARGUMENTS_LIST X_C_TYPE *x, R_xlen_t nrow, R_xlen_t ncol, void *rows, R_xlen_t nrows, void *cols, R_xlen_t ncols, int narm, int hasna, int byrow, double *ans, int cores

#define X_TYPE 'i'
#include "templates-gen-matrix.h"
#define X_TYPE 'r'
#include "templates-gen-matrix.h"


SEXP rowMedians(SEXP x, SEXP dim, SEXP rows, SEXP cols, SEXP naRm, SEXP hasNA, SEXP byRow) {
int narm, hasna, byrow;
SEXP rowMedians(SEXP x, SEXP dim, SEXP rows, SEXP cols, SEXP naRm, SEXP hasNA, SEXP byRow, SEXP cores) {
int narm, hasna, byrow, cores2;
SEXP ans;
R_xlen_t nrow, ncol;

Expand All @@ -46,6 +46,9 @@ SEXP rowMedians(SEXP x, SEXP dim, SEXP rows, SEXP cols, SEXP naRm, SEXP hasNA, S
/* Argument 'byRow': */
byrow = asLogical(byRow);

/* Argument 'cores': */
cores2 = asInteger(cores);

if (!byrow) {
SWAP(R_xlen_t, nrow, ncol);
SWAP(void*, crows, ccols);
Expand All @@ -59,9 +62,9 @@ SEXP rowMedians(SEXP x, SEXP dim, SEXP rows, SEXP cols, SEXP naRm, SEXP hasNA, S

/* Double matrices are more common to use. */
if (isReal(x)) {
rowMedians_Real[rowsType][colsType](REAL(x), nrow, ncol, crows, nrows, ccols, ncols, narm, hasna, byrow, REAL(ans));
rowMedians_Real[rowsType][colsType](REAL(x), nrow, ncol, crows, nrows, ccols, ncols, narm, hasna, byrow, REAL(ans), cores2);
} else if (isInteger(x)) {
rowMedians_Integer[rowsType][colsType](INTEGER(x), nrow, ncol, crows, nrows, ccols, ncols, narm, hasna, byrow, REAL(ans));
rowMedians_Integer[rowsType][colsType](INTEGER(x), nrow, ncol, crows, nrows, ccols, ncols, narm, hasna, byrow, REAL(ans), cores2);
}

UNPROTECT(1);
Expand All @@ -72,6 +75,8 @@ SEXP rowMedians(SEXP x, SEXP dim, SEXP rows, SEXP cols, SEXP naRm, SEXP hasNA, S

/***************************************************************************
HISTORY:
2015-08-13 [DJ]
o Pthread processing.
2015-06-07 [DJ]
o Supported subsetted computation.
2013-01-13 [HB]
Expand Down
114 changes: 105 additions & 9 deletions src/rowMedians_TYPE-template.h
Expand Up @@ -3,7 +3,7 @@
void rowMedians_<Integer|Real>[rowsType][colsType](ARGUMENTS_LIST)
ARGUMENTS_LIST:
X_C_TYPE *x, R_xlen_t nrow, R_xlen_t ncol, void *rows, R_xlen_t nrows, void *cols, R_xlen_t ncols, int narm, int hasna, int byrow, double *ans
X_C_TYPE *x, R_xlen_t nrow, R_xlen_t ncol, void *rows, R_xlen_t nrows, void *cols, R_xlen_t ncols, int narm, int hasna, int byrow, double *ans, int cores
Arguments:
The following macros ("arguments") should be defined for the
Expand All @@ -20,6 +20,8 @@
***********************************************************************/
#include <R_ext/Memory.h>
#include <Rmath.h>
#include <pthread.h>
#include <stdint.h>
#include "types.h"

/* Expand arguments:
Expand All @@ -28,7 +30,97 @@
#include "templates-types.h"


static void *WRAPPER_METHOD_NAME_ROWS_COLS(void *args) {
uint8_t *buffer = (uint8_t*) args;
int ii = 0;
POP_ARGUMENT(buffer, ii, uint8_t*, buffer0);
POP_ARGUMENT(buffer, ii, R_xlen_t, begin);
POP_ARGUMENT(buffer, ii, R_xlen_t, end);

ii = 0;
POP_ARGUMENT(buffer0, ii, X_C_TYPE *, x);
POP_ARGUMENT(buffer0, ii, R_xlen_t, nrow);
POP_ARGUMENT(buffer0, ii, R_xlen_t, ncol);
POP_ARGUMENT(buffer0, ii, void*, rows);
POP_ARGUMENT(buffer0, ii, R_xlen_t, nrows);
POP_ARGUMENT(buffer0, ii, void*, cols);
POP_ARGUMENT(buffer0, ii, R_xlen_t, ncols);
POP_ARGUMENT(buffer0, ii, int, narm);
POP_ARGUMENT(buffer0, ii, int, hasna);
POP_ARGUMENT(buffer0, ii, int, byrow);
POP_ARGUMENT(buffer0, ii, double*, ans);

extern RETURN_TYPE (*METHOD_NAME[3][3])(ARGUMENTS_LIST);
int rowsType = ROWS_TYPE_CODE;

ans += begin;
nrows = end - begin;
#ifdef ROWS_TYPE
rows = (ROWS_C_TYPE*) rows + begin;
#else
rows = indicesFromRange(begin, end, &rowsType);
#endif

METHOD_NAME[rowsType][COLS_TYPE_CODE](x, nrow, ncol, rows, nrows, cols, ncols, narm, hasna, byrow, ans, 1);

#ifndef ROWS_TYPE
Free(rows);
#endif
return NULL;
}

RETURN_TYPE METHOD_NAME_ROWS_COLS(ARGUMENTS_LIST) {
// Apply pthread
if (cores > 1 && nrows > 1) {
const static int memSize0 = sizeof(x) + sizeof(nrow) + sizeof(ncol)
+ sizeof(rows) + sizeof(nrows) + sizeof(cols) + sizeof(ncols)
+ sizeof(narm) + sizeof(hasna) + sizeof(byrow) + sizeof(ans);
uint8_t *buffer0;
R_xlen_t begin, end;
const static int memSize1 = sizeof(buffer0) + sizeof(begin) + sizeof(end);

cores = MIN(cores, nrows);
uint8_t buffer[memSize0 + memSize1 * cores];

int ii = 0;
PUSH_ARGUMENT(buffer, ii, x);
PUSH_ARGUMENT(buffer, ii, nrow);
PUSH_ARGUMENT(buffer, ii, ncol);
PUSH_ARGUMENT(buffer, ii, rows);
PUSH_ARGUMENT(buffer, ii, nrows);
PUSH_ARGUMENT(buffer, ii, cols);
PUSH_ARGUMENT(buffer, ii, ncols);
PUSH_ARGUMENT(buffer, ii, narm);
PUSH_ARGUMENT(buffer, ii, hasna);
PUSH_ARGUMENT(buffer, ii, byrow);
PUSH_ARGUMENT(buffer, ii, ans);

pthread_t threads[cores];

R_xlen_t gap = (nrows + cores - 1) / cores;
int jj = 0;
begin = 0;
while (begin < nrows) {
uint8_t *args = buffer + ii;
buffer0 = buffer;
end = MIN(begin + gap, nrows);

PUSH_ARGUMENT(buffer, ii, buffer0);
PUSH_ARGUMENT(buffer, ii, begin);
PUSH_ARGUMENT(buffer, ii, end);

pthread_create(threads+(jj++), NULL, &WRAPPER_METHOD_NAME_ROWS_COLS, args);

begin = end;
}

for (jj = 0; jj < cores; ++jj) {
pthread_join(threads[jj], NULL);
}
return;
}


int isOdd;
R_xlen_t ii, jj, kk, qq, idx;
R_xlen_t *colOffset;
Expand All @@ -41,9 +133,8 @@ RETURN_TYPE METHOD_NAME_ROWS_COLS(ARGUMENTS_LIST) {
COLS_C_TYPE *ccols = (COLS_C_TYPE*) cols;
#endif

/* R allocate memory for the 'values'. This will be
taken care of by the R garbage collector later on. */
values = (X_C_TYPE *) R_alloc(ncols, sizeof(X_C_TYPE));
/* C allocate memory for the 'values'. Should be freed manually. */
values = Calloc(ncols, X_C_TYPE);

/* If there are no missing values, don't try to remove them. */
if (hasna == FALSE)
Expand All @@ -61,7 +152,7 @@ RETURN_TYPE METHOD_NAME_ROWS_COLS(ARGUMENTS_LIST) {
value = 0;

/* Pre-calculate the column offsets */
colOffset = (R_xlen_t *) R_alloc(ncols, sizeof(R_xlen_t));
colOffset = Calloc(ncols, R_xlen_t);

// HJ begin
if (byrow) {
Expand Down Expand Up @@ -120,8 +211,8 @@ RETURN_TYPE METHOD_NAME_ROWS_COLS(ARGUMENTS_LIST) {
ans[ii] = ((double)values[qq] + (double)value)/2;
}
}

R_CHECK_USER_INTERRUPT(ii);
// TODO: interrupt under pthread
// R_CHECK_USER_INTERRUPT(ii);
} /* for (ii ...) */
} else {
for (ii=0; ii < nrows; ii++) {
Expand All @@ -143,15 +234,20 @@ RETURN_TYPE METHOD_NAME_ROWS_COLS(ARGUMENTS_LIST) {
X_PSORT(values, qq+1, qq);
ans[ii] = ((double)values[qq] + (double)value)/2;
}

R_CHECK_USER_INTERRUPT(ii);
// TODO: interrupt under pthread
// R_CHECK_USER_INTERRUPT(ii);
} /* for (ii ...) */
} /* if (hasna ...) */

Free(values);
Free(colOffset);
}


/***************************************************************************
HISTORY:
2015-08-13 [DJ]
o Pthread processing.
2015-06-07 [DJ]
o Supported subsetted computation.
2014-11-06 [HB]
Expand Down

0 comments on commit 1fc1478

Please sign in to comment.