Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/iceberg/catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.

iceberg_install_all_headers(iceberg/catalog)

add_subdirectory(memory)

if(ICEBERG_BUILD_REST)
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ namespace iceberg {

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<StructType> partition_schema) {
std::shared_ptr<StructType> partition_type) {
auto manifest_entry_schema =
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
ManifestEntry::TypeFromPartitionType(std::move(partition_type));
std::shared_ptr<Schema> schema =
FromStructType(std::move(*manifest_entry_schema), std::nullopt);

Expand All @@ -53,9 +53,9 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<StructType> partition_schema) {
std::shared_ptr<StructType> partition_type) {
auto manifest_entry_schema =
ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
ManifestEntry::TypeFromPartitionType(std::move(partition_type));
auto fields_span = manifest_entry_schema->fields();
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
auto schema = std::make_shared<Schema>(fields);
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ class ICEBERG_EXPORT ManifestReader {
/// \brief Creates a reader for a manifest file.
/// \param manifest A ManifestFile object containing metadata about the manifest.
/// \param file_io File IO implementation to use.
/// \param partition_schema Schema for the partition.
/// \param partition_type Schema for the partition.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<StructType> partition_schema);
std::shared_ptr<StructType> partition_type);

/// \brief Creates a reader for a manifest file.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_schema Schema for the partition.
/// \param partition_type Schema for the partition.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<StructType> partition_schema);
std::shared_ptr<StructType> partition_type);
};

/// \brief Read manifest files from a manifest list file.
Expand Down
23 changes: 6 additions & 17 deletions src/iceberg/sort_order.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ bool SortOrder::Equals(const SortOrder& other) const {

Status SortOrder::Validate(const Schema& schema) const {
for (const auto& field : fields_) {
auto schema_field = schema.FindFieldById(field.source_id());
if (!schema_field.has_value() || schema_field.value() == std::nullopt) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
if (!schema_field.has_value() || schema_field == std::nullopt) {
return InvalidArgument("Cannot find source column for sort field: {}", field);
}

const auto& source_type = schema_field.value().value().get().type();
const auto& source_type = schema_field.value().get().type();

if (!field.transform()->CanTransform(*source_type)) {
return InvalidArgument("Invalid source type {} for transform {}",
Expand All @@ -113,20 +113,9 @@ Result<std::unique_ptr<SortOrder>> SortOrder::Make(const Schema& schema, int32_t
return InvalidArgument("Sort order must have at least one sort field");
}

for (const auto& field : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldById(field.source_id()));
if (schema_field == std::nullopt) {
return InvalidArgument("Cannot find source column for sort field: {}", field);
}

const auto& source_type = schema_field.value().get().type();
if (field.transform()->CanTransform(*source_type) == false) {
return InvalidArgument("Invalid source type {} for transform {}",
source_type->ToString(), field.transform()->ToString());
}
}

return std::make_unique<SortOrder>(sort_id, std::move(fields));
auto sort_order = std::make_unique<SortOrder>(sort_id, std::move(fields));
ICEBERG_RETURN_UNEXPECTED(sort_order->Validate(schema));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, this is more precise.

return sort_order;
}

Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t sort_id,
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {

TEST(JsonPartitionTest, PartitionSpec) {
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(3, "region", iceberg::string(), false),
SchemaField(5, "ts", iceberg::int64(), false)},
std::vector<SchemaField>{SchemaField(3, "region", string(), false),
SchemaField(5, "ts", int64(), false)},
/*schema_id=*/100);
auto identity_transform = Transform::Identity();
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
Expand Down
32 changes: 16 additions & 16 deletions src/iceberg/test/manifest_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

namespace iceberg {

class ManifestReaderTestBase : public TempFileTestBase {
class ManifestReaderWriterTestBase : public TempFileTestBase {
protected:
static void SetUpTestSuite() { avro::RegisterAll(); }

Expand Down Expand Up @@ -88,7 +88,7 @@ class ManifestReaderTestBase : public TempFileTestBase {
std::shared_ptr<FileIO> file_io_;
};

class ManifestReaderV1Test : public ManifestReaderTestBase {
class ManifestV1Test : public ManifestReaderWriterTestBase {
protected:
std::vector<ManifestEntry> PreparePartitionedTestData() {
std::vector<ManifestEntry> manifest_entries;
Expand Down Expand Up @@ -167,19 +167,19 @@ class ManifestReaderV1Test : public ManifestReaderTestBase {
}
};

TEST_F(ManifestReaderV1Test, PartitionedTest) {
TEST_F(ManifestV1Test, ReadPartitionedTest) {
// TODO(xiao.dong) we need to add more cases for different partition types
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
SchemaField partition_field(1000, "order_ts_hour", int32(), true);
auto partition_schema =
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
auto expected_entries = PreparePartitionedTestData();
TestManifestReading("56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro", expected_entries,
partition_schema);
}

TEST_F(ManifestReaderV1Test, WritePartitionedTest) {
iceberg::SchemaField table_field(1, "order_ts_hour_source", iceberg::int32(), true);
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
TEST_F(ManifestV1Test, WritePartitionedTest) {
SchemaField table_field(1, "order_ts_hour_source", int32(), true);
SchemaField partition_field(1000, "order_ts_hour", int32(), true);
auto table_schema = std::make_shared<Schema>(std::vector<SchemaField>({table_field}));
auto partition_schema =
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
Expand All @@ -194,7 +194,7 @@ TEST_F(ManifestReaderV1Test, WritePartitionedTest) {
TestManifestReadingByPath(write_manifest_path, expected_entries, partition_schema);
}

class ManifestReaderV2Test : public ManifestReaderTestBase {
class ManifestV2Test : public ManifestReaderWriterTestBase {
protected:
std::vector<ManifestEntry> CreateV2TestData(
std::optional<int64_t> sequence_number = std::nullopt,
Expand Down Expand Up @@ -276,12 +276,12 @@ class ManifestReaderV2Test : public ManifestReaderTestBase {
}
};

TEST_F(ManifestReaderV2Test, NonPartitionedTest) {
TEST_F(ManifestV2Test, ReadNonPartitionedTest) {
auto expected_entries = PrepareNonPartitionedTestData();
TestManifestReading("2ddf1bc9-830b-4015-aced-c060df36f150-m0.avro", expected_entries);
}

TEST_F(ManifestReaderV2Test, MetadataInheritanceTest) {
TEST_F(ManifestV2Test, ReadMetadataInheritanceTest) {
std::string path = GetResourcePath("2ddf1bc9-830b-4015-aced-c060df36f150-m0.avro");
ManifestFile manifest_file{
.manifest_path = path,
Expand All @@ -295,9 +295,9 @@ TEST_F(ManifestReaderV2Test, MetadataInheritanceTest) {
TestManifestReadingWithManifestFile(manifest_file, expected_entries);
}

TEST_F(ManifestReaderV2Test, WriteNonPartitionedTest) {
iceberg::SchemaField table_field(1, "order_ts_hour_source", int32(), true);
iceberg::SchemaField partition_field(1000, "order_ts_hour", int32(), true);
TEST_F(ManifestV2Test, WriteNonPartitionedTest) {
SchemaField table_field(1, "order_ts_hour_source", int32(), true);
SchemaField partition_field(1000, "order_ts_hour", int32(), true);
auto table_schema = std::make_shared<Schema>(std::vector<SchemaField>({table_field}));
auto expected_entries = PrepareNonPartitionedTestData();
auto write_manifest_path = CreateNewTempFilePath();
Expand All @@ -306,9 +306,9 @@ TEST_F(ManifestReaderV2Test, WriteNonPartitionedTest) {
TestManifestReadingByPath(write_manifest_path, expected_entries);
}

TEST_F(ManifestReaderV2Test, WriteInheritancePartitionedTest) {
iceberg::SchemaField table_field(1, "order_ts_hour_source", int32(), true);
iceberg::SchemaField partition_field(1000, "order_ts_hour", int32(), true);
TEST_F(ManifestV2Test, WriteInheritancePartitionedTest) {
SchemaField table_field(1, "order_ts_hour_source", int32(), true);
SchemaField partition_field(1000, "order_ts_hour", int32(), true);
auto table_schema = std::make_shared<Schema>(std::vector<SchemaField>({table_field}));
auto expected_entries = PrepareMetadataInheritanceTestData();
auto write_manifest_path = CreateNewTempFilePath();
Expand Down
26 changes: 13 additions & 13 deletions src/iceberg/test/partition_spec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ TEST(PartitionSpecTest, PartitionSchemaTest) {
PartitionField pt_field2(7, 1001, "hour", identity_transform);
PartitionSpec spec(100, {pt_field1, pt_field2});

auto partition_schema = spec.PartitionType(schema);
ASSERT_TRUE(partition_schema.has_value());
ASSERT_EQ(2, partition_schema.value()->fields().size());
EXPECT_EQ(pt_field1.name(), partition_schema.value()->fields()[0].name());
EXPECT_EQ(pt_field1.field_id(), partition_schema.value()->fields()[0].field_id());
EXPECT_EQ(pt_field2.name(), partition_schema.value()->fields()[1].name());
EXPECT_EQ(pt_field2.field_id(), partition_schema.value()->fields()[1].field_id());
auto partition_type = spec.PartitionType(schema);
ASSERT_TRUE(partition_type.has_value());
ASSERT_EQ(2, partition_type.value()->fields().size());
EXPECT_EQ(pt_field1.name(), partition_type.value()->fields()[0].name());
EXPECT_EQ(pt_field1.field_id(), partition_type.value()->fields()[0].field_id());
EXPECT_EQ(pt_field2.name(), partition_type.value()->fields()[1].name());
EXPECT_EQ(pt_field2.field_id(), partition_type.value()->fields()[1].field_id());
}

TEST(PartitionSpecTest, PartitionTypeTest) {
Expand Down Expand Up @@ -138,18 +138,18 @@ TEST(PartitionSpecTest, PartitionTypeTest) {
auto parsed_spec_result = PartitionSpecFromJson(schema, json);
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;

auto partition_schema = parsed_spec_result.value()->PartitionType(*schema);
auto partition_type = parsed_spec_result.value()->PartitionType(*schema);

SchemaField pt_field1(1000, "ts_day", date(), true);
SchemaField pt_field2(1001, "id_bucket", int32(), true);
SchemaField pt_field3(1002, "id_truncate", string(), true);

ASSERT_TRUE(partition_schema.has_value());
ASSERT_EQ(3, partition_schema.value()->fields().size());
ASSERT_TRUE(partition_type.has_value());
ASSERT_EQ(3, partition_type.value()->fields().size());

EXPECT_EQ(pt_field1, partition_schema.value()->fields()[0]);
EXPECT_EQ(pt_field2, partition_schema.value()->fields()[1]);
EXPECT_EQ(pt_field3, partition_schema.value()->fields()[2]);
EXPECT_EQ(pt_field1, partition_type.value()->fields()[0]);
EXPECT_EQ(pt_field2, partition_type.value()->fields()[1]);
EXPECT_EQ(pt_field3, partition_type.value()->fields()[2]);
}

} // namespace iceberg
44 changes: 24 additions & 20 deletions src/iceberg/test/schema_field_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,47 @@
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {

TEST(SchemaFieldTest, Basics) {
{
iceberg::SchemaField field(1, "foo", iceberg::int32(), false);
SchemaField field(1, "foo", int32(), false);
EXPECT_EQ(1, field.field_id());
EXPECT_EQ("foo", field.name());
EXPECT_EQ(iceberg::TypeId::kInt, field.type()->type_id());
EXPECT_EQ(TypeId::kInt, field.type()->type_id());
EXPECT_FALSE(field.optional());
EXPECT_EQ("foo (1): int (required)", field.ToString());
EXPECT_EQ("foo (1): int (required)", std::format("{}", field));
}
{
iceberg::SchemaField field = iceberg::SchemaField::MakeOptional(
2, "foo bar", std::make_shared<iceberg::FixedType>(10));
SchemaField field =
SchemaField::MakeOptional(2, "foo bar", std::make_shared<FixedType>(10));
EXPECT_EQ(2, field.field_id());
EXPECT_EQ("foo bar", field.name());
EXPECT_EQ(iceberg::FixedType(10), *field.type());
EXPECT_EQ(FixedType(10), *field.type());
EXPECT_TRUE(field.optional());
EXPECT_EQ("foo bar (2): fixed(10) (optional)", field.ToString());
EXPECT_EQ("foo bar (2): fixed(10) (optional)", std::format("{}", field));
}
{
iceberg::SchemaField field = iceberg::SchemaField::MakeRequired(
2, "foo bar", std::make_shared<iceberg::FixedType>(10));
SchemaField field =
SchemaField::MakeRequired(2, "foo bar", std::make_shared<FixedType>(10));
EXPECT_EQ(2, field.field_id());
EXPECT_EQ("foo bar", field.name());
EXPECT_EQ(iceberg::FixedType(10), *field.type());
EXPECT_EQ(FixedType(10), *field.type());
EXPECT_FALSE(field.optional());
EXPECT_EQ("foo bar (2): fixed(10) (required)", field.ToString());
EXPECT_EQ("foo bar (2): fixed(10) (required)", std::format("{}", field));
}
}

TEST(SchemaFieldTest, Equality) {
iceberg::SchemaField field1(1, "foo", iceberg::int32(), false);
iceberg::SchemaField field2(2, "foo", iceberg::int32(), false);
iceberg::SchemaField field3(1, "bar", iceberg::int32(), false);
iceberg::SchemaField field4(1, "foo", iceberg::int64(), false);
iceberg::SchemaField field5(1, "foo", iceberg::int32(), true);
iceberg::SchemaField field6(1, "foo", iceberg::int32(), false);
SchemaField field1(1, "foo", int32(), false);
SchemaField field2(2, "foo", int32(), false);
SchemaField field3(1, "bar", int32(), false);
SchemaField field4(1, "foo", int64(), false);
SchemaField field5(1, "foo", int32(), true);
SchemaField field6(1, "foo", int32(), false);

ASSERT_EQ(field1, field1);
ASSERT_NE(field1, field2);
Expand All @@ -82,25 +84,27 @@ TEST(SchemaFieldTest, Equality) {

TEST(SchemaFieldTest, WithDoc) {
{
iceberg::SchemaField field(/*field_id=*/1, /*name=*/"foo", iceberg::int32(),
/*optional=*/false, /*doc=*/"Field documentation");
SchemaField field(/*field_id=*/1, /*name=*/"foo", int32(),
/*optional=*/false, /*doc=*/"Field documentation");
EXPECT_EQ(1, field.field_id());
EXPECT_EQ("foo", field.name());
EXPECT_EQ(iceberg::TypeId::kInt, field.type()->type_id());
EXPECT_EQ(TypeId::kInt, field.type()->type_id());
EXPECT_FALSE(field.optional());
EXPECT_EQ("Field documentation", field.doc());
EXPECT_EQ("foo (1): int (required) - Field documentation", field.ToString());
}
{
iceberg::SchemaField field = iceberg::SchemaField::MakeOptional(
SchemaField field = SchemaField::MakeOptional(
/*field_id=*/2, /*name=*/"bar",
/*type=*/std::make_shared<iceberg::FixedType>(10),
/*type=*/std::make_shared<FixedType>(10),
/*doc=*/"Field with 10 bytes");
EXPECT_EQ(2, field.field_id());
EXPECT_EQ("bar", field.name());
EXPECT_EQ(iceberg::FixedType(10), *field.type());
EXPECT_EQ(FixedType(10), *field.type());
EXPECT_TRUE(field.optional());
EXPECT_EQ("Field with 10 bytes", field.doc());
EXPECT_EQ("bar (2): fixed(10) (optional) - Field with 10 bytes", field.ToString());
}
}

} // namespace iceberg
Loading