diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 49b149a49..400e59e84 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -369,13 +369,28 @@ struct PAIMON_EXPORT Options { /// bytes. Default value is "false". static const char BLOB_AS_DESCRIPTOR[]; /// "blob-field" - Specifies column names that should be stored as blob type. This is used - /// when you want to treat a BYTES column as a BLOB. Comma-separated field names. - /// Multiple blob fields are supported. + /// when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or + /// blob-view-field are also treated as BLOB fields. Comma-separated field names. Multiple blob + /// fields are supported. No default value. static const char BLOB_FIELD[]; - // TODO(xinyu.lxy): support "blob-descriptor-field" - treat fields as BLOB and store as - // BlobDescriptor - // TODO(xinyu.lxy): support "blob-view-field" - treat fields as BLOB and resolve from upstream - // tables + /// "blob-descriptor-field" - Comma-separated field names to treat as BLOB fields and store as + /// serialized BlobDescriptor bytes inline in data files. No default value. + static const char BLOB_DESCRIPTOR_FIELD[]; + /// "blob.stored-descriptor-fields" deprecated as a fallback for `BLOB_DESCRIPTOR_FIELD`. + static const char FALLBACK_BLOB_DESCRIPTOR_FIELD[]; + /// "blob-view-field" - Comma-separated field names to treat as BLOB fields and store as + /// serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at + /// read time. No default value. + static const char BLOB_VIEW_FIELD[]; + /// "blob-external-storage-field" - Comma-separated BLOB field names (must be a subset of + /// blob-descriptor-field ) whose raw data will be written to external storage at write time. + /// The external storage path is configured via blob-external-storage-path. Orphan file cleanup + /// is not applied to that path. No default value. + static const char BLOB_EXTERNAL_STORAGE_FIELD[]; + /// "blob-external-storage-path" - The external storage path where raw BLOB data from fields + /// configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is + /// not applied to this path. No default value. + static const char BLOB_EXTERNAL_STORAGE_PATH[]; /// "global-index.enabled" - Whether to enable global index for scan. Default value is "true". static const char GLOBAL_INDEX_ENABLED[]; /// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 9f495e55c..51fcc7efb 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -279,6 +279,7 @@ set(PAIMON_CORE_SRCS core/operation/key_value_file_store_write.cpp core/operation/manifest_file_merger.cpp core/operation/merge_file_split_read.cpp + core/operation/blob_file_context.cpp core/operation/orphan_files_cleaner.cpp core/operation/orphan_files_cleaner_impl.cpp core/operation/raw_file_split_read.cpp @@ -675,6 +676,7 @@ if(PAIMON_BUILD_TESTS) core/operation/file_store_write_test.cpp core/operation/manifest_file_merger_test.cpp core/operation/merge_file_split_read_test.cpp + core/operation/blob_file_context_test.cpp core/operation/orphan_files_cleaner_test.cpp core/operation/raw_file_split_read_test.cpp core/operation/read_context_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 23330fe7c..b328213d6 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -93,6 +93,11 @@ const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled"; const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name"; const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor"; const char Options::BLOB_FIELD[] = "blob-field"; +const char Options::BLOB_DESCRIPTOR_FIELD[] = "blob-descriptor-field"; +const char Options::FALLBACK_BLOB_DESCRIPTOR_FIELD[] = "blob.stored-descriptor-fields"; +const char Options::BLOB_VIEW_FIELD[] = "blob-view-field"; +const char Options::BLOB_EXTERNAL_STORAGE_FIELD[] = "blob-external-storage-field"; +const char Options::BLOB_EXTERNAL_STORAGE_PATH[] = "blob-external-storage-path"; const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled"; const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num"; const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path"; diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index dac20c139..d8947b55b 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -76,12 +76,15 @@ class ConfigParser { // Parse list configurations template - Status ParseList(const std::string& key, const std::string& delimiter, - std::vector* list) const { + Status ParseList(const std::string& key, const std::string& delimiter, std::vector* list, + bool need_trim = false) const { auto iter = config_map_.find(key); if (iter != config_map_.end()) { auto value_str_vec = StringUtils::Split(iter->second, delimiter, /*ignore_empty=*/true); - for (const auto& value_str : value_str_vec) { + for (auto& value_str : value_str_vec) { + if (need_trim) { + StringUtils::Trim(&value_str); + } if constexpr (std::is_same_v) { list->emplace_back(value_str); } else { @@ -375,6 +378,9 @@ struct CoreOptions::Impl { std::vector sequence_field; std::vector remove_record_on_sequence_group; std::vector blob_fields; + std::vector blob_descriptor_fields; + std::vector blob_view_fields; + std::vector blob_external_storage_fields; std::string partition_default_name = "__DEFAULT_PARTITION__"; StartupMode startup_mode = StartupMode::Default(); @@ -387,6 +393,7 @@ struct CoreOptions::Impl { std::optional field_default_func; std::optional scan_fallback_branch; std::optional data_file_external_paths; + std::optional blob_external_storage_path; std::map raw_options; @@ -537,7 +544,27 @@ struct CoreOptions::Impl { PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type)); // Parse blob-field - column names to store as blob type, comma separated PAIMON_RETURN_NOT_OK(parser.ParseList( - Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields)); + Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields, /*need_trim=*/true)); + // Parse blob-descriptor-field - BLOB fields stored inline as serialized descriptors + PAIMON_RETURN_NOT_OK( + parser.ParseList(Options::BLOB_DESCRIPTOR_FIELD, Options::FIELDS_SEPARATOR, + &blob_descriptor_fields, /*need_trim=*/true)); + if (blob_descriptor_fields.empty()) { + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, Options::FIELDS_SEPARATOR, + &blob_descriptor_fields, /*need_trim=*/true)); + } + // Parse blob-view-field - BLOB fields stored inline as serialized view metadata + PAIMON_RETURN_NOT_OK(parser.ParseList(Options::BLOB_VIEW_FIELD, + Options::FIELDS_SEPARATOR, + &blob_view_fields, /*need_trim=*/true)); + // Parse blob-external-storage-field - descriptor BLOB fields written to external storage + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::BLOB_EXTERNAL_STORAGE_FIELD, Options::FIELDS_SEPARATOR, + &blob_external_storage_fields, /*need_trim=*/true)); + // Parse blob-external-storage-path - external storage path for configured BLOB fields + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::BLOB_EXTERNAL_STORAGE_PATH, &blob_external_storage_path)); return Status::OK(); } @@ -1389,6 +1416,29 @@ const std::vector& CoreOptions::GetBlobFields() const { return impl_->blob_fields; } +const std::vector& CoreOptions::GetBlobDescriptorFields() const { + return impl_->blob_descriptor_fields; +} + +const std::vector& CoreOptions::GetBlobViewFields() const { + return impl_->blob_view_fields; +} + +std::vector CoreOptions::GetBlobInlineFields() const { + std::vector blob_inline_fields = impl_->blob_descriptor_fields; + blob_inline_fields.insert(blob_inline_fields.end(), impl_->blob_view_fields.begin(), + impl_->blob_view_fields.end()); + return blob_inline_fields; +} + +const std::vector& CoreOptions::GetBlobExternalStorageFields() const { + return impl_->blob_external_storage_fields; +} + +std::optional CoreOptions::GetBlobExternalStoragePath() const { + return impl_->blob_external_storage_path; +} + int64_t CoreOptions::GetLookupCacheFileRetentionMs() const { return impl_->lookup_cache_file_retention_ms; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 3f96c9c9f..699506413 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -181,6 +181,11 @@ class PAIMON_EXPORT CoreOptions { std::optional GetGlobalIndexThreadNum() const; const std::vector& GetBlobFields() const; + const std::vector& GetBlobDescriptorFields() const; + const std::vector& GetBlobViewFields() const; + std::vector GetBlobInlineFields() const; + const std::vector& GetBlobExternalStorageFields() const; + std::optional GetBlobExternalStoragePath() const; const std::map& ToMap() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 9b968abce..4d8a26884 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -114,6 +114,11 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_TRUE(core_options.RowTrackingPartitionGroupOnCommit()); ASSERT_FALSE(core_options.DataEvolutionEnabled()); ASSERT_TRUE(core_options.GetBlobFields().empty()); + ASSERT_TRUE(core_options.GetBlobDescriptorFields().empty()); + ASSERT_TRUE(core_options.GetBlobViewFields().empty()); + ASSERT_TRUE(core_options.GetBlobInlineFields().empty()); + ASSERT_TRUE(core_options.GetBlobExternalStorageFields().empty()); + ASSERT_EQ(std::nullopt, core_options.GetBlobExternalStoragePath()); ASSERT_TRUE(core_options.LegacyPartitionNameEnabled()); ASSERT_TRUE(core_options.GlobalIndexEnabled()); ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath()); @@ -214,6 +219,10 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "false"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, {Options::BLOB_FIELD, "blob1,blob2"}, + {Options::BLOB_DESCRIPTOR_FIELD, "blob3,blob4"}, + {Options::BLOB_VIEW_FIELD, "blob5"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "blob3,blob4"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"}, {Options::PARTITION_GENERATE_LEGACY_NAME, "false"}, {Options::GLOBAL_INDEX_ENABLED, "false"}, {Options::GLOBAL_INDEX_THREAD_NUM, "4"}, @@ -343,6 +352,14 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_FALSE(core_options.RowTrackingPartitionGroupOnCommit()); ASSERT_TRUE(core_options.DataEvolutionEnabled()); ASSERT_EQ(core_options.GetBlobFields(), std::vector({"blob1", "blob2"})); + ASSERT_EQ(core_options.GetBlobDescriptorFields(), std::vector({"blob3", "blob4"})); + ASSERT_EQ(core_options.GetBlobViewFields(), std::vector({"blob5"})); + ASSERT_EQ(core_options.GetBlobInlineFields(), + std::vector({"blob3", "blob4", "blob5"})); + ASSERT_EQ(core_options.GetBlobExternalStorageFields(), + std::vector({"blob3", "blob4"})); + ASSERT_EQ(core_options.GetBlobExternalStoragePath(), + std::optional("FILE:///tmp/blob_external_storage/")); ASSERT_FALSE(core_options.LegacyPartitionNameEnabled()); ASSERT_FALSE(core_options.GlobalIndexEnabled()); ASSERT_EQ(core_options.GetGlobalIndexThreadNum(), 4); @@ -870,4 +887,20 @@ TEST(CoreOptionsTest, TestAssignmentIndependence) { ASSERT_EQ(MergeEngine::DEDUPLICATE, source.GetMergeEngine()); } +TEST(CoreOptionsTest, TestFallback) { + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, "b1,b2"}})); + ASSERT_EQ(options.GetBlobDescriptorFields(), std::vector({"b1", "b2"})); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, "b1,b2"}, + {Options::BLOB_DESCRIPTOR_FIELD, "new_b1 , new_b2"}})); + ASSERT_EQ(options.GetBlobDescriptorFields(), + std::vector({"new_b1", "new_b2"})); + } +} } // namespace paimon::test diff --git a/src/paimon/core/operation/blob_file_context.cpp b/src/paimon/core/operation/blob_file_context.cpp new file mode 100644 index 000000000..375932382 --- /dev/null +++ b/src/paimon/core/operation/blob_file_context.cpp @@ -0,0 +1,124 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/blob_file_context.h" + +#include + +#include "arrow/type.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/core/core_options.h" + +namespace paimon { + +BlobFileContext::BlobFileContext(std::set descriptor_fields, + std::set view_fields, + std::set inline_fields, + std::set external_storage_fields, + std::set blob_file_fields, + std::optional external_storage_path) + : descriptor_fields_(std::move(descriptor_fields)), + view_fields_(std::move(view_fields)), + inline_fields_(std::move(inline_fields)), + external_storage_fields_(std::move(external_storage_fields)), + blob_file_fields_(std::move(blob_file_fields)), + external_storage_path_(std::move(external_storage_path)) {} + +std::unique_ptr BlobFileContext::Create( + const std::shared_ptr& schema, const CoreOptions& options) { + // Check if there are any BLOB fields in the schema + bool has_blob = false; + for (int i = 0; i < schema->num_fields(); ++i) { + if (BlobUtils::IsBlobField(schema->field(i))) { + has_blob = true; + break; + } + } + if (!has_blob) { + return nullptr; + } + + // Populate descriptor fields + std::set descriptor_fields; + for (const auto& name : options.GetBlobDescriptorFields()) { + descriptor_fields.insert(name); + } + + // Populate view fields + std::set view_fields; + for (const auto& name : options.GetBlobViewFields()) { + view_fields.insert(name); + } + + // Populate inline fields from options (descriptor ∪ view) + std::set inline_fields; + for (const auto& name : options.GetBlobInlineFields()) { + inline_fields.insert(name); + } + + // Populate external storage fields + std::set external_storage_fields; + for (const auto& name : options.GetBlobExternalStorageFields()) { + external_storage_fields.insert(name); + } + + // Populate external storage path + std::optional external_storage_path = options.GetBlobExternalStoragePath(); + + // Determine blob_file_fields: BLOB fields that are NOT inline + std::set blob_file_fields; + for (int i = 0; i < schema->num_fields(); ++i) { + const auto& field = schema->field(i); + if (BlobUtils::IsBlobField(field) && inline_fields.count(field->name()) == 0) { + blob_file_fields.insert(field->name()); + } + } + + return std::unique_ptr( + new BlobFileContext(std::move(descriptor_fields), std::move(view_fields), + std::move(inline_fields), std::move(external_storage_fields), + std::move(blob_file_fields), std::move(external_storage_path))); +} + +bool BlobFileContext::IsInlineField(const std::string& field_name) const { + return inline_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsBlobFileField(const std::string& field_name) const { + return blob_file_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsDescriptorField(const std::string& field_name) const { + return descriptor_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsViewField(const std::string& field_name) const { + return view_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsExternalStorageField(const std::string& field_name) const { + return external_storage_fields_.count(field_name) > 0; +} + +bool BlobFileContext::RequireBlobFileWriter() const { + return !blob_file_fields_.empty(); +} + +bool BlobFileContext::RequireExternalStorageWriter() const { + return !external_storage_fields_.empty(); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/blob_file_context.h b/src/paimon/core/operation/blob_file_context.h new file mode 100644 index 000000000..2caf675a0 --- /dev/null +++ b/src/paimon/core/operation/blob_file_context.h @@ -0,0 +1,110 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +class CoreOptions; + +/// Context that classifies BLOB fields into different storage categories. +/// +/// Categories: +/// - descriptor_fields: stored as BlobDescriptor bytes inline in the main data file. +/// - view_fields: stored as BlobViewStruct bytes inline in the main data file. +/// - inline_fields: descriptor_fields ∪ view_fields. These stay in the main data file. +/// - external_storage_fields: subset of descriptor_fields whose raw data is written to an +/// external storage path (the descriptor still goes into the main data file). +/// - blob_file_fields: BLOB fields that are NOT inline. These go into separate .blob files. +class BlobFileContext { + public: + /// Creates a BlobFileContext from schema and options. + /// Returns nullptr if the schema has no BLOB fields at all. + /// Otherwise always returns a valid context (even if all blobs are inline). + static std::unique_ptr Create(const std::shared_ptr& schema, + const CoreOptions& options); + + /// Returns true if the given field should be stored inline in the main data file + /// (either as descriptor bytes or view bytes). + bool IsInlineField(const std::string& field_name) const; + + /// Returns true if the given field should be written to a separate .blob file. + bool IsBlobFileField(const std::string& field_name) const; + + /// Returns true if the given field is a descriptor field. + bool IsDescriptorField(const std::string& field_name) const; + + /// Returns true if the given field is a view field. + bool IsViewField(const std::string& field_name) const; + + /// Returns true if the given field should be written to external storage. + bool IsExternalStorageField(const std::string& field_name) const; + + /// Returns true if there are any BLOB fields that need a .blob file writer. + bool RequireBlobFileWriter() const; + + /// Returns true if there are any external storage fields that need an external writer. + bool RequireExternalStorageWriter() const; + + const std::set& GetDescriptorFields() const { + return descriptor_fields_; + } + + const std::set& GetViewFields() const { + return view_fields_; + } + + const std::set& GetInlineFields() const { + return inline_fields_; + } + + const std::set& GetExternalStorageFields() const { + return external_storage_fields_; + } + + const std::set& GetBlobFileFields() const { + return blob_file_fields_; + } + + const std::optional& GetExternalStoragePath() const { + return external_storage_path_; + } + + private: + BlobFileContext(std::set descriptor_fields, std::set view_fields, + std::set inline_fields, + std::set external_storage_fields, + std::set blob_file_fields, + std::optional external_storage_path); + + std::set descriptor_fields_; + std::set view_fields_; + std::set inline_fields_; + std::set external_storage_fields_; + std::set blob_file_fields_; + std::optional external_storage_path_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/blob_file_context_test.cpp b/src/paimon/core/operation/blob_file_context_test.cpp new file mode 100644 index 000000000..70156f3bc --- /dev/null +++ b/src/paimon/core/operation/blob_file_context_test.cpp @@ -0,0 +1,199 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/blob_file_context.h" + +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/data/blob.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon { + +class BlobFileContextTest : public ::testing::Test { + protected: + static std::shared_ptr MakeSchema(const std::vector& normal_fields, + const std::vector& blob_fields) { + std::vector> fields; + for (const auto& name : normal_fields) { + fields.push_back(arrow::field(name, arrow::int32())); + } + for (const auto& name : blob_fields) { + fields.push_back(BlobUtils::ToArrowField(name)); + } + return arrow::schema(fields); + } +}; + +TEST_F(BlobFileContextTest, NoBlobFields) { + auto schema = MakeSchema({"id", "name"}, {}); + std::map opts_map; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_FALSE(context); +} + +TEST_F(BlobFileContextTest, AllInlineNoExternalStorage) { + auto schema = MakeSchema({"id"}, {"image", "video"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "image"}, + {Options::BLOB_VIEW_FIELD, "video"}, + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + // All blobs are inline but context is still valid for callers to query inline fields + ASSERT_TRUE(context); + ASSERT_EQ(context->GetInlineFields(), std::set({"image", "video"})); + ASSERT_TRUE(context->GetBlobFileFields().empty()); + ASSERT_FALSE(context->RequireBlobFileWriter()); + ASSERT_FALSE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, MixedInlineAndBlobFile) { + auto schema = MakeSchema({"id"}, {"image", "video", "audio"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "image"}, + // video and audio are not configured as inline -> go to .blob files + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + // descriptor fields + ASSERT_EQ(context->GetDescriptorFields(), std::set({"image"})); + + // view fields (none configured) + ASSERT_TRUE(context->GetViewFields().empty()); + + // inline = descriptor ∪ view + ASSERT_EQ(context->GetInlineFields(), std::set({"image"})); + + // blob file fields = non-inline blob fields + ASSERT_EQ(context->GetBlobFileFields(), std::set({"video", "audio"})); + + // Query methods + ASSERT_TRUE(context->IsInlineField("image")); + ASSERT_TRUE(context->IsDescriptorField("image")); + ASSERT_FALSE(context->IsViewField("image")); + ASSERT_FALSE(context->IsBlobFileField("image")); + + ASSERT_FALSE(context->IsInlineField("video")); + ASSERT_TRUE(context->IsBlobFileField("video")); + + ASSERT_FALSE(context->IsInlineField("audio")); + ASSERT_TRUE(context->IsBlobFileField("audio")); + + // Requires blob file writer for video and audio + ASSERT_TRUE(context->RequireBlobFileWriter()); + ASSERT_FALSE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, ExternalStorageFields) { + auto schema = MakeSchema({"id"}, {"image", "video"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "image,video"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "image"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "oss://bucket/blob/"}, + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + ASSERT_EQ(context->GetDescriptorFields(), std::set({"image", "video"})); + ASSERT_EQ(context->GetInlineFields(), std::set({"image", "video"})); + ASSERT_EQ(context->GetExternalStorageFields(), std::set({"image"})); + ASSERT_TRUE(context->GetExternalStoragePath()); + ASSERT_EQ(context->GetExternalStoragePath(), "oss://bucket/blob/"); + ASSERT_TRUE(context->GetBlobFileFields().empty()); + + ASSERT_TRUE(context->IsExternalStorageField("image")); + ASSERT_FALSE(context->IsExternalStorageField("video")); + + ASSERT_FALSE(context->RequireBlobFileWriter()); + ASSERT_TRUE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, ViewFields) { + auto schema = MakeSchema({"id"}, {"ref_image", "raw_blob"}); + std::map opts_map = { + {Options::BLOB_VIEW_FIELD, "ref_image"}, + // raw_blob not configured -> goes to .blob file + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + ASSERT_TRUE(context->GetDescriptorFields().empty()); + ASSERT_EQ(context->GetViewFields(), std::set({"ref_image"})); + ASSERT_EQ(context->GetInlineFields(), std::set({"ref_image"})); + ASSERT_EQ(context->GetBlobFileFields(), std::set({"raw_blob"})); + + ASSERT_TRUE(context->IsInlineField("ref_image")); + ASSERT_TRUE(context->IsViewField("ref_image")); + ASSERT_FALSE(context->IsDescriptorField("ref_image")); + + ASSERT_TRUE(context->RequireBlobFileWriter()); + ASSERT_FALSE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, DescriptorAndViewTogether) { + auto schema = MakeSchema({"id"}, {"desc_blob", "view_blob", "normal_blob"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "desc_blob"}, + {Options::BLOB_VIEW_FIELD, "view_blob"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "desc_blob"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "/tmp/ext/"}, + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + ASSERT_EQ(context->GetDescriptorFields(), std::set({"desc_blob"})); + ASSERT_EQ(context->GetViewFields(), std::set({"view_blob"})); + ASSERT_EQ(context->GetInlineFields(), std::set({"desc_blob", "view_blob"})); + ASSERT_EQ(context->GetExternalStorageFields(), std::set({"desc_blob"})); + ASSERT_TRUE(context->GetExternalStoragePath()); + ASSERT_EQ(context->GetExternalStoragePath(), "/tmp/ext/"); + ASSERT_EQ(context->GetBlobFileFields(), std::set({"normal_blob"})); + + ASSERT_TRUE(context->IsDescriptorField("desc_blob")); + ASSERT_TRUE(context->IsExternalStorageField("desc_blob")); + ASSERT_TRUE(context->IsInlineField("desc_blob")); + ASSERT_FALSE(context->IsBlobFileField("desc_blob")); + + ASSERT_TRUE(context->IsViewField("view_blob")); + ASSERT_TRUE(context->IsInlineField("view_blob")); + ASSERT_FALSE(context->IsDescriptorField("view_blob")); + + ASSERT_FALSE(context->IsInlineField("normal_blob")); + ASSERT_TRUE(context->IsBlobFileField("normal_blob")); + + // Non-existent field + ASSERT_FALSE(context->IsInlineField("not_exist")); + ASSERT_FALSE(context->IsBlobFileField("not_exist")); + + ASSERT_TRUE(context->RequireBlobFileWriter()); + ASSERT_TRUE(context->RequireExternalStorageWriter()); +} + +} // namespace paimon diff --git a/src/paimon/core/schema/schema_validation.cpp b/src/paimon/core/schema/schema_validation.cpp index b8c4bb692..8a92b7e90 100644 --- a/src/paimon/core/schema/schema_validation.cpp +++ b/src/paimon/core/schema/schema_validation.cpp @@ -411,50 +411,97 @@ Status SchemaValidation::ValidateRowTracking(const TableSchema& table_schema, !options.DeletionVectorsEnabled(), "Data evolution config must disabled with deletion-vectors.enabled")); } + + std::vector blob_names; + for (const auto& field : table_schema.Fields()) { + if (BlobUtils::IsBlobField(field.ArrowField())) { + blob_names.push_back(field.Name()); + } + } + if (!blob_names.empty()) { + // Validate blob fields cannot be partition keys + for (const auto& blob_field_name : blob_names) { + if (std::find(table_schema.PartitionKeys().begin(), table_schema.PartitionKeys().end(), + blob_field_name) != table_schema.PartitionKeys().end()) { + return Status::Invalid( + fmt::format("Blob field {} cannot be a partition key.", blob_field_name)); + } + } + + // Validate data evolution must be enabled when blob-field is configured + PAIMON_RETURN_NOT_OK(Preconditions::CheckState( + options.DataEvolutionEnabled(), + "Data evolution config must be enabled for table with BLOB type column.")); + PAIMON_RETURN_NOT_OK(Preconditions::CheckState( + table_schema.Fields().size() > blob_names.size(), + "Table with BLOB type column must have other normal columns.")); + } return Status::OK(); } Status SchemaValidation::ValidateBlobFields(const TableSchema& schema, const CoreOptions& options) { const auto& configured_blob_names = options.GetBlobFields(); - if (configured_blob_names.empty()) { + const auto& blob_descriptor_names = options.GetBlobDescriptorFields(); + const auto& blob_view_names = options.GetBlobViewFields(); + const auto& blob_external_storage_names = options.GetBlobExternalStorageFields(); + std::vector configured_blob_like_names = configured_blob_names; + configured_blob_like_names.insert(configured_blob_like_names.end(), + blob_descriptor_names.begin(), blob_descriptor_names.end()); + configured_blob_like_names.insert(configured_blob_like_names.end(), blob_view_names.begin(), + blob_view_names.end()); + if (configured_blob_like_names.empty() && blob_external_storage_names.empty()) { return Status::OK(); } - PAIMON_ASSIGN_OR_RAISE(std::vector blob_fields, - schema.GetFields(configured_blob_names)); - for (const auto& blob_field : blob_fields) { - if (!BlobUtils::IsBlobField(blob_field.ArrowField())) { - return Status::Invalid( - fmt::format("Field {} in {} must be a BLOB field in table schema.", - blob_field.Name(), fmt::join(configured_blob_names, ", "))); + auto validate_blob_fields = [&](const std::vector& field_names, + const std::string& option_key) -> Status { + if (field_names.empty()) { + return Status::OK(); } - } + PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(field_names, option_key)); + PAIMON_ASSIGN_OR_RAISE(std::vector blob_fields, schema.GetFields(field_names)); + for (const auto& blob_field : blob_fields) { + if (!BlobUtils::IsBlobField(blob_field.ArrowField())) { + return Status::Invalid( + fmt::format("Field '{}' in '{}' must be a BLOB field in table schema.", + blob_field.Name(), option_key)); + } + } + return Status::OK(); + }; - // Validate no duplicate blob field names - PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(configured_blob_names, "blob field")); + PAIMON_RETURN_NOT_OK(validate_blob_fields(configured_blob_names, Options::BLOB_FIELD)); + PAIMON_RETURN_NOT_OK( + validate_blob_fields(blob_descriptor_names, Options::BLOB_DESCRIPTOR_FIELD)); + PAIMON_RETURN_NOT_OK(validate_blob_fields(blob_view_names, Options::BLOB_VIEW_FIELD)); + PAIMON_RETURN_NOT_OK( + validate_blob_fields(blob_external_storage_names, Options::BLOB_EXTERNAL_STORAGE_FIELD)); + + std::set blob_descriptor_name_set(blob_descriptor_names.begin(), + blob_descriptor_names.end()); + for (const auto& blob_view_name : blob_view_names) { + if (blob_descriptor_name_set.count(blob_view_name) > 0) { + return Status::Invalid(fmt::format("Field '{}' in '{}' can not also be in '{}'.", + blob_view_name, Options::BLOB_VIEW_FIELD, + Options::BLOB_DESCRIPTOR_FIELD)); + } + } - // Validate blob fields cannot be primary keys or partition keys - for (const auto& blob_field_name : configured_blob_names) { - if (std::find(schema.PrimaryKeys().begin(), schema.PrimaryKeys().end(), blob_field_name) != - schema.PrimaryKeys().end()) { + for (const auto& blob_external_storage_name : blob_external_storage_names) { + if (blob_descriptor_name_set.count(blob_external_storage_name) == 0) { return Status::Invalid( - fmt::format("Blob field {} cannot be a primary key.", blob_field_name)); + fmt::format("Field '{}' in '{}' must also be in '{}'.", blob_external_storage_name, + Options::BLOB_EXTERNAL_STORAGE_FIELD, Options::BLOB_DESCRIPTOR_FIELD)); } - if (std::find(schema.PartitionKeys().begin(), schema.PartitionKeys().end(), - blob_field_name) != schema.PartitionKeys().end()) { - return Status::Invalid( - fmt::format("Blob field {} cannot be a partition key.", blob_field_name)); + } + if (!blob_external_storage_names.empty()) { + auto external_storage_path = options.GetBlobExternalStoragePath(); + if (!external_storage_path || external_storage_path->empty()) { + return Status::Invalid(fmt::format("'{}' must be set when '{}' is configured.", + Options::BLOB_EXTERNAL_STORAGE_PATH, + Options::BLOB_EXTERNAL_STORAGE_FIELD)); } } - - // Validate data evolution must be enabled when blob-field is configured - PAIMON_RETURN_NOT_OK(Preconditions::CheckState( - options.DataEvolutionEnabled(), - "Data evolution config must be enabled for table with BLOB type column.")); - PAIMON_RETURN_NOT_OK( - Preconditions::CheckState(schema.Fields().size() > configured_blob_names.size(), - "Table with BLOB type column must have other normal columns.")); - // TODO(xinyu.lxy): validate blob-descriptor-field and blob-view-field return Status::OK(); } diff --git a/src/paimon/core/schema/schema_validation_test.cpp b/src/paimon/core/schema/schema_validation_test.cpp index 9b28ff5d5..5e2c797d7 100644 --- a/src/paimon/core/schema/schema_validation_test.cpp +++ b/src/paimon/core/schema/schema_validation_test.cpp @@ -23,7 +23,6 @@ #include "paimon/common/data/blob_utils.h" #include "paimon/core/schema/table_schema.h" #include "paimon/defs.h" -#include "paimon/result.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -97,6 +96,93 @@ TEST(SchemaValidationTest, TestWithBlobField) { TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema)); } + { + arrow::FieldVector fields = {f0, f1, f2, f3, f4}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f1"}; + std::map options = { + {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_DESCRIPTOR_FIELD, "f3"}, + {Options::BLOB_VIEW_FIELD, "f4"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f3"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema)); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f1"}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_DESCRIPTOR_FIELD, "f0"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG( + SchemaValidation::ValidateTableSchema(*table_schema), + "Field 'f0' in 'blob-descriptor-field' must be a BLOB field in table schema."); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f1"}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_DESCRIPTOR_FIELD, "f3"}, + {Options::BLOB_VIEW_FIELD, "f3"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG( + SchemaValidation::ValidateTableSchema(*table_schema), + "Field 'f3' in 'blob-view-field' can not also be in 'blob-descriptor-field'."); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3, f4}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f1"}; + std::map options = { + {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_DESCRIPTOR_FIELD, "f3"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f4"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG( + SchemaValidation::ValidateTableSchema(*table_schema), + "Field 'f4' in 'blob-external-storage-field' must also be in 'blob-descriptor-field'."); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f1"}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_DESCRIPTOR_FIELD, "f3"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "f3"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), + "'blob-external-storage-path' must be set when " + "'blob-external-storage-field' is configured."); + } { arrow::FieldVector fields = {f0, f1, f2, f3}; auto schema = arrow::schema(fields); @@ -156,7 +242,7 @@ TEST(SchemaValidationTest, TestWithBlobField) { std::shared_ptr table_schema, TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), - "Field f0 in f3, f0 must be a BLOB field in table schema."); + "Field 'f0' in 'blob-field' must be a BLOB field in table schema."); } { arrow::FieldVector fields = {f0, f1, f2, f3}; @@ -171,25 +257,9 @@ TEST(SchemaValidationTest, TestWithBlobField) { ASSERT_OK_AND_ASSIGN( std::shared_ptr table_schema, TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); - ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateBlobFields(*table_schema, core_options), + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateRowTracking(*table_schema, core_options), "Blob field f3 cannot be a partition key."); } - { - arrow::FieldVector fields = {f0, f1, f2, f3}; - auto schema = arrow::schema(fields); - std::vector primary_keys = {"f3"}; - std::vector partition_keys = {}; - std::map options = {{Options::BUCKET, "-1"}, - {Options::ROW_TRACKING_ENABLED, "true"}, - {Options::DATA_EVOLUTION_ENABLED, "true"}, - {Options::BLOB_FIELD, "f3"}}; - ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); - ASSERT_OK_AND_ASSIGN( - std::shared_ptr table_schema, - TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); - ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateBlobFields(*table_schema, core_options), - "Blob field f3 cannot be a primary key."); - } } TEST(SchemaValidationTest, TestDuplicateField) {