Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions cpp/src/arrow/ipc/feather-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,57 @@ TEST_F(TestTableWriter, PrimitiveNullRoundTrip) {
}
}

class TestTableWriterSlice : public TestTableWriter,
public ::testing::WithParamInterface<std::tuple<int, int>> {
public:
void CheckSlice(std::shared_ptr<RecordBatch> batch) {
auto p = GetParam();
auto start = std::get<0>(p);
auto size = std::get<1>(p);

batch = batch->Slice(start, size);

ASSERT_OK(writer_->Append("f0", *batch->column(0)));
ASSERT_OK(writer_->Append("f1", *batch->column(1)));
Finish();

std::shared_ptr<Column> col;
ASSERT_OK(reader_->GetColumn(0, &col));
ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(0)));
ASSERT_EQ("f0", col->name());

ASSERT_OK(reader_->GetColumn(1, &col));
ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(1)));
ASSERT_EQ("f1", col->name());
}
};

TEST_P(TestTableWriterSlice, SliceRoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntBatchSized(600, &batch));
CheckSlice(batch);
}

TEST_P(TestTableWriterSlice, SliceStringsRoundTrip) {
auto p = GetParam();
auto start = std::get<0>(p);
auto with_nulls = start % 2 == 0;
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeStringTypesRecordBatch(&batch, with_nulls));
CheckSlice(batch);
}

TEST_P(TestTableWriterSlice, SliceBooleanRoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeBooleanBatchSized(600, &batch));
CheckSlice(batch);
}

INSTANTIATE_TEST_CASE_P(TestTableWriterSliceOffsets, TestTableWriterSlice,
::testing::Combine(::testing::Values(0, 1, 300, 301, 302, 303,
304, 305, 306, 307),
::testing::Values(0, 1, 7, 8, 30, 32, 100)));

} // namespace feather
} // namespace ipc
} // namespace arrow
63 changes: 50 additions & 13 deletions cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,43 @@ static Status WritePadded(io::OutputStream* stream, const uint8_t* data, int64_t
return Status::OK();
}

static Status WritePaddedWithOffset(io::OutputStream* stream, const uint8_t* data,
int64_t bit_offset, const int64_t length,
int64_t* bytes_written) {
data = data + bit_offset / 8;
uint8_t bit_shift = static_cast<uint8_t>(bit_offset % 8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast to int8 necessary here? (for performance perhaps?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, but I think I got some warning, maybe with clang.

if (bit_offset == 0) {
RETURN_NOT_OK(stream->Write(data, length));
} else {
constexpr int64_t buffersize = 256;
uint8_t buffer[buffersize];
const uint8_t lshift = static_cast<uint8_t>(8 - bit_shift);
const uint8_t* buffer_end = buffer + buffersize;
uint8_t* buffer_it = buffer;

for (const uint8_t* end = data + length; data != end;) {
uint8_t r = static_cast<uint8_t>(*data++ >> bit_shift);
uint8_t l = static_cast<uint8_t>(*data << lshift);
uint8_t value = l | r;
*buffer_it++ = value;
if (buffer_it == buffer_end) {
RETURN_NOT_OK(stream->Write(buffer, buffersize));
buffer_it = buffer;
}
}
if (buffer_it != buffer) {
RETURN_NOT_OK(stream->Write(buffer, buffer_it - buffer));
}
}

int64_t remainder = PaddedLength(length) - length;
if (remainder != 0) {
RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder));
}
*bytes_written = length + remainder;
return Status::OK();
}

/// For compability, we need to write any data sometimes just to keep producing
/// files that can be read with an older reader.
static Status WritePaddedBlank(io::OutputStream* stream, int64_t length,
Expand Down Expand Up @@ -554,19 +591,22 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {

// Write the null bitmask
if (values.null_count() > 0) {
// We assume there is one bit for each value in values.nulls, aligned on a
// byte boundary, and we write this much data into the stream
// We assume there is one bit for each value in values.nulls,
// starting at the zero offset.
int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(values.length()));
if (values.null_bitmap()) {
RETURN_NOT_OK(WritePadded(stream_.get(), values.null_bitmap()->data(),
null_bitmap_size, &bytes_written));
auto null_bitmap = values.null_bitmap();
RETURN_NOT_OK(WritePaddedWithOffset(stream_.get(), null_bitmap->data(),
values.offset(), null_bitmap_size,
&bytes_written));
} else {
RETURN_NOT_OK(WritePaddedBlank(stream_.get(), null_bitmap_size, &bytes_written));
}
meta->total_bytes += bytes_written;
}

int64_t values_bytes = 0;
int64_t bit_offset = 0;

const uint8_t* values_buffer = nullptr;

Expand Down Expand Up @@ -595,20 +635,17 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
const auto& prim_values = static_cast<const PrimitiveArray&>(values);
const auto& fw_type = static_cast<const FixedWidthType&>(*values.type());

if (values.type_id() == Type::BOOL) {
// Booleans are bit-packed
values_bytes = BitUtil::BytesForBits(values.length());
} else {
values_bytes = values.length() * fw_type.bit_width() / 8;
}
values_bytes = BitUtil::BytesForBits(values.length() * fw_type.bit_width());

if (prim_values.values()) {
values_buffer = prim_values.values()->data();
values_buffer = prim_values.values()->data() +
(prim_values.offset() * fw_type.bit_width() / 8);
bit_offset = (prim_values.offset() * fw_type.bit_width()) % 8;
}
}
if (values_buffer) {
RETURN_NOT_OK(
WritePadded(stream_.get(), values_buffer, values_bytes, &bytes_written));
RETURN_NOT_OK(WritePaddedWithOffset(stream_.get(), values_buffer, bit_offset,
values_bytes, &bytes_written));
} else {
RETURN_NOT_OK(WritePaddedBlank(stream_.get(), values_bytes, &bytes_written));
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/ipc/ipc-json-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
#define BATCH_CASES() \
::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
&MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, \
&MakeStringTypesRecordBatch, &MakeStruct, &MakeUnion, &MakeDates, \
&MakeTimestamps, &MakeTimes, &MakeFWBinary, &MakeDecimal, \
&MakeDictionary);
&MakeStringTypesRecordBatchWithNulls, &MakeStruct, &MakeUnion, \
&MakeDates, &MakeTimestamps, &MakeTimes, &MakeFWBinary, \
&MakeDecimal, &MakeDictionary);

class TestJsonRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*> {
public:
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/ipc-read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ TEST_F(TestSchemaMetadata, NestedFields) {
#define BATCH_CASES() \
::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
&MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, \
&MakeStringTypesRecordBatch, &MakeStruct, &MakeUnion, \
&MakeStringTypesRecordBatchWithNulls, &MakeStruct, &MakeUnion, \
&MakeDictionary, &MakeDates, &MakeTimestamps, &MakeTimes, \
&MakeFWBinary, &MakeNull, &MakeDecimal, &MakeBooleanBatch);

Expand Down
29 changes: 26 additions & 3 deletions cpp/src/arrow/ipc/test-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,24 @@ Status MakeRandomBinaryArray(int64_t length, bool include_nulls, MemoryPool* poo
return builder.Finish(out);
}

Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
template <class Builder, class RawType>
Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
MemoryPool* pool, std::shared_ptr<Array>* out) {
Builder builder(pool);
for (int64_t i = 0; i < length; ++i) {
if (include_nulls && (i % 7 == 0)) {
RETURN_NOT_OK(builder.AppendNull());
} else {
const std::string value = std::to_string(i);
RETURN_NOT_OK(builder.Append(reinterpret_cast<const RawType*>(value.data()),
static_cast<int32_t>(value.size())));
}
}
return builder.Finish(out);
}

Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out,
bool with_nulls = true) {
const int64_t length = 500;
auto string_type = utf8();
auto binary_type = binary();
Expand All @@ -244,18 +261,24 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {

// Quirk with RETURN_NOT_OK macro and templated functions
{
auto s = MakeRandomBinaryArray<StringBuilder, char>(length, true, pool, &a0);
auto s = MakeBinaryArrayWithUniqueValues<StringBuilder, char>(length, with_nulls,
pool, &a0);
RETURN_NOT_OK(s);
}

{
auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, true, pool, &a1);
auto s = MakeBinaryArrayWithUniqueValues<BinaryBuilder, uint8_t>(length, with_nulls,
pool, &a1);
RETURN_NOT_OK(s);
}
*out = RecordBatch::Make(schema, length, {a0, a1});
return Status::OK();
}

Status MakeStringTypesRecordBatchWithNulls(std::shared_ptr<RecordBatch>* out) {
return MakeStringTypesRecordBatch(out, true);
}

Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
const int64_t length = 500;
auto f0 = field("f0", null());
Expand Down