Skip to content

Commit

Permalink
leak in forder (#3955)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdowle committed Oct 13, 2019
1 parent 5fc6dde commit 4e3740b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 39 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

3. An error, again in `fwrite`'s compression, but only observed so far on Solaris 32bit has been fixed, [#3931](https://github.com/Rdatatable/data.table/issues/3931): `Error -2: one or more threads failed to allocate buffers or there was a compression error.` In case it happens again, this area has been made more robust and the error more detailed.

4. A leak could occur in the event of an unsupported column type error, or if working memory cannot all be allocated; [#3940](https://github.com/Rdatatable/data.table/issues/3940). Found thanks to `clang`'s Leak Sanitizer (prompted by CRAN's diligent use of latest tools), and two tests in the test suite which tested the unsupported type error.

## NOTES


Expand Down
87 changes: 48 additions & 39 deletions src/forder.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ static uint8_t **key = NULL;
static int *anso = NULL;
static bool notFirst=false;

#define Error(...) do {cleanup(); error(__VA_ARGS__);} while(0) // http://gcc.gnu.org/onlinedocs/cpp/Swallowing-the-Semicolon.html#Swallowing-the-Semicolon
static char msg[1001];
#define STOP(...) do {snprintf(msg, 1000, __VA_ARGS__); cleanup(); error(msg);} while(0) // http://gcc.gnu.org/onlinedocs/cpp/Swallowing-the-Semicolon.html#Swallowing-the-Semicolon
// use STOP in this file (not error()) to ensure cleanup() is called first
// snprintf to msg first in case nrow (just as an example) is provided in the message because cleanup() sets nrow to 0
#undef warning
#define warning(...) Do not use warning in this file // since it can be turned to error via warn=2
/* Using OS realloc() in this file to benefit from (often) in-place realloc() to save copy
* We have to trap on exit anyway to call savetl_end().
* NB: R_alloc() would be more convenient (fails within) and robust (auto free) but there is no R_realloc(). Implementing R_realloc() would be an alloc and copy, iiuc.
* Calloc/Realloc needs to be Free'd, even before error() [R-exts$6.1.2]. An oom within Calloc causes a previous Calloc to leak so Calloc would still needs to be trapped anyway.
* Therefore, using <<if (!malloc()) Error("helpful context msg")>> approach to cleanup() on error.
* Therefore, using <<if (!malloc()) STOP("helpful context msg")>> approach to cleanup() on error.
*/

static void free_ustr() {
Expand All @@ -88,7 +91,7 @@ static void cleanup() {
free(cradix_counts); cradix_counts=NULL;
free(cradix_xtmp); cradix_xtmp=NULL;
free_ustr();
if (key!=NULL) for (int i=0; i<nradix; i++) free(key[i]);
if (key!=NULL) { int i=0; while (key[i]!=NULL) free(key[i++]); } // ==nradix, other than rare cases e.g. tests 1844.5-6 (#3940), and if a calloc fails
free(key); key=NULL; nradix=0;
savetl_end(); // Restore R's own usage of tl. Must run after the for loop in free_ustr() since only CHARSXP which had tl>0 (R's usage) are stored there.
}
Expand All @@ -100,7 +103,7 @@ static void push(const int *x, const int n) {
if (gs_thread_alloc[me] < newn) {
gs_thread_alloc[me] = (newn < nrow/3) ? (1+(newn*2)/4096)*4096 : nrow; // [2|3] to not overflow and 3 not 2 to avoid allocating close to nrow (nrow groups occurs when all size 1 groups)
gs_thread[me] = realloc(gs_thread[me], gs_thread_alloc[me]*sizeof(int));
if (gs_thread[me]==NULL) Error("Failed to realloc thread private group size buffer to %d*4bytes", (int)gs_thread_alloc[me]);
if (gs_thread[me]==NULL) STOP("Failed to realloc thread private group size buffer to %d*4bytes", (int)gs_thread_alloc[me]);
}
memcpy(gs_thread[me]+gs_thread_n[me], x, n*sizeof(int));
gs_thread_n[me] += n;
Expand All @@ -114,7 +117,7 @@ static void flush() {
if (gs_alloc < newn) {
gs_alloc = (newn < nrow/3) ? (1+(newn*2)/4096)*4096 : nrow;
gs = realloc(gs, gs_alloc*sizeof(int));
if (gs==NULL) Error("Failed to realloc group size result to %d*4bytes", (int)gs_alloc);
if (gs==NULL) STOP("Failed to realloc group size result to %d*4bytes", (int)gs_alloc);
}
memcpy(gs+gs_n, gs_thread[me], n*sizeof(int));
gs_n += n;
Expand Down Expand Up @@ -257,7 +260,7 @@ static void cradix_r(SEXP *xsub, int n, int radix)
memset(thiscounts, 0, 256*sizeof(int));
return;
}
if (thiscounts[0] != 0) Error("Logical error. counts[0]=%d in cradix but should have been decremented to 0. radix=%d", thiscounts[0], radix);
if (thiscounts[0] != 0) STOP("Logical error. counts[0]=%d in cradix but should have been decremented to 0. radix=%d", thiscounts[0], radix);
itmp = 0;
for (int i=1; i<256; i++) {
if (thiscounts[i] == 0) continue;
Expand All @@ -272,9 +275,9 @@ static void cradix_r(SEXP *xsub, int n, int radix)
static void cradix(SEXP *x, int n)
{
cradix_counts = (int *)calloc(ustr_maxlen*256, sizeof(int)); // counts for the letters of left-aligned strings
if (!cradix_counts) Error("Failed to alloc cradix_counts");
if (!cradix_counts) STOP("Failed to alloc cradix_counts");
cradix_xtmp = (SEXP *)malloc(ustr_n*sizeof(SEXP));
if (!cradix_xtmp) Error("Failed to alloc cradix_tmp");
if (!cradix_xtmp) STOP("Failed to alloc cradix_tmp");
cradix_r(x, n, 0);
free(cradix_counts); cradix_counts=NULL;
free(cradix_xtmp); cradix_xtmp=NULL;
Expand All @@ -285,8 +288,8 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
{
int na_count=0;
bool anyneedutf8=false;
if (ustr_n!=0) Error("Internal error: ustr isn't empty when starting range_str: ustr_n=%d, ustr_alloc=%d", ustr_n, ustr_alloc); // # nocov
if (ustr_maxlen!=0) Error("Internal error: ustr_maxlen isn't 0 when starting range_str"); // # nocov
if (ustr_n!=0) STOP("Internal error: ustr isn't empty when starting range_str: ustr_n=%d, ustr_alloc=%d", ustr_n, ustr_alloc); // # nocov
if (ustr_maxlen!=0) STOP("Internal error: ustr_maxlen isn't 0 when starting range_str"); // # nocov
// savetl_init() has already been called at the start of forder
#pragma omp parallel for num_threads(getDTthreads())
for(int i=0; i<n; i++) {
Expand All @@ -306,7 +309,7 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
ustr_alloc = (ustr_alloc==0) ? 16384 : ustr_alloc*2; // small initial guess, negligible time to alloc 128KB (32 pages)
if (ustr_alloc>n) ustr_alloc = n; // clamp at n. Reaches n when fully unique (no dups)
ustr = realloc(ustr, ustr_alloc * sizeof(SEXP));
if (ustr==NULL) Error("Unable to realloc %d * %d bytes in range_str", ustr_alloc, sizeof(SEXP)); // # nocov
if (ustr==NULL) STOP("Unable to realloc %d * %d bytes in range_str", ustr_alloc, (int)sizeof(SEXP)); // # nocov
}
ustr[ustr_n++] = s;
SET_TRUELENGTH(s, -ustr_n); // unique in any order is fine. first-appearance order is achieved later in count_group
Expand All @@ -324,7 +327,7 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
SEXP ustr2 = PROTECT(allocVector(STRSXP, ustr_n));
for (int i=0; i<ustr_n; i++) SET_STRING_ELT(ustr2, i, ENC2UTF8(ustr[i]));
SEXP *ustr3 = (SEXP *)malloc(ustr_n * sizeof(SEXP));
if (!ustr3) Error("Failed to alloc ustr3 when converting strings to UTF8"); // # nocov
if (!ustr3) STOP("Failed to alloc ustr3 when converting strings to UTF8"); // # nocov
memcpy(ustr3, STRING_PTR(ustr2), ustr_n*sizeof(SEXP));
// need to reset ustr_maxlen because we need ustr_maxlen for utf8 strings
ustr_maxlen = 0;
Expand All @@ -342,7 +345,7 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
}
// now use the 1-1 mapping from ustr to ustr2 to get the ordering back into original ustr, being careful to reset tl to 0
int *tl = (int *)malloc(ustr_n * sizeof(int));
if (!tl) Error("Failed to alloc tl when converting strings to UTF8"); // # nocov
if (!tl) STOP("Failed to alloc tl when converting strings to UTF8"); // # nocov
SEXP *tt = STRING_PTR(ustr2);
for (int i=0; i<ustr_n; i++) tl[i] = TRUELENGTH(tt[i]); // fetches the o in ustr3 into tl which is ordered by ustr
for (int i=0; i<ustr_n; i++) SET_TRUELENGTH(ustr3[i], 0); // reset to 0 tl of the UTF8 (and possibly non-UTF in ustr too)
Expand Down Expand Up @@ -406,7 +409,7 @@ uint64_t dtwiddle(void *p, int i)
}
if (ISNAN(u.d)) return ISNA(u.d) ? 0 /*NA*/ : 1 /*NaN*/; // also normalises a difference between NA on 32bit R (bit 13 set) and 64bit R (bit 13 not set)
if (isinf(u.d)) return signbit(u.d) ? 2 /*-Inf*/ : (0xffffffffffffffff>>(dround*8)) /*+Inf*/;
Error("Unknown non-finite value; not NA, NaN, -Inf or +Inf"); // # nocov
STOP("Unknown non-finite value; not NA, NaN, -Inf or +Inf"); // # nocov
}

void radix_r(const int from, const int to, const int radix);
Expand All @@ -428,11 +431,11 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S

if (!isNewList(DT)) {
if (!isVectorAtomic(DT))
error("Internal error: input is not either a list of columns, or an atomic vector."); // # nocov; caught by colnamesInt at R level, test 1962.0472
STOP("Internal error: input is not either a list of columns, or an atomic vector."); // # nocov; caught by colnamesInt at R level, test 1962.0472
if (!isNull(by))
error("Internal error: input is an atomic vector (not a list of columns) but by= is not NULL"); // # nocov; caught at R level, test 1962.043
STOP("Internal error: input is an atomic vector (not a list of columns) but by= is not NULL"); // # nocov; caught at R level, test 1962.043
if (!isInteger(ascArg) || LENGTH(ascArg)!=1)
error("Input is an atomic vector (not a list of columns) but order= is not a length 1 integer");
STOP("Input is an atomic vector (not a list of columns) but order= is not a length 1 integer");
if (verbose)
Rprintf("forder.c received a vector type '%s' length %d\n", type2char(TYPEOF(DT)), length(DT));
SEXP tt = PROTECT(allocVector(VECSXP, 1)); n_protect++;
Expand All @@ -445,31 +448,31 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
Rprintf("forder.c received %d rows and %d columns\n", length(VECTOR_ELT(DT,0)), length(DT));
}
if (!length(DT))
error("Internal error: DT is an empty list() of 0 columns"); // # nocov should have been caught be colnamesInt, test 2099.1
STOP("Internal error: DT is an empty list() of 0 columns"); // # nocov should have been caught be colnamesInt, test 2099.1
if (!isInteger(by) || !LENGTH(by))
error("Internal error: DT has %d columns but 'by' is either not integer or is length 0", length(DT)); // # nocov colnamesInt catches, 2099.2
STOP("Internal error: DT has %d columns but 'by' is either not integer or is length 0", length(DT)); // # nocov colnamesInt catches, 2099.2
if (!isInteger(ascArg) || LENGTH(ascArg)!=LENGTH(by))
error("Either order= is not integer or its length (%d) is different to by='s length (%d)", LENGTH(ascArg), LENGTH(by));
STOP("Either order= is not integer or its length (%d) is different to by='s length (%d)", LENGTH(ascArg), LENGTH(by));
nrow = length(VECTOR_ELT(DT,0));
int n_cplx = 0;
for (int i=0; i<LENGTH(by); i++) {
int by_i = INTEGER(by)[i];
if (by_i < 1 || by_i > length(DT))
error("internal error: 'by' value %d out of range [1,%d]", by_i, length(DT)); // # nocov # R forderv already catch that using C colnamesInt
STOP("internal error: 'by' value %d out of range [1,%d]", by_i, length(DT)); // # nocov # R forderv already catch that using C colnamesInt
if ( nrow != length(VECTOR_ELT(DT, by_i-1)) )
error("Column %d is length %d which differs from length of column 1 (%d)\n", INTEGER(by)[i], length(VECTOR_ELT(DT, INTEGER(by)[i]-1)), nrow);
STOP("Column %d is length %d which differs from length of column 1 (%d)\n", INTEGER(by)[i], length(VECTOR_ELT(DT, INTEGER(by)[i]-1)), nrow);
if (TYPEOF(VECTOR_ELT(DT, by_i-1)) == CPLXSXP) n_cplx++;
}
if (!isLogical(retGrpArg) || LENGTH(retGrpArg)!=1 || INTEGER(retGrpArg)[0]==NA_LOGICAL)
error("retGrp= must be TRUE or FALSE");
STOP("retGrp= must be TRUE or FALSE");
retgrp = LOGICAL(retGrpArg)[0]==TRUE;
if (!isLogical(sortGroupsArg) || LENGTH(sortGroupsArg)!=1 || INTEGER(sortGroupsArg)[0]==NA_LOGICAL )
error("sort= must be TRUE or FALSE");
STOP("sort= must be TRUE or FALSE");
sortType = LOGICAL(sortGroupsArg)[0]==TRUE; // if sortType is 1, it is later flipped between +1/-1 according to ascArg. Otherwise ascArg is ignored when sortType==0
if (!retgrp && !sortType)
error("At least one of retGrp= or sort= must be TRUE");
STOP("At least one of retGrp= or sort= must be TRUE");
if (!isLogical(naArg) || LENGTH(naArg) != 1)
error("na.last must be logical TRUE, FALSE or NA of length 1");
STOP("na.last must be logical TRUE, FALSE or NA of length 1");
nalast = (LOGICAL(naArg)[0] == NA_LOGICAL) ? -1 : LOGICAL(naArg)[0]; // 1=na last, 0=na first (default), -1=remove na

if (nrow==0) {
Expand All @@ -494,8 +497,10 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
savetl_init(); // from now on use Error not error

int ncol=length(by);
key = calloc((ncol+n_cplx)*8+1, sizeof(uint8_t *)); // needs to be before loop because part II relies on part I, column-by-column. +1 because we check NULL after last one
// TODO: if key==NULL Error
int keyAlloc = (ncol+n_cplx)*8 + 1; // +1 for NULL to mark end; calloc to initialize with NULLs
key = calloc(keyAlloc, sizeof(uint8_t *)); // needs to be before loop because part II relies on part I, column-by-column.
if (!key)
STOP("Unable to allocate %llu bytes of working memory", (unsigned long long)(keyAlloc*sizeof(uint8_t *))); // # nocov
nradix=0; // the current byte we're writing this column to; might be squashing into it (spare>0)
int spare=0; // the amount of bits remaining on the right of the current nradix byte
bool isReal=false;
Expand All @@ -511,7 +516,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
if (sortType) {
sortType=INTEGER(ascArg)[col]; // if sortType!=0 (not first-appearance) then +1/-1 comes from ascArg.
if (sortType!=1 && sortType!=-1)
Error("Item %d of order (ascending/descending) is %d. Must be +1 or -1.", col+1, sortType);
STOP("Item %d of order (ascending/descending) is %d. Must be +1 or -1.", col+1, sortType);
}
//Rprintf("sortType = %d\n", sortType);
switch(TYPEOF(x)) {
Expand Down Expand Up @@ -553,7 +558,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
range_str(STRING_PTR(x), nrow, &min, &max, &na_count);
break;
default:
Error("Column %d passed to [f]order is type '%s', not yet supported.", col+1, type2char(TYPEOF(x)));
STOP("Column %d passed to [f]order is type '%s', not yet supported.", col+1, type2char(TYPEOF(x)));
}
TEND(3);
if (na_count==nrow || (min>0 && min==max && na_count==0 && infnan_count==0)) {
Expand Down Expand Up @@ -596,8 +601,12 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
}

for (int b=0; b<nbyte; b++) {
if (key[nradix+b]==NULL)
key[nradix+b] = calloc(nrow, sizeof(uint8_t)); // 0 initialize so that NA's can just skip (NA is always the 0 offset)
if (key[nradix+b]==NULL) {
uint8_t *tt = calloc(nrow, sizeof(uint8_t)); // 0 initialize so that NA's can just skip (NA is always the 0 offset)
if (!tt)
STOP("Unable to allocate %llu bytes of working memory", (unsigned long long)(nrow * sizeof(uint8_t))); // # nocov
key[nradix+b] = tt;
}
}

const bool asc = (sortType>=0);
Expand Down Expand Up @@ -702,7 +711,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
free_ustr(); // ustr could be left allocated and reused, but free now in case large and we're tight on ram
break;
default:
Error("Internal error: column not supported not caught earlier"); // # nocov
STOP("Internal error: column not supported not caught earlier"); // # nocov
}
nradix += nbyte-1+(spare==0);
TEND(4)
Expand All @@ -716,12 +725,12 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
int nth = getDTthreads();
TMP = (int *)malloc(nth*UINT16_MAX*sizeof(int)); // used by counting sort (my_n<=65536) in radix_r()
UGRP = (uint8_t *)malloc(nth*256); // TODO: align TMP and UGRP to cache lines (and do the same for stack allocations too)
if (!TMP || !UGRP /*|| TMP%64 || UGRP%64*/) Error("Failed to allocate TMP or UGRP or they weren't cache line aligned: nth=%d", nth);
if (!TMP || !UGRP /*|| TMP%64 || UGRP%64*/) STOP("Failed to allocate TMP or UGRP or they weren't cache line aligned: nth=%d", nth);
if (retgrp) {
gs_thread = calloc(nth, sizeof(int *)); // thread private group size buffers
gs_thread_alloc = calloc(nth, sizeof(int));
gs_thread_n = calloc(nth, sizeof(int));
if (!gs_thread || !gs_thread_alloc || !gs_thread_n) Error("Could not allocate (very tiny) group size thread buffers");
if (!gs_thread || !gs_thread_alloc || !gs_thread_n) STOP("Could not allocate (very tiny) group size thread buffers");
}
if (nradix) {
radix_r(0, nrow-1, 0); // top level recursive call: (from, to, radix)
Expand Down Expand Up @@ -1041,7 +1050,7 @@ void radix_r(const int from, const int to, const int radix) {
uint16_t *counts = calloc(nBatch*256,sizeof(uint16_t));
uint8_t *ugrps = malloc(nBatch*256*sizeof(uint8_t));
int *ngrps = calloc(nBatch ,sizeof(int));
if (!counts || !ugrps || !ngrps) Error("Failed to allocate parallel counts. my_n=%d, nBatch=%d", my_n, nBatch);
if (!counts || !ugrps || !ngrps) STOP("Failed to allocate parallel counts. my_n=%d, nBatch=%d", my_n, nBatch);

bool skip=true;
const int n_rem = nradix-radix-1; // how many radix are remaining after this one
Expand Down Expand Up @@ -1150,7 +1159,7 @@ void radix_r(const int from, const int to, const int radix) {
TEND(18 + notFirst*3)
if (!skip) {
int *TMP = malloc(my_n * sizeof(int));
if (!TMP) Error("Unable to allocate TMP for my_n=%d items in parallel batch counting", my_n);
if (!TMP) STOP("Unable to allocate TMP for my_n=%d items in parallel batch counting", my_n);
#pragma omp parallel for num_threads(getDTthreads())
for (int batch=0; batch<nBatch; batch++) {
const int *restrict my_starts = starts + batch*256;
Expand Down Expand Up @@ -1257,7 +1266,7 @@ SEXP fsorted(SEXP x)
// These are all sequential access to x, so very quick and cache efficient. Could be parallel by checking continuity at batch boundaries.
const int n = length(x);
if (n <= 1) return(ScalarLogical(TRUE));
if (!isVectorAtomic(x)) Error("is.sorted (R level) and fsorted (C level) only to be used on vectors. If needed on a list/data.table, you'll need the order anyway if not sorted, so use if (length(o<-forder(...))) for efficiency in one step, or equivalent at C level");
if (!isVectorAtomic(x)) STOP("is.sorted (R level) and fsorted (C level) only to be used on vectors. If needed on a list/data.table, you'll need the order anyway if not sorted, so use if (length(o<-forder(...))) for efficiency in one step, or equivalent at C level");
int i=1;
switch(TYPEOF(x)) {
case INTSXP : case LGLSXP : {
Expand Down Expand Up @@ -1289,7 +1298,7 @@ SEXP fsorted(SEXP x)
}
} break;
default :
Error("type '%s' is not yet supported", type2char(TYPEOF(x)));
STOP("type '%s' is not yet supported", type2char(TYPEOF(x)));
}
return ScalarLogical(i==n);
}
Expand Down

0 comments on commit 4e3740b

Please sign in to comment.