From 0235498abf899e295d5f5d0c92ef191598340805 Mon Sep 17 00:00:00 2001 From: Minggang Yu Date: Thu, 22 Jun 2017 15:54:50 -0700 Subject: [PATCH] Convert empty ResultSet into empty arrow::PoolBuffer. --- QueryEngine/ResultSetConversion.cpp | 47 ++++++++++++++++++----------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/QueryEngine/ResultSetConversion.cpp b/QueryEngine/ResultSetConversion.cpp index 9c24f4a235..8c667a25f2 100644 --- a/QueryEngine/ResultSetConversion.cpp +++ b/QueryEngine/ResultSetConversion.cpp @@ -300,6 +300,7 @@ std::vector> generate_columns( CHECK_GT(col_count, 0); std::vector> columns(col_count, nullptr); auto generate = [&](const size_t i, std::shared_ptr& column) { + CHECK(column_values[i]); column = generate_column(*fields[i], null_bitmaps[i], *column_values[i]); }; if (col_count > 1) { @@ -441,6 +442,9 @@ void print_serialized_schema(const uint8_t* data, const size_t length) { std::shared_ptr serialize_arrow_records(const RecordBatch& rb, ipc::FileBlock& block) { int64_t rb_sz = 0; + if (!rb.num_rows()) { + return std::make_shared(); + } ASSERT_OK(ipc::GetRecordBatchSize(rb, &rb_sz)); auto pool = default_memory_pool(); auto buffer = std::make_shared(pool); @@ -464,6 +468,13 @@ void print_serialized_records(const uint8_t* data, const size_t length, const ipc::FileBlock& block, const std::shared_ptr& schema) { + if (data == nullptr || !length) { + std::cout << "No row found" << std::endl; + return; + } + + CHECK_GT(block.metadata_length, int32_t(0)); + CHECK_GT(block.body_length, int64_t(0)); const auto payload = std::make_shared(data, length); auto buffer_reader = std::make_shared(payload); @@ -650,7 +661,9 @@ std::pair>, size_t> ResultSet::getArro row_count = fetch(column_values, null_bitmaps, size_t(0), entry_count); } - return {generate_columns(fields, null_bitmaps, column_values), row_count}; + return {row_count > 0 ? generate_columns(fields, null_bitmaps, column_values) + : std::vector>(), + row_count}; } arrow::RecordBatch ResultSet::convertToArrow(const std::vector& col_names, @@ -702,7 +715,6 @@ ArrowResult ResultSet::getArrowCopyOnCpu(Data_Namespace::DataMgr* data_mgr, arrow::ipc::FileBlock block{0, 0, 0}; auto serialized_records = arrow::serialize_arrow_records(arrow_copy, block); const auto record_key = arrow::get_and_copy_to_shm(serialized_records); - CHECK(record_key != IPC_PRIVATE); std::vector record_handle_buffer(sizeof(key_t), 0); memcpy(&record_handle_buffer[0], reinterpret_cast(&record_key), sizeof(key_t)); @@ -733,22 +745,23 @@ ArrowResult ResultSet::getArrowCopyOnGpu(Data_Namespace::DataMgr* data_mgr, #ifdef HAVE_CUDA arrow::ipc::FileBlock block{0, 0, 0}; auto serialized_records = arrow::serialize_arrow_records(arrow_copy, block); - CHECK(data_mgr && data_mgr->cudaMgr_); - auto dev_ptr = - reinterpret_cast(data_mgr->cudaMgr_->allocateDeviceMem(serialized_records->size(), device_id)); - CUipcMemHandle record_handle; - cuIpcGetMemHandle(&record_handle, dev_ptr); - data_mgr->cudaMgr_->copyHostToDevice(reinterpret_cast(dev_ptr), - reinterpret_cast(serialized_records->data()), - serialized_records->size(), - device_id); - std::vector record_handle_buffer(sizeof(record_handle), 0); - memcpy(&record_handle_buffer[0], reinterpret_cast(&record_handle), sizeof(CUipcMemHandle)); - - return {schema_handle_buffer, serialized_schema->size(), record_handle_buffer, serialized_records->size()}; -#else - return {schema_handle_buffer, serialized_schema->size(), {}, 0}; + if (serialized_records->size()) { + CHECK(data_mgr && data_mgr->cudaMgr_); + auto dev_ptr = + reinterpret_cast(data_mgr->cudaMgr_->allocateDeviceMem(serialized_records->size(), device_id)); + CUipcMemHandle record_handle; + cuIpcGetMemHandle(&record_handle, dev_ptr); + data_mgr->cudaMgr_->copyHostToDevice(reinterpret_cast(dev_ptr), + reinterpret_cast(serialized_records->data()), + serialized_records->size(), + device_id); + std::vector record_handle_buffer(sizeof(record_handle), 0); + memcpy(&record_handle_buffer[0], reinterpret_cast(&record_handle), sizeof(CUipcMemHandle)); + + return {schema_handle_buffer, serialized_schema->size(), record_handle_buffer, serialized_records->size()}; + } #endif + return {schema_handle_buffer, serialized_schema->size(), {}, 0}; } ArrowResult ResultSet::getArrowCopy(Data_Namespace::DataMgr* data_mgr,