Skip to content

Commit

Permalink
PARQUET-1716: [C++] Add BYTE_STREAM_SPLIT encoder and decoder
Browse files Browse the repository at this point in the history
The patch implements an encoder and decoder for Parquet's
BYTE_STREAM_SPLIT encoding. The patch also adds tests for
the new encoding.

Closes #6005 from martinradev/byte_stream_split_submit and squashes the following commits:

5a78f8b <Martin Radev> ARROW-5913:  Add BYTE_STREAM_SPLIT encoder and decoder

Authored-by: Martin Radev <martin.b.radev@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
martinradev authored and kszucs committed Feb 7, 2020
1 parent 2f3ff80 commit 810b9c1
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,12 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::BYTE_STREAM_SPLIT: {
auto decoder = MakeTypedDecoder<DType>(Encoding::BYTE_STREAM_SPLIT, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

Expand Down
188 changes: 188 additions & 0 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,104 @@ void DictEncoderImpl<ByteArrayType>::PutDictionary(const arrow::Array& values) {
}
}

// ----------------------------------------------------------------------
// ByteStreamSplitEncoder<T> implementations

template <typename DType>
class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
using TypedEncoder<DType>::Put;

explicit ByteStreamSplitEncoder(
const ColumnDescriptor* descr,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;

void Put(const T* buffer, int num_values) override;
void Put(const arrow::Array& values) override;
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override;

protected:
arrow::TypedBufferBuilder<T> values_;

private:
void PutArrowArray(const arrow::Array& values);
};

template <typename DType>
ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool)
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), values_{pool} {}

template <typename DType>
int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
return values_.length() * sizeof(T);
}

template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
constexpr size_t num_streams = sizeof(T);
std::shared_ptr<ResizableBuffer> output_buffer =
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const size_t num_values = values_.length();
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
for (size_t i = 0; i < num_values; ++i) {
for (size_t j = 0U; j < num_streams; ++j) {
const uint8_t byte_in_value = raw_values[i * num_streams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
}
values_.Reset();
return std::move(output_buffer);
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
PARQUET_THROW_NOT_OK(values_.Append(buffer, num_values));
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::Put(const ::arrow::Array& values) {
PutArrowArray(values);
}

template <>
void ByteStreamSplitEncoder<FloatType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<arrow::FloatArray>(values,
reinterpret_cast<arrow::BufferBuilder*>(&values_));
}

template <>
void ByteStreamSplitEncoder<DoubleType>::PutArrowArray(const ::arrow::Array& values) {
DirectPutImpl<arrow::DoubleArray>(values,
reinterpret_cast<arrow::BufferBuilder*>(&values_));
}

template <typename DType>
void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
const uint8_t* valid_bits,
int64_t valid_bits_offset) {
std::shared_ptr<ResizableBuffer> buffer;
PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
num_values * sizeof(T), &buffer));
int32_t num_valid_values = 0;
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
T* data = reinterpret_cast<T*>(buffer->mutable_data());
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
data[num_valid_values++] = src[i];
}
valid_bits_reader.Next();
}
Put(data, num_valid_values);
}

// ----------------------------------------------------------------------
// Encoder and decoder factory functions

Expand Down Expand Up @@ -863,6 +961,18 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
DCHECK(false) << "Encoder not implemented";
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::FLOAT:
return std::unique_ptr<Encoder>(
new ByteStreamSplitEncoder<FloatType>(descr, pool));
case Type::DOUBLE:
return std::unique_ptr<Encoder>(
new ByteStreamSplitEncoder<DoubleType>(descr, pool));
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down Expand Up @@ -2236,6 +2346,74 @@ class DeltaByteArrayDecoder : public DecoderImpl,
ByteArray last_value_;
};

// ----------------------------------------------------------------------
// BYTE_STREAM_SPLIT

template <typename DType>
class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
using T = typename DType::c_type;
explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);

int Decode(T* buffer, int max_values) override;

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) override;

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) override;

void SetData(int num_values, const uint8_t* data, int len) override;

private:
int num_values_in_buffer{0U};
};

template <typename DType>
ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}

template <typename DType>
void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
int len) {
DecoderImpl::SetData(num_values, data, len);
num_values_in_buffer = num_values;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
constexpr size_t num_streams = sizeof(T);
const int values_to_decode = std::min(num_values_, max_values);
const int num_decoded_previously = num_values_in_buffer - num_values_;
for (int i = 0; i < values_to_decode; ++i) {
uint8_t gathered_byte_data[num_streams];
for (size_t b = 0; b < num_streams; ++b) {
const size_t byte_index = b * num_values_in_buffer + num_decoded_previously + i;
gathered_byte_data[b] = data_[byte_index];
}
buffer[i] = arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]);
}
num_values_ -= values_to_decode;
len_ -= sizeof(T) * values_to_decode;
return values_to_decode;
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* builder) {
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
}

template <typename DType>
int ByteStreamSplitDecoder<DType>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
typename EncodingTraits<DType>::DictAccumulator* builder) {
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
}

// ----------------------------------------------------------------------

std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
Expand All @@ -2261,6 +2439,16 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
default:
break;
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
case Type::FLOAT:
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<FloatType>(descr));
case Type::DOUBLE:
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<DoubleType>(descr));
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down
Loading

0 comments on commit 810b9c1

Please sign in to comment.