Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add _error and _raw_message virtual columns for Kafka engine #20249

Closed
wants to merge 1 commit into from
Closed

Add _error and _raw_message virtual columns for Kafka engine #20249

wants to merge 1 commit into from

Conversation

fastio
Copy link
Contributor

@fastio fastio commented Feb 9, 2021

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Add two virtual columns (_error and _raw_message) to Kafka Engine.

Detailed description / Documentation draft:

Use case:

Create the kafka engine with kafka_auto_append_error_column = 1.The new parameter's default value is 0.

 CREATE TABLE default.tt
(
    `i` Int64,
    `s` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '172.19.0.32:9092', kafka_topic_list = 't2', kafka_group_name = 'g1', kafka_format = 'JSONEachRow', kafka_max_block_size = 16, kafka_skip_broken_messages = 17, kafka_auto_append_error_column = 1

The table data is used to save data from kafka.

CREATE MATERIALIZED VIEW default.data
(
    `i` Int64,
    `s` String
)
ENGINE = MergeTree
ORDER BY i
SETTINGS index_granularity = 8192 AS
SELECT
    i,
    s
FROM default.tt
WHERE length(_error) = 0

The table error is used to save the exception row information.

CREATE MATERIALIZED VIEW default.error
(
    `topic` String,
    `partition` Int64,
    `offset` Int64,
    `raw` String,
    `error` String
)
ENGINE = MergeTree
ORDER BY topic
SETTINGS index_granularity = 8192 AS
SELECT
    _topic AS topic,
    _partition AS partition,
    _offset AS offset,
    _raw_message AS raw,
    _error AS error
FROM default.tt
WHERE length(_error) > 0

@robot-clickhouse robot-clickhouse added doc-alert pr-feature Pull request with new product feature labels Feb 9, 2021
@vdimir vdimir self-assigned this Feb 11, 2021
@@ -98,36 +101,53 @@ Chunk IRowInputFormat::generate()
if (!isParseError(e.code()))
throw;

if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0)
throw;
if (!append_error_column)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just set allow_errors_ratio to 1 and add column if append_error_column, but this branch execute as earlier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just catching exception on the caller side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just catching exception on the caller side?
We should just handle the current row message, and collect the message, then process the next one.
It's better not to break the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just set allow_errors_ratio to 1 and add column if append_error_column, but this branch execute as earlier.

Yes, I'll fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like current approach.

It will not handle errors from readPrefix / readSuffix. And will work only for IRowInputFormat based formats.

We process single message at a time.
You can put try catch in loop (I've added comment about the place we should catch it)

Copy link
Contributor

@filimonov filimonov Feb 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also that approach

  1. pollute clean interface of IRowInputFormat
  2. introduce extra states for IRowInputFormat
  3. may affect parsers performance for non-kafka case.

For me approach with try catch in Kafka sounds much more clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also that approach

  1. pollute clean interface of IRowInputFormat
  2. introduce extra states for IRowInputFormat
  3. may affect parsers performance for non-kafka case.

For me approach with try catch in Kafka sounds much more clean.

Sure. I agree with you. I'll change the current approach.

column->insertDefault();
}
}
syncAfterError();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we skip allowSyncAfterError here? Is allowSyncAfterError = false and append_error_column incompatible ?

@@ -189,6 +221,7 @@ Block KafkaBlockInputStream::readImpl()
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
virtual_columns[8]->insert(payload);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need _raw_message always?

@@ -29,7 +29,8 @@ class ASTStorage;
M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0)
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
M(Bool, kafka_auto_append_error_column, false, "Virtual column named _error for Kafka engine to collect the parser error.", 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just kafka_append_error_column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be named kafka_append_error_column.

auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
if (auto_append_error_column)
{
result_columns.pop_back();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs clarification

@@ -119,7 +143,14 @@ Block KafkaBlockInputStream::readImpl()
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
if (i != error_column_index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that it may be better to handle it in separate loop. error_column_index always last column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. I'll fix it.

@filimonov filimonov self-requested a review February 11, 2021 21:06
@@ -155,6 +186,7 @@ Block KafkaBlockInputStream::readImpl()
auto partition = buffer->currentPartition();
auto timestamp_raw = buffer->currentTimestamp();
auto header_list = buffer->currentHeaderList();
auto payload = buffer->currentPayload();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 170
try {} catch {}

around read_message

@filimonov
Copy link
Contributor

filimonov commented Feb 15, 2021

Also: actually several options for handling errors were discussed:

  1. put errors in the stream (the approach you implementing) - the simplest, but has disadvantages.
  2. put errors in a fixed system table (system.kafka_errors: event_data, event_time, database, table, topic, partition, offset, error, raw_message)
  3. put errors in a custom system table
  4. send error into a special dead letter queue topic
  5. silently ignore errors (with some threshold, similar to what we have now, but should work for all formats)
  6. make all options (1-5) configurable.

From my perspective the best would be to start with defining the p. 6 and after that implement the rest.

For Rabbit there is rabbitmq_deadletter_exchange

Copy link
Contributor

@filimonov filimonov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the comments + need tests

@fastio
Copy link
Contributor Author

fastio commented Feb 16, 2021

see the comments + need tests

Thanks for your review & suggestions. I'm working around it.

@fastio
Copy link
Contributor Author

fastio commented Mar 18, 2021

I create the new pr. And close this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-feature Pull request with new product feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants