Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,9 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
partition_fields.push_back(std::move(*partition_field));
}
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
// partition field from schema and then verify it
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
Expand Down Expand Up @@ -902,8 +904,10 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
std::move(field->transform()));
}

auto spec = std::make_unique<PartitionSpec>(
current_schema, PartitionSpec::kInitialSpecId, std::move(fields));
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
// partition field from schema and then verify it
auto spec =
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_s
/// objects. Each `PartitionField` will be parsed using the `PartitionFieldFromJson`
/// function.
///
/// \param schema The current schema.
/// \param json The JSON object representing a `PartitionSpec`.
/// \return An `expected` value containing either a `PartitionSpec` object or an error. If
/// the JSON is malformed or missing expected fields, an error will be returned.
Expand Down
7 changes: 6 additions & 1 deletion src/iceberg/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "iceberg/manifest_adapter.h"

#include <utility>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow/nanoarrow_status_internal.h"
Expand Down Expand Up @@ -140,8 +142,11 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
}

ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema,
ManifestContent content)
: partition_spec_(std::move(partition_spec)), content_(content) {
: partition_spec_(std::move(partition_spec)),
current_schema_(std::move(current_schema)),
content_(content) {
if (!partition_spec_) {
partition_spec_ = PartitionSpec::Unpartitioned();
}
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/manifest_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ICEBERG_EXPORT ManifestAdapter {
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content);
std::shared_ptr<Schema> current_schema, ManifestContent content);

~ManifestEntryAdapter() override;

Expand Down Expand Up @@ -92,6 +92,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {

protected:
std::shared_ptr<PartitionSpec> partition_spec_;
std::shared_ptr<Schema> current_schema_;
std::shared_ptr<Schema> manifest_schema_;
const ManifestContent content_;
};
Expand Down
54 changes: 47 additions & 7 deletions src/iceberg/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,23 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec) {
auto adapter =
std::make_unique<ManifestEntryAdapterV1>(snapshot_id, std::move(partition_spec));
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}

auto adapter = std::make_unique<ManifestEntryAdapterV1>(
snapshot_id, std::move(partition_spec), std::move(current_schema));
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand All @@ -86,9 +100,21 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content) {
std::shared_ptr<Schema> current_schema, ManifestContent content) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
snapshot_id, std::move(partition_spec), content);
snapshot_id, std::move(partition_spec), std::move(current_schema), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand All @@ -102,9 +128,23 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content) {
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
snapshot_id, first_row_id, std::move(partition_spec), content);
snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema),
content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand Down
13 changes: 10 additions & 3 deletions src/iceberg/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,42 @@ class ICEBERG_EXPORT ManifestWriter {
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Current table schema.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec);
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema);

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Schema containing the source fields referenced by partition
/// spec.
/// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
ManifestContent content);
std::shared_ptr<Schema> current_schema, ManifestContent content);

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param first_row_id First row ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Schema containing the source fields referenced by partition
/// spec.
/// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content);
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content);

private:
static constexpr int64_t kBatchSize = 1024;
Expand Down
31 changes: 9 additions & 22 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <algorithm>
#include <format>
#include <memory>
#include <ranges>

#include "iceberg/schema.h"
Expand All @@ -31,10 +32,9 @@

namespace iceberg {

PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
std::vector<PartitionField> fields,
PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id)
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
: spec_id_(spec_id), fields_(std::move(fields)) {
if (last_assigned_field_id) {
last_assigned_field_id_ = last_assigned_field_id.value();
} else if (fields_.empty()) {
Expand All @@ -48,34 +48,25 @@ PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,

const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
static const std::shared_ptr<PartitionSpec> unpartitioned =
std::make_shared<PartitionSpec>(
/*schema=*/std::make_shared<Schema>(std::vector<SchemaField>{}), kInitialSpecId,
std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1);
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
kLegacyPartitionDataIdStart - 1);
return unpartitioned;
}

const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }

int32_t PartitionSpec::spec_id() const { return spec_id_; }

std::span<const PartitionField> PartitionSpec::fields() const { return fields_; }

Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(const Schema& schema) {
if (fields_.empty()) {
return nullptr;
}
{
std::scoped_lock<std::mutex> lock(mutex_);
if (partition_type_ != nullptr) {
return partition_type_;
}
return std::make_unique<StructType>(std::vector<SchemaField>{});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation allows using partition_type_ cache based on tableschema_, but the given schema parameter may belong to a different schema version, which could lead to correctness issues.
Suggest adding schema_id to the cache key to ensure consistency.

such as:
std::unordered_map<int32_t, std::shared_ptr> partition_type_cache_;


std::vector<SchemaField> partition_fields;
for (const auto& partition_field : fields_) {
// Get the source field from the original schema by source_id
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
schema_->FindFieldById(partition_field.source_id()));
schema.FindFieldById(partition_field.source_id()));
if (!source_field.has_value()) {
// TODO(xiao.dong) when source field is missing,
// should return an error or just use UNKNOWN type
Expand All @@ -97,11 +88,7 @@ Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
/*optional=*/true);
}

std::scoped_lock<std::mutex> lock(mutex_);
if (partition_type_ == nullptr) {
partition_type_ = std::make_shared<StructType>(std::move(partition_fields));
}
return partition_type_;
return std::make_unique<StructType>(std::move(partition_fields));
}

std::string PartitionSpec::ToString() const {
Expand Down
18 changes: 5 additions & 13 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/// Partition specs for Iceberg tables.

#include <cstdint>
#include <mutex>
#include <memory>
#include <optional>
#include <span>
#include <string>
Expand All @@ -32,6 +32,7 @@
#include "iceberg/iceberg_export.h"
#include "iceberg/partition_field.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/formattable.h"

namespace iceberg {
Expand All @@ -56,24 +57,20 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \param fields The partition fields.
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
/// be calculated from the fields.
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
std::vector<PartitionField> fields,
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
std::optional<int32_t> last_assigned_field_id = std::nullopt);

/// \brief Get an unsorted partition spec singleton.
static const std::shared_ptr<PartitionSpec>& Unpartitioned();

/// \brief Get the table schema
const std::shared_ptr<Schema>& schema() const;

/// \brief Get the spec ID.
int32_t spec_id() const;

/// \brief Get a list view of the partition fields.
std::span<const PartitionField> fields() const;

/// \brief Get the partition type.
Result<std::shared_ptr<StructType>> PartitionType();
/// \brief Get the partition type binding to the input schema.
Result<std::unique_ptr<StructType>> PartitionType(const Schema&);

std::string ToString() const override;

Expand All @@ -87,14 +84,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \brief Compare two partition specs for equality.
bool Equals(const PartitionSpec& other) const;

std::shared_ptr<Schema> schema_;
const int32_t spec_id_;
std::vector<PartitionField> fields_;
int32_t last_assigned_field_id_;

// FIXME: use similar lazy initialization pattern as in StructType
std::mutex mutex_;
std::shared_ptr<StructType> partition_type_;
};

} // namespace iceberg
7 changes: 6 additions & 1 deletion src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "iceberg/schema_field.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/type.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -269,7 +270,11 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co

std::vector<std::shared_ptr<FileScanTask>> tasks;
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec());
auto partition_schema = partition_spec->schema();

// Get the table schema and partition type
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema());
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructType> partition_schema,
partition_spec->PartitionType(*current_schema));

for (const auto& manifest_file : manifest_files) {
ICEBERG_ASSIGN_OR_RAISE(
Expand Down
6 changes: 2 additions & 4 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,9 @@ TEST(JsonPartitionTest, PartitionSpec) {
std::vector<SchemaField>{SchemaField(3, "region", iceberg::string(), false),
SchemaField(5, "ts", iceberg::int64(), false)},
/*schema_id=*/100);

auto identity_transform = Transform::Identity();
PartitionSpec spec(schema, 1,
{PartitionField(3, 101, "region", identity_transform),
PartitionField(5, 102, "ts", identity_transform)});
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
PartitionField(5, 102, "ts", identity_transform)});
auto json = ToJson(spec);
nlohmann::json expected_json = R"({"spec-id": 1,
"fields": [
Expand Down
Loading
Loading