Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #2268 - Fixes Sporadic failures of Parquet segarray_write test with gasnet #2534

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions src/ArrowFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
first = false;
}
}
if (!int_reader->HasNext()){
if (values_read != 0 && !int_reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
Expand All @@ -361,7 +361,7 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
first = false;
}
}
if (!int_reader->HasNext()){
if (values_read != 0 && !int_reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
Expand All @@ -384,7 +384,7 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
first = false;
}
}
if (!reader->HasNext()){
if (values_read != 0 && !reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
Expand All @@ -407,7 +407,7 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
first = false;
}
}
if (!bool_reader->HasNext()){
if (values_read != 0 && !bool_reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
Expand All @@ -432,7 +432,7 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
first = false;
}
}
if (!float_reader->HasNext()){
if (values_read != 0 && !float_reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
Expand All @@ -456,7 +456,7 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
first = false;
}
}
if (!dbl_reader->HasNext()){
if (values_read != 0 && !dbl_reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
Expand Down Expand Up @@ -557,7 +557,6 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
auto chpl_ptr = (int64_t*)chpl_arr;
parquet::Int64Reader* reader =
static_cast<parquet::Int64Reader*>(column_reader.get());
startIdx -= reader->Skip(startIdx);

while (reader->HasNext() && i < numElems) {
if((numElems - i) < batchSize)
Expand All @@ -569,7 +568,6 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
auto chpl_ptr = (int64_t*)chpl_arr;
parquet::Int32Reader* reader =
static_cast<parquet::Int32Reader*>(column_reader.get());
startIdx -= reader->Skip(startIdx);

int32_t* tmpArr = (int32_t*)malloc(batchSize * sizeof(int32_t));
while (reader->HasNext() && i < numElems) {
Expand Down Expand Up @@ -604,7 +602,6 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
auto chpl_ptr = (bool*)chpl_arr;
parquet::BoolReader* reader =
static_cast<parquet::BoolReader*>(column_reader.get());
startIdx -= reader->Skip(startIdx);

while (reader->HasNext() && i < numElems) {
if((numElems - i) < batchSize)
Expand All @@ -616,12 +613,10 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
auto chpl_ptr = (double*)chpl_arr;
parquet::FloatReader* reader =
static_cast<parquet::FloatReader*>(column_reader.get());
startIdx -= reader->Skip(startIdx);

while (reader->HasNext() && i < numElems) {
if((numElems - i) < batchSize) // adjust batchSize if needed
batchSize = numElems - i;

// unlike the base case, the list stores NaN values and is able to read them out.
// This prevents the need for any additional processing
// setting nullptr for def and rep levels allows use to ignore the empty segments since we only care about values here.
Expand All @@ -633,17 +628,16 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
chpl_ptr[i+j] = (double)tmpArr[j];
}
i += values_read;
delete[] tmpArr;
}
} else if(lty == ARROWDOUBLE) {
auto chpl_ptr = (double*)chpl_arr;
parquet::DoubleReader* reader =
static_cast<parquet::DoubleReader*>(column_reader.get());
startIdx -= reader->Skip(startIdx);

while (reader->HasNext() && i < numElems) {
if((numElems - i) < batchSize) // adjust batchSize if needed
batchSize = numElems - i;

// unlike the base case, the list stores NaN values and is able to read them out.
// This prevents the need for any additional processing
// setting nullptr for def and rep levels allows use to ignore the empty segments since we only care about values here.
Expand Down Expand Up @@ -783,6 +777,8 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, const char* colna
}
i++;
}
delete[] def_lvl;
delete[] rep_lvl;
}
} else if(ty == ARROWDOUBLE) {
auto chpl_ptr = (double*)chpl_arr;
Expand Down Expand Up @@ -812,6 +808,9 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, const char* colna
}
i++;
}
delete[] tmpArr;
delete[] def_lvl;
delete[] rep_lvl;
}
}
}
Expand Down Expand Up @@ -963,13 +962,15 @@ int cpp_writeMultiColToParquet(const char* filename, void* column_names,
}
int64_t valIdx = offset_ptr[offIdx];
writer->WriteBatch(segSize, def_lvl, rep_lvl, &data_ptr[valIdx]);
delete[] def_lvl;
delete[] rep_lvl;
}
else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
segSize = 1; // even though segment is length=0, write null to hold the empty segment
int16_t* def_lvl = new int16_t[segSize] { 1 };
int16_t* rep_lvl = new int16_t[segSize] { 0 };
writer->WriteBatch(segSize, def_lvl, rep_lvl, nullptr);
int16_t def_lvl = 1;
int16_t rep_lvl = 0;
writer->WriteBatch(segSize, &def_lvl, &rep_lvl, nullptr);
}
offIdx++;
count++;
Expand Down Expand Up @@ -1013,13 +1014,15 @@ int cpp_writeMultiColToParquet(const char* filename, void* column_names,
}
int64_t valIdx = offset_ptr[offIdx];
writer->WriteBatch(segSize, def_lvl, rep_lvl, &data_ptr[valIdx]);
delete[] def_lvl;
delete[] rep_lvl;
}
else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
segSize = 1; // even though segment is length=0, write null to hold the empty segment
int16_t* def_lvl = new int16_t[segSize] { 1 };
int16_t* rep_lvl = new int16_t[segSize] { 0 };
writer->WriteBatch(segSize, def_lvl, rep_lvl, nullptr);
int16_t def_lvl = 1;
int16_t rep_lvl = 0;
writer->WriteBatch(segSize, &def_lvl, &rep_lvl, nullptr);
}
offIdx++;
count++;
Expand Down Expand Up @@ -1063,13 +1066,15 @@ int cpp_writeMultiColToParquet(const char* filename, void* column_names,
}
int64_t valIdx = offset_ptr[offIdx];
writer->WriteBatch(segSize, def_lvl, rep_lvl, &data_ptr[valIdx]);
delete[] def_lvl;
delete[] rep_lvl;
}
else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
segSize = 1; // even though segment is length=0, write null to hold the empty segment
int16_t* def_lvl = new int16_t[segSize] { 1 };
int16_t* rep_lvl = new int16_t[segSize] { 0 };
writer->WriteBatch(segSize, def_lvl, rep_lvl, nullptr);
int16_t def_lvl = 1;
int16_t rep_lvl =0;
writer->WriteBatch(segSize, &def_lvl, &rep_lvl, nullptr);
}
offIdx++;
count++;
Expand Down Expand Up @@ -1230,6 +1235,11 @@ int cpp_writeColumnToParquet(const char* filename, void* chpl_arr,
int64_t i = 0;
int64_t numLeft = numelems;

if (chpl_arr == NULL) {
// early out to prevent bad memory access
return 0;
}

if(dtype == ARROWINT64 || dtype == ARROWUINT64) {
auto chpl_ptr = (int64_t*)chpl_arr;
while(numLeft > 0) {
Expand Down Expand Up @@ -1512,9 +1522,7 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
int64_t valIdx = 0; // index into chpl_arr
int64_t segIdx = 0; // index into offsets

if(dtype == ARROWINT64 || dtype == ARROWUINT64) {
auto chpl_ptr = (int64_t*)chpl_arr;

if(dtype == ARROWINT64 || dtype == ARROWUINT64) {
while(numLeft > 0) { // write all local values to the file
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::Int64Writer* writer =
Expand All @@ -1523,23 +1531,25 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
while (numLeft > 0 && count < rowGroupSize) { // ensures rowGroupSize maintained
int64_t batchSize = segments[segIdx+1] - segments[segIdx];
if (batchSize > 0) {
int16_t* def_lvl = new int16_t[batchSize] { 3 };
auto chpl_ptr = (int64_t*)chpl_arr;
int16_t* def_lvl = new int16_t[batchSize] { 3 }; // all values defined at the item level (3)
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
for (int64_t x = 0; x < batchSize; x++){
// if the value is first in the segment rep_lvl = 0, otherwise 1
// all values defined at the item level (3)
rep_lvl[x] = (x == 0) ? 0 : 1;
def_lvl[x] = 3;
}
writer->WriteBatch(batchSize, def_lvl, rep_lvl, &chpl_ptr[valIdx]);
valIdx += batchSize;
delete[] def_lvl;
delete[] rep_lvl;
}
else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
batchSize = 1; // even though segment is length=0, write null to hold the empty segment
int16_t* def_lvl = new int16_t[batchSize] { 1 };
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
writer->WriteBatch(batchSize, def_lvl, rep_lvl, nullptr);
int16_t def_lvl = 1;
int16_t rep_lvl = 0;
writer->WriteBatch(batchSize, &def_lvl, &rep_lvl, nullptr);
}
count++;
segIdx++;
Expand All @@ -1548,8 +1558,6 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
}
}
else if (dtype == ARROWBOOLEAN) {
auto chpl_ptr = (bool*)chpl_arr;

while(numLeft > 0) {
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::BoolWriter* writer =
Expand All @@ -1558,6 +1566,7 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
while (numLeft > 0 && count < rowGroupSize) {
int64_t batchSize = segments[segIdx+1] - segments[segIdx];
if (batchSize > 0) {
auto chpl_ptr = (bool*)chpl_arr;
// if the value is first in the segment rep_lvl = 0, otherwise 1
// all values defined at the item level (3)
int16_t* def_lvl = new int16_t[batchSize] { 3 };
Expand All @@ -1568,23 +1577,23 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
}
writer->WriteBatch(batchSize, def_lvl, rep_lvl, &chpl_ptr[valIdx]);
valIdx += batchSize;
delete[] def_lvl;
delete[] rep_lvl;
}
else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
batchSize = 1; // even though segment is length=0, write null to hold the empty segment
int16_t* def_lvl = new int16_t[batchSize] { 1 };
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
writer->WriteBatch(batchSize, def_lvl, rep_lvl, nullptr);
int16_t def_lvl = 1;
int16_t rep_lvl = 0;
writer->WriteBatch(batchSize, &def_lvl, &rep_lvl, nullptr);
}
count++;
segIdx++;
numLeft--;
}
}
}
else if (dtype == ARROWDOUBLE) {
auto chpl_ptr = (double*)chpl_arr;

else if (dtype == ARROWDOUBLE) {
while(numLeft > 0) {
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::DoubleWriter* writer =
Expand All @@ -1593,6 +1602,7 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
while (numLeft > 0 && count < rowGroupSize) {
int64_t batchSize = segments[segIdx+1] - segments[segIdx];
if (batchSize > 0) {
auto chpl_ptr = (double*)chpl_arr;
// if the value is first in the segment rep_lvl = 0, otherwise 1
// all values defined at the item level (3)
int16_t* def_lvl = new int16_t[batchSize] { 3 };
Expand All @@ -1603,13 +1613,15 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* ch
}
writer->WriteBatch(batchSize, def_lvl, rep_lvl, &chpl_ptr[valIdx]);
valIdx += batchSize;
delete[] def_lvl;
delete[] rep_lvl;
}
else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
batchSize = 1; // even though segment is length=0, write null to hold the empty segment
int16_t* def_lvl = new int16_t[batchSize] { 1 };
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
writer->WriteBatch(batchSize, def_lvl, rep_lvl, nullptr);
int16_t def_lvl = 1;
int16_t rep_lvl = 0;
writer->WriteBatch(batchSize, &def_lvl, &rep_lvl, nullptr);
}
count++;
segIdx++;
Expand Down Expand Up @@ -1752,6 +1764,10 @@ int cpp_appendColumnToParquet(const char* filename, void* chpl_arr,
int64_t dtype, int64_t compression,
char** errMsg) {
try {
if (chpl_arr == NULL){
// early out to prevent bad memory access
return 0;
}
std::shared_ptr<arrow::io::ReadableFile> infile;
ARROWRESULT_OK(arrow::io::ReadableFile::Open(filename, arrow::default_memory_pool()),
infile);
Expand Down
Loading