Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,28 @@ struct PAIMON_EXPORT Options {
/// bytes. Default value is "false".
static const char BLOB_AS_DESCRIPTOR[];
/// "blob-field" - Specifies column names that should be stored as blob type. This is used
/// when you want to treat a BYTES column as a BLOB. Comma-separated field names.
/// Multiple blob fields are supported.
/// when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or
/// blob-view-field are also treated as BLOB fields. Comma-separated field names. Multiple blob
/// fields are supported. No default value.
static const char BLOB_FIELD[];
// TODO(xinyu.lxy): support "blob-descriptor-field" - treat fields as BLOB and store as
// BlobDescriptor
// TODO(xinyu.lxy): support "blob-view-field" - treat fields as BLOB and resolve from upstream
// tables
/// "blob-descriptor-field" - Comma-separated field names to treat as BLOB fields and store as
/// serialized BlobDescriptor bytes inline in data files. No default value.
static const char BLOB_DESCRIPTOR_FIELD[];
Comment thread
lxy-9602 marked this conversation as resolved.
/// "blob.stored-descriptor-fields" deprecated as a fallback for `BLOB_DESCRIPTOR_FIELD`.
static const char FALLBACK_BLOB_DESCRIPTOR_FIELD[];
/// "blob-view-field" - Comma-separated field names to treat as BLOB fields and store as
/// serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at
/// read time. No default value.
static const char BLOB_VIEW_FIELD[];
/// "blob-external-storage-field" - Comma-separated BLOB field names (must be a subset of
/// blob-descriptor-field ) whose raw data will be written to external storage at write time.
/// The external storage path is configured via blob-external-storage-path. Orphan file cleanup
/// is not applied to that path. No default value.
static const char BLOB_EXTERNAL_STORAGE_FIELD[];
/// "blob-external-storage-path" - The external storage path where raw BLOB data from fields
/// configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is
/// not applied to this path. No default value.
static const char BLOB_EXTERNAL_STORAGE_PATH[];
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
/// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ set(PAIMON_CORE_SRCS
core/operation/key_value_file_store_write.cpp
core/operation/manifest_file_merger.cpp
core/operation/merge_file_split_read.cpp
core/operation/blob_file_context.cpp
core/operation/orphan_files_cleaner.cpp
core/operation/orphan_files_cleaner_impl.cpp
core/operation/raw_file_split_read.cpp
Expand Down Expand Up @@ -675,6 +676,7 @@ if(PAIMON_BUILD_TESTS)
core/operation/file_store_write_test.cpp
core/operation/manifest_file_merger_test.cpp
core/operation/merge_file_split_read_test.cpp
core/operation/blob_file_context_test.cpp
core/operation/orphan_files_cleaner_test.cpp
core/operation/raw_file_split_read_test.cpp
core/operation/read_context_test.cpp
Expand Down
5 changes: 5 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled";
const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
const char Options::BLOB_FIELD[] = "blob-field";
const char Options::BLOB_DESCRIPTOR_FIELD[] = "blob-descriptor-field";
const char Options::FALLBACK_BLOB_DESCRIPTOR_FIELD[] = "blob.stored-descriptor-fields";
const char Options::BLOB_VIEW_FIELD[] = "blob-view-field";
const char Options::BLOB_EXTERNAL_STORAGE_FIELD[] = "blob-external-storage-field";
const char Options::BLOB_EXTERNAL_STORAGE_PATH[] = "blob-external-storage-path";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
Expand Down
58 changes: 54 additions & 4 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,15 @@ class ConfigParser {

// Parse list configurations
template <typename T>
Status ParseList(const std::string& key, const std::string& delimiter,
std::vector<T>* list) const {
Status ParseList(const std::string& key, const std::string& delimiter, std::vector<T>* list,
bool need_trim = false) const {
auto iter = config_map_.find(key);
if (iter != config_map_.end()) {
auto value_str_vec = StringUtils::Split(iter->second, delimiter, /*ignore_empty=*/true);
for (const auto& value_str : value_str_vec) {
for (auto& value_str : value_str_vec) {
if (need_trim) {
StringUtils::Trim(&value_str);
}
if constexpr (std::is_same_v<T, std::string>) {
list->emplace_back(value_str);
} else {
Expand Down Expand Up @@ -375,6 +378,9 @@ struct CoreOptions::Impl {
std::vector<std::string> sequence_field;
std::vector<std::string> remove_record_on_sequence_group;
std::vector<std::string> blob_fields;
std::vector<std::string> blob_descriptor_fields;
std::vector<std::string> blob_view_fields;
std::vector<std::string> blob_external_storage_fields;

std::string partition_default_name = "__DEFAULT_PARTITION__";
StartupMode startup_mode = StartupMode::Default();
Expand All @@ -387,6 +393,7 @@ struct CoreOptions::Impl {
std::optional<std::string> field_default_func;
std::optional<std::string> scan_fallback_branch;
std::optional<std::string> data_file_external_paths;
std::optional<std::string> blob_external_storage_path;

std::map<std::string, std::string> raw_options;

Expand Down Expand Up @@ -537,7 +544,27 @@ struct CoreOptions::Impl {
PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type));
// Parse blob-field - column names to store as blob type, comma separated
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields));
Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields, /*need_trim=*/true));
// Parse blob-descriptor-field - BLOB fields stored inline as serialized descriptors
PAIMON_RETURN_NOT_OK(
parser.ParseList<std::string>(Options::BLOB_DESCRIPTOR_FIELD, Options::FIELDS_SEPARATOR,
&blob_descriptor_fields, /*need_trim=*/true));
if (blob_descriptor_fields.empty()) {
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, Options::FIELDS_SEPARATOR,
&blob_descriptor_fields, /*need_trim=*/true));
}
// Parse blob-view-field - BLOB fields stored inline as serialized view metadata
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(Options::BLOB_VIEW_FIELD,
Options::FIELDS_SEPARATOR,
&blob_view_fields, /*need_trim=*/true));
// Parse blob-external-storage-field - descriptor BLOB fields written to external storage
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
Options::BLOB_EXTERNAL_STORAGE_FIELD, Options::FIELDS_SEPARATOR,
&blob_external_storage_fields, /*need_trim=*/true));
// Parse blob-external-storage-path - external storage path for configured BLOB fields
PAIMON_RETURN_NOT_OK(
parser.Parse(Options::BLOB_EXTERNAL_STORAGE_PATH, &blob_external_storage_path));
return Status::OK();
}

Expand Down Expand Up @@ -1389,6 +1416,29 @@ const std::vector<std::string>& CoreOptions::GetBlobFields() const {
return impl_->blob_fields;
}

const std::vector<std::string>& CoreOptions::GetBlobDescriptorFields() const {
return impl_->blob_descriptor_fields;
}

const std::vector<std::string>& CoreOptions::GetBlobViewFields() const {
return impl_->blob_view_fields;
}

std::vector<std::string> CoreOptions::GetBlobInlineFields() const {
std::vector<std::string> blob_inline_fields = impl_->blob_descriptor_fields;
blob_inline_fields.insert(blob_inline_fields.end(), impl_->blob_view_fields.begin(),
impl_->blob_view_fields.end());
return blob_inline_fields;
}

const std::vector<std::string>& CoreOptions::GetBlobExternalStorageFields() const {
return impl_->blob_external_storage_fields;
}

std::optional<std::string> CoreOptions::GetBlobExternalStoragePath() const {
return impl_->blob_external_storage_path;
}

int64_t CoreOptions::GetLookupCacheFileRetentionMs() const {
return impl_->lookup_cache_file_retention_ms;
}
Expand Down
5 changes: 5 additions & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class PAIMON_EXPORT CoreOptions {
std::optional<int32_t> GetGlobalIndexThreadNum() const;

const std::vector<std::string>& GetBlobFields() const;
const std::vector<std::string>& GetBlobDescriptorFields() const;
const std::vector<std::string>& GetBlobViewFields() const;
std::vector<std::string> GetBlobInlineFields() const;
const std::vector<std::string>& GetBlobExternalStorageFields() const;
std::optional<std::string> GetBlobExternalStoragePath() const;

const std::map<std::string, std::string>& ToMap() const;

Expand Down
33 changes: 33 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_TRUE(core_options.RowTrackingPartitionGroupOnCommit());
ASSERT_FALSE(core_options.DataEvolutionEnabled());
ASSERT_TRUE(core_options.GetBlobFields().empty());
ASSERT_TRUE(core_options.GetBlobDescriptorFields().empty());
ASSERT_TRUE(core_options.GetBlobViewFields().empty());
ASSERT_TRUE(core_options.GetBlobInlineFields().empty());
ASSERT_TRUE(core_options.GetBlobExternalStorageFields().empty());
ASSERT_EQ(std::nullopt, core_options.GetBlobExternalStoragePath());
ASSERT_TRUE(core_options.LegacyPartitionNameEnabled());
ASSERT_TRUE(core_options.GlobalIndexEnabled());
ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath());
Expand Down Expand Up @@ -214,6 +219,10 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "false"},
{Options::DATA_EVOLUTION_ENABLED, "true"},
{Options::BLOB_FIELD, "blob1,blob2"},
{Options::BLOB_DESCRIPTOR_FIELD, "blob3,blob4"},
{Options::BLOB_VIEW_FIELD, "blob5"},
{Options::BLOB_EXTERNAL_STORAGE_FIELD, "blob3,blob4"},
{Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"},
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
{Options::GLOBAL_INDEX_ENABLED, "false"},
{Options::GLOBAL_INDEX_THREAD_NUM, "4"},
Expand Down Expand Up @@ -343,6 +352,14 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_FALSE(core_options.RowTrackingPartitionGroupOnCommit());
ASSERT_TRUE(core_options.DataEvolutionEnabled());
ASSERT_EQ(core_options.GetBlobFields(), std::vector<std::string>({"blob1", "blob2"}));
ASSERT_EQ(core_options.GetBlobDescriptorFields(), std::vector<std::string>({"blob3", "blob4"}));
ASSERT_EQ(core_options.GetBlobViewFields(), std::vector<std::string>({"blob5"}));
ASSERT_EQ(core_options.GetBlobInlineFields(),
std::vector<std::string>({"blob3", "blob4", "blob5"}));
ASSERT_EQ(core_options.GetBlobExternalStorageFields(),
std::vector<std::string>({"blob3", "blob4"}));
ASSERT_EQ(core_options.GetBlobExternalStoragePath(),
std::optional<std::string>("FILE:///tmp/blob_external_storage/"));
ASSERT_FALSE(core_options.LegacyPartitionNameEnabled());
ASSERT_FALSE(core_options.GlobalIndexEnabled());
ASSERT_EQ(core_options.GetGlobalIndexThreadNum(), 4);
Expand Down Expand Up @@ -870,4 +887,20 @@ TEST(CoreOptionsTest, TestAssignmentIndependence) {
ASSERT_EQ(MergeEngine::DEDUPLICATE, source.GetMergeEngine());
}

TEST(CoreOptionsTest, TestFallback) {
{
ASSERT_OK_AND_ASSIGN(
CoreOptions options,
CoreOptions::FromMap({{Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, "b1,b2"}}));
ASSERT_EQ(options.GetBlobDescriptorFields(), std::vector<std::string>({"b1", "b2"}));
}
{
ASSERT_OK_AND_ASSIGN(
CoreOptions options,
CoreOptions::FromMap({{Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, "b1,b2"},
{Options::BLOB_DESCRIPTOR_FIELD, "new_b1 , new_b2"}}));
ASSERT_EQ(options.GetBlobDescriptorFields(),
std::vector<std::string>({"new_b1", "new_b2"}));
}
}
} // namespace paimon::test
124 changes: 124 additions & 0 deletions src/paimon/core/operation/blob_file_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/operation/blob_file_context.h"

#include <utility>

#include "arrow/type.h"
#include "paimon/common/data/blob_utils.h"
#include "paimon/core/core_options.h"

namespace paimon {

BlobFileContext::BlobFileContext(std::set<std::string> descriptor_fields,
std::set<std::string> view_fields,
std::set<std::string> inline_fields,
std::set<std::string> external_storage_fields,
std::set<std::string> blob_file_fields,
std::optional<std::string> external_storage_path)
: descriptor_fields_(std::move(descriptor_fields)),
view_fields_(std::move(view_fields)),
inline_fields_(std::move(inline_fields)),
external_storage_fields_(std::move(external_storage_fields)),
blob_file_fields_(std::move(blob_file_fields)),
external_storage_path_(std::move(external_storage_path)) {}

std::unique_ptr<BlobFileContext> BlobFileContext::Create(
const std::shared_ptr<arrow::Schema>& schema, const CoreOptions& options) {
// Check if there are any BLOB fields in the schema
bool has_blob = false;
for (int i = 0; i < schema->num_fields(); ++i) {
if (BlobUtils::IsBlobField(schema->field(i))) {
has_blob = true;
break;
}
}
if (!has_blob) {
return nullptr;
}

// Populate descriptor fields
std::set<std::string> descriptor_fields;
for (const auto& name : options.GetBlobDescriptorFields()) {
descriptor_fields.insert(name);
}

// Populate view fields
std::set<std::string> view_fields;
for (const auto& name : options.GetBlobViewFields()) {
view_fields.insert(name);
}

// Populate inline fields from options (descriptor ∪ view)
std::set<std::string> inline_fields;
for (const auto& name : options.GetBlobInlineFields()) {
inline_fields.insert(name);
}

// Populate external storage fields
std::set<std::string> external_storage_fields;
for (const auto& name : options.GetBlobExternalStorageFields()) {
external_storage_fields.insert(name);
}

// Populate external storage path
std::optional<std::string> external_storage_path = options.GetBlobExternalStoragePath();

// Determine blob_file_fields: BLOB fields that are NOT inline
std::set<std::string> blob_file_fields;
for (int i = 0; i < schema->num_fields(); ++i) {
const auto& field = schema->field(i);
if (BlobUtils::IsBlobField(field) && inline_fields.count(field->name()) == 0) {
blob_file_fields.insert(field->name());
Comment thread
lxy-9602 marked this conversation as resolved.
Comment thread
lxy-9602 marked this conversation as resolved.
}
}

return std::unique_ptr<BlobFileContext>(
new BlobFileContext(std::move(descriptor_fields), std::move(view_fields),
std::move(inline_fields), std::move(external_storage_fields),
std::move(blob_file_fields), std::move(external_storage_path)));
Comment thread
lxy-9602 marked this conversation as resolved.
}

bool BlobFileContext::IsInlineField(const std::string& field_name) const {
return inline_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsBlobFileField(const std::string& field_name) const {
return blob_file_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsDescriptorField(const std::string& field_name) const {
return descriptor_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsViewField(const std::string& field_name) const {
return view_fields_.count(field_name) > 0;
}

bool BlobFileContext::IsExternalStorageField(const std::string& field_name) const {
return external_storage_fields_.count(field_name) > 0;
}

bool BlobFileContext::RequireBlobFileWriter() const {
return !blob_file_fields_.empty();
}

bool BlobFileContext::RequireExternalStorageWriter() const {
return !external_storage_fields_.empty();
}

} // namespace paimon
Loading
Loading