diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 1c2f3225..02b8d528 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -851,6 +851,38 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); } +TYPED_TEST(TestParquetIO, FileMetaDataWrite) { + std::shared_ptr values; + ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); + std::shared_ptr table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + std::unique_ptr reader; + ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); + const std::shared_ptr fileMetaData = reader->parquet_reader()->metadata(); + ASSERT_EQ(1, fileMetaData->num_columns()); + ASSERT_EQ(100, fileMetaData->num_rows()); + + this->sink_ = std::make_shared(); + + std::unique_ptr uniqueFileMetaData(fileMetaData.get()); + + ASSERT_OK_NO_THROW(FileWriter::WriteMetaData(uniqueFileMetaData, this->sink_)); + + ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); + const std::shared_ptr fileMetaDataWritten = + reader->parquet_reader()->metadata(); + ASSERT_EQ(fileMetaData->size(), fileMetaDataWritten->size()); + ASSERT_EQ(fileMetaData->num_row_groups(), fileMetaDataWritten->num_row_groups()); + ASSERT_EQ(fileMetaData->num_rows(), fileMetaDataWritten->num_rows()); + ASSERT_EQ(fileMetaData->num_columns(), fileMetaDataWritten->num_columns()); + ASSERT_EQ(fileMetaData->RowGroup(0)->num_rows(), + fileMetaDataWritten->RowGroup(0)->num_rows()); + uniqueFileMetaData.release(); +} + using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>; TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index f3ddda90..d1697c34 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -1092,6 +1092,19 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool return Open(schema, pool, wrapper, properties, arrow_properties, writer); } +Status FileWriter::WriteMetaData(const std::unique_ptr& fileMetaData, + const std::shared_ptr& sink) { + ParquetFileWriter::WriteMetaData(sink, fileMetaData); + return Status::OK(); +} + +Status FileWriter::WriteMetaData(const std::unique_ptr& fileMetaData, + const std::shared_ptr<::arrow::io::OutputStream>& sink) { + auto wrapper = std::make_shared(sink); + return WriteMetaData(fileMetaData, wrapper); +} + + namespace {} // namespace Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) { diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h index 06008d2f..d62d3b0e 100644 --- a/src/parquet/arrow/writer.h +++ b/src/parquet/arrow/writer.h @@ -132,6 +132,14 @@ class PARQUET_EXPORT FileWriter { const std::shared_ptr& arrow_properties, std::unique_ptr* writer); + static ::arrow::Status WriteMetaData( + const std::unique_ptr& fileMetaData, + const std::shared_ptr& sink); + + static ::arrow::Status WriteMetaData( + const std::unique_ptr& fileMetaData, + const std::shared_ptr<::arrow::io::OutputStream>& sink); + /// \brief Write a Table to Parquet. ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size); diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc index 1e4a09e2..cc34fd0b 100644 --- a/src/parquet/file_writer.cc +++ b/src/parquet/file_writer.cc @@ -160,6 +160,20 @@ class FileSerializer : public ParquetFileWriter::Contents { return result; } + static void WriteMetaData( + const std::shared_ptr& sink, + const std::unique_ptr& fileMetaData) { + // Write MetaData + uint32_t metadata_len = static_cast(sink->Tell()); + + fileMetaData->WriteTo(sink.get()); + metadata_len = static_cast(sink->Tell()) - metadata_len; + + // Write Footer + sink->Write(reinterpret_cast(&metadata_len), 4); + sink->Write(PARQUET_MAGIC, 4); + } + void Close() override { if (is_open_) { if (row_group_writer_) { @@ -234,17 +248,8 @@ class FileSerializer : public ParquetFileWriter::Contents { } void WriteMetaData() { - // Write MetaData - uint32_t metadata_len = static_cast(sink_->Tell()); - - // Get a FileMetaData auto metadata = metadata_->Finish(); - metadata->WriteTo(sink_.get()); - metadata_len = static_cast(sink_->Tell()) - metadata_len; - - // Write Footer - sink_->Write(reinterpret_cast(&metadata_len), 4); - sink_->Write(PARQUET_MAGIC, 4); + WriteMetaData(sink_, metadata); } }; @@ -280,6 +285,18 @@ std::unique_ptr ParquetFileWriter::Open( return result; } +void ParquetFileWriter::WriteMetaData( + const std::shared_ptr<::arrow::io::OutputStream> &sink, + const std::unique_ptr &fileMetaData) { + WriteMetaData(std::make_shared(sink), fileMetaData); +} + +void ParquetFileWriter::WriteMetaData( + const std::shared_ptr &sink, + const std::unique_ptr &fileMetaData) { + FileSerializer::WriteMetaData(sink, fileMetaData); +} + const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } const ColumnDescriptor* ParquetFileWriter::descr(int i) const { diff --git a/src/parquet/file_writer.h b/src/parquet/file_writer.h index 9c28531f..e0d1dae9 100644 --- a/src/parquet/file_writer.h +++ b/src/parquet/file_writer.h @@ -133,6 +133,14 @@ class PARQUET_EXPORT ParquetFileWriter { const std::shared_ptr& properties = default_writer_properties(), const std::shared_ptr& key_value_metadata = nullptr); + static void WriteMetaData( + const std::shared_ptr<::arrow::io::OutputStream> &sink, + const std::unique_ptr &fileMetaData); + + static void WriteMetaData( + const std::shared_ptr &sink, + const std::unique_ptr &fileMetaData); + void Open(std::unique_ptr contents); void Close();