Skip to content

Commit

Permalink
PARQUET-1828: [C++] Use SSE2 for the ByteStreamSplit encoder
Browse files Browse the repository at this point in the history
The ByteStreamSplit encoder can benefit from using SSE2 intrinsics
to speed-up data processing since most of the data is likey in the
cache.

Benchmark results on Sandy Bridge 3930k:
BM_ByteStreamSplitEncode_Float_Scalar/1024          264 ns          264 ns      2679735 bytes_per_second=14.4706G/s
BM_ByteStreamSplitEncode_Float_Scalar/4096         1003 ns         1003 ns       688811 bytes_per_second=15.2114G/s
BM_ByteStreamSplitEncode_Float_Scalar/32768       11933 ns        11926 ns        59187 bytes_per_second=10.2353G/s
BM_ByteStreamSplitEncode_Float_Scalar/65536       28137 ns        28137 ns        24634 bytes_per_second=8.67699G/s
BM_ByteStreamSplitEncode_Double_Scalar/1024        2601 ns         2599 ns       266977 bytes_per_second=2.93583G/s
BM_ByteStreamSplitEncode_Double_Scalar/4096       32408 ns        32408 ns        21594 bytes_per_second=964.268M/s
BM_ByteStreamSplitEncode_Double_Scalar/32768     228019 ns       227850 ns         3079 bytes_per_second=1097.21M/s
BM_ByteStreamSplitEncode_Double_Scalar/65536     475051 ns       475051 ns         1477 bytes_per_second=1052.52M/s
BM_ByteStreamSplitEncode_Float_SSE2/1024            219 ns          219 ns      3156405 bytes_per_second=17.4093G/s
BM_ByteStreamSplitEncode_Float_SSE2/4096            860 ns          860 ns       796082 bytes_per_second=17.7457G/s
BM_ByteStreamSplitEncode_Float_SSE2/32768         11556 ns        11556 ns        61763 bytes_per_second=10.5632G/s
BM_ByteStreamSplitEncode_Float_SSE2/65536         27759 ns        27758 ns        25234 bytes_per_second=8.7953G/s
BM_ByteStreamSplitEncode_Double_SSE2/1024           433 ns          433 ns      1596315 bytes_per_second=17.6216G/s
BM_ByteStreamSplitEncode_Double_SSE2/4096          3358 ns         3358 ns       205874 bytes_per_second=9.08763G/s
BM_ByteStreamSplitEncode_Double_SSE2/32768        33812 ns        33812 ns        20505 bytes_per_second=7.22053G/s
BM_ByteStreamSplitEncode_Double_SSE2/65536        68419 ns        68417 ns        10078 bytes_per_second=7.13682G/s

Closes #6723 from martinradev/byte_stream_split_submit

Authored-by: Martin Radev <martin.b.radev@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
martinradev authored and pitrou committed Mar 30, 2020
1 parent ef79a2f commit da94098
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 25 deletions.
105 changes: 101 additions & 4 deletions cpp/src/arrow/util/byte_stream_split.h
Expand Up @@ -31,8 +31,8 @@ namespace internal {
#if defined(ARROW_HAVE_SSE2)

template <typename T>
void ByteStreamSlitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t stride,
T* out) {
void ByteStreamSplitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t stride,
T* out) {
constexpr size_t kNumStreams = sizeof(T);
static_assert(kNumStreams == 4U || kNumStreams == 8U, "Invalid number of streams.");
constexpr size_t kNumStreamsLog2 = (kNumStreams == 8U ? 3U : 2U);
Expand Down Expand Up @@ -84,11 +84,108 @@ void ByteStreamSlitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t s
}
}

template <typename T>
void ByteStreamSplitEncodeSSE2(const uint8_t* raw_values, const size_t num_values,
uint8_t* output_buffer_raw) {
constexpr size_t num_streams = sizeof(T);
static_assert(num_streams == 4U || num_streams == 8U, "Invalid number of streams.");
__m128i stage[3][num_streams];
__m128i final_result[num_streams];

const size_t size = num_values * sizeof(T);
const size_t block_size = sizeof(__m128i) * num_streams;
const size_t num_blocks = size / block_size;
const __m128i* raw_values_sse = reinterpret_cast<const __m128i*>(raw_values);
__m128i* output_buffer_streams[num_streams];
for (size_t i = 0; i < num_streams; ++i) {
output_buffer_streams[i] =
reinterpret_cast<__m128i*>(&output_buffer_raw[num_values * i]);
}

const size_t num_processed_elements = (num_blocks * block_size) / sizeof(T);
for (size_t i = num_processed_elements; i < num_values; ++i) {
for (size_t j = 0U; j < num_streams; ++j) {
const uint8_t byte_in_value = raw_values[i * num_streams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
}
// The current shuffling algorithm diverges for float and double types but the compiler
// should be able to remove the branch since only one path is taken for each template
// instantiation.
// Example run for floats:
// Step 0, copy:
// 0: ABCD ABCD ABCD ABCD 1: ABCD ABCD ABCD ABCD ...
// Step 1: _mm_unpacklo_epi8 and mm_unpackhi_epi8:
// 0: AABB CCDD AABB CCDD 1: AABB CCDD AABB CCDD ...
// 0: AAAA BBBB CCCC DDDD 1: AAAA BBBB CCCC DDDD ...
// Step 3: __mm_unpacklo_epi8 and _mm_unpackhi_epi8:
// 0: AAAA AAAA BBBB BBBB 1: CCCC CCCC DDDD DDDD ...
// Step 4: __mm_unpacklo_epi64 and _mm_unpackhi_epi64:
// 0: AAAA AAAA AAAA AAAA 1: BBBB BBBB BBBB BBBB ...
for (size_t block_index = 0; block_index < num_blocks; ++block_index) {
// First copy the data to stage 0.
for (size_t i = 0; i < num_streams; ++i) {
stage[0][i] = _mm_loadu_si128(&raw_values_sse[block_index * num_streams + i]);
}

// The shuffling of bytes is performed through the unpack intrinsics.
// In my measurements this gives better performance then an implementation
// which uses the shuffle intrinsics.
for (size_t stage_lvl = 0; stage_lvl < 2U; ++stage_lvl) {
for (size_t i = 0; i < num_streams / 2U; ++i) {
stage[stage_lvl + 1][i * 2] =
_mm_unpacklo_epi8(stage[stage_lvl][i * 2], stage[stage_lvl][i * 2 + 1]);
stage[stage_lvl + 1][i * 2 + 1] =
_mm_unpackhi_epi8(stage[stage_lvl][i * 2], stage[stage_lvl][i * 2 + 1]);
}
}
if (num_streams == 8U) {
// This is the path for double.
__m128i tmp[8];
for (size_t i = 0; i < 4; ++i) {
tmp[i * 2] = _mm_unpacklo_epi32(stage[2][i], stage[2][i + 4]);
tmp[i * 2 + 1] = _mm_unpackhi_epi32(stage[2][i], stage[2][i + 4]);
}

for (size_t i = 0; i < 4; ++i) {
final_result[i * 2] = _mm_unpacklo_epi32(tmp[i], tmp[i + 4]);
final_result[i * 2 + 1] = _mm_unpackhi_epi32(tmp[i], tmp[i + 4]);
}
} else {
// this is the path for float.
__m128i tmp[4];
for (size_t i = 0; i < 2; ++i) {
tmp[i * 2] = _mm_unpacklo_epi8(stage[2][i * 2], stage[2][i * 2 + 1]);
tmp[i * 2 + 1] = _mm_unpackhi_epi8(stage[2][i * 2], stage[2][i * 2 + 1]);
}
for (size_t i = 0; i < 2; ++i) {
final_result[i * 2] = _mm_unpacklo_epi64(tmp[i], tmp[i + 2]);
final_result[i * 2 + 1] = _mm_unpackhi_epi64(tmp[i], tmp[i + 2]);
}
}
for (size_t i = 0; i < num_streams; ++i) {
_mm_storeu_si128(&output_buffer_streams[i][block_index], final_result[i]);
}
}
}

#endif

template <typename T>
void ByteStreamSlitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
T* out) {
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values,
uint8_t* output_buffer_raw) {
constexpr size_t num_streams = sizeof(T);
for (size_t i = 0U; i < num_values; ++i) {
for (size_t j = 0U; j < num_streams; ++j) {
const uint8_t byte_in_value = raw_values[i * num_streams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
}
}

template <typename T>
void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
T* out) {
constexpr size_t kNumStreams = sizeof(T);

for (int64_t i = 0; i < num_values; ++i) {
Expand Down
26 changes: 13 additions & 13 deletions cpp/src/parquet/encoding.cc
Expand Up @@ -858,18 +858,18 @@ int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {

template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
constexpr size_t num_streams = sizeof(T);
std::shared_ptr<ResizableBuffer> output_buffer =
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const size_t num_values = values_.length();
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
for (size_t i = 0; i < num_values; ++i) {
for (size_t j = 0U; j < num_streams; ++j) {
const uint8_t byte_in_value = raw_values[i * num_streams + j];
output_buffer_raw[j * num_values + i] = byte_in_value;
}
}
#if defined(ARROW_HAVE_SSE2)
arrow::util::internal::ByteStreamSplitEncodeSSE2<T>(raw_values, num_values,
output_buffer_raw);
#else
arrow::util::internal::ByteStreamSplitEncodeScalar<T>(raw_values, num_values,
output_buffer_raw);
#endif
values_.Reset();
return std::move(output_buffer);
}
Expand Down Expand Up @@ -2341,11 +2341,11 @@ int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
const uint8_t* data = data_ + num_decoded_previously;

#if defined(ARROW_HAVE_SSE2)
arrow::util::internal::ByteStreamSlitDecodeSSE2<T>(data, values_to_decode,
num_values_in_buffer_, buffer);
arrow::util::internal::ByteStreamSplitDecodeSSE2<T>(data, values_to_decode,
num_values_in_buffer_, buffer);
#else
arrow::util::internal::ByteStreamSlitDecodeScalar<T>(data, values_to_decode,
num_values_in_buffer_, buffer);
arrow::util::internal::ByteStreamSplitDecodeScalar<T>(data, values_to_decode,
num_values_in_buffer_, buffer);
#endif
num_values_ -= values_to_decode;
len_ -= sizeof(T) * values_to_decode;
Expand All @@ -2372,8 +2372,8 @@ int ByteStreamSplitDecoder<DType>::DecodeArrow(
// Use fast decoding into intermediate buffer. This will also decode
// some null values, but it's fast enough that we don't care.
T* decode_out = EnsureDecodeBuffer(values_decoded);
arrow::util::internal::ByteStreamSlitDecodeSSE2<T>(data, values_decoded,
num_values_in_buffer_, decode_out);
arrow::util::internal::ByteStreamSplitDecodeSSE2<T>(data, values_decoded,
num_values_in_buffer_, decode_out);

// XXX If null_count is 0, we could even append in bulk or decode directly into
// builder
Expand Down
47 changes: 42 additions & 5 deletions cpp/src/parquet/encoding_benchmark.cc
Expand Up @@ -213,32 +213,69 @@ static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&& decod
state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
}

template <typename T, typename EncodeFunc>
static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&& encode_func) {
std::vector<T> values(state.range(0), 64.0);
const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
std::vector<uint8_t> output(state.range(0) * sizeof(T), 0);

for (auto _ : state) {
encode_func(values_raw, values.size(), output.data());
benchmark::ClobberMemory();
}
state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
}

static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) {
BM_ByteStreamSplitDecode<float>(
state, arrow::util::internal::ByteStreamSlitDecodeScalar<float>);
state, arrow::util::internal::ByteStreamSplitDecodeScalar<float>);
}

static void BM_ByteStreamSplitDecode_Double_Scalar(benchmark::State& state) {
BM_ByteStreamSplitDecode<double>(
state, arrow::util::internal::ByteStreamSlitDecodeScalar<double>);
state, arrow::util::internal::ByteStreamSplitDecodeScalar<double>);
}

static void BM_ByteStreamSplitEncode_Float_Scalar(benchmark::State& state) {
BM_ByteStreamSplitEncode<float>(
state, arrow::util::internal::ByteStreamSplitEncodeScalar<float>);
}

static void BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) {
BM_ByteStreamSplitEncode<double>(
state, arrow::util::internal::ByteStreamSplitEncodeScalar<double>);
}

BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);

#if defined(ARROW_HAVE_SSE2)
static void BM_ByteStreamSplitDecode_Float_SSE2(benchmark::State& state) {
BM_ByteStreamSplitDecode<float>(state,
arrow::util::internal::ByteStreamSlitDecodeSSE2<float>);
BM_ByteStreamSplitDecode<float>(
state, arrow::util::internal::ByteStreamSplitDecodeSSE2<float>);
}

static void BM_ByteStreamSplitDecode_Double_SSE2(benchmark::State& state) {
BM_ByteStreamSplitDecode<double>(
state, arrow::util::internal::ByteStreamSlitDecodeSSE2<double>);
state, arrow::util::internal::ByteStreamSplitDecodeSSE2<double>);
}

static void BM_ByteStreamSplitEncode_Float_SSE2(benchmark::State& state) {
BM_ByteStreamSplitEncode<float>(
state, arrow::util::internal::ByteStreamSplitEncodeSSE2<float>);
}

static void BM_ByteStreamSplitEncode_Double_SSE2(benchmark::State& state) {
BM_ByteStreamSplitEncode<double>(
state, arrow::util::internal::ByteStreamSplitEncodeSSE2<double>);
}

BENCHMARK(BM_ByteStreamSplitDecode_Float_SSE2)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitDecode_Double_SSE2)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitEncode_Float_SSE2)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitEncode_Double_SSE2)->Range(MIN_RANGE, MAX_RANGE);
#endif

template <typename Type>
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/parquet/encoding_test.cc
Expand Up @@ -1094,9 +1094,6 @@ TYPED_TEST(TestByteStreamSplitEncoding, BasicRoundTrip) {
// can handle both inputs with size divisible by 4/8 and sizes which would
// require a scalar loop for the suffix.

// Exercise only the scalar loop.
ASSERT_NO_FATAL_FAILURE(this->Execute(3, 1));

// Exercise only the SIMD loop.
ASSERT_NO_FATAL_FAILURE(this->Execute(256, 1));

Expand Down

0 comments on commit da94098

Please sign in to comment.