Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions

class AvroWriter::Impl {
public:
~Impl() {
if (arrow_schema_.release != nullptr) {
ArrowSchemaRelease(&arrow_schema_);
}
}

Status Open(const WriterOptions& options) {
write_schema_ = options.schema;

Expand Down
59 changes: 8 additions & 51 deletions src/iceberg/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
return &array_;
}

ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content)
: partition_spec_(std::move(partition_spec)), content_(content) {
if (!partition_spec_) {
partition_spec_ = PartitionSpec::Unpartitioned();
}
}

ManifestEntryAdapter::~ManifestEntryAdapter() {
if (array_.release != nullptr) {
ArrowArrayRelease(&array_);
Expand All @@ -148,14 +156,6 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
}
}

Result<std::shared_ptr<StructType>> ManifestEntryAdapter::GetManifestEntryType() {
if (partition_spec_ == nullptr) [[unlikely]] {
return ManifestEntry::TypeFromPartitionType(nullptr);
}
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType());
return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
}

Status ManifestEntryAdapter::AppendPartitionValues(
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
const std::vector<Literal>& partition_values) {
Expand Down Expand Up @@ -436,37 +436,6 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
return {};
}

Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>& fields_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_type, GetManifestEntryType())
auto fields_span = manifest_entry_type->fields();
std::vector<SchemaField> fields;
// TODO(xiao.dong) Make this a common function to recursively handle
// all nested fields in the schema
for (const auto& field : fields_span) {
if (field.field_id() == 2) {
// handle data_file field
auto data_file_struct = internal::checked_pointer_cast<StructType>(field.type());
std::vector<SchemaField> data_file_fields;
for (const auto& data_file_field : data_file_struct->fields()) {
if (fields_ids.contains(data_file_field.field_id())) {
data_file_fields.emplace_back(data_file_field);
}
}
auto type = std::make_shared<StructType>(data_file_fields);
auto data_file_field = SchemaField::MakeRequired(
field.field_id(), std::string(field.name()), std::move(type));
fields.emplace_back(std::move(data_file_field));
} else {
if (fields_ids.contains(field.field_id())) {
fields.emplace_back(field);
}
}
}
manifest_schema_ = std::make_shared<Schema>(fields);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
return {};
}

ManifestFileAdapter::~ManifestFileAdapter() {
if (array_.release != nullptr) {
ArrowArrayRelease(&array_);
Expand Down Expand Up @@ -671,16 +640,4 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) {
return {};
}

Status ManifestFileAdapter::InitSchema(const std::unordered_set<int32_t>& fields_ids) {
std::vector<SchemaField> fields;
for (const auto& field : ManifestFile::Type().fields()) {
if (fields_ids.contains(field.field_id())) {
fields.emplace_back(field);
}
}
manifest_list_schema_ = std::make_shared<Schema>(fields);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_));
return {};
}

} // namespace iceberg
22 changes: 8 additions & 14 deletions src/iceberg/manifest_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,18 @@ class ICEBERG_EXPORT ManifestAdapter {
/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
: partition_spec_(std::move(partition_spec)) {}
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content);

~ManifestEntryAdapter() override;

virtual Status Append(const ManifestEntry& entry) = 0;

const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }

protected:
virtual Result<std::shared_ptr<StructType>> GetManifestEntryType();
ManifestContent content() const { return content_; }

/// \brief Initialize version-specific schema.
///
/// \param fields_ids Field IDs to include in the manifest schema. The schema will be
/// initialized to include only the fields with these IDs.
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
protected:
Status AppendInternal(const ManifestEntry& entry);
Status AppendDataFile(ArrowArray* array,
const std::shared_ptr<StructType>& data_file_type,
Expand All @@ -97,6 +93,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
protected:
std::shared_ptr<PartitionSpec> partition_spec_;
std::shared_ptr<Schema> manifest_schema_;
const ManifestContent content_;
};

/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.
Expand All @@ -110,12 +107,9 @@ class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {

const std::shared_ptr<Schema>& schema() const { return manifest_list_schema_; }

virtual std::optional<int64_t> next_row_id() const { return std::nullopt; }

protected:
/// \brief Initialize version-specific schema.
///
/// \param fields_ids Field IDs to include in the manifest list schema. The schema will
/// be initialized to include only the fields with these IDs.
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
Status AppendInternal(const ManifestFile& file);
static Status AppendPartitionSummary(
ArrowArray* array, const std::shared_ptr<ListType>& summary_type,
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ bool ManifestEntry::operator==(const ManifestEntry& other) const {

std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
if (!partition_type) {
partition_type = PartitionSpec::Unpartitioned()->schema();
partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
}
return std::make_shared<StructType>(std::vector<SchemaField>{
kContent,
Expand Down
11 changes: 11 additions & 0 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
}
}

enum class ManifestContent {
kData = 0,
kDeletes = 1,
};

ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
std::string_view str) noexcept;

/// \brief DataFile carries data file path, partition tuple, metrics, ...
struct ICEBERG_EXPORT DataFile {
/// \brief Content of a data file
Expand Down Expand Up @@ -185,6 +194,8 @@ struct ICEBERG_EXPORT DataFile {
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
inline static const int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
inline static const std::string kPartitionDoc =
"Partition data tuple, schema based on the partition spec";
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
103, "record_count", iceberg::int64(), "Number of records in the file");
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
Expand Down
25 changes: 19 additions & 6 deletions src/iceberg/manifest_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,25 @@ const StructType& PartitionFieldSummary::Type() {
return kInstance;
}

const StructType& ManifestFile::Type() {
static const StructType kInstance(
{kManifestPath, kManifestLength, kPartitionSpecId, kContent, kSequenceNumber,
kMinSequenceNumber, kAddedSnapshotId, kAddedFilesCount, kExistingFilesCount,
kDeletedFilesCount, kAddedRowsCount, kExistingRowsCount, kDeletedRowsCount,
kPartitions, kKeyMetadata, kFirstRowId});
const std::shared_ptr<Schema>& ManifestFile::Type() {
static const auto kInstance = std::make_shared<Schema>(std::vector<SchemaField>{
kManifestPath,
kManifestLength,
kPartitionSpecId,
kContent,
kSequenceNumber,
kMinSequenceNumber,
kAddedSnapshotId,
kAddedFilesCount,
kExistingFilesCount,
kDeletedFilesCount,
kAddedRowsCount,
kExistingRowsCount,
kDeletedRowsCount,
kPartitions,
kKeyMetadata,
kFirstRowId,
});
return kInstance;
}

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ struct ICEBERG_EXPORT ManifestFile {

bool operator==(const ManifestFile& other) const = default;

static const StructType& Type();
static const std::shared_ptr<Schema>& Type();
};

/// Snapshots are embedded in table metadata, but the list of manifests for a snapshot are
Expand Down
17 changes: 9 additions & 8 deletions src/iceberg/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(

Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
ManifestFile::Type().fields().end());
auto schema = std::make_shared<Schema>(fields);
ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(
FileFormatType::kAvro,
{.path = std::string(manifest_list_location),
.io = std::move(file_io),
.projection = schema}));
std::shared_ptr<Schema> schema = ManifestFile::Type();
ICEBERG_ASSIGN_OR_RAISE(
auto reader,
ReaderFactoryRegistry::Open(FileFormatType::kAvro,
{
.path = std::string(manifest_list_location),
.io = std::move(file_io),
.projection = schema,
}));
return std::make_unique<ManifestListReaderImpl>(std::move(reader), std::move(schema));
}

Expand Down
21 changes: 14 additions & 7 deletions src/iceberg/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Status ManifestWriter::Close() {
return writer_->Close();
}

ManifestContent ManifestWriter::content() const { return adapter_->content(); }

Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
Expand Down Expand Up @@ -83,9 +85,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec) {
auto adapter =
std::make_unique<ManifestEntryAdapterV2>(snapshot_id, std::move(partition_spec));
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content) {
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
snapshot_id, std::move(partition_spec), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand All @@ -99,9 +102,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec) {
auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id, first_row_id,
std::move(partition_spec));
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content) {
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
snapshot_id, first_row_id, std::move(partition_spec), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand Down Expand Up @@ -136,6 +139,10 @@ Status ManifestListWriter::Close() {
return writer_->Close();
}

std::optional<int64_t> ManifestListWriter::next_row_id() const {
return adapter_->next_row_id();
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
Expand Down Expand Up @@ -169,7 +176,7 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::optional<int64_t> first_row_id,
int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id, parent_snapshot_id,
sequence_number, first_row_id);
Expand Down
15 changes: 12 additions & 3 deletions src/iceberg/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class ICEBERG_EXPORT ManifestWriter {
/// \brief Close writer and flush to storage.
Status Close();

/// \brief Get the content of the manifest.
ManifestContent content() const;

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
Expand All @@ -69,22 +72,25 @@ class ICEBERG_EXPORT ManifestWriter {
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec);
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content);

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param first_row_id First row ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec);
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content);

private:
static constexpr int64_t kBatchSize = 1024;
Expand Down Expand Up @@ -114,6 +120,9 @@ class ICEBERG_EXPORT ManifestListWriter {
/// \brief Close writer and flush to storage.
Status Close();

/// \brief Get the next row id to assign.
std::optional<int64_t> next_row_id() const;

/// \brief Creates a writer for the v1 manifest list.
/// \param snapshot_id ID of the snapshot.
/// \param parent_snapshot_id ID of the parent snapshot.
Expand Down Expand Up @@ -146,7 +155,7 @@ class ICEBERG_EXPORT ManifestListWriter {
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestListWriter>> MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::optional<int64_t> first_row_id,
int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);

private:
Expand Down
12 changes: 12 additions & 0 deletions src/iceberg/schema_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
return lhs.Equals(rhs);
}

SchemaField AsRequired() const {
auto copy = *this;
copy.optional_ = false;
return copy;
}

SchemaField AsOptional() const {
auto copy = *this;
copy.optional_ = true;
return copy;
}

private:
/// \brief Compare two fields for equality.
[[nodiscard]] bool Equals(const SchemaField& other) const;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ if(ICEBERG_BUILD_BUNDLE)
avro_schema_test.cc
avro_stream_test.cc
manifest_list_reader_writer_test.cc
manifest_list_versions_test.cc
manifest_reader_writer_test.cc
test_common.cc)

Expand Down
Loading
Loading