Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class AvroReader::Impl {
return InvalidArgument("Projected schema is required by Avro reader");
}

batch_size_ = options.batch_size;
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
read_schema_ = options.projection;

// Open the input stream and adapt to the avro interface.
Expand Down
17 changes: 12 additions & 5 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,30 @@ class AvroWriter::Impl {

::avro::NodePtr root;
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
if (const auto& schema_name =
options.properties->Get(WriterProperties::kAvroSchemaName);
!schema_name.empty()) {
root->setName(::avro::Name(schema_name));
}

avro_schema_ = std::make_shared<::avro::ValidSchema>(root);

// Open the output stream and adapt to the avro interface.
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
CreateOutputStream(options, kDefaultBufferSize));
ICEBERG_ASSIGN_OR_RAISE(
auto output_stream,
CreateOutputStream(options,
options.properties->Get(WriterProperties::kAvroBufferSize)));
arrow_output_stream_ = output_stream->arrow_output_stream();
std::map<std::string, std::vector<uint8_t>> metadata;
for (const auto& [key, value] : options.properties) {
for (const auto& [key, value] : options.metadata) {
std::vector<uint8_t> vec;
vec.reserve(value.size());
vec.assign(value.begin(), value.end());
metadata.emplace(key, std::move(vec));
}
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
std::move(output_stream), *avro_schema_,
options.properties->Get(WriterProperties::kAvroSyncInterval),
::avro::NULL_CODEC /*codec*/, metadata);
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
Expand Down
11 changes: 11 additions & 0 deletions src/iceberg/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
return reader;
}

std::unique_ptr<ReaderProperties> ReaderProperties::default_properties() {
return std::make_unique<ReaderProperties>();
}

std::unique_ptr<ReaderProperties> ReaderProperties::FromMap(
const std::unordered_map<std::string, std::string>& properties) {
auto reader_properties = std::make_unique<ReaderProperties>();
reader_properties->configs_ = properties;
return reader_properties;
}

} // namespace iceberg
26 changes: 19 additions & 7 deletions src/iceberg/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "iceberg/file_format.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/config.h"

namespace iceberg {

Expand All @@ -42,7 +43,7 @@ class ICEBERG_EXPORT Reader {
Reader& operator=(const Reader&) = delete;

/// \brief Open the reader.
virtual Status Open(const struct ReaderOptions& options) = 0;
virtual Status Open(const ReaderOptions& options) = 0;

/// \brief Close the reader.
virtual Status Close() = 0;
Expand All @@ -67,19 +68,30 @@ struct ICEBERG_EXPORT Split {
size_t length;
};

class ReaderProperties : public ConfigBase<ReaderProperties> {
public:
template <typename T>
using Entry = const ConfigBase<ReaderProperties>::Entry<T>;

/// \brief The batch size to read.
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};

/// \brief Create a default ReaderProperties instance.
static std::unique_ptr<ReaderProperties> default_properties();

/// \brief Create a ReaderProperties instance from a map of key-value pairs.
static std::unique_ptr<ReaderProperties> FromMap(
const std::unordered_map<std::string, std::string>& properties);
};

/// \brief Options for creating a reader.
struct ICEBERG_EXPORT ReaderOptions {
static constexpr int64_t kDefaultBatchSize = 4096;

/// \brief The path to the file to read.
std::string path;
/// \brief The total length of the file.
std::optional<size_t> length;
/// \brief The split to read.
std::optional<Split> split;
/// \brief The batch size to read. Only applies to implementations that support
/// batching.
int64_t batch_size = kDefaultBatchSize;
/// \brief FileIO instance to open the file. Reader implementations should down cast it
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
/// `ArrowFileSystemFileIO` as the default implementation.
Expand All @@ -93,7 +105,7 @@ struct ICEBERG_EXPORT ReaderOptions {
/// that may have different field names than the current schema.
std::shared_ptr<class NameMapping> name_mapping;
/// \brief Format-specific or implementation-specific properties.
std::unordered_map<std::string, std::string> properties;
std::shared_ptr<ReaderProperties> properties = ReaderProperties::default_properties();
};

/// \brief Factory function to create a reader of a specific file format.
Expand Down
11 changes: 11 additions & 0 deletions src/iceberg/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
return writer;
}

std::unique_ptr<WriterProperties> WriterProperties::default_properties() {
return std::make_unique<WriterProperties>();
}

std::unique_ptr<WriterProperties> WriterProperties::FromMap(
const std::unordered_map<std::string, std::string>& properties) {
auto writer_properties = std::make_unique<WriterProperties>();
writer_properties->configs_ = properties;
return writer_properties;
}

} // namespace iceberg
31 changes: 29 additions & 2 deletions src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,34 @@
#include "iceberg/metrics.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/config.h"

namespace iceberg {

class WriterProperties : public ConfigBase<WriterProperties> {
public:
template <typename T>
using Entry = const ConfigBase<WriterProperties>::Entry<T>;

/// \brief The name of the Avro root node schema to write.
inline static Entry<std::string> kAvroSchemaName{"write.avro.schema-name", ""};

/// \brief The buffer size used by Avro output stream.
inline static Entry<int64_t> kAvroBufferSize{"write.avro.buffer-size", 1024 * 1024};

/// \brief The sync interval used by Avro writer.
inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024};

/// TODO(gangwu): add more properties, like compression codec, compression level, etc.

/// \brief Create a default WriterProperties instance.
static std::unique_ptr<WriterProperties> default_properties();

/// \brief Create a WriterProperties instance from a map of key-value pairs.
static std::unique_ptr<WriterProperties> FromMap(
const std::unordered_map<std::string, std::string>& properties);
};

/// \brief Options for creating a writer.
struct ICEBERG_EXPORT WriterOptions {
/// \brief The path to the file to write.
Expand All @@ -44,8 +69,10 @@ struct ICEBERG_EXPORT WriterOptions {
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
/// `ArrowFileSystemFileIO` as the default implementation.
std::shared_ptr<class FileIO> io;
/// \brief Metadata to write to the file.
std::unordered_map<std::string, std::string> metadata;
/// \brief Format-specific or implementation-specific properties.
std::unordered_map<std::string, std::string> properties;
std::shared_ptr<WriterProperties> properties = WriterProperties::default_properties();
};

/// \brief Base writer class to write data from different file formats.
Expand All @@ -57,7 +84,7 @@ class ICEBERG_EXPORT Writer {
Writer& operator=(const Writer&) = delete;

/// \brief Open the writer.
virtual Status Open(const struct WriterOptions& options) = 0;
virtual Status Open(const WriterOptions& options) = 0;

/// \brief Close the writer.
virtual Status Close() = 0;
Expand Down
10 changes: 10 additions & 0 deletions src/iceberg/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ namespace iceberg {
class ICEBERG_EXPORT ManifestReader {
public:
virtual ~ManifestReader() = default;

/// \brief Read all manifest entries in the manifest file.
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;

/// \brief Get the metadata of the manifest file.
virtual Result<std::unordered_map<std::string, std::string>> Metadata() const = 0;

/// \brief Creates a reader for a manifest file.
/// \param manifest A ManifestFile object containing metadata about the manifest.
/// \param file_io File IO implementation to use.
Expand All @@ -61,8 +66,13 @@ class ICEBERG_EXPORT ManifestReader {
class ICEBERG_EXPORT ManifestListReader {
public:
virtual ~ManifestListReader() = default;

/// \brief Read all manifest files in the manifest list file.
virtual Result<std::vector<ManifestFile>> Files() const = 0;

/// \brief Get the metadata of the manifest list file.
virtual Result<std::unordered_map<std::string, std::string>> Metadata() const = 0;

/// \brief Creates a reader for the manifest list.
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
Expand Down
10 changes: 10 additions & 0 deletions src/iceberg/manifest_reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
return manifest_entries;
}

Result<std::unordered_map<std::string, std::string>> ManifestReaderImpl::Metadata()
const {
return reader_->Metadata();
}

Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
std::vector<ManifestFile> manifest_files;
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
Expand All @@ -569,6 +574,11 @@ Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
return manifest_files;
}

Result<std::unordered_map<std::string, std::string>> ManifestListReaderImpl::Metadata()
const {
return reader_->Metadata();
}

Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
if (index >= 0 && index < static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
return static_cast<ManifestFileField>(index);
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/manifest_reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class ManifestReaderImpl : public ManifestReader {

Result<std::vector<ManifestEntry>> Entries() const override;

Result<std::unordered_map<std::string, std::string>> Metadata() const override;

private:
std::shared_ptr<Schema> schema_;
std::unique_ptr<Reader> reader_;
Expand All @@ -55,6 +57,8 @@ class ManifestListReaderImpl : public ManifestListReader {

Result<std::vector<ManifestFile>> Files() const override;

Result<std::unordered_map<std::string, std::string>> Metadata() const override;

private:
std::shared_ptr<Schema> schema_;
std::unique_ptr<Reader> reader_;
Expand Down
63 changes: 38 additions & 25 deletions src/iceberg/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); }
Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
std::unordered_map<std::string, std::string> properties) {
ICEBERG_ASSIGN_OR_RAISE(
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
{.path = std::string(location),
.schema = std::move(schema),
.io = std::move(file_io),
.properties = std::move(properties)}));
std::unordered_map<std::string, std::string> metadata, std::string_view schema_name) {
auto writer_properties = WriterProperties::default_properties();
if (!schema_name.empty()) {
writer_properties->Set(WriterProperties::kAvroSchemaName, std::string(schema_name));
}
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
FileFormatType::kAvro,
{
.path = std::string(location),
.schema = std::move(schema),
.io = std::move(file_io),
.metadata = std::move(metadata),
.properties = std::move(writer_properties),
}));
return writer;
}

Expand All @@ -91,9 +98,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io), adapter->metadata()));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
}

Expand All @@ -119,9 +127,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io), adapter->metadata()));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
}

Expand Down Expand Up @@ -149,9 +158,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io), adapter->metadata()));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
}

Expand Down Expand Up @@ -191,9 +201,10 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io), adapter->metadata()));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
}

Expand All @@ -207,9 +218,10 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io), adapter->metadata()));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));

return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
}
Expand All @@ -224,9 +236,10 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(auto writer,
OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io), adapter->metadata()));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
}

Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/parquet/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class ParquetReader::Impl {
// Prepare reader properties
::parquet::ReaderProperties reader_properties(pool_);
::parquet::ArrowReaderProperties arrow_reader_properties;
arrow_reader_properties.set_batch_size(options.batch_size);
arrow_reader_properties.set_batch_size(
options.properties->Get(ReaderProperties::kBatchSize));
arrow_reader_properties.set_arrow_extensions_enabled(true);

// Open the Parquet file reader
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class ParquetWriter::Impl {

ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
auto file_writer = ::parquet::ParquetFileWriter::Open(
output_stream_, std::move(schema_node), std::move(writer_properties));
output_stream_, std::move(schema_node), std::move(writer_properties),
std::make_shared<::arrow::KeyValueMetadata>(options.metadata));
ICEBERG_ARROW_RETURN_NOT_OK(
::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_,
std::move(arrow_writer_properties), &writer_));
Expand Down
Loading
Loading