Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible parsing error in WithNames formats with disabled input_format_with_names_use_header #54513

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Analyzer/Passes/QueryAnalysisPass.cpp
Expand Up @@ -6341,9 +6341,9 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat(), scope.context);
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(scope.context);
}

if (use_columns_from_insert_query)
Expand Down
17 changes: 13 additions & 4 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -684,10 +684,18 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na

void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
{
auto & target = dict[name].supports_subset_of_columns;
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = true;
target = [](const FormatSettings &){ return true; };
}

void FormatFactory::registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker)
{
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = std::move(subset_of_columns_support_checker);
}

void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
Expand All @@ -698,10 +706,11 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
target = true;
}

bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const DB::String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_) const
{
const auto & target = getCreators(name);
return target.supports_subset_of_columns;
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return target.subset_of_columns_support_checker && target.subset_of_columns_support_checker(format_settings);
}

void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(
Expand Down
12 changes: 8 additions & 4 deletions src/Formats/FormatFactory.h
Expand Up @@ -123,6 +123,10 @@ class FormatFactory final : private boost::noncopyable
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;

/// Some formats can support reading subset of columns depending on settings.
/// The checker should return true if format support append.
using SubsetOfColumnsSupportChecker = std::function<bool(const FormatSettings & settings)>;

struct Creators
{
InputCreator input_creator;
Expand All @@ -132,12 +136,11 @@ class FormatFactory final : private boost::noncopyable
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool supports_subcolumns{false};
bool supports_subset_of_columns{false};
bool prefers_large_blocks{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
SubsetOfColumnsSupportChecker subset_of_columns_support_checker;
};

using FormatsDictionary = std::unordered_map<String, Creators>;
Expand Down Expand Up @@ -225,9 +228,10 @@ class FormatFactory final : private boost::noncopyable

void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);

bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
void markFormatSupportsSubsetOfColumns(const String & name);
void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker);
bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_ = std::nullopt) const;

bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;
Expand Down
5 changes: 3 additions & 2 deletions src/Formats/registerWithNamesAndTypes.cpp
Expand Up @@ -12,8 +12,9 @@ void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWit

void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory)
{
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNames");
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNamesAndTypes");
auto setting_checker = [](const FormatSettings & settings){ return settings.with_names_use_header; };
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNames", setting_checker);
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNamesAndTypes", setting_checker);
}

}
4 changes: 2 additions & 2 deletions src/Interpreters/Context.cpp
Expand Up @@ -1684,9 +1684,9 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat(), shared_from_this());
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(shared_from_this());
}

if (use_columns_from_insert_query)
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/HDFS/StorageHDFS.cpp
Expand Up @@ -838,9 +838,9 @@ class PartitionedHDFSSink : public PartitionedSink
};


bool StorageHDFS::supportsSubsetOfColumns() const
bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
}

Pipe StorageHDFS::read(
Expand Down Expand Up @@ -878,7 +878,7 @@ Pipe StorageHDFS::read(
});
}

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/HDFS/StorageHDFS.h
Expand Up @@ -76,7 +76,7 @@ class StorageHDFS final : public IStorage, WithContext
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;

bool supportsSubcolumns() const override { return true; }

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/Hive/StorageHive.h
Expand Up @@ -65,7 +65,7 @@ class StorageHive final : public IStorage, WithContext

NamesAndTypesList getVirtuals() const override;

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns() const;

std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/IStorage.h
Expand Up @@ -620,8 +620,6 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// NOTE: write-once also does not support INSERTs/merges/... for MergeTree
virtual bool isStaticStorage() const;

virtual bool supportsSubsetOfColumns() const { return false; }

/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() optimization
Expand Down
8 changes: 4 additions & 4 deletions src/Storages/S3Queue/StorageS3Queue.cpp
Expand Up @@ -162,9 +162,9 @@ bool StorageS3Queue::supportsSubcolumns() const
return true;
}

bool StorageS3Queue::supportsSubsetOfColumns() const
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context_, format_settings);
}

Pipe StorageS3Queue::read(
Expand All @@ -187,7 +187,7 @@ Pipe StorageS3Queue::read(

std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());

const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;

Expand Down Expand Up @@ -363,7 +363,7 @@ void StorageS3Queue::streamToViews()
// Create a stream for each consumer and join them in a union stream

std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(getContext()), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;

auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/S3Queue/StorageS3Queue.h
Expand Up @@ -125,7 +125,7 @@ class StorageS3Queue : public IStorage, WithContext
};
std::shared_ptr<TaskContext> task;

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;

const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageAzureBlob.cpp
Expand Up @@ -687,7 +687,7 @@ Pipe StorageAzureBlob::read(
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

Expand Down Expand Up @@ -792,9 +792,9 @@ bool StorageAzureBlob::supportsPartitionBy() const
return true;
}

bool StorageAzureBlob::supportsSubsetOfColumns() const
bool StorageAzureBlob::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}

bool StorageAzureBlob::prefersLargeBlocks() const
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageAzureBlob.h
Expand Up @@ -99,7 +99,7 @@ class StorageAzureBlob : public IStorage

bool supportsSubcolumns() const override { return true; }

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool supportsTrivialCountOptimization() const override { return true; }

Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageFile.cpp
Expand Up @@ -803,9 +803,9 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return columns;
}

bool StorageFile::supportsSubsetOfColumns() const
bool StorageFile::supportsSubsetOfColumns(const ContextPtr & context) const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}

bool StorageFile::prefersLargeBlocks() const
Expand Down Expand Up @@ -1433,7 +1433,7 @@ Pipe StorageFile::read(
if (progress_callback && !archive_info)
progress_callback(FileProgress(0, total_bytes_to_read));

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context->getSettingsRef().optimize_count_from_files;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageFile.h
Expand Up @@ -74,7 +74,7 @@ class StorageFile final : public IStorage
/// Is is useful because such formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool supportsSubcolumns() const override { return true; }

Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageS3.cpp
Expand Up @@ -983,9 +983,9 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
}

bool StorageS3::supportsSubsetOfColumns() const
bool StorageS3::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}

bool StorageS3::prefersLargeBlocks() const
Expand Down Expand Up @@ -1017,7 +1017,7 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageS3.h
Expand Up @@ -388,7 +388,7 @@ class StorageS3 : public IStorage

bool supportsSubcolumns() const override { return true; }

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool prefersLargeBlocks() const override;

Expand Down
8 changes: 4 additions & 4 deletions src/Storages/StorageURL.cpp
Expand Up @@ -817,9 +817,9 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return columns;
}

bool IStorageURLBase::supportsSubsetOfColumns() const
bool IStorageURLBase::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}

bool IStorageURLBase::prefersLargeBlocks() const
Expand All @@ -846,7 +846,7 @@ Pipe IStorageURLBase::read(
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());

if (distributed_processing)
{
Expand Down Expand Up @@ -951,7 +951,7 @@ Pipe StorageURLWithFailover::read(
return uri_options;
});

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());

const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageURL.h
Expand Up @@ -114,7 +114,7 @@ class IStorageURLBase : public IStorage
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;

bool supportsSubsetOfColumns() const override;
virtual bool supportsSubsetOfColumns(const ContextPtr & context) const;

bool prefersLargeBlocks() const override;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageXDBC.cpp
Expand Up @@ -146,7 +146,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
compression_method);
}

bool StorageXDBC::supportsSubsetOfColumns() const
bool StorageXDBC::supportsSubsetOfColumns(const ContextPtr &) const
{
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageXDBC.h
Expand Up @@ -68,7 +68,7 @@ class StorageXDBC : public IStorageURLBase

Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const override;

bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr &) const override;
};

}
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunction.h
Expand Up @@ -76,7 +76,7 @@ class ITableFunction : public std::enable_shared_from_this<ITableFunction>
/// because we cannot determine which column from table correspond to this virtual column.
virtual std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const { return {}; }

virtual bool supportsReadingSubsetOfColumns() { return true; }
virtual bool supportsReadingSubsetOfColumns(const ContextPtr &) { return true; }

/// Create storage according to the query.
StoragePtr
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/ITableFunctionFileLike.cpp
Expand Up @@ -32,9 +32,9 @@ String ITableFunctionFileLike::getFormatFromFirstArgument()
return FormatFactory::instance().getFormatFromFileName(filename, true);
}

bool ITableFunctionFileLike::supportsReadingSubsetOfColumns()
bool ITableFunctionFileLike::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format, context);
}

void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, ContextPtr context)
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunctionFileLike.h
Expand Up @@ -27,7 +27,7 @@ class ITableFunctionFileLike : public ITableFunction

void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }

bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;

static size_t getMaxNumberOfArguments() { return 4; }

Expand Down