Skip to content

Commit

Permalink
Merge pull request #53529 from Avogar/filter-files-all-table-functions
Browse files Browse the repository at this point in the history
Use filter by file/path before reading in url/file/hdfs table functins
  • Loading branch information
robot-ch-test-poll1 committed Aug 23, 2023
2 parents 6d6d5f5 + 30e8e4f commit c22ffa6
Show file tree
Hide file tree
Showing 37 changed files with 451 additions and 427 deletions.
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -366,6 +366,8 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3PutObject, "Number of DiskS3 API PutObject calls.") \
M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \
\
M(EngineFileLikeReadFiles, "Number of files read in table engines working with files (like File/S3/URL/HDFS).") \
\
M(ReadBufferFromS3Microseconds, "Time spent on reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
Expand Down
86 changes: 49 additions & 37 deletions src/Storages/HDFS/StorageHDFS.cpp
Expand Up @@ -29,7 +29,7 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/PartitionedSink.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/checkAndGetLiteralArgument.h>

#include <Formats/ReadSchemaUtils.h>
Expand All @@ -50,6 +50,11 @@

namespace fs = std::filesystem;

namespace ProfileEvents
{
extern const Event EngineFileLikeReadFiles;
}

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -291,12 +296,7 @@ StorageHDFS::StorageHDFS(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);

auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};

auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}

ColumnsDescription StorageHDFS::getTableStructureFromData(
Expand Down Expand Up @@ -363,11 +363,25 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
class HDFSSource::DisclosedGlobIterator::Impl
{
public:
Impl(ContextPtr context_, const String & uri)
Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
uris = getPathsList(path_from_uri, uri_without_path, context_);
auto file_progress_callback = context_->getFileProgressCallback();
uris = getPathsList(path_from_uri, uri_without_path, context);
ASTPtr filter_ast;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, uris[0].path, context);

if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & path_with_info : uris)
paths.push_back(path_with_info.path);

VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
}
auto file_progress_callback = context->getFileProgressCallback();

for (auto & elem : uris)
{
elem.path = uri_without_path + elem.path;
Expand Down Expand Up @@ -397,9 +411,23 @@ class HDFSSource::DisclosedGlobIterator::Impl
class HDFSSource::URISIterator::Impl : WithContext
{
public:
explicit Impl(const std::vector<String> & uris_, ContextPtr context_)
explicit Impl(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
: WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback())
{
ASTPtr filter_ast;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first, getContext());

if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(getPathFromUriAndUriWithoutPath(uri).first);

VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast);
}

if (!uris.empty())
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris[0]);
Expand Down Expand Up @@ -444,16 +472,16 @@ class HDFSSource::URISIterator::Impl : WithContext
std::function<void(FileProgress)> file_progress_callback;
};

HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(ContextPtr context_, const String & uri)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(context_, uri)) {}
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, query, virtual_columns, context)) {}

StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}

HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, ContextPtr context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, query, virtual_columns, context))
{
}

Expand Down Expand Up @@ -538,6 +566,8 @@ bool HDFSSource::initialize()

pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return true;
}

Expand All @@ -560,29 +590,11 @@ Chunk HDFSSource::generate()
Chunk chunk;
if (reader->pull(chunk))
{
Columns columns = chunk.getColumns();
UInt64 num_rows = chunk.getNumRows();
size_t chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());

for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);

auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
}

return Chunk(std::move(columns), num_rows);
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, current_path);
return chunk;
}

reader.reset();
Expand Down Expand Up @@ -747,15 +759,15 @@ Pipe StorageHDFS::read(
else if (is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], query_info.query, virtual_columns, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, query_info.query, virtual_columns, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/HDFS/StorageHDFS.h
Expand Up @@ -124,7 +124,7 @@ class HDFSSource : public ISource, WithContext
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(ContextPtr context_, const String & uri_);
DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
Expand All @@ -135,7 +135,7 @@ class HDFSSource : public ISource, WithContext
class URISIterator
{
public:
URISIterator(const std::vector<String> & uris_, ContextPtr context);
URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/HDFS/StorageHDFSCluster.cpp
Expand Up @@ -21,6 +21,7 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h>

#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <memory>
Expand Down Expand Up @@ -64,6 +65,8 @@ StorageHDFSCluster::StorageHDFSCluster(

storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);

virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}

void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
Expand All @@ -76,9 +79,9 @@ void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String
}


RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, query, virtual_columns, context);
auto callback = std::make_shared<std::function<String()>>([iter = std::move(iterator)]() mutable -> String { return iter->next().path; });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
Expand Down
1 change: 1 addition & 0 deletions src/Storages/HDFS/StorageHDFSCluster.h
Expand Up @@ -45,6 +45,7 @@ class StorageHDFSCluster : public IStorageCluster
String uri;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
};


Expand Down
5 changes: 2 additions & 3 deletions src/Storages/S3Queue/S3QueueSource.cpp
Expand Up @@ -25,7 +25,6 @@
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>

# include <Formats/FormatFactory.h>

Expand Down Expand Up @@ -70,13 +69,13 @@ StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_header, context, nullptr, request_settings_))
client_, globbed_uri_, query, virtual_columns, context, nullptr, request_settings_))
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/S3Queue/S3QueueSource.h
Expand Up @@ -44,7 +44,7 @@ class StorageS3QueueSource : public ISource, WithContext
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_ = {});
Expand Down
13 changes: 2 additions & 11 deletions src/Storages/S3Queue/StorageS3Queue.cpp
Expand Up @@ -32,7 +32,6 @@
# include <Storages/StorageS3.h>
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Common/NamedCollections/NamedCollections.h>

Expand Down Expand Up @@ -171,15 +170,7 @@ StorageS3Queue::StorageS3Queue(
}

files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);

auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};

auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());

auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
Expand Down Expand Up @@ -527,7 +518,7 @@ StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
*configuration.client,
configuration.url,
query,
virtual_block,
virtual_columns,
local_context,
s3queue_settings->s3queue_polling_size.value,
configuration.request_settings);
Expand Down
1 change: 0 additions & 1 deletion src/Storages/S3Queue/StorageS3Queue.h
Expand Up @@ -93,7 +93,6 @@ class StorageS3Queue : public IStorage, WithContext
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
UInt64 reschedule_processing_interval_ms;

std::optional<FormatSettings> format_settings;
Expand Down

0 comments on commit c22ffa6

Please sign in to comment.