Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix major bug in the NCZarr cache management #2737

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This file contains a high-level description of this package's evolution. Release

## 4.9.3 - TBD

* Fix significant bug in the NCZarr cache management. See [Github #2737](https://github.com/Unidata/netcdf-c/pull/2737).
* Fix default parameters for caching of NCZarr. See [Github #2734](https://github.com/Unidata/netcdf-c/pull/2734).
* Introducing configure-time options to disable various filters, even if the required libraries are available on the system, in support of [GitHub #2712](https://github.com/Unidata/netcdf-c/pull/2712).
* Fix memory leak WRT unreclaimed HDF5 plist. See [Github #2752](https://github.com/Unidata/netcdf-c/pull/2752).
Expand Down
64 changes: 50 additions & 14 deletions libnczarr/zxcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

#define LEAFLEN 32

#define USEPARAMSIZE 0xffffffffffffffff

/* Forward */
static int get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry);
static int put_chunk(NCZChunkCache* cache, NCZCacheEntry*);
static int makeroom(NCZChunkCache* cache);
static int verifycache(NCZChunkCache* cache);
static int flushcache(NCZChunkCache* cache);
static int constraincache(NCZChunkCache* cache);
static int constraincache(NCZChunkCache* cache, size64_t needed);

/**************************************************/
/* Dispatch table per-var cache functions */
Expand Down Expand Up @@ -298,7 +300,7 @@ NCZ_read_cache_chunk(NCZChunkCache* cache, const size64_t* indices, void** datap
if((stat=get_chunk(cache,entry))) goto done;
assert(entry->data != NULL);
/* Ensure cache constraints not violated; but do it before entry is added */
if((stat=makeroom(cache))) goto done;
if((stat=verifycache(cache))) goto done;
nclistpush(cache->mru,entry);
if((stat = ncxcacheinsert(cache->xcache,entry->hashkey,entry))) goto done;
}
Expand Down Expand Up @@ -348,24 +350,26 @@ fprintf(stderr,"|cache.write|=%ld\n",nclistlength(cache->mru));
entry = NULL;

/* Ensure cache constraints not violated */
if((stat=makeroom(cache))) goto done;
if((stat=verifycache(cache))) goto done;

done:
if(entry) free_cache_entry(cache,entry);
return THROW(stat);
}
#endif

/* Constrain cache, but allow at least one entry */
/* Constrain cache */
static int
makeroom(NCZChunkCache* cache)
verifycache(NCZChunkCache* cache)
{
int stat = NC_NOERR;

#if 0
/* Sanity check; make sure at least one entry is always allowed */
if(nclistlength(cache->mru) == 1)
goto done;
stat = constraincache(cache);
#endif
if((stat = constraincache(cache,USEPARAMSIZE))) goto done;
done:
return stat;
}
Expand All @@ -376,10 +380,14 @@ static int
flushcache(NCZChunkCache* cache)
{
int stat = NC_NOERR;
#if 0
size_t oldsize = cache->params.size;
cache->params.size = 0;
stat = constraincache(cache);
stat = constraincache(cache,USEPARAMSIZE);
cache->params.size = oldsize;
#else
stat = constraincache(cache,USEPARAMSIZE);
#endif
return stat;
}

Expand All @@ -388,21 +396,32 @@ flushcache(NCZChunkCache* cache)
violating any of its constraints.
On entry, constraints might be violated.
Make sure that the entryinuse (NULL => no constraint) is not reclaimed.
@param cache
@param needed make sure there is room for this much space; USEPARAMSIZE => ensure no more than cache params is used.
*/

static int
constraincache(NCZChunkCache* cache)
constraincache(NCZChunkCache* cache, size64_t needed)
{
int stat = NC_NOERR;
size64_t final_size;

/* If the cache is empty then do nothing */
if(cache->used == 0) goto done;

if(needed == USEPARAMSIZE)
final_size = cache->params.size;
else if(cache->used > needed)
final_size = cache->used - needed;
else
final_size = 0;

/* Flush from LRU end if we are at capacity */
while(nclistlength(cache->mru) > cache->params.nelems || cache->used > cache->params.size) {
while(nclistlength(cache->mru) > cache->params.nelems || cache->used > final_size) {
int i;
void* ptr;
NCZCacheEntry* e = ncxcachelast(cache->xcache); /* last entry is the least recently used */
if(e == NULL) break;
if((stat = ncxcacheremove(cache->xcache,e->hashkey,&ptr))) goto done;
assert(e == ptr);
for(i=0;i<nclistlength(cache->mru);i++) {
Expand All @@ -427,6 +446,12 @@ fprintf(stderr,"|cache.makeroom|=%ld\n",nclistlength(cache->mru));
return stat;
}

/**
Push modified cache entries to disk.
Also make sure the cache size is correct.
@param cache
@return NC_EXXX error
*/
int
NCZ_flush_chunk_cache(NCZChunkCache* cache)
{
Expand All @@ -441,15 +466,21 @@ NCZ_flush_chunk_cache(NCZChunkCache* cache)
for(i=0;i<nclistlength(cache->mru);i++) {
NCZCacheEntry* entry = nclistget(cache->mru,i);
if(entry->modified) {
/* Make cache used be consistent across filter application */
cache->used -= entry->size;
/* Write out this chunk in toto*/
if((stat=put_chunk(cache,entry)))
goto done;
cache->used += entry->size;
}
entry->modified = 0;
}
/* Re-compute space used */
cache->used = 0;
for(i=0;i<nclistlength(cache->mru);i++) {
NCZCacheEntry* entry = nclistget(cache->mru,i);
cache->used += entry->size;
}
/* Make sure cache size and nelems are correct */
if((stat=verifycache(cache))) goto done;


done:
return ZUNTRACE(stat);
Expand Down Expand Up @@ -726,6 +757,9 @@ get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry)
default: goto done;
}

/* make room in the cache */
if((stat = constraincache(cache,size))) goto done;

if(!empty) {
/* Make sure we have a place to read it */
if((entry->data = (void*)calloc(1,entry->size)) == NULL)
Expand Down Expand Up @@ -795,6 +829,9 @@ get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry)
entry->isfixedstring = 0;
}

/* track new chunk */
cache->used += entry->size;

done:
nullfree(strchunk);
nullfree(path);
Expand Down Expand Up @@ -892,5 +929,4 @@ NCZ_printxcache(NCZChunkCache* cache)
strlcat(xs,ncbytescontents(buf),sizeof(xs));
ncbytesfree(buf);
fprintf(stderr,"%s\n",xs);
// return xs;
}
8 changes: 6 additions & 2 deletions ncdap_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ IF(ENABLE_TESTS)
SET_TESTS_PROPERTIES(ncdap_tst_remote3 PROPERTIES RUN_SERIAL TRUE)
ENDIF(HAVE_BASH)
add_sh_test(ncdap tst_zero_len_var)
add_sh_test(ncdap tst_encode)
# not yet add_sh_test(ncdap tst_hyrax)
add_sh_test(ncdap tst_fillmismatch)
IF(ENABLE_DAP_LONG_TESTS)
add_sh_test(ncdap tst_longremote3)
SET_TESTS_PROPERTIES(ncdap_tst_longremote3 PROPERTIES RUN_SERIAL TRUE)
ENDIF(ENABLE_DAP_LONG_TESTS)
IF(FALSE)
# Apparently iridl.ldeo.columbia.edu is down for now
add_sh_test(ncdap tst_encode)
# not yet fixed
add_sh_test(ncdap tst_hyrax)
ENDIF()
ENDIF(BUILD_UTILITIES)
IF(ENABLE_EXTERNAL_SERVER_TESTS)
add_bin_test(ncdap test_manyurls)
Expand Down
7 changes: 6 additions & 1 deletion ncdap_test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ findtestserver_SOURCES = findtestserver.c
pingurl_SOURCES = pingurl.c

if BUILD_UTILITIES
TESTS += tst_ber.sh tst_remote3.sh tst_formatx.sh testurl.sh tst_fillmismatch.sh tst_zero_len_var.sh tst_encode.sh
TESTS += tst_ber.sh tst_remote3.sh tst_formatx.sh testurl.sh tst_fillmismatch.sh tst_zero_len_var.sh
endif

if AX_IGNORE
# Apparently iridl.ldeo.columbia.edu is down for now
TESTS += tst_encode.sh
endif

TESTS += test_partvar
Expand Down
6 changes: 6 additions & 0 deletions nczarr_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ IF(ENABLE_TESTS)
TARGET_INCLUDE_DIRECTORIES(tst_chunkcases PUBLIC ../libnczarr)
add_sh_test(nczarr_test run_chunkcases)

if(LARGE_FILE_TESTS)
BUILD_BIN_TEST(test_readcaching})
BUILD_BIN_TEST(test_writecaching})
add_sh_test(nczarr_test run_cachetest)
ENDIF()

add_sh_test(nczarr_test run_purezarr)
add_sh_test(nczarr_test run_interop)
add_sh_test(nczarr_test run_misc)
Expand Down
5 changes: 5 additions & 0 deletions nczarr_test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ TESTS += run_nulls.sh
TESTS += run_notzarr.sh
TESTS += run_external.sh

if LARGE_FILE_TESTS
check_PROGRAMS += test_writecaching test_readcaching
TESTS += run_cachetest.sh
endif

endif #BUILD_UTILITIES

if BUILD_UTILITIES
Expand Down
40 changes: 40 additions & 0 deletions nczarr_test/run_cachetest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/sh

if test "x$srcdir" = x ; then srcdir=`pwd`; fi
. ../test_common.sh

. "$top_srcdir/nczarr_test/test_nczarr.sh"

set -e

s3isolate "testdir_cachtest"
THISDIR=`pwd`
cd $ISOPATH

# This shell script tests support for the NC_STRING type

testcase() {
zext=$1

echo "*** Test: cache operation"

# Get pure zarr args
fileargs tmp_scalar_zarr "mode=zarr,$zext"
zarrurl="$fileurl"
zarrfile="$file"

# setup
deletemap $zext $zarrfile

echo "*** write cache"
${execdir}/test_writecaching

echo "*** read cache"
${execdir}/test_readcaching
}

testcase file
if test "x$FEATURE_NCZARR_ZIP" = xyes ; then testcase zip; fi
if test "x$FEATURE_S3TESTS" = xyes ; then testcase s3; fi

if test "x$FEATURE_S3TESTS" = xyes ; then s3sdkdelete "/${S3ISOPATH}" ; fi # Cleanup
84 changes: 84 additions & 0 deletions nczarr_test/test_readcaching.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/resource.h>

#if defined(__APPLE__) && defined(__MACH__)
#include <mach/mach.h>
#endif

#include "netcdf.h"

#define filename "file://tmp_cachetest.zarr#mode=zarr,file"
#define chunkSize (size_t)(1<<17) /* 128k */
#define numCells (size_t)(50 * chunkSize)
#define numSteps (size_t)360

static float var[numCells];

size_t getPeakRSS(void)
{
struct rusage rusage;
getrusage( RUSAGE_SELF, &rusage );
#if defined(__APPLE__) && defined(__MACH__)
return (size_t)rusage.ru_maxrss;
#else
return (size_t)(rusage.ru_maxrss * 1024L);
#endif
}

static void
nce(int istat)
{
if (istat != NC_NOERR)
{
fprintf(stderr, "%s\n", nc_strerror(istat));
exit(-1);
}
}

int
main(void)
{
printf("read: chunkSize=%zu, numCells=%zu, numSteps=%zu, filename=%s\n", chunkSize, numCells, numSteps, filename);

int ncId;
nce(nc_open(filename, NC_NOWRITE, &ncId));

int varId;
nce(nc_inq_varid(ncId, "var", &varId));

size_t size, nelems;
float preemption;
nce(nc_get_var_chunk_cache(ncId, varId, &size, &nelems, &preemption));
printf("default chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);
size = 4 * numCells; // one float field at one time step
nelems = 1000;
preemption = 0.5;
nce(nc_set_var_chunk_cache(ncId, varId, size, nelems, preemption));
printf("set chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);

{
for (size_t i = 0; i < numCells; ++i) var[i] = 0.0f;
for (size_t i = 0; i < numSteps; ++i)
{
size_t start[2], count[2];
start[0] = i; start[1] = 0;
count[0] = 1; count[1] = numCells;
nce(nc_get_vara_float(ncId, varId, start, count, var));
}
}

nce(nc_close(ncId));

{
size_t mbused = getPeakRSS() / (1024 * 1024);
printf("Max mem: %zu MB\n", mbused);
if(mbused > 100) {
fprintf(stderr,"*** Failed: used: %luMB expected: < 100MB\n",mbused);
return (1);
}
}

return 0;
}
Loading
Loading