Skip to content

Commit

Permalink
Merge branch 'apache:main' into apacheGH-38246
Browse files Browse the repository at this point in the history
  • Loading branch information
xxlaykxx committed Oct 14, 2023
2 parents 62c9899 + c5bce96 commit cafa892
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
33 changes: 33 additions & 0 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <utility>

#include "arrow/array.h"
#include "arrow/array/concatenate.h"
#include "arrow/array/validate.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -432,4 +433,36 @@ RecordBatchReader::~RecordBatchReader() {
ARROW_WARN_NOT_OK(this->Close(), "Implicitly called RecordBatchReader::Close failed");
}

Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
const RecordBatchVector& batches, MemoryPool* pool) {
int64_t length = 0;
size_t n = batches.size();
if (n == 0) {
return Status::Invalid("Must pass at least one recordbatch");
}
int cols = batches[0]->num_columns();
auto schema = batches[0]->schema();
for (size_t i = 0; i < batches.size(); ++i) {
length += batches[i]->num_rows();
if (!schema->Equals(batches[i]->schema())) {
return Status::Invalid(
"Schema of RecordBatch index ", i, " is ", batches[i]->schema()->ToString(),
", which does not match index 0 recordbatch schema: ", schema->ToString());
}
}

std::vector<std::shared_ptr<Array>> concatenated_columns;
concatenated_columns.reserve(cols);
for (int col = 0; col < cols; ++col) {
ArrayVector column_arrays;
column_arrays.reserve(batches.size());
for (const auto& batch : batches) {
column_arrays.emplace_back(batch->column(col));
}
ARROW_ASSIGN_OR_RAISE(auto concatenated_column, Concatenate(column_arrays, pool))
concatenated_columns.emplace_back(std::move(concatenated_column));
}
return RecordBatch::Make(std::move(schema), length, std::move(concatenated_columns));
}

} // namespace arrow
14 changes: 14 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,18 @@ class ARROW_EXPORT RecordBatchReader {
Iterator<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<Schema> schema);
};

/// \brief Concatenate record batches
///
/// The columns of the new batch are formed by concatenate the same columns of each input
/// batch. Concatenate multiple batches into a new batch requires that the schema must be
/// consistent. It supports merging batches without columns (only length, scenarios such
/// as count(*)).
///
/// \param[in] batches a vector of record batches to be concatenated
/// \param[in] pool memory to store the result will be allocated from this memory pool
/// \return the concatenated record batch
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool());

} // namespace arrow
37 changes: 37 additions & 0 deletions cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,4 +555,41 @@ TEST_F(TestRecordBatch, ReplaceSchema) {
ASSERT_RAISES(Invalid, b1->ReplaceSchema(schema));
}

TEST_F(TestRecordBatch, ConcatenateRecordBatches) {
int length = 10;

auto f0 = field("f0", int32());
auto f1 = field("f1", uint8());

auto schema = ::arrow::schema({f0, f1});

random::RandomArrayGenerator gen(42);

auto b1 = gen.BatchOf(schema->fields(), length);

length = 5;

auto b2 = gen.BatchOf(schema->fields(), length);

ASSERT_OK_AND_ASSIGN(auto batch, ConcatenateRecordBatches({b1, b2}));
ASSERT_EQ(batch->num_rows(), b1->num_rows() + b2->num_rows());
ASSERT_BATCHES_EQUAL(*batch->Slice(0, b1->num_rows()), *b1);
ASSERT_BATCHES_EQUAL(*batch->Slice(b1->num_rows()), *b2);

f0 = field("fd0", int32());
f1 = field("fd1", uint8());

schema = ::arrow::schema({f0, f1});

auto b3 = gen.BatchOf(schema->fields(), length);

ASSERT_RAISES(Invalid, ConcatenateRecordBatches({b1, b3}));

auto null_batch = RecordBatch::Make(::arrow::schema({}), length,
std::vector<std::shared_ptr<ArrayData>>{});
ASSERT_OK_AND_ASSIGN(batch, ConcatenateRecordBatches({null_batch}));
ASSERT_EQ(batch->num_rows(), null_batch->num_rows());
ASSERT_BATCHES_EQUAL(*batch, *null_batch);
}

} // namespace arrow

0 comments on commit cafa892

Please sign in to comment.