Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ It is currently only implemented in StorageObjectStorage.
Block & /* block_with_partition_values */,
std::string & /* destination_file_path */,
bool /* overwrite_if_exists */,
const std::optional<FormatSettings> & /* format_settings */,
ContextPtr /* context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
Expand Down
14 changes: 5 additions & 9 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <Disks/SingleDiskVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/createVolume.h>
#include <Formats/FormatFactory.h>
#include <IO/Operators.h>
#include <IO/S3Common.h>
#include <IO/SharedThreadPools.h>
Expand Down Expand Up @@ -6241,13 +6242,12 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
part_name, getStorageID().getFullTableName());

{
const auto format_settings = getFormatSettings(query_context);
MergeTreeExportManifest manifest(
dest_storage->getStorageID(),
part,
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
query_context->getSettingsRef()[Setting::max_threads]);
format_settings);

std::lock_guard lock(export_manifests_mutex);

Expand Down Expand Up @@ -6293,17 +6293,13 @@ void MergeTreeData::exportPartToTableImpl(

try
{
auto context_copy = Context::createCopy(local_context);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
context_copy->setSetting("max_threads", manifest.max_threads);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
block_with_partition_values,
destination_file_path,
manifest.overwrite_file_if_exists,
context_copy);
manifest.format_settings,
local_context);
}
catch (const Exception & e)
{
Expand Down
13 changes: 3 additions & 10 deletions src/Storages/MergeTree/MergeTreeExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,17 @@ struct MergeTreeExportManifest
const StorageID & destination_storage_id_,
const DataPartPtr & data_part_,
bool overwrite_file_if_exists_,
bool parallel_formatting_,
bool parallel_formatting_parquet_,
std::size_t max_threads_)
const FormatSettings & format_settings_)
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
overwrite_file_if_exists(overwrite_file_if_exists_),
parallel_formatting(parallel_formatting_),
parquet_parallel_encoding(parallel_formatting_parquet_),
max_threads(max_threads_),
format_settings(format_settings_),
create_time(time(nullptr)) {}

StorageID destination_storage_id;
DataPartPtr data_part;
bool overwrite_file_if_exists;
bool parallel_formatting;
/// parquet has a different setting for parallel formatting
bool parquet_parallel_encoding;
std::size_t max_threads;
FormatSettings format_settings;

time_t create_time;
mutable bool in_progress = false;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ SinkToStoragePtr StorageObjectStorage::import(
Block & block_with_partition_values,
std::string & destination_file_path,
bool overwrite_if_exists,
const std::optional<FormatSettings> & format_settings_,
ContextPtr local_context)
{
std::string partition_key;
Expand All @@ -508,7 +509,7 @@ SinkToStoragePtr StorageObjectStorage::import(
destination_file_path,
object_storage,
configuration,
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
format_settings_ ? format_settings_ : format_settings,
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
local_context);
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class StorageObjectStorage : public IStorage
Block & /* block_with_partition_values */,
std::string & /* destination_file_path */,
bool /* overwrite_if_exists */,
const std::optional<FormatSettings> & /* format_settings_ */,
ContextPtr /* context */) override;

void truncate(
Expand Down
6 changes: 4 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <optional>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>

#include <Common/Exception.h>
Expand Down Expand Up @@ -575,12 +576,13 @@ SinkToStoragePtr StorageObjectStorageCluster::import(
Block & block_with_partition_values,
std::string & destination_file_path,
bool overwrite_if_exists,
const std::optional<FormatSettings> & format_settings_,
ContextPtr context)
{
if (pure_storage)
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context);

return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context);
}

void StorageObjectStorageCluster::readFallBackToPure(
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class StorageObjectStorageCluster : public IStorageCluster
Block & /* block_with_partition_values */,
std::string & /* destination_file_path */,
bool /* overwrite_if_exists */,
const std::optional<FormatSettings> & /* format_settings_ */,
ContextPtr /* context */) override;
bool prefersLargeBlocks() const override;

Expand Down
Loading