Skip to content

Commit

Permalink
Allow using Alias column type for KafkaEngine
Browse files Browse the repository at this point in the history
```
create table kafka
(
 a UInt32,
 a_str String Alias toString(a)
) engine = Kafka;

create table data
(
  a UInt32;
  a_str String
) engine = MergeTree
order by tuple();

create materialized view data_mv to data
(
  a UInt32,
  a_str String
) as
select a, a_str from kafka;
```
Alias type works as expected in comparison with MATERIALIZED/EPHEMERAL
or column with default expression.

Ref: #47138

Co-authored-by: Azat Khuzhin <a3at.mail@gmail.com>
  • Loading branch information
AVMusorin and azat committed May 15, 2023
1 parent 25912a2 commit 418a61a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 22 deletions.
4 changes: 2 additions & 2 deletions docs/en/engines/table-engines/integrations/kafka.md
Expand Up @@ -19,8 +19,8 @@ Kafka lets you:
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
Expand Down
9 changes: 0 additions & 9 deletions src/Storages/ColumnsDescription.cpp
Expand Up @@ -383,15 +383,6 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
return ret;
}

NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.expression)
ret.emplace_back(col.name, col.type);
return ret;
}

NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/ColumnsDescription.h
Expand Up @@ -132,7 +132,6 @@ class ColumnsDescription : public IHints<1, ColumnsDescription>
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
NamesAndTypesList getWithDefaultExpression() const; // columns with default expression, for example set by `CREATE TABLE` statement
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
/// Returns .size0/.null/...
Expand Down
14 changes: 12 additions & 2 deletions src/Storages/Kafka/StorageKafka.cpp
Expand Up @@ -41,6 +41,7 @@
#include <Common/setThreadName.h>
#include <Formats/FormatFactory.h>

#include "Storages/ColumnDefault.h"
#include "config_version.h"

#include <Common/CurrentMetrics.h>
Expand Down Expand Up @@ -966,9 +967,18 @@ void registerStorageKafka(StorageFactory & factory)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
NamesAndTypesList supported_columns;
for (const auto & column : args.columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
if (column.default_desc.kind == ColumnDefaultKind::Alias)
supported_columns.emplace_back(column.name, column.type);
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
supported_columns.emplace_back(column.name, column.type);
}
// Kafka engine allows only ordinary columns without default expression or alias columns.
if (args.columns.getAll() != supported_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}

Expand Down
38 changes: 30 additions & 8 deletions tests/integration/test_storage_kafka/test.py
Expand Up @@ -285,11 +285,11 @@ def avro_confluent_message(schema_registry_client, value):
# Tests


def test_kafka_prohibited_column_types(kafka_cluster):
def test_kafka_column_types(kafka_cluster):
def assert_returned_exception(e):
assert e.value.returncode == 36
assert (
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns."
in str(e.value)
)

Expand All @@ -314,17 +314,39 @@ def assert_returned_exception(e):
assert_returned_exception(exception)

# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
SETTINGS kafka_commit_on_select = 1;
"""
)
assert_returned_exception(exception)
)
messages = []
for i in range(5):
messages.append(json.dumps({"a": i}))
kafka_produce(kafka_cluster, "new", messages)
result = ""
expected = TSV(
"""
0\t0
1\t1
2\t2
3\t3
4\t4
"""
)
retries = 50
while retries > 0:
result += instance.query("SELECT a, b FROM test.kafka", ignore_error=True)
if TSV(result) == expected:
break
retries -= 1

assert TSV(result) == expected

instance.query("DROP TABLE test.kafka SYNC")

# check MATERIALIZED
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
Expand Down

0 comments on commit 418a61a

Please sign in to comment.