Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaEngine: Allow usage of Alias column type #49824

Merged
merged 1 commit into from May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/engines/table-engines/integrations/kafka.md
Expand Up @@ -19,8 +19,8 @@ Kafka lets you:
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
Expand Down
9 changes: 0 additions & 9 deletions src/Storages/ColumnsDescription.cpp
Expand Up @@ -383,15 +383,6 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
return ret;
}

NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.expression)
ret.emplace_back(col.name, col.type);
return ret;
}

NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/ColumnsDescription.h
Expand Up @@ -132,7 +132,6 @@ class ColumnsDescription : public IHints<1, ColumnsDescription>
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
NamesAndTypesList getWithDefaultExpression() const; // columns with default expression, for example set by `CREATE TABLE` statement
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
/// Returns .size0/.null/...
Expand Down
14 changes: 12 additions & 2 deletions src/Storages/Kafka/StorageKafka.cpp
Expand Up @@ -41,6 +41,7 @@
#include <Common/setThreadName.h>
#include <Formats/FormatFactory.h>

#include "Storages/ColumnDefault.h"
#include "config_version.h"

#include <Common/CurrentMetrics.h>
Expand Down Expand Up @@ -966,9 +967,18 @@ void registerStorageKafka(StorageFactory & factory)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
NamesAndTypesList supported_columns;
for (const auto & column : args.columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
if (column.default_desc.kind == ColumnDefaultKind::Alias)
supported_columns.emplace_back(column.name, column.type);
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
supported_columns.emplace_back(column.name, column.type);
}
// Kafka engine allows only ordinary columns without default expression or alias columns.
if (args.columns.getAll() != supported_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}

Expand Down
38 changes: 30 additions & 8 deletions tests/integration/test_storage_kafka/test.py
Expand Up @@ -285,11 +285,11 @@ def avro_confluent_message(schema_registry_client, value):
# Tests


def test_kafka_prohibited_column_types(kafka_cluster):
def test_kafka_column_types(kafka_cluster):
def assert_returned_exception(e):
assert e.value.returncode == 36
assert (
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns."
in str(e.value)
)

Expand All @@ -314,17 +314,39 @@ def assert_returned_exception(e):
assert_returned_exception(exception)

# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
SETTINGS kafka_commit_on_select = 1;
"""
)
assert_returned_exception(exception)
)
messages = []
for i in range(5):
messages.append(json.dumps({"a": i}))
kafka_produce(kafka_cluster, "new", messages)
result = ""
expected = TSV(
"""
0\t0
1\t1
2\t2
3\t3
4\t4
"""
)
retries = 50
while retries > 0:
result += instance.query("SELECT a, b FROM test.kafka", ignore_error=True)
if TSV(result) == expected:
break
retries -= 1

assert TSV(result) == expected

instance.query("DROP TABLE test.kafka SYNC")

# check MATERIALIZED
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
Expand Down