Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use filter by file/path before reading in url/file/hdfs table functins #53529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -366,6 +366,8 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3PutObject, "Number of DiskS3 API PutObject calls.") \
M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \
\
M(EngineFileLikeReadFiles, "Number of files read in table engines working with files (like File/S3/URL/HDFS).") \
\
M(ReadBufferFromS3Microseconds, "Time spent on reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
Expand Down
88 changes: 50 additions & 38 deletions src/Storages/HDFS/StorageHDFS.cpp
Expand Up @@ -29,7 +29,7 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/PartitionedSink.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/checkAndGetLiteralArgument.h>

#include <Formats/ReadSchemaUtils.h>
Expand All @@ -50,6 +50,11 @@

namespace fs = std::filesystem;

namespace ProfileEvents
{
extern const Event EngineFileLikeReadFiles;
}

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -291,12 +296,7 @@ StorageHDFS::StorageHDFS(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);

auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};

auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}

ColumnsDescription StorageHDFS::getTableStructureFromData(
Expand Down Expand Up @@ -363,11 +363,25 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
class HDFSSource::DisclosedGlobIterator::Impl
{
public:
Impl(ContextPtr context_, const String & uri)
Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
uris = getPathsList(path_from_uri, uri_without_path, context_);
auto file_progress_callback = context_->getFileProgressCallback();
uris = getPathsList(path_from_uri, uri_without_path, context);
ASTPtr filter_ast;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, uris[0].path, context);

if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & path_with_info : uris)
paths.push_back(path_with_info.path);

VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
}
auto file_progress_callback = context->getFileProgressCallback();

for (auto & elem : uris)
{
elem.path = uri_without_path + elem.path;
Expand Down Expand Up @@ -397,9 +411,23 @@ class HDFSSource::DisclosedGlobIterator::Impl
class HDFSSource::URISIterator::Impl : WithContext
{
public:
explicit Impl(const std::vector<String> & uris_, ContextPtr context_)
explicit Impl(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
: WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback())
{
ASTPtr filter_ast;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first, getContext());

if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(getPathFromUriAndUriWithoutPath(uri).first);

VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast);
}

if (!uris.empty())
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris[0]);
Expand Down Expand Up @@ -444,16 +472,16 @@ class HDFSSource::URISIterator::Impl : WithContext
std::function<void(FileProgress)> file_progress_callback;
};

HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(ContextPtr context_, const String & uri)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(context_, uri)) {}
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, query, virtual_columns, context)) {}

StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}

HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, ContextPtr context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, query, virtual_columns, context))
{
}

Expand Down Expand Up @@ -535,6 +563,8 @@ bool HDFSSource::initialize()

pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return true;
}

Expand All @@ -557,29 +587,11 @@ Chunk HDFSSource::generate()
Chunk chunk;
if (reader->pull(chunk))
{
Columns columns = chunk.getColumns();
UInt64 num_rows = chunk.getNumRows();
size_t chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());

for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);

auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
}

return Chunk(std::move(columns), num_rows);
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, current_path);
return chunk;
}

reader.reset();
Expand Down Expand Up @@ -727,7 +739,7 @@ bool StorageHDFS::supportsSubsetOfColumns() const
Pipe StorageHDFS::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
Expand All @@ -744,15 +756,15 @@ Pipe StorageHDFS::read(
else if (is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], query_info.query, virtual_columns, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, query_info.query, virtual_columns, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/HDFS/StorageHDFS.h
Expand Up @@ -123,7 +123,7 @@ class HDFSSource : public ISource, WithContext
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(ContextPtr context_, const String & uri_);
DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
Expand All @@ -134,7 +134,7 @@ class HDFSSource : public ISource, WithContext
class URISIterator
{
public:
URISIterator(const std::vector<String> & uris_, ContextPtr context);
URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/HDFS/StorageHDFSCluster.cpp
Expand Up @@ -21,6 +21,7 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h>

#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <memory>
Expand Down Expand Up @@ -64,6 +65,8 @@ StorageHDFSCluster::StorageHDFSCluster(

storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);

virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}

void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
Expand All @@ -76,9 +79,9 @@ void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String
}


RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, query, virtual_columns, context);
auto callback = std::make_shared<std::function<String()>>([iter = std::move(iterator)]() mutable -> String { return iter->next().path; });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/HDFS/StorageHDFSCluster.h
Expand Up @@ -45,6 +45,7 @@ class StorageHDFSCluster : public IStorageCluster
String uri;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
};


Expand Down
5 changes: 2 additions & 3 deletions src/Storages/S3Queue/S3QueueSource.cpp
Expand Up @@ -25,7 +25,6 @@
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>

# include <Formats/FormatFactory.h>

Expand Down Expand Up @@ -70,13 +69,13 @@ StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_header, context, nullptr, request_settings_))
client_, globbed_uri_, query, virtual_columns, context, nullptr, request_settings_))
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/S3Queue/S3QueueSource.h
Expand Up @@ -44,7 +44,7 @@ class StorageS3QueueSource : public ISource, WithContext
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_ = {});
Expand Down
13 changes: 2 additions & 11 deletions src/Storages/S3Queue/StorageS3Queue.cpp
Expand Up @@ -32,7 +32,6 @@
# include <Storages/StorageS3.h>
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Common/NamedCollections/NamedCollections.h>

Expand Down Expand Up @@ -171,15 +170,7 @@ StorageS3Queue::StorageS3Queue(
}

files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);

auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};

auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());

auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
Expand Down Expand Up @@ -527,7 +518,7 @@ StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
*configuration.client,
configuration.url,
query,
virtual_block,
virtual_columns,
local_context,
s3queue_settings->s3queue_polling_size.value,
configuration.request_settings);
Expand Down
1 change: 0 additions & 1 deletion src/Storages/S3Queue/StorageS3Queue.h
Expand Up @@ -93,7 +93,6 @@ class StorageS3Queue : public IStorage, WithContext
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
UInt64 reschedule_processing_interval_ms;

std::optional<FormatSettings> format_settings;
Expand Down