Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Merge bdd5ec1 into 5264ad4
Browse files Browse the repository at this point in the history
  • Loading branch information
rgruener committed Jul 26, 2018
2 parents 5264ad4 + bdd5ec1 commit ff8b7dc
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 10 deletions.
32 changes: 32 additions & 0 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,38 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values));
}

TYPED_TEST(TestParquetIO, FileMetaDataWrite) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
this->sink_ = std::make_shared<InMemoryOutputStream>();
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
values->length(), default_writer_properties()));

std::unique_ptr<FileReader> reader;
ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
const std::shared_ptr<FileMetaData> fileMetaData = reader->parquet_reader()->metadata();
ASSERT_EQ(1, fileMetaData->num_columns());
ASSERT_EQ(100, fileMetaData->num_rows());

this->sink_ = std::make_shared<InMemoryOutputStream>();

std::unique_ptr<FileMetaData> uniqueFileMetaData(fileMetaData.get());

ASSERT_OK_NO_THROW(FileWriter::WriteMetaData(uniqueFileMetaData, this->sink_));

ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
const std::shared_ptr<FileMetaData> fileMetaDataWritten =
reader->parquet_reader()->metadata();
ASSERT_EQ(fileMetaData->size(), fileMetaDataWritten->size());
ASSERT_EQ(fileMetaData->num_row_groups(), fileMetaDataWritten->num_row_groups());
ASSERT_EQ(fileMetaData->num_rows(), fileMetaDataWritten->num_rows());
ASSERT_EQ(fileMetaData->num_columns(), fileMetaDataWritten->num_columns());
ASSERT_EQ(fileMetaData->RowGroup(0)->num_rows(),
fileMetaDataWritten->RowGroup(0)->num_rows());
uniqueFileMetaData.release();
}

using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>;

TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
Expand Down
13 changes: 13 additions & 0 deletions src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,19 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}

Status FileWriter::WriteMetaData(const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<OutputStream>& sink) {
ParquetFileWriter::WriteMetaData(sink, fileMetaData);
return Status::OK();
}

Status FileWriter::WriteMetaData(const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<::arrow::io::OutputStream>& sink) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return WriteMetaData(fileMetaData, wrapper);
}


namespace {} // namespace

Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
Expand Down
8 changes: 8 additions & 0 deletions src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ class PARQUET_EXPORT FileWriter {
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);

static ::arrow::Status WriteMetaData(
const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<OutputStream>& sink);

static ::arrow::Status WriteMetaData(
const std::unique_ptr<FileMetaData>& fileMetaData,
const std::shared_ptr<::arrow::io::OutputStream>& sink);

/// \brief Write a Table to Parquet.
::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);

Expand Down
37 changes: 27 additions & 10 deletions src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ class FileSerializer : public ParquetFileWriter::Contents {
return result;
}

static void WriteMetaData(
const std::shared_ptr<OutputStream>& sink,
const std::unique_ptr<FileMetaData>& fileMetaData) {
// Write MetaData
uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());

fileMetaData->WriteTo(sink.get());
metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;

// Write Footer
sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
sink->Write(PARQUET_MAGIC, 4);
}

void Close() override {
if (is_open_) {
if (row_group_writer_) {
Expand Down Expand Up @@ -234,17 +248,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
}

void WriteMetaData() {
// Write MetaData
uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());

// Get a FileMetaData
auto metadata = metadata_->Finish();
metadata->WriteTo(sink_.get());
metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;

// Write Footer
sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
sink_->Write(PARQUET_MAGIC, 4);
WriteMetaData(sink_, metadata);
}
};

Expand Down Expand Up @@ -280,6 +285,18 @@ std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
return result;
}

void ParquetFileWriter::WriteMetaData(
const std::shared_ptr<::arrow::io::OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData) {
WriteMetaData(std::make_shared<ArrowOutputStream>(sink), fileMetaData);
}

void ParquetFileWriter::WriteMetaData(
const std::shared_ptr<OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData) {
FileSerializer::WriteMetaData(sink, fileMetaData);
}

const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }

const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
Expand Down
8 changes: 8 additions & 0 deletions src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ class PARQUET_EXPORT ParquetFileWriter {
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr);

static void WriteMetaData(
const std::shared_ptr<::arrow::io::OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData);

static void WriteMetaData(
const std::shared_ptr<OutputStream> &sink,
const std::unique_ptr<FileMetaData> &fileMetaData);

void Open(std::unique_ptr<Contents> contents);
void Close();

Expand Down

0 comments on commit ff8b7dc

Please sign in to comment.