Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
PARQUET-1360: Use conforming API style, variable names in WriteFileMe…
Browse files Browse the repository at this point in the history
…taData functions

Per comments in PARQUET-1348. I also upgraded clang-format to 6.0 here so pardon the diff noise from that.

To summarize the style points:

* Don't pass `const std:shared_ptr<T>&` if `const T&` will do
* Don't pass `const std::shared_ptr<T>&` if `T*` will do
* I prefer to only use class static function members for alternate constructors
* Out arguments after in arguments
* `lower_with_underscores` for variable names

Author: Wes McKinney <wesm+git@apache.org>

Closes #482 from wesm/PARQUET-1360 and squashes the following commits:

375a442 [Wes McKinney] API conformity and changes per review comments in ARROW-1348
  • Loading branch information
wesm committed Jul 29, 2018
1 parent b4023c2 commit 853abb9
Show file tree
Hide file tree
Showing 19 changed files with 86 additions and 117 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ enable_testing()
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake_modules")
set(BUILD_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/build-support")

set(CLANG_FORMAT_VERSION "5.0")
set(CLANG_FORMAT_VERSION "6.0")
find_package(ClangTools)
if ("$ENV{CMAKE_EXPORT_COMPILE_COMMANDS}" STREQUAL "1" OR CLANG_TIDY_FOUND)
# Generate a Clang compile_commands.json "compilation database" file for use
Expand Down
33 changes: 14 additions & 19 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::Column;
using arrow::DataType;
using arrow::default_memory_pool;
using arrow::ListArray;
using arrow::ResizableBuffer;
using arrow::PrimitiveArray;
using arrow::ResizableBuffer;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;
using arrow::compute::Datum;
using arrow::compute::DictionaryEncode;
using arrow::compute::FunctionContext;
using arrow::default_memory_pool;
using arrow::io::BufferReader;

using arrow::test::randint;
Expand Down Expand Up @@ -861,26 +861,21 @@ TYPED_TEST(TestParquetIO, FileMetaDataWrite) {

std::unique_ptr<FileReader> reader;
ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
const std::shared_ptr<FileMetaData> fileMetaData = reader->parquet_reader()->metadata();
ASSERT_EQ(1, fileMetaData->num_columns());
ASSERT_EQ(100, fileMetaData->num_rows());
auto metadata = reader->parquet_reader()->metadata();
ASSERT_EQ(1, metadata->num_columns());
ASSERT_EQ(100, metadata->num_rows());

this->sink_ = std::make_shared<InMemoryOutputStream>();

std::unique_ptr<FileMetaData> uniqueFileMetaData(fileMetaData.get());

ASSERT_OK_NO_THROW(FileWriter::WriteMetaData(uniqueFileMetaData, this->sink_));
ASSERT_OK_NO_THROW(::parquet::arrow::WriteFileMetaData(*metadata, this->sink_.get()));

ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
const std::shared_ptr<FileMetaData> fileMetaDataWritten =
reader->parquet_reader()->metadata();
ASSERT_EQ(fileMetaData->size(), fileMetaDataWritten->size());
ASSERT_EQ(fileMetaData->num_row_groups(), fileMetaDataWritten->num_row_groups());
ASSERT_EQ(fileMetaData->num_rows(), fileMetaDataWritten->num_rows());
ASSERT_EQ(fileMetaData->num_columns(), fileMetaDataWritten->num_columns());
ASSERT_EQ(fileMetaData->RowGroup(0)->num_rows(),
fileMetaDataWritten->RowGroup(0)->num_rows());
uniqueFileMetaData.release();
auto metadata_written = reader->parquet_reader()->metadata();
ASSERT_EQ(metadata->size(), metadata_written->size());
ASSERT_EQ(metadata->num_row_groups(), metadata_written->num_row_groups());
ASSERT_EQ(metadata->num_rows(), metadata_written->num_rows());
ASSERT_EQ(metadata->num_columns(), metadata_written->num_columns());
ASSERT_EQ(metadata->RowGroup(0)->num_rows(), metadata_written->RowGroup(0)->num_rows());
}

using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>;
Expand Down Expand Up @@ -1485,13 +1480,13 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
// Regression for ARROW-2802
TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
using ::arrow::Column;
using ::arrow::default_memory_pool;
using ::arrow::Field;
using ::arrow::Schema;
using ::arrow::Table;
using ::arrow::TimeUnit;
using ::arrow::TimestampBuilder;
using ::arrow::TimestampType;
using ::arrow::default_memory_pool;
using ::arrow::TimeUnit;

auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);

Expand Down
11 changes: 6 additions & 5 deletions src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
<< " != " << rhs->ToString();
EXPECT_TRUE(lhs->Equals(rhs))
<< i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}

Expand Down Expand Up @@ -607,9 +607,10 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
auto outer_group_fields = {
std::make_shared<Field>("leaf2", INT32, true),
std::make_shared<Field>("innerGroup", ::arrow::list(std::make_shared<Field>(
"innerGroup", inner_group_type, false)),
false)};
std::make_shared<Field>(
"innerGroup",
::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)),
false)};
auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);

arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
Expand Down
4 changes: 1 addition & 3 deletions src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
public:
explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
int16_t struct_def_level, MemoryPool* pool, const Node* node)
: children_(children),
struct_def_level_(struct_def_level),
pool_(pool) {
: children_(children), struct_def_level_(struct_def_level), pool_(pool) {
InitField(node, children);
}

Expand Down
13 changes: 6 additions & 7 deletions src/parquet/arrow/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,16 @@ Status MakeEmptyListsArray(int64_t size, std::shared_ptr<Array>* out_array) {
&offsets_buffer));
memset(offsets_buffer->mutable_data(), 0, offsets_nbytes);

auto value_field = ::arrow::field("item", ::arrow::float64(),
false /* nullable_values */);
auto value_field =
::arrow::field("item", ::arrow::float64(), false /* nullable_values */);
auto list_type = ::arrow::list(value_field);

std::vector<std::shared_ptr<Buffer>> child_buffers = {nullptr /* null bitmap */,
nullptr /* values */ };
auto child_data = ::arrow::ArrayData::Make(value_field->type(), 0,
std::move(child_buffers));
nullptr /* values */};
auto child_data =
::arrow::ArrayData::Make(value_field->type(), 0, std::move(child_buffers));

std::vector<std::shared_ptr<Buffer>> buffers = {nullptr /* bitmap */,
offsets_buffer };
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr /* bitmap */, offsets_buffer};
auto array_data = ::arrow::ArrayData::Make(list_type, size, std::move(buffers));
array_data->child_data.push_back(child_data);

Expand Down
24 changes: 12 additions & 12 deletions src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ using arrow::Int16Builder;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::NumericArray;
using arrow::ResizableBuffer;
using arrow::PrimitiveArray;
using arrow::ResizableBuffer;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;
Expand Down Expand Up @@ -216,9 +216,11 @@ class LevelBuilder {
if (level_null_count && level_valid_bitmap == nullptr) {
// Special case: this is a null array (all elements are null)
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
} else if (nullable_level && ((level_null_count == 0) ||
BitUtil::GetBit(level_valid_bitmap,
inner_offset + i + array_offsets_[recursion_level]))) {
} else if (nullable_level &&
((level_null_count == 0) ||
BitUtil::GetBit(
level_valid_bitmap,
inner_offset + i + array_offsets_[recursion_level]))) {
// Non-null element in a null level
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
} else {
Expand Down Expand Up @@ -1092,19 +1094,17 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}

Status FileWriter::WriteMetaData(const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<OutputStream>& sink) {
ParquetFileWriter::WriteMetaData(sink, fileMetaData);
Status WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
PARQUET_CATCH_NOT_OK(::parquet::WriteFileMetaData(file_metadata, sink));
return Status::OK();
}

Status FileWriter::WriteMetaData(const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<::arrow::io::OutputStream>& sink) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return WriteMetaData(fileMetaData, wrapper);
Status WriteFileMetaData(const FileMetaData& file_metadata,
const std::shared_ptr<::arrow::io::OutputStream>& sink) {
ArrowOutputStream wrapper(sink);
return ::parquet::arrow::WriteFileMetaData(file_metadata, &wrapper);
}


namespace {} // namespace

Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
Expand Down
17 changes: 9 additions & 8 deletions src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,6 @@ class PARQUET_EXPORT FileWriter {
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);

static ::arrow::Status WriteMetaData(
const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<OutputStream>& sink);

static ::arrow::Status WriteMetaData(
const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<::arrow::io::OutputStream>& sink);

/// \brief Write a Table to Parquet.
::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);

Expand All @@ -161,6 +153,15 @@ class PARQUET_EXPORT FileWriter {
std::unique_ptr<Impl> impl_;
};

/// \brief Write Parquet file metadata only to indicated OutputStream
PARQUET_EXPORT
::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink);

/// \brief Write Parquet file metadata only to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata,
const std::shared_ptr<::arrow::io::OutputStream>& sink);

/**
* Write a Table to Parquet.
*
Expand Down
2 changes: 1 addition & 1 deletion src/parquet/encoding-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include "parquet/encoding-internal.h"
#include "parquet/util/memory.h"

using arrow::MemoryPool;
using arrow::default_memory_pool;
using arrow::MemoryPool;

namespace parquet {

Expand Down
2 changes: 1 addition & 1 deletion src/parquet/encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"

using arrow::MemoryPool;
using arrow::default_memory_pool;
using arrow::MemoryPool;

using std::string;
using std::vector;
Expand Down
8 changes: 4 additions & 4 deletions src/parquet/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ class Encoder {
virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) {
std::shared_ptr<ResizableBuffer> buffer;
auto status = ::arrow::AllocateResizableBuffer(pool_, num_values * sizeof(T),
&buffer);
auto status =
::arrow::AllocateResizableBuffer(pool_, num_values * sizeof(T), &buffer);
if (!status.ok()) {
std::ostringstream ss;
ss << "AllocateResizableBuffer failed in Encoder.PutSpaced in "
<< __FILE__ << ", on line " << __LINE__;
ss << "AllocateResizableBuffer failed in Encoder.PutSpaced in " << __FILE__
<< ", on line " << __LINE__;
throw ParquetException(ss.str());
}
int32_t num_valid_values = 0;
Expand Down
40 changes: 11 additions & 29 deletions src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,6 @@ class FileSerializer : public ParquetFileWriter::Contents {
return result;
}

static void WriteMetaData(
const std::shared_ptr<OutputStream>& sink,
const std::unique_ptr<FileMetaData>& fileMetaData) {
// Write MetaData
uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());

fileMetaData->WriteTo(sink.get());
metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;

// Write Footer
sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
sink->Write(PARQUET_MAGIC, 4);
}

void Close() override {
if (is_open_) {
if (row_group_writer_) {
Expand All @@ -183,7 +169,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
row_group_writer_.reset();

// Write magic bytes and metadata
WriteMetaData();
auto metadata = metadata_->Finish();
WriteFileMetaData(*metadata, sink_.get());

sink_->Close();
is_open_ = false;
Expand Down Expand Up @@ -246,11 +233,6 @@ class FileSerializer : public ParquetFileWriter::Contents {
// Parquet files always start with PAR1
sink_->Write(PARQUET_MAGIC, 4);
}

void WriteMetaData() {
auto metadata = metadata_->Finish();
WriteMetaData(sink_, metadata);
}
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -285,16 +267,16 @@ std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
return result;
}

void ParquetFileWriter::WriteMetaData(
const std::shared_ptr<::arrow::io::OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData) {
WriteMetaData(std::make_shared<ArrowOutputStream>(sink), fileMetaData);
}
void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
// Write MetaData
uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());

file_metadata.WriteTo(sink);
metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;

void ParquetFileWriter::WriteMetaData(
const std::shared_ptr<OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData) {
FileSerializer::WriteMetaData(sink, fileMetaData);
// Write Footer
sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
sink->Write(PARQUET_MAGIC, 4);
}

const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
Expand Down
11 changes: 3 additions & 8 deletions src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class PARQUET_EXPORT RowGroupWriter {
std::unique_ptr<Contents> contents_;
};

PARQUET_EXPORT
void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink);

class PARQUET_EXPORT ParquetFileWriter {
public:
// Forward declare a virtual class 'Contents' to aid dependency injection and more
Expand Down Expand Up @@ -133,14 +136,6 @@ class PARQUET_EXPORT ParquetFileWriter {
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr);

static void WriteMetaData(
const std::shared_ptr<::arrow::io::OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData);

static void WriteMetaData(
const std::shared_ptr<OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData);

void Open(std::unique_ptr<Contents> contents);
void Close();

Expand Down
10 changes: 5 additions & 5 deletions src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ int64_t ColumnChunkMetaData::data_page_offset() const {
return impl_->data_page_offset();
}

bool ColumnChunkMetaData::has_index_page() const {
return impl_->has_index_page();
}
bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); }

int64_t ColumnChunkMetaData::index_page_offset() const {
return impl_->index_page_offset();
Expand Down Expand Up @@ -345,7 +343,9 @@ class FileMetaData::FileMetaDataImpl {

const ApplicationVersion& writer_version() const { return writer_version_; }

void WriteTo(OutputStream* dst) { SerializeThriftMsg(metadata_.get(), 1024, dst); }
void WriteTo(OutputStream* dst) const {
SerializeThriftMsg(metadata_.get(), 1024, dst);
}

std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
if (!(i < num_row_groups())) {
Expand Down Expand Up @@ -462,7 +462,7 @@ std::shared_ptr<const KeyValueMetadata> FileMetaData::key_value_metadata() const
return impl_->key_value_metadata();
}

void FileMetaData::WriteTo(OutputStream* dst) { return impl_->WriteTo(dst); }
void FileMetaData::WriteTo(OutputStream* dst) const { return impl_->WriteTo(dst); }

ApplicationVersion::ApplicationVersion(const std::string& application, int major,
int minor, int patch)
Expand Down
2 changes: 1 addition & 1 deletion src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class PARQUET_EXPORT FileMetaData {

const ApplicationVersion& writer_version() const;

void WriteTo(OutputStream* dst);
void WriteTo(OutputStream* dst) const;

// Return const-pointer to make it clear that this object is not to be copied
const SchemaDescriptor* schema() const;
Expand Down

0 comments on commit 853abb9

Please sign in to comment.