Skip to content

Commit

Permalink
refactoring of virtual columns
Browse files Browse the repository at this point in the history
  • Loading branch information
CurtizJ committed Feb 28, 2024
1 parent 9f8ac76 commit 8889a70
Show file tree
Hide file tree
Showing 31 changed files with 162 additions and 190 deletions.
2 changes: 1 addition & 1 deletion src/Processors/Transforms/buildPushingToViewsChain.cpp
Expand Up @@ -574,7 +574,7 @@ static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsDat
views_data.source_storage_id,
views_data.source_metadata_snapshot->getColumns(),
std::move(block),
views_data.source_storage->getVirtuals()));
*views_data.source_storage->getVirtualsDescription()));

QueryPipelineBuilder pipeline;

Expand Down
35 changes: 20 additions & 15 deletions src/Storages/FileLog/StorageFileLog.cpp
Expand Up @@ -27,6 +27,7 @@
#include <Common/filesystemHelpers.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h>
#include "Storages/VirtualColumnsDescription.h"

#include <sys/stat.h>

Expand Down Expand Up @@ -148,6 +149,9 @@ StorageFileLog::StorageFileLog(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);

auto virtuals = createVirtuals(filelog_settings->handle_error_mode);
setVirtuals(virtuals);

if (!fileOrSymlinkPathStartsWith(path, getContext()->getUserFilesPath()))
{
if (LoadingStrictnessLevel::ATTACH <= mode)
Expand Down Expand Up @@ -203,6 +207,22 @@ StorageFileLog::StorageFileLog(
}
}

VirtualColumnsDescription StorageFileLog::createVirtuals(StreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;

desc.addEphemeral("_filename", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_offset", std::make_shared<DataTypeUInt64>(), "");

if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_record", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "");
}

return desc;
}

void StorageFileLog::loadMetaFiles(bool attach)
{
/// Attach table
Expand Down Expand Up @@ -1009,19 +1029,4 @@ bool StorageFileLog::updateFileInfos()
return events.empty() || file_infos.file_names.empty();
}

NamesAndTypesList StorageFileLog::getVirtuals() const
{
auto virtuals = NamesAndTypesList{
{"_filename", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_offset", std::make_shared<DataTypeUInt64>()}};

if (filelog_settings->handle_error_mode == StreamingHandleErrorMode::STREAM)
{
virtuals.push_back({"_raw_record", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
virtuals.push_back({"_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
}

return virtuals;
}

}
5 changes: 3 additions & 2 deletions src/Storages/FileLog/StorageFileLog.h
Expand Up @@ -9,6 +9,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Common/SettingsChanges.h>
#include "Storages/VirtualColumnsDescription.h"

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -102,8 +103,6 @@ class StorageFileLog final : public IStorage, WithContext
String getFullMetaPath(const String & file_name) const { return std::filesystem::path(metadata_base_path) / file_name; }
String getFullDataPath(const String & file_name) const { return std::filesystem::path(root_data_path) / file_name; }

NamesAndTypesList getVirtuals() const override;

static UInt64 getInode(const String & file_name);

void openFilesAndSetPos();
Expand Down Expand Up @@ -212,6 +211,8 @@ class StorageFileLog final : public IStorage, WithContext
UInt64 inode = 0;
};
ReadMetadataResult readMetadata(const String & filename) const;

static VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);
};

}
11 changes: 4 additions & 7 deletions src/Storages/Hive/StorageHive.cpp
Expand Up @@ -45,6 +45,7 @@
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/VirtualColumnUtils.h>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -444,6 +445,9 @@ StorageHive::StorageHive(
storage_metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_ast, storage_metadata.columns, getContext());

setInMemoryMetadata(storage_metadata);

auto virtuals = VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns());
setVirtuals(virtuals);
}

void StorageHive::lazyInitialize()
Expand Down Expand Up @@ -1020,13 +1024,6 @@ SinkToStoragePtr StorageHive::write(const ASTPtr & /*query*/, const StorageMetad
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not implemented for StorageHive");
}

NamesAndTypesList StorageHive::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}

std::optional<UInt64> StorageHive::totalRows(const Settings & settings) const
{
/// query_info is not used when prune_level == PruneLevel::None
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/Hive/StorageHive.h
Expand Up @@ -54,8 +54,6 @@ class StorageHive final : public IStorage, WithContext

SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/, bool async_insert) override;

NamesAndTypesList getVirtuals() const override;

bool supportsSubsetOfColumns() const;

std::optional<UInt64> totalRows(const Settings & settings) const override;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/Kafka/KafkaSource.cpp
Expand Up @@ -45,7 +45,7 @@ KafkaSource::KafkaSource(
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames()))
, virtual_header(storage.getVirtualsHeader())
, handle_error_mode(storage.getStreamingHandleErrorMode())
{
}
Expand Down
66 changes: 27 additions & 39 deletions src/Storages/Kafka/StorageKafka.cpp
Expand Up @@ -52,6 +52,7 @@
#include <Common/config_version.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include "Storages/VirtualColumnsDescription.h"
#include <base/sleep.h>

#if USE_KRB5
Expand Down Expand Up @@ -344,6 +345,10 @@ StorageKafka::StorageKafka(
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);

auto virtuals = createVirtuals(kafka_settings->kafka_handle_error_mode);
setVirtuals(virtuals);

auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
{
Expand All @@ -365,6 +370,28 @@ StorageKafka::StorageKafka(
});
}

VirtualColumnsDescription StorageKafka::createVirtuals(StreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;

desc.addEphemeral("_topic", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_key", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_offset", std::make_shared<DataTypeUInt64>(), "");
desc.addEphemeral("_partition", std::make_shared<DataTypeUInt64>(), "");
desc.addEphemeral("_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "");
desc.addEphemeral("_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3)), "");
desc.addEphemeral("_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");

if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_message", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_error", std::make_shared<DataTypeString>(), "");
}

return desc;
}

SettingsChanges StorageKafka::createSettingsAdjustments()
{
SettingsChanges result;
Expand Down Expand Up @@ -1187,43 +1214,4 @@ void registerStorageKafka(StorageFactory & factory)
factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}

NamesAndTypesList StorageKafka::getVirtuals() const
{
auto result = NamesAndTypesList{
{"_topic", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()},
{"_partition", std::make_shared<DataTypeUInt64>()},
{"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3))},
{"_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}};
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
{
result.push_back({"_raw_message", std::make_shared<DataTypeString>()});
result.push_back({"_error", std::make_shared<DataTypeString>()});
}
return result;
}

Names StorageKafka::getVirtualColumnNames() const
{
auto result = Names {
"_topic",
"_key",
"_offset",
"_partition",
"_timestamp",
"_timestamp_ms",
"_headers.name",
"_headers.value",
};
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
{
result.push_back({"_raw_message"});
result.push_back({"_error"});
}
return result;
}

}
5 changes: 3 additions & 2 deletions src/Storages/Kafka/StorageKafka.h
Expand Up @@ -7,6 +7,7 @@
#include <Storages/Kafka/KafkaConsumer.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Common/SettingsChanges.h>
#include "Storages/VirtualColumnsDescription.h"

#include <Poco/Semaphore.h>

Expand Down Expand Up @@ -74,8 +75,6 @@ class StorageKafka final : public IStorage, WithContext

const auto & getFormatName() const { return format_name; }

NamesAndTypesList getVirtuals() const override;
Names getVirtualColumnNames() const;
StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; }

struct SafeConsumers
Expand Down Expand Up @@ -159,6 +158,8 @@ class StorageKafka final : public IStorage, WithContext
bool checkDependencies(const StorageID & table_id);

void cleanConsumers();

static VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);
};

}
12 changes: 5 additions & 7 deletions src/Storages/LiveView/StorageLiveView.cpp
Expand Up @@ -27,6 +27,7 @@ limitations under the License. */
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>
#include "Storages/VirtualColumnsDescription.h"
#include <base/hex.h>

#include <Storages/LiveView/StorageLiveView.h>
Expand Down Expand Up @@ -218,6 +219,10 @@ StorageLiveView::StorageLiveView(

setInMemoryMetadata(storage_metadata);

VirtualColumnsDescription virtuals;
virtuals.addEphemeral("_version", std::make_shared<DataTypeUInt64>(), "");
setVirtuals(virtuals);

if (!query.select)
throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName());

Expand Down Expand Up @@ -256,13 +261,6 @@ StorageLiveView::~StorageLiveView()
shutdown(false);
}

NamesAndTypesList StorageLiveView::getVirtuals() const
{
return NamesAndTypesList{
NameAndTypePair("_version", std::make_shared<DataTypeUInt64>())
};
}

void StorageLiveView::checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const
{
auto table_id = getStorageID();
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/LiveView/StorageLiveView.h
Expand Up @@ -73,8 +73,6 @@ using MilliSeconds = std::chrono::milliseconds;

bool supportsFinal() const override { return true; }

NamesAndTypesList getVirtuals() const override;

void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override;

void drop() override;
Expand Down
6 changes: 1 addition & 5 deletions src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
Expand Up @@ -34,6 +34,7 @@ class StorageFromMergeTreeDataPart final : public IStorage
, partition_id(part_->info.partition_id)
{
setInMemoryMetadata(storage.getInMemoryMetadata());
setVirtuals(*storage.getVirtualsDescription());
}

/// Used in queries with projection.
Expand Down Expand Up @@ -90,11 +91,6 @@ class StorageFromMergeTreeDataPart final : public IStorage

bool supportsSubcolumns() const override { return true; }

NamesAndTypesList getVirtuals() const override
{
return storage.getVirtuals();
}

String getPartitionId() const
{
return partition_id;
Expand Down
32 changes: 16 additions & 16 deletions src/Storages/NATS/StorageNATS.cpp
Expand Up @@ -89,6 +89,9 @@ StorageNATS::StorageNATS(
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);

auto virtuals = createVirtuals(nats_settings->nats_handle_error_mode);
setVirtuals(virtuals);

nats_context = addSettings(getContext());
nats_context->makeQueryContext();

Expand Down Expand Up @@ -131,6 +134,19 @@ StorageNATS::StorageNATS(
connection_task->deactivate();
}

VirtualColumnsDescription StorageNATS::createVirtuals(StreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;
desc.addEphemeral("_subject", std::make_shared<DataTypeString>(), "");

if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_message", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "");
}

return desc;
}

Names StorageNATS::parseList(const String & list, char delim)
{
Expand Down Expand Up @@ -746,20 +762,4 @@ void registerStorageNATS(StorageFactory & factory)
factory.registerStorage("NATS", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}


NamesAndTypesList StorageNATS::getVirtuals() const
{
auto virtuals = NamesAndTypesList{
{"_subject", std::make_shared<DataTypeString>()}
};

if (nats_settings->nats_handle_error_mode == StreamingHandleErrorMode::STREAM)
{
virtuals.push_back({"_raw_message", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
virtuals.push_back({"_error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())});
}

return virtuals;
}

}
2 changes: 1 addition & 1 deletion src/Storages/NATS/StorageNATS.h
Expand Up @@ -61,7 +61,6 @@ class StorageNATS final : public IStorage, WithContext
NATSConsumerPtr popConsumer(std::chrono::milliseconds timeout);

const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;

void incrementReader();
void decrementReader();
Expand Down Expand Up @@ -137,6 +136,7 @@ class StorageNATS final : public IStorage, WithContext

static Names parseList(const String & list, char delim);
static String getTableBasedName(String name, const StorageID & table_id);
static VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);

ContextMutablePtr addSettings(ContextPtr context) const;
size_t getMaxBlockSize() const;
Expand Down

0 comments on commit 8889a70

Please sign in to comment.