Skip to content

Commit

Permalink
Merge pull request #54599 from ClickHouse/backport/23.8/54513
Browse files Browse the repository at this point in the history
Backport #54513 to 23.8: Fix possible parsing error in WithNames formats with disabled input_format_with_names_use_header
  • Loading branch information
Avogar committed Sep 14, 2023
2 parents a184301 + dad2c2f commit cd0407e
Show file tree
Hide file tree
Showing 31 changed files with 80 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/Analyzer/Passes/QueryAnalysisPass.cpp
Expand Up @@ -6341,9 +6341,9 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat(), scope.context);
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(scope.context);
}

if (use_columns_from_insert_query)
Expand Down
17 changes: 13 additions & 4 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -683,10 +683,18 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na

void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
{
auto & target = dict[name].supports_subset_of_columns;
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = true;
target = [](const FormatSettings &){ return true; };
}

void FormatFactory::registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker)
{
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = std::move(subset_of_columns_support_checker);
}

void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
Expand All @@ -697,10 +705,11 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
target = true;
}

bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const DB::String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_) const
{
const auto & target = getCreators(name);
return target.supports_subset_of_columns;
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return target.subset_of_columns_support_checker && target.subset_of_columns_support_checker(format_settings);
}

void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(
Expand Down
12 changes: 8 additions & 4 deletions src/Formats/FormatFactory.h
Expand Up @@ -123,6 +123,10 @@ class FormatFactory final : private boost::noncopyable
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;

/// Some formats can support reading subset of columns depending on settings.
/// The checker should return true if format support append.
using SubsetOfColumnsSupportChecker = std::function<bool(const FormatSettings & settings)>;

struct Creators
{
InputCreator input_creator;
Expand All @@ -132,12 +136,11 @@ class FormatFactory final : private boost::noncopyable
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool supports_subcolumns{false};
bool supports_subset_of_columns{false};
bool prefers_large_blocks{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
SubsetOfColumnsSupportChecker subset_of_columns_support_checker;
};

using FormatsDictionary = std::unordered_map<String, Creators>;
Expand Down Expand Up @@ -225,9 +228,10 @@ class FormatFactory final : private boost::noncopyable

void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);

bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
void markFormatSupportsSubsetOfColumns(const String & name);
void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker);
bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_ = std::nullopt) const;

bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;
Expand Down
5 changes: 3 additions & 2 deletions src/Formats/registerWithNamesAndTypes.cpp
Expand Up @@ -12,8 +12,9 @@ void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWit

void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory)
{
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNames");
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNamesAndTypes");
auto setting_checker = [](const FormatSettings & settings){ return settings.with_names_use_header; };
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNames", setting_checker);
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNamesAndTypes", setting_checker);
}

}
4 changes: 2 additions & 2 deletions src/Interpreters/Context.cpp
Expand Up @@ -1699,9 +1699,9 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat(), shared_from_this());
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(shared_from_this());
}

if (use_columns_from_insert_query)
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/HDFS/StorageHDFS.cpp
Expand Up @@ -838,9 +838,9 @@ class PartitionedHDFSSink : public PartitionedSink
};


bool StorageHDFS::supportsSubsetOfColumns() const
bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
}

Pipe StorageHDFS::read(
Expand Down Expand Up @@ -878,7 +878,7 @@ Pipe StorageHDFS::read(
});
}

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/HDFS/StorageHDFS.h
Expand Up @@ -76,7 +76,7 @@ class StorageHDFS final : public IStorage, WithContext
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;

bool supportsSubcolumns() const override { return true; }

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/Hive/StorageHive.h
Expand Up @@ -65,7 +65,7 @@ class StorageHive final : public IStorage, WithContext

NamesAndTypesList getVirtuals() const override;

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns() const;

std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/IStorage.h
Expand Up @@ -620,8 +620,6 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// NOTE: write-once also does not support INSERTs/merges/... for MergeTree
virtual bool isStaticStorage() const;

virtual bool supportsSubsetOfColumns() const { return false; }

/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() optimization
Expand Down
8 changes: 4 additions & 4 deletions src/Storages/S3Queue/StorageS3Queue.cpp
Expand Up @@ -162,9 +162,9 @@ bool StorageS3Queue::supportsSubcolumns() const
return true;
}

bool StorageS3Queue::supportsSubsetOfColumns() const
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context_, format_settings);
}

Pipe StorageS3Queue::read(
Expand All @@ -187,7 +187,7 @@ Pipe StorageS3Queue::read(

std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());

const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;

Expand Down Expand Up @@ -363,7 +363,7 @@ void StorageS3Queue::streamToViews()
// Create a stream for each consumer and join them in a union stream

std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(getContext()), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;

auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/S3Queue/StorageS3Queue.h
Expand Up @@ -125,7 +125,7 @@ class StorageS3Queue : public IStorage, WithContext
};
std::shared_ptr<TaskContext> task;

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;

const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageAzureBlob.cpp
Expand Up @@ -687,7 +687,7 @@ Pipe StorageAzureBlob::read(
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

Expand Down Expand Up @@ -792,9 +792,9 @@ bool StorageAzureBlob::supportsPartitionBy() const
return true;
}

bool StorageAzureBlob::supportsSubsetOfColumns() const
bool StorageAzureBlob::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}

bool StorageAzureBlob::prefersLargeBlocks() const
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageAzureBlob.h
Expand Up @@ -99,7 +99,7 @@ class StorageAzureBlob : public IStorage

bool supportsSubcolumns() const override { return true; }

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool supportsTrivialCountOptimization() const override { return true; }

Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageFile.cpp
Expand Up @@ -816,9 +816,9 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return columns;
}

bool StorageFile::supportsSubsetOfColumns() const
bool StorageFile::supportsSubsetOfColumns(const ContextPtr & context) const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}

bool StorageFile::prefersLargeBlocks() const
Expand Down Expand Up @@ -1441,7 +1441,7 @@ Pipe StorageFile::read(
if (progress_callback)
progress_callback(FileProgress(0, total_bytes_to_read));

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context->getSettingsRef().optimize_count_from_files;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageFile.h
Expand Up @@ -74,7 +74,7 @@ class StorageFile final : public IStorage
/// Is is useful because such formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool supportsSubcolumns() const override { return true; }

Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageS3.cpp
Expand Up @@ -983,9 +983,9 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
}

bool StorageS3::supportsSubsetOfColumns() const
bool StorageS3::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}

bool StorageS3::prefersLargeBlocks() const
Expand Down Expand Up @@ -1017,7 +1017,7 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageS3.h
Expand Up @@ -388,7 +388,7 @@ class StorageS3 : public IStorage

bool supportsSubcolumns() const override { return true; }

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool prefersLargeBlocks() const override;

Expand Down
8 changes: 4 additions & 4 deletions src/Storages/StorageURL.cpp
Expand Up @@ -817,9 +817,9 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return columns;
}

bool IStorageURLBase::supportsSubsetOfColumns() const
bool IStorageURLBase::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}

bool IStorageURLBase::prefersLargeBlocks() const
Expand All @@ -846,7 +846,7 @@ Pipe IStorageURLBase::read(
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());

if (distributed_processing)
{
Expand Down Expand Up @@ -951,7 +951,7 @@ Pipe StorageURLWithFailover::read(
return uri_options;
});

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());

const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageURL.h
Expand Up @@ -114,7 +114,7 @@ class IStorageURLBase : public IStorage
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;

bool supportsSubsetOfColumns() const override;
virtual bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool prefersLargeBlocks() const override;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageXDBC.cpp
Expand Up @@ -146,7 +146,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
compression_method);
}

bool StorageXDBC::supportsSubsetOfColumns() const
bool StorageXDBC::supportsSubsetOfColumns(const ContextPtr &) const
{
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageXDBC.h
Expand Up @@ -68,7 +68,7 @@ class StorageXDBC : public IStorageURLBase

Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const override;

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr &) const override;
};

}
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunction.h
Expand Up @@ -76,7 +76,7 @@ class ITableFunction : public std::enable_shared_from_this<ITableFunction>
/// because we cannot determine which column from table correspond to this virtual column.
virtual std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const { return {}; }

virtual bool supportsReadingSubsetOfColumns() { return true; }
virtual bool supportsReadingSubsetOfColumns(const ContextPtr &) { return true; }

/// Create storage according to the query.
StoragePtr
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/ITableFunctionFileLike.cpp
Expand Up @@ -32,9 +32,9 @@ String ITableFunctionFileLike::getFormatFromFirstArgument()
return FormatFactory::instance().getFormatFromFileName(filename, true);
}

bool ITableFunctionFileLike::supportsReadingSubsetOfColumns()
bool ITableFunctionFileLike::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format, context);
}

void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, ContextPtr context)
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunctionFileLike.h
Expand Up @@ -27,7 +27,7 @@ class ITableFunctionFileLike : public ITableFunction

void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }

bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;

static size_t getMaxNumberOfArguments() { return 4; }

Expand Down

0 comments on commit cd0407e

Please sign in to comment.