Skip to content

Commit

Permalink
Unlimited variables allowed in single dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
cpiker committed Mar 8, 2024
1 parent b03fcdc commit 458eb81
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 34 deletions.
72 changes: 64 additions & 8 deletions das2/dataset.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#else
#define strcasecmp _stricmp
#endif
#include <assert.h>

#include "util.h"
#include "dataset.h"
Expand Down Expand Up @@ -357,23 +358,60 @@ DasDim* DasDs_makeDim(
}

/* ************************************************************************* */
/* Codec handling */

/* Most only be triggered at the transition to large, or garbage will be copied in */
void _DasDs_codecsGoLarge(DasDs* pThis)
{
/* Copy over the current codecs */
size_t uNewSz = DASDS_LOC_ENC_SZ * 2;

pThis->lCodecs = (DasCodec*) calloc(uNewSz, sizeof(DasCodec));
pThis->lItems = (int*) calloc(uNewSz, sizeof(int));
memcpy(pThis->lCodecs, pThis->aCodecs, DASDS_LOC_ENC_SZ*sizeof(DasCodec));
memcpy(pThis->lItems, pThis->aItems, DASDS_LOC_ENC_SZ*sizeof(int));

pThis->uSzCodecs = uNewSz;
}

void _DasDs_codecsGoLarger(DasDs* pThis)
{
/* We're already using dynamic codec array, now go even bigger */
size_t uNewSz = pThis->uSzCodecs * 2;


pThis->lCodecs = realloc(pThis->lCodecs, uNewSz * sizeof(DasCodec));
pThis->lItems = realloc(pThis->lCodecs, uNewSz * sizeof(int));

/* Null out the new memory, realloc doesn't do this */
size_t uHalfSz = (pThis->uSzCodecs) * sizeof(DasCodec);
memset(pThis->lCodecs + uHalfSz, 0, uHalfSz);

uHalfSz = (pThis->uSzCodecs) * sizeof(int);
memset(pThis->lItems + uHalfSz, 0, uHalfSz);

pThis->uSzCodecs = uNewSz;
}

DasErrCode DasDs_addFixedCodec(
DasDs* pThis, const char* sAryId, const char* sSemantic,
const char* sEncType, int nItemBytes, int nNumItems
){
if(pThis->uSzEncs == DASDS_LOC_ENC_SZ)
return das_error(DASERR_NOTIMP,
"Adding more then %d array codecs per dataset is not yet implemented",
DASDS_LOC_ENC_SZ
);

/* Go dynamic? */
if(pThis->uSzCodecs == DASDS_LOC_ENC_SZ)
_DasDs_codecsGoLarge(pThis);

/* Go even bigger? */
if(pThis->uCodecs == pThis->uSzCodecs)
_DasDs_codecsGoLarger(pThis);

/* Find the array with this ID */
DasAry* pAry = DasDs_getAryById(pThis, sAryId);
if(pAry == NULL)
return das_error(DASERR_DS, "An array with id '%s' was not found", sAryId);

DasCodec* pCodec = (DasCodec*) &(pThis->aPktEncs[pThis->uSzEncs]);
DasCodec* pCodec = &(pThis->lCodecs[pThis->uCodecs]);

DasErrCode nRet = DasCodec_init(
pCodec, pAry, sSemantic, sEncType, nItemBytes, 0, pAry->units
Expand All @@ -384,12 +422,14 @@ DasErrCode DasDs_addFixedCodec(
return nRet;
}

pThis->nPktItems[pThis->uSzEncs] = nNumItems;
pThis->uSzEncs += 1;
pThis->lItems[pThis->uCodecs] = nNumItems;
pThis->uCodecs += 1;

return DAS_OKAY;
}

/* ************************************************************************* */

char* DasDs_toStr(const DasDs* pThis, char* sBuf, int nLen)
{
char sDimBuf[1024] = {'\0'};
Expand Down Expand Up @@ -504,6 +544,16 @@ void del_DasDs(DasDs* pThis){
del_DasDim(pThis->lDims[u]);
free(pThis->lDims);
}

/* If I had to go large on codecs, free those */
if(pThis->uCodecs >= DASDS_LOC_ENC_SZ){
assert(pThis->lCodecs != pThis->aCodecs);
assert(pThis->lItems != pThis->aItems);
assert(pThis->lCodecs != NULL);
assert(pThis->lItems != NULL);
free(pThis->lCodecs);
free(pThis->lItems);
}

/* Now drop the reference count on our arrays */
if(pThis->lArrays != NULL){
Expand Down Expand Up @@ -560,6 +610,12 @@ DasDs* new_DasDs(
/* All datasets start out as dynamic (or else how would you build one? */
pThis->_dynamic = true;

pThis->uSzCodecs = DASDS_LOC_ENC_SZ; /* build in, small vec array */

/* Point at my internal storage to start */
pThis->lCodecs = pThis->aCodecs;
pThis->lItems = pThis->aItems;

return pThis;
}

Expand Down
56 changes: 42 additions & 14 deletions das2/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ typedef struct dataset {
char sGroupId[DAS_MAX_ID_BUFSZ];

size_t uDims; /* Number of dimensions, das datasets are
* implicitly bundles in qdataset terms. */
* implicitly bundles in qdataset terms. */

DasDim** lDims; /* The data variable object arrays */
size_t uSzDims; /* Current size of dimension array */
Expand All @@ -172,27 +172,36 @@ typedef struct dataset {
bool _dynamic; /* If true, the dataset may still be changing and all
bulk properties such as the iteration shape should be
recalculated instead of using cached values.
If false, cached values are expected to already be
If false, cached values are expected to already be
available */

/* dataset arrays can be written in chunks to output buffers. The number of
* elements in each chuck, the encoding of each element any separators are
* defined below. */
/* DasCodec** lEncs; */

/* Use a fixed size for now */
size_t uSzEncs;
DasCodec aPktEncs[DASDS_LOC_ENC_SZ];
int nPktItems[DASDS_LOC_ENC_SZ];
size_t uCodecs; /* Number of valid codecs */

/** User data pointer
*
* The stream -> dataset hierarchy provides a goood organizational structure
* for application data, especially applications that filter streams. It is
* initialized to NULL when a variable is created but otherwise the library
* dosen't deal with it.
*/
void* pUser;
/* These become large vector memory when uSzEncs > DASDS_LOC_ENC_SZ */

/* When the number of valid codecs grows past DASDS_LOC_ENC_SZ, use an
external buffer for all of them */

DasCodec* lCodecs; /* Codec pointers, internal or external */
int* lItems; /* Number of items to decode per codec, internal or ex */
size_t uSzCodecs; /* Codec & Item array memory size, internal or external */

DasCodec aCodecs[DASDS_LOC_ENC_SZ]; /* small vector memory */
int aItems[DASDS_LOC_ENC_SZ]; /* small vector memory */

/** User data pointer
*
* The stream -> dataset hierarchy provides a goood organizational structure
* for application data, especially applications that filter streams. It is
* initialized to NULL when a variable is created but otherwise the library
* dosen't deal with it.
*/
void* pUser;

} DasDs;

Expand Down Expand Up @@ -575,6 +584,25 @@ DAS_API size_t DasDs_memIndexed(const DasDs* pThis);
*/
DAS_API size_t DasDs_memOwned(const DasDs* pThis);

/** Number of value codecs owned by this dataset
* @param P A pointer to a DasDs
*
* @memberof DasDs
*/
#define DasDs_numCodecs( P ) ( (P)->uCodecs )

/** Get the Ith codec of a dataset
*
* @memberof DasDs
*/
#define DasDs_getCodec( P, I ) ( &( (P)->lCodecs[(I)] ) )

/** Get the number of values we expect the Ith codec to read from each
* raw packet buffer
*
* @memberof DasDs
*/
#define DasDs_pktItems( P, I ) ( (P)->lItems[(I)] )

/** Define a packet data encoded/decoder for fixed length items and arrays
*
Expand Down
3 changes: 1 addition & 2 deletions das2/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ DAS_API DasIO* new_DasIO_cfile(const char* sProg, FILE* file, const char* mode);
* @param nModel The internal data sturcture version to use. If set
* to 2 any das3 structures encountered will trigger a
* failure. If set to 3 then any das2 structures will be
* upgraded to das3. Use -1 to indicate mixed model streams
* (not recommened)
* upgraded to das3. Use -1 to indicate mixed model streams.
*
* @returns DAS_OKAY if successful or an error code if not.
*
Expand Down
8 changes: 4 additions & 4 deletions das2/serial.c
Original file line number Diff line number Diff line change
Expand Up @@ -1448,16 +1448,16 @@ DasDs* dasds_from_xmlheader2(DasBuf* pBuf, StreamDesc* pParent, int nPktId)

DasErrCode dasds_decode_data(DasDs* pDs, DasBuf* pBuf)
{
if(pDs->uSzEncs == 0){
if(DasDs_numCodecs(pDs) == 0){
return das_error(DASERR_SERIAL,
"No decoders are defined for dataset %02d in group %s", DasDs_id(pDs), DasDs_group(pDs)
);
}

int nUnReadBytes = 0;
int nSzEncs = (int)pDs->uSzEncs;
int nSzEncs = (int)DasDs_numCodecs(pDs);
for(int i = 0; i < nSzEncs; ++i){
DasCodec* pCodec = &(pDs->aPktEncs[i]);
DasCodec* pCodec = DasDs_getCodec(pDs, i);
size_t uBufLen = 0;
const ubyte* pRaw = DasBuf_direct(pBuf, &uBufLen);

Expand All @@ -1475,7 +1475,7 @@ DasErrCode dasds_decode_data(DasDs* pDs, DasBuf* pBuf)
be 0, AKA nothing will be unread in the packet.
*/
int nValsRead = 0;
int nValsExpect = pDs->nPktItems[i];
int nValsExpect = DasDs_pktItems(pDs, i);

if((nValsExpect < 1)&&(i < (nSzEncs - 1)))
return das_error(DASERR_NOTIMP,
Expand Down
3 changes: 2 additions & 1 deletion test/cdf_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
if len(f[k].shape) == 1:
print( f[k][:] )
else:
print( f[k][0,:] )
if f[k].shape[0] > 0:
print( f[k][0,:] )

print()

Expand Down
55 changes: 50 additions & 5 deletions utilities/das3_cdf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ long DasVar_cdfType(const DasVar* pVar)
return aCdfType[DasVar_elemType(pVar)];
}

/* Make a simple name for a variable */
const char* DasVar_cdfName(
const DasDim* pDim, const DasVar* pVar, char* sBuf, size_t uBufLen
){
Expand Down Expand Up @@ -1219,6 +1220,38 @@ const char* DasVar_cdfName(
return sBuf;
}

/* Make a flattened namespace name for a variable. If the variable already
* exists in the CDF, the sufficies are added until it's unique
*/
const char* DasVar_cdfUniqName(
CDFid nCdfId, const DasDim* pDim, const DasVar* pVar, char* sBuf, size_t uBufLen
){
/* Start with the short name, that may be enough */
DasVar_cdfName(pDim, pVar, sBuf, uBufLen);

if( CDFconfirmzVarExistence(nCdfId, sBuf) != CDF_OK )
return sBuf;

/* Okay that's not unique. perpend the dataset group name and see if that
gets it. */
DasDs* pDs = (DasDs*) DasDesc_parent((DasDesc*)pDim);
size_t uSz = DAS_MAX_ID_BUFSZ * 2;
char sLocal[DAS_MAX_ID_BUFSZ * 2] = {'\0'};
snprintf(sLocal, uSz - 1, "%s_%s", sBuf, DasDs_group(pDs));

if( CDFconfirmzVarExistence(nCdfId, sLocal) != CDF_OK ){
strncpy(sBuf, sLocal, uBufLen - 1);
return sBuf;
}

/* We'll, add the DS ID, that will force it to be unique */
memset(sLocal, 0, uSz);
snprintf(sLocal, uSz - 1, "%s_%s_%s", sBuf, DasDs_id(pDs), DasDs_group(pDs));
strncpy(sBuf, sLocal, uBufLen - 1);
return sBuf;
}


/* Sequences pour themselves into the shape of the containing dataset
so the dataset shape is needed here */
long DasVar_cdfNonRecDims(
Expand Down Expand Up @@ -1288,8 +1321,8 @@ DasErrCode makeCdfVar(
variable ID as well as the last written record index */
DasVar_addCdfInfo(pVar);

/* add the variable's name */
DasVar_cdfName(pDim, pVar, sNameBuf, DAS_MAX_ID_BUFSZ - 1);
/* Make a name for this variable, since everything is flattened */
DasVar_cdfUniqName(pCtx->nCdfId, pDim, pVar, sNameBuf, DAS_MAX_ID_BUFSZ - 1);

das_val_type vt = DasVar_valType(pVar);
long nCharLen = 1L;
Expand All @@ -1299,7 +1332,6 @@ DasErrCode makeCdfVar(
nCharLen = aIntr[0];
}


CDFstatus iStatus = CDFcreatezVar(
pCtx->nCdfId, /* CDF File ID */
sNameBuf, /* Varible's name */
Expand Down Expand Up @@ -1655,7 +1687,9 @@ DasErrCode onDataSet(StreamDesc* pSd, int iPktId, DasDs* pDs, void* pUser)
ptrdiff_t aDsShape[DASIDX_MAX] = DASIDX_INIT_UNUSED;
int nDsRank = DasDs_shape(pDs, aDsShape);

if(daslog_level() <= DASLOG_INFO){
daslog_info_v("Creating variables for dataset %s,%s", DasDs_group(pDs), DasDs_id(pDs));

if(daslog_level() < DASLOG_INFO){
char sBuf[16000] = {'\0'};
DasDs_toStr(pDs, sBuf, 15999);
daslog_info(sBuf);
Expand Down Expand Up @@ -1745,6 +1779,14 @@ DasErrCode _writeRecVaryAry(CDFid nCdfId, DasVar* pVar, DasAry* pAry)
{
CDFstatus iStatus; /* Used by the CDF_MAD macro */

/* It's possible that we didn't get any data, for example when
a header is sent, but no actual values. If so just return okay.
*/
if(DasAry_size(pAry) == 0){
daslog_debug_v("No more data to write for array %s", DasAry_id(pAry));
return DAS_OKAY;
}

static const long indicies[DASIDX_MAX] = {0,0,0,0, 0,0,0,0};
static const long intervals[DASIDX_MAX] = {1,1,1,1, 1,1,1,1};
long counts[DASIDX_MAX] = {0,0,0,0, 0,0,0,0};
Expand Down Expand Up @@ -1841,6 +1883,9 @@ DasErrCode writeAndClearData(DasDs* pDs, struct context* pCtx)
{
ptrdiff_t aDsShape[DASIDX_MAX] = DASIDX_INIT_UNUSED;
int nDsRank = DasDs_shape(pDs, aDsShape);
daslog_info_v("Writing %zu records for dataset %s,%s",
aDsShape[0], DasDs_group(pDs), DasDs_id(pDs)
);

/* Write all the data first. Don't clear arrays as you go because
binary-op variables might depend on them! */
Expand Down Expand Up @@ -2072,7 +2117,7 @@ int main(int argc, char** argv)
if(opts.aSource[0] == '\0'){ /* Reading from standard input */
pIn = new_DasIO_cfile(PROG, stdin, "r");

/* If writing from standard input, an we need a name, just use the current time */
/* If reading from standard input, an we need a name, just use the current time */
if(bAddFileName)
_addTimeStampName(ctx.sWriteTo, LOC_PATH_LEN-1);
}
Expand Down

0 comments on commit 458eb81

Please sign in to comment.