Skip to content

Commit

Permalink
PARQUET-1523: [C++] Vectorize Comparator interface, remove virtual ca…
Browse files Browse the repository at this point in the history
…lls on inner loop. Refactor Statistics to not require PARQUET_EXTERN_TEMPLATE

This patch supersedes #3752

I took the liberty of consolidating the comparator code with the statistics code since the two things are effectively inseparable. I also renamed the statistics classes for clarity, since "Statistics" is clearer than "RowGroupStatistics" -- the "scope" of the statistics need not be limited to a row group.

I apologize for the size of the diff; it is largely the result of moving code around and shuffling code from header files into parquet/statistics.cc

Author: Wes McKinney <wesm+git@apache.org>

Closes #4233 from wesm/PARQUET-1523-vectorize-comparator and squashes the following commits:

a1f2f38 <Wes McKinney> Code review feedback, fix doxygen warning
5493b59 <Wes McKinney> Add basic doxygen comments, fix clang warnings
e6efdc0 <Wes McKinney> Restore UNKNOWN default sort order for INT96
3a58b1f <Wes McKinney> Restore PARQUET_TEMPLATE_CLASS_EXPORT to TypedScanner
ab5d01f <Wes McKinney> Make benchmark data for int64 write path larger and more diverse
335607d <Wes McKinney> Fix parquet-column_writer-test
6273789 <Wes McKinney> Statistics tests passing again
d55bf7d <Wes McKinney> Compiling again, not quite there
9201261 <Wes McKinney> Refactoring progress
  • Loading branch information
wesm committed May 2, 2019
1 parent 0289af2 commit 250e97c
Show file tree
Hide file tree
Showing 20 changed files with 1,017 additions and 958 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class FlightIpcMessageReader : public ipc::MessageReader {
Status OverrideWithServerError(Status&& st) {
// Get the gRPC status if not OK, to propagate any server error message
RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish()));
return st;
return std::move(st);
}

// The RPC context lifetime must be coupled to the ClientReader
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/flight/flight-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ Status RunPerformanceTest(const std::string& hostname, const int port) {
RETURN_NOT_OK(plan->GetSchema(&schema));

PerformanceStats stats;
auto ConsumeStream = [&stats, &schema, &hostname,
&port](const FlightEndpoint& endpoint) {
auto ConsumeStream = [&stats, &hostname, &port](const FlightEndpoint& endpoint) {
// TODO(wesm): Use location from endpoint, same host/port for now
std::unique_ptr<FlightClient> client;
RETURN_NOT_OK(FlightClient::Connect(hostname, port, &client));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/server_auth.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ARROW_EXPORT ServerAuthHandler {
/// \brief An authentication mechanism that does nothing.
class ARROW_EXPORT NoOpAuthHandler : public ServerAuthHandler {
public:
~NoOpAuthHandler();
~NoOpAuthHandler() override;
Status Authenticate(ServerAuthSender* outgoing, ServerAuthReader* incoming) override;
Status IsValid(const std::string& token, std::string* peer_identity) override;
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class ARROW_EXPORT TestServerAuthHandler : public ServerAuthHandler {
public:
explicit TestServerAuthHandler(const std::string& username,
const std::string& password);
~TestServerAuthHandler();
~TestServerAuthHandler() override;
Status Authenticate(ServerAuthSender* outgoing, ServerAuthReader* incoming) override;
Status IsValid(const std::string& token, std::string* peer_identity) override;

Expand All @@ -151,7 +151,7 @@ class ARROW_EXPORT TestClientAuthHandler : public ClientAuthHandler {
public:
explicit TestClientAuthHandler(const std::string& username,
const std::string& password);
~TestClientAuthHandler();
~TestClientAuthHandler() override;
Status Authenticate(ClientAuthSender* outgoing, ClientAuthReader* incoming) override;
Status GetToken(std::string* token) override;

Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ set(PARQUET_SRCS
schema.cc
statistics.cc
types.cc
util/comparison.cc
util/memory.cc)

# Ensure that thrift compilation is done before using its generated headers
Expand Down
40 changes: 22 additions & 18 deletions cpp/src/parquet/column-io-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "benchmark/benchmark.h"

#include "arrow/array.h"
#include "arrow/testing/random.h"

#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
#include "parquet/file_reader.h"
Expand Down Expand Up @@ -62,7 +65,11 @@ template <Repetition::type repetition,
Compression::type codec = Compression::UNCOMPRESSED>
static void BM_WriteInt64Column(::benchmark::State& state) {
format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range(0), 128);

::arrow::random::RandomArrayGenerator rgen(1337);
auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
const auto& i8_values = static_cast<const ::arrow::Int64Array&>(*values);

std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
Expand All @@ -78,39 +85,36 @@ static void BM_WriteInt64Column(::benchmark::State& state) {
InMemoryOutputStream stream;
std::shared_ptr<Int64Writer> writer = BuildWriter(
state.range(0), &stream, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
values.data());
writer->WriteBatch(i8_values.length(), definition_levels.data(),
repetition_levels.data(), i8_values.raw_values());
writer->Close();
}
SetBytesProcessed(state, repetition);
}

BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Range(1024, 65536);

BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Range(1024, 65536);

BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);

BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
->Range(1024, 65536);
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
->Range(1024, 65536);
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::SNAPPY)
->Range(1024, 65536);
->Arg(1 << 20);

BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4)
->Range(1024, 65536);
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4)
->Range(1024, 65536);
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
->Range(1024, 65536);
->Arg(1 << 20);

BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::ZSTD)
->Range(1024, 65536);
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
->Range(1024, 65536);
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD)
->Range(1024, 65536);
->Arg(1 << 20);

std::shared_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
int64_t num_values, ColumnDescriptor* schema) {
Expand Down
26 changes: 12 additions & 14 deletions cpp/src/parquet/column_writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
#include "parquet/column_writer.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
#include "parquet/test-specialization.h"
#include "parquet/test-util.h"
#include "parquet/thrift.h"
#include "parquet/types.h"
#include "parquet/util/comparison.h"
#include "parquet/util/memory.h"

namespace parquet {
Expand Down Expand Up @@ -215,16 +215,14 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
void ReadAndCompare(Compression::type compression, int64_t num_rows) {
this->SetupValuesOut(num_rows);
this->ReadColumnFully(compression);
std::shared_ptr<CompareDefault<TestType>> compare;
compare = std::static_pointer_cast<CompareDefault<TestType>>(
Comparator::Make(this->descr_));
auto comparator = TypedComparator<TestType>::Make(this->descr_);
for (size_t i = 0; i < this->values_.size(); i++) {
if ((*compare)(this->values_[i], this->values_out_[i]) ||
(*compare)(this->values_out_[i], this->values_[i])) {
if (comparator->Compare(this->values_[i], this->values_out_[i]) ||
comparator->Compare(this->values_out_[i], this->values_[i])) {
std::cout << "Failed at " << i << std::endl;
}
ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i]));
ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i]));
}
ASSERT_EQ(this->values_, this->values_out_);
}
Expand Down Expand Up @@ -297,15 +295,15 @@ void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compressio
int64_t num_rows) {
this->SetupValuesOut(num_rows);
this->ReadColumnFully(compression);
std::shared_ptr<CompareDefault<Int96Type>> compare;
compare = std::make_shared<CompareDefaultInt96>();

auto comparator = TypedComparator<Int96Type>::Make(Type::INT96, SortOrder::SIGNED);
for (size_t i = 0; i < this->values_.size(); i++) {
if ((*compare)(this->values_[i], this->values_out_[i]) ||
(*compare)(this->values_out_[i], this->values_[i])) {
if (comparator->Compare(this->values_[i], this->values_out_[i]) ||
comparator->Compare(this->values_out_[i], this->values_[i])) {
std::cout << "Failed at " << i << std::endl;
}
ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
ASSERT_FALSE(comparator->Compare(this->values_[i], this->values_out_[i]));
ASSERT_FALSE(comparator->Compare(this->values_out_[i], this->values_[i]));
}
ASSERT_EQ(this->values_, this->values_out_);
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<

if (properties->statistics_enabled(descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
page_statistics_ = TypedStats::Make(descr_, allocator_);
chunk_statistics_ = TypedStats::Make(descr_, allocator_);
}
}

Expand Down Expand Up @@ -719,9 +719,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
std::unique_ptr<Encoder> current_encoder_;

typedef TypedRowGroupStatistics<DType> TypedStats;
std::unique_ptr<TypedStats> page_statistics_;
std::unique_ptr<TypedStats> chunk_statistics_;
using TypedStats = TypedStatistics<DType>;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
};

// Only one Dictionary Page is written.
Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "parquet/encoding.h"
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,26 @@ std::string ParquetVersionToString(ParquetVersion::type ver) {
}

template <typename DType>
static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
static std::shared_ptr<Statistics> MakeTypedColumnStats(
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
// If ColumnOrder is defined, return max_value and min_value
if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
return std::make_shared<TypedRowGroupStatistics<DType>>(
return TypedStatistics<DType>::Make(
descr, metadata.statistics.min_value, metadata.statistics.max_value,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count,
metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value);
}
// Default behavior
return std::make_shared<TypedRowGroupStatistics<DType>>(
return TypedStatistics<DType>::Make(
descr, metadata.statistics.min, metadata.statistics.max,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count,
metadata.statistics.__isset.max || metadata.statistics.__isset.min);
}

std::shared_ptr<RowGroupStatistics> MakeColumnStats(
const format::ColumnMetaData& meta_data, const ColumnDescriptor* descr) {
std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_data,
const ColumnDescriptor* descr) {
switch (static_cast<Type::type>(meta_data.type)) {
case Type::BOOLEAN:
return MakeTypedColumnStats<BooleanType>(meta_data, descr);
Expand Down Expand Up @@ -159,7 +159,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
descr_->sort_order());
}

inline std::shared_ptr<RowGroupStatistics> statistics() const {
inline std::shared_ptr<Statistics> statistics() const {
return is_stats_set() ? possible_stats_ : nullptr;
}

Expand Down Expand Up @@ -196,7 +196,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
}

private:
mutable std::shared_ptr<RowGroupStatistics> possible_stats_;
mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
const format::ColumnChunk* column_;
const ColumnDescriptor* descr_;
Expand Down Expand Up @@ -232,7 +232,7 @@ std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const
return impl_->path_in_schema();
}

std::shared_ptr<RowGroupStatistics> ColumnChunkMetaData::statistics() const {
std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const {
return impl_->statistics();
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace parquet {
class ColumnDescriptor;
class EncodedStatistics;
class OutputStream;
class RowGroupStatistics;
class Statistics;
class SchemaDescriptor;

namespace schema {
Expand Down Expand Up @@ -117,7 +117,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
int64_t num_values() const;
std::shared_ptr<schema::ColumnPath> path_in_schema() const;
bool is_stats_set() const;
std::shared_ptr<RowGroupStatistics> statistics() const;
std::shared_ptr<Statistics> statistics() const;
Compression::type compression() const;
const std::vector<Encoding::type>& encodings() const;
bool has_dictionary_page() const;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte
// Print column metadata
for (auto i : selected_columns) {
auto column_chunk = group_metadata->ColumnChunk(i);
std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
std::shared_ptr<Statistics> stats = column_chunk->statistics();

const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
stream << "Column " << i << std::endl << ", Values: " << column_chunk->num_values();
Expand Down Expand Up @@ -220,7 +220,7 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list<int> selected
int c1 = 0;
for (auto i : selected_columns) {
auto column_chunk = group_metadata->ColumnChunk(i);
std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
std::shared_ptr<Statistics> stats = column_chunk->statistics();

const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
stream << " {\"Id\": \"" << i << "\", \"Values\": \""
Expand Down
Loading

0 comments on commit 250e97c

Please sign in to comment.