Skip to content

Commit

Permalink
Convert empty ResultSet into empty arrow::PoolBuffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1mc authored and asuhan committed Jun 29, 2017
1 parent baf2945 commit 0235498
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions QueryEngine/ResultSetConversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ std::vector<std::shared_ptr<Array>> generate_columns(
CHECK_GT(col_count, 0);
std::vector<std::shared_ptr<arrow::Array>> columns(col_count, nullptr);
auto generate = [&](const size_t i, std::shared_ptr<arrow::Array>& column) {
CHECK(column_values[i]);
column = generate_column(*fields[i], null_bitmaps[i], *column_values[i]);
};
if (col_count > 1) {
Expand Down Expand Up @@ -441,6 +442,9 @@ void print_serialized_schema(const uint8_t* data, const size_t length) {

std::shared_ptr<PoolBuffer> serialize_arrow_records(const RecordBatch& rb, ipc::FileBlock& block) {
int64_t rb_sz = 0;
if (!rb.num_rows()) {
return std::make_shared<PoolBuffer>();
}
ASSERT_OK(ipc::GetRecordBatchSize(rb, &rb_sz));
auto pool = default_memory_pool();
auto buffer = std::make_shared<PoolBuffer>(pool);
Expand All @@ -464,6 +468,13 @@ void print_serialized_records(const uint8_t* data,
const size_t length,
const ipc::FileBlock& block,
const std::shared_ptr<Schema>& schema) {
if (data == nullptr || !length) {
std::cout << "No row found" << std::endl;
return;
}

CHECK_GT(block.metadata_length, int32_t(0));
CHECK_GT(block.body_length, int64_t(0));
const auto payload = std::make_shared<arrow::Buffer>(data, length);
auto buffer_reader = std::make_shared<io::BufferReader>(payload);

Expand Down Expand Up @@ -650,7 +661,9 @@ std::pair<std::vector<std::shared_ptr<arrow::Array>>, size_t> ResultSet::getArro
row_count = fetch(column_values, null_bitmaps, size_t(0), entry_count);
}

return {generate_columns(fields, null_bitmaps, column_values), row_count};
return {row_count > 0 ? generate_columns(fields, null_bitmaps, column_values)
: std::vector<std::shared_ptr<arrow::Array>>(),
row_count};
}

arrow::RecordBatch ResultSet::convertToArrow(const std::vector<std::string>& col_names,
Expand Down Expand Up @@ -702,7 +715,6 @@ ArrowResult ResultSet::getArrowCopyOnCpu(Data_Namespace::DataMgr* data_mgr,
arrow::ipc::FileBlock block{0, 0, 0};
auto serialized_records = arrow::serialize_arrow_records(arrow_copy, block);
const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
CHECK(record_key != IPC_PRIVATE);
std::vector<char> record_handle_buffer(sizeof(key_t), 0);
memcpy(&record_handle_buffer[0], reinterpret_cast<const unsigned char*>(&record_key), sizeof(key_t));

Expand Down Expand Up @@ -733,22 +745,23 @@ ArrowResult ResultSet::getArrowCopyOnGpu(Data_Namespace::DataMgr* data_mgr,
#ifdef HAVE_CUDA
arrow::ipc::FileBlock block{0, 0, 0};
auto serialized_records = arrow::serialize_arrow_records(arrow_copy, block);
CHECK(data_mgr && data_mgr->cudaMgr_);
auto dev_ptr =
reinterpret_cast<CUdeviceptr>(data_mgr->cudaMgr_->allocateDeviceMem(serialized_records->size(), device_id));
CUipcMemHandle record_handle;
cuIpcGetMemHandle(&record_handle, dev_ptr);
data_mgr->cudaMgr_->copyHostToDevice(reinterpret_cast<int8_t*>(dev_ptr),
reinterpret_cast<const int8_t*>(serialized_records->data()),
serialized_records->size(),
device_id);
std::vector<char> record_handle_buffer(sizeof(record_handle), 0);
memcpy(&record_handle_buffer[0], reinterpret_cast<unsigned char*>(&record_handle), sizeof(CUipcMemHandle));

return {schema_handle_buffer, serialized_schema->size(), record_handle_buffer, serialized_records->size()};
#else
return {schema_handle_buffer, serialized_schema->size(), {}, 0};
if (serialized_records->size()) {
CHECK(data_mgr && data_mgr->cudaMgr_);
auto dev_ptr =
reinterpret_cast<CUdeviceptr>(data_mgr->cudaMgr_->allocateDeviceMem(serialized_records->size(), device_id));
CUipcMemHandle record_handle;
cuIpcGetMemHandle(&record_handle, dev_ptr);
data_mgr->cudaMgr_->copyHostToDevice(reinterpret_cast<int8_t*>(dev_ptr),
reinterpret_cast<const int8_t*>(serialized_records->data()),
serialized_records->size(),
device_id);
std::vector<char> record_handle_buffer(sizeof(record_handle), 0);
memcpy(&record_handle_buffer[0], reinterpret_cast<unsigned char*>(&record_handle), sizeof(CUipcMemHandle));

return {schema_handle_buffer, serialized_schema->size(), record_handle_buffer, serialized_records->size()};
}
#endif
return {schema_handle_buffer, serialized_schema->size(), {}, 0};
}

ArrowResult ResultSet::getArrowCopy(Data_Namespace::DataMgr* data_mgr,
Expand Down

0 comments on commit 0235498

Please sign in to comment.