Skip to content

Commit

Permalink
Merge pull request #2737 from DennisHeimbigner/cachesizes2.dmh
Browse files Browse the repository at this point in the history
Fix major bug in the NCZarr cache management
  • Loading branch information
WardF committed Aug 17, 2023
2 parents 032b910 + c5b5a8a commit ef94285
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 17 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This file contains a high-level description of this package's evolution. Release

## 4.9.3 - TBD

* Fix significant bug in the NCZarr cache management. See [Github #2737](https://github.com/Unidata/netcdf-c/pull/2737).
* Fix default parameters for caching of NCZarr. See [Github #2734](https://github.com/Unidata/netcdf-c/pull/2734).
* Introducing configure-time options to disable various filters, even if the required libraries are available on the system, in support of [GitHub #2712](https://github.com/Unidata/netcdf-c/pull/2712).
* Fix memory leak WRT unreclaimed HDF5 plist. See [Github #2752](https://github.com/Unidata/netcdf-c/pull/2752).
Expand Down
64 changes: 50 additions & 14 deletions libnczarr/zxcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

#define LEAFLEN 32

#define USEPARAMSIZE 0xffffffffffffffff

/* Forward */
static int get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry);
static int put_chunk(NCZChunkCache* cache, NCZCacheEntry*);
static int makeroom(NCZChunkCache* cache);
static int verifycache(NCZChunkCache* cache);
static int flushcache(NCZChunkCache* cache);
static int constraincache(NCZChunkCache* cache);
static int constraincache(NCZChunkCache* cache, size64_t needed);

/**************************************************/
/* Dispatch table per-var cache functions */
Expand Down Expand Up @@ -298,7 +300,7 @@ NCZ_read_cache_chunk(NCZChunkCache* cache, const size64_t* indices, void** datap
if((stat=get_chunk(cache,entry))) goto done;
assert(entry->data != NULL);
/* Ensure cache constraints not violated; but do it before entry is added */
if((stat=makeroom(cache))) goto done;
if((stat=verifycache(cache))) goto done;
nclistpush(cache->mru,entry);
if((stat = ncxcacheinsert(cache->xcache,entry->hashkey,entry))) goto done;
}
Expand Down Expand Up @@ -348,24 +350,26 @@ fprintf(stderr,"|cache.write|=%ld\n",nclistlength(cache->mru));
entry = NULL;

/* Ensure cache constraints not violated */
if((stat=makeroom(cache))) goto done;
if((stat=verifycache(cache))) goto done;

done:
if(entry) free_cache_entry(cache,entry);
return THROW(stat);
}
#endif

/* Constrain cache, but allow at least one entry */
/* Constrain cache */
static int
makeroom(NCZChunkCache* cache)
verifycache(NCZChunkCache* cache)
{
int stat = NC_NOERR;

#if 0
/* Sanity check; make sure at least one entry is always allowed */
if(nclistlength(cache->mru) == 1)
goto done;
stat = constraincache(cache);
#endif
if((stat = constraincache(cache,USEPARAMSIZE))) goto done;
done:
return stat;
}
Expand All @@ -376,10 +380,14 @@ static int
flushcache(NCZChunkCache* cache)
{
int stat = NC_NOERR;
#if 0
size_t oldsize = cache->params.size;
cache->params.size = 0;
stat = constraincache(cache);
stat = constraincache(cache,USEPARAMSIZE);
cache->params.size = oldsize;
#else
stat = constraincache(cache,USEPARAMSIZE);
#endif
return stat;
}

Expand All @@ -388,21 +396,32 @@ flushcache(NCZChunkCache* cache)
violating any of its constraints.
On entry, constraints might be violated.
Make sure that the entryinuse (NULL => no constraint) is not reclaimed.
@param cache
@param needed make sure there is room for this much space; USEPARAMSIZE => ensure no more than cache params is used.
*/

static int
constraincache(NCZChunkCache* cache)
constraincache(NCZChunkCache* cache, size64_t needed)
{
int stat = NC_NOERR;
size64_t final_size;

/* If the cache is empty then do nothing */
if(cache->used == 0) goto done;

if(needed == USEPARAMSIZE)
final_size = cache->params.size;
else if(cache->used > needed)
final_size = cache->used - needed;
else
final_size = 0;

/* Flush from LRU end if we are at capacity */
while(nclistlength(cache->mru) > cache->params.nelems || cache->used > cache->params.size) {
while(nclistlength(cache->mru) > cache->params.nelems || cache->used > final_size) {
int i;
void* ptr;
NCZCacheEntry* e = ncxcachelast(cache->xcache); /* last entry is the least recently used */
if(e == NULL) break;
if((stat = ncxcacheremove(cache->xcache,e->hashkey,&ptr))) goto done;
assert(e == ptr);
for(i=0;i<nclistlength(cache->mru);i++) {
Expand All @@ -427,6 +446,12 @@ fprintf(stderr,"|cache.makeroom|=%ld\n",nclistlength(cache->mru));
return stat;
}

/**
Push modified cache entries to disk.
Also make sure the cache size is correct.
@param cache
@return NC_EXXX error
*/
int
NCZ_flush_chunk_cache(NCZChunkCache* cache)
{
Expand All @@ -441,15 +466,21 @@ NCZ_flush_chunk_cache(NCZChunkCache* cache)
for(i=0;i<nclistlength(cache->mru);i++) {
NCZCacheEntry* entry = nclistget(cache->mru,i);
if(entry->modified) {
/* Make cache used be consistent across filter application */
cache->used -= entry->size;
/* Write out this chunk in toto*/
if((stat=put_chunk(cache,entry)))
goto done;
cache->used += entry->size;
}
entry->modified = 0;
}
/* Re-compute space used */
cache->used = 0;
for(i=0;i<nclistlength(cache->mru);i++) {
NCZCacheEntry* entry = nclistget(cache->mru,i);
cache->used += entry->size;
}
/* Make sure cache size and nelems are correct */
if((stat=verifycache(cache))) goto done;


done:
return ZUNTRACE(stat);
Expand Down Expand Up @@ -726,6 +757,9 @@ get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry)
default: goto done;
}

/* make room in the cache */
if((stat = constraincache(cache,size))) goto done;

if(!empty) {
/* Make sure we have a place to read it */
if((entry->data = (void*)calloc(1,entry->size)) == NULL)
Expand Down Expand Up @@ -795,6 +829,9 @@ get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry)
entry->isfixedstring = 0;
}

/* track new chunk */
cache->used += entry->size;

done:
nullfree(strchunk);
nullfree(path);
Expand Down Expand Up @@ -892,5 +929,4 @@ NCZ_printxcache(NCZChunkCache* cache)
strlcat(xs,ncbytescontents(buf),sizeof(xs));
ncbytesfree(buf);
fprintf(stderr,"%s\n",xs);
// return xs;
}
8 changes: 6 additions & 2 deletions ncdap_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ IF(ENABLE_TESTS)
SET_TESTS_PROPERTIES(ncdap_tst_remote3 PROPERTIES RUN_SERIAL TRUE)
ENDIF(HAVE_BASH)
add_sh_test(ncdap tst_zero_len_var)
add_sh_test(ncdap tst_encode)
# not yet add_sh_test(ncdap tst_hyrax)
add_sh_test(ncdap tst_fillmismatch)
IF(ENABLE_DAP_LONG_TESTS)
add_sh_test(ncdap tst_longremote3)
SET_TESTS_PROPERTIES(ncdap_tst_longremote3 PROPERTIES RUN_SERIAL TRUE)
ENDIF(ENABLE_DAP_LONG_TESTS)
IF(FALSE)
# Apparently iridl.ldeo.columbia.edu is down for now
add_sh_test(ncdap tst_encode)
# not yet fixed
add_sh_test(ncdap tst_hyrax)
ENDIF()
ENDIF(BUILD_UTILITIES)
IF(ENABLE_EXTERNAL_SERVER_TESTS)
add_bin_test(ncdap test_manyurls)
Expand Down
7 changes: 6 additions & 1 deletion ncdap_test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ findtestserver_SOURCES = findtestserver.c
pingurl_SOURCES = pingurl.c

if BUILD_UTILITIES
TESTS += tst_ber.sh tst_remote3.sh tst_formatx.sh testurl.sh tst_fillmismatch.sh tst_zero_len_var.sh tst_encode.sh
TESTS += tst_ber.sh tst_remote3.sh tst_formatx.sh testurl.sh tst_fillmismatch.sh tst_zero_len_var.sh
endif

if AX_IGNORE
# Apparently iridl.ldeo.columbia.edu is down for now
TESTS += tst_encode.sh
endif

TESTS += test_partvar
Expand Down
6 changes: 6 additions & 0 deletions nczarr_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ IF(ENABLE_TESTS)
TARGET_INCLUDE_DIRECTORIES(tst_chunkcases PUBLIC ../libnczarr)
add_sh_test(nczarr_test run_chunkcases)

if(LARGE_FILE_TESTS)
BUILD_BIN_TEST(test_readcaching})
BUILD_BIN_TEST(test_writecaching})
add_sh_test(nczarr_test run_cachetest)
ENDIF()

add_sh_test(nczarr_test run_purezarr)
add_sh_test(nczarr_test run_interop)
add_sh_test(nczarr_test run_misc)
Expand Down
5 changes: 5 additions & 0 deletions nczarr_test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ TESTS += run_nulls.sh
TESTS += run_notzarr.sh
TESTS += run_external.sh

if LARGE_FILE_TESTS
check_PROGRAMS += test_writecaching test_readcaching
TESTS += run_cachetest.sh
endif

endif #BUILD_UTILITIES

if BUILD_UTILITIES
Expand Down
40 changes: 40 additions & 0 deletions nczarr_test/run_cachetest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/sh

if test "x$srcdir" = x ; then srcdir=`pwd`; fi
. ../test_common.sh

. "$top_srcdir/nczarr_test/test_nczarr.sh"

set -e

s3isolate "testdir_cachtest"
THISDIR=`pwd`
cd $ISOPATH

# This shell script tests support for the NC_STRING type

testcase() {
zext=$1

echo "*** Test: cache operation"

# Get pure zarr args
fileargs tmp_scalar_zarr "mode=zarr,$zext"
zarrurl="$fileurl"
zarrfile="$file"

# setup
deletemap $zext $zarrfile

echo "*** write cache"
${execdir}/test_writecaching

echo "*** read cache"
${execdir}/test_readcaching
}

testcase file
if test "x$FEATURE_NCZARR_ZIP" = xyes ; then testcase zip; fi
if test "x$FEATURE_S3TESTS" = xyes ; then testcase s3; fi

if test "x$FEATURE_S3TESTS" = xyes ; then s3sdkdelete "/${S3ISOPATH}" ; fi # Cleanup
84 changes: 84 additions & 0 deletions nczarr_test/test_readcaching.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/resource.h>

#if defined(__APPLE__) && defined(__MACH__)
#include <mach/mach.h>
#endif

#include "netcdf.h"

#define filename "file://tmp_cachetest.zarr#mode=zarr,file"
#define chunkSize (size_t)(1<<17) /* 128k */
#define numCells (size_t)(50 * chunkSize)
#define numSteps (size_t)360

static float var[numCells];

size_t getPeakRSS(void)
{
struct rusage rusage;
getrusage( RUSAGE_SELF, &rusage );
#if defined(__APPLE__) && defined(__MACH__)
return (size_t)rusage.ru_maxrss;
#else
return (size_t)(rusage.ru_maxrss * 1024L);
#endif
}

static void
nce(int istat)
{
if (istat != NC_NOERR)
{
fprintf(stderr, "%s\n", nc_strerror(istat));
exit(-1);
}
}

int
main(void)
{
printf("read: chunkSize=%zu, numCells=%zu, numSteps=%zu, filename=%s\n", chunkSize, numCells, numSteps, filename);

int ncId;
nce(nc_open(filename, NC_NOWRITE, &ncId));

int varId;
nce(nc_inq_varid(ncId, "var", &varId));

size_t size, nelems;
float preemption;
nce(nc_get_var_chunk_cache(ncId, varId, &size, &nelems, &preemption));
printf("default chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);
size = 4 * numCells; // one float field at one time step
nelems = 1000;
preemption = 0.5;
nce(nc_set_var_chunk_cache(ncId, varId, size, nelems, preemption));
printf("set chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);

{
for (size_t i = 0; i < numCells; ++i) var[i] = 0.0f;
for (size_t i = 0; i < numSteps; ++i)
{
size_t start[2], count[2];
start[0] = i; start[1] = 0;
count[0] = 1; count[1] = numCells;
nce(nc_get_vara_float(ncId, varId, start, count, var));
}
}

nce(nc_close(ncId));

{
size_t mbused = getPeakRSS() / (1024 * 1024);
printf("Max mem: %zu MB\n", mbused);
if(mbused > 100) {
fprintf(stderr,"*** Failed: used: %luMB expected: < 100MB\n",mbused);
return (1);
}
}

return 0;
}
Loading

0 comments on commit ef94285

Please sign in to comment.