Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
auto files = files_generator(current_manifest_file_content);
while (internal_data_index < files.size())
{
const auto & manifest_file_entry = files[internal_data_index++];
auto & manifest_file_entry = files[internal_data_index++];
if ((manifest_file_entry.schema_id != previous_entry_schema) && (use_partition_pruning))
{
previous_entry_schema = manifest_file_entry.schema_id;
Expand All @@ -164,7 +164,13 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
switch (pruning_status)
{
case PruningReturnStatus::NOT_PRUNED:
{
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context);
manifest_file_entry.storage_to_use = storage_to_use;
manifest_file_entry.resolved_key = resolved_key;
return manifest_file_entry;
}
case PruningReturnStatus::MIN_MAX_INDEX_PRUNED: {
++min_max_index_pruned_files;
break;
Expand Down Expand Up @@ -242,7 +248,6 @@ IcebergIterator::IcebergIterator(
std::shared_ptr<SecondaryStorages> secondary_storages_)
: filter_dag(filter_dag_ ? std::make_unique<ActionsDAG>(filter_dag_->clone()) : nullptr)
, object_storage(std::move(object_storage_))
, local_context(local_context_)
, data_files_iterator(
object_storage,
local_context_,
Expand Down Expand Up @@ -336,11 +341,8 @@ ObjectInfoPtr IcebergIterator::next(size_t)
Iceberg::ManifestFileEntry manifest_file_entry;
if (blocking_queue.pop(manifest_file_entry))
{
// Resolve the data file path to get the correct storage and key
auto [storage_to_use, resolved_key] = resolveObjectStorageForPath(
persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context);

IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(manifest_file_entry, storage_to_use, resolved_key);
IcebergDataObjectInfoPtr object_info = std::make_shared<IcebergDataObjectInfo>(
manifest_file_entry, manifest_file_entry.storage_to_use, manifest_file_entry.resolved_key);

for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, false))
object_info->addPositionDeleteObject(position_delete);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class IcebergIterator : public IObjectIterator
private:
std::unique_ptr<ActionsDAG> filter_dag;
ObjectStoragePtr object_storage;
ContextPtr local_context;
Iceberg::SingleThreadIcebergKeysIterator data_files_iterator;
Iceberg::SingleThreadIcebergKeysIterator deletes_iterator;
ConcurrentBoundedQueue<Iceberg::ManifestFileEntry> blocking_queue;
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct ColumnInfo
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Core/Field.h>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>

#include <cstdint>

Expand Down Expand Up @@ -84,6 +85,10 @@ struct ManifestFileEntry
String file_format;
std::optional<String> reference_data_file_path; // For position delete files only.
std::optional<std::vector<Int32>> equality_ids;

// Resolved storage and key (set by SingleThreadIcebergKeysIterator)
ObjectStoragePtr storage_to_use;
String resolved_key;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t
auto next_file = files.back();
files.pop_back();

auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath();
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
auto it = unprocessed_files.find(file_path);
if (it == unprocessed_files.end())
continue;
Expand Down Expand Up @@ -221,7 +221,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
auto next_file = it->second.first;
unprocessed_files.erase(it);

auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath();
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
Expand Down Expand Up @@ -282,8 +282,8 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
replica_to_files_to_be_processed.erase(number_of_current_replica);
for (const auto & file : processed_file_list_ptr->second)
{
auto file_replica_idx = getReplicaForFile(file->getPath());
unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx));
auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath()));
unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx));
connection_to_files[file_replica_idx].push_back(file);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/Storages/ObjectStorage/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ std::pair<ObjectStoragePtr, std::string> getOrCreateStorageAndKey(

configure_fn(*cfg, config_prefix);

auto & factory = ObjectStorageFactory::instance();
ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true);
ObjectStoragePtr storage = ObjectStorageFactory::instance().create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true);

{
std::lock_guard lock(secondary_storages.mutex);
Expand Down
Loading