Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce KafkaRecordEntity to support Kafka headers in InputFormats #10730

Merged
merged 14 commits into from
Jan 9, 2021

Conversation

xvrl
Copy link
Member

@xvrl xvrl commented Jan 5, 2021

Today Kafka message support in streaming indexing tasks is limited to
message values, and does not provide a way to expose Kafka headers,
timestamps, or keys, which may be of interest to more specialized
Druid input formats. For instance, Kafka headers may be used to indicate
payload format/encoding or additional metadata, and timestamps are often
omitted from values in Kafka streams applications, since they are
included in the record.

This change proposes to introduce KafkaRecordEntity as InputEntity,
which would give input formats full access to the underlying Kafka record,
including headers, key, timestamps. It would also open access to low-level
information such as topic, partition, offset if needed.

KafkaEntity is a subclass of ByteEntity for backwards compatibility with
existing input formats, and to avoid introducing unnecessary complexity
for Kinesis indexing tasks.

Today Kafka message support in streaming indexing tasks is limited to
message values, and does not provide a way to expose Kafka headers,
timestamps, or keys, which may be of interest to more specialized
Druid input formats. For instance, Kafka headers may be used to indicate
payload format/encoding or additional metadata, and timestamps are often
omitted from values in Kakfa streams applications, since they are
included in the record.

This change proposes to introduce KafkaRecordEntity as InputEntity,
which would give input formats full access to the underlying Kafka record,
including headers, key, timestamps. It would also open access to low-level
information such as topic, partition, offset if needed.

KafkaEntity is a subclass of ByteEntity for backwards compatbility with
existing input formats, and to avoid introducing unnecessary complexity
for Kinesis indexing tasks.
polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.value() == null ? null : ImmutableList.of(record.value())
ImmutableList.of(new KafkaRecordEntity(record))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note, today we skip records with null values, so we should decide if/how we want to allow passing those through. I'm thinking we could add a config option if needed that would expose them as empty byte arrays for backwards compatibility in ByteEntity, but allow the full record in KafkaRecordEntity. However, I have not found many use-cases for null values, so we can decide to punt on this and add this option later if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine as long as it doesn't cause an unhandled NPE somewhere downstream and just counts as unparseable record for existing pipelines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept reverted to the original behavior for now

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, few minor comments

this.record = record;
}

public ConsumerRecord<byte[], byte[]> getRecord()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used anywhere , is this only to allow writing extensions with custom InputFormat which can take advantage of the extra metadata available ?

Copy link
Member Author

@xvrl xvrl Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's correct. We could have a sample InputFormat for documentation purposes, though I'm not sure if there is much value. A more generic input format for Kafka that wraps multiple input-formats would require a lot more thought, and I couldn't think of a one-size-fits-all approach that seemed very useful or didn't have additional complexities to deal with.

polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.value() == null ? null : ImmutableList.of(record.value())
ImmutableList.of(new KafkaRecordEntity(record))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine as long as it doesn't cause an unhandled NPE somewhere downstream and just counts as unparseable record for existing pipelines.

@@ -55,7 +54,7 @@ public OrderedPartitionableRecord(
this.stream = stream;
this.partitionId = partitionId;
this.sequenceNumber = sequenceNumber;
this.data = data == null ? ImmutableList.of() : data;
this.data = data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the null check and conversion to empty list ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this change after seeing that we rely on null in a bunch of places.

polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.value() == null ? null : ImmutableList.of(record.value())
record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug I reverted this change to keep the existing behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put in a comment or javadoc noting that records with null values will be skipped, even if the other stuff (keys, timestamp, headers) are not null.

Here's a suggestion:

  1. Add a note to the javadoc of KafkaRecordEntity mentioning that these entities won't be generated for messages with null values
  2. Add a null-value check to the constructor of KafkaRecordEntity enforcing that comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, KafkaRecordEntity will already barf on null values when instantiating ByteEntity

records.remainingCapacity(),
currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
);
if (log.isTraceEnabled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this check, to avoid unnecessarily deserializing all the bytes as strings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 😄

I bet it will speed up Kinesis ingestion a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's hope so, using ByteEntities directly should also avoid having to copy around byte arrays

currRecord.getPartitionId(),
currRecord.getSequenceNumber(),
records.remainingCapacity(),
currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not even sure this is safe to do. Some byte arrays could technically be invalid UTF-8 sequences, which would lead to exceptions when trying to log values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK, because fromUtf8 doesn't throw exceptions, it just replaces invalid sequences with the replacement character. Although the log messages will look very weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elegant solution!

import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class KafkaRecordEntity extends ByteEntity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add javadocs explaining how this class is meant to be used? Something like:

A ByteEntity generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by a Kafka indexing tasks. It can be used as a regular ByteEntity, in which case the Kafka message value is returned. But the {@link #getRecord} method also allows Kafka-aware InputFormat implementations to read the full Kafka message, including headers, key, and timestamp.

This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.

polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.value() == null ? null : ImmutableList.of(record.value())
record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put in a comment or javadoc noting that records with null values will be skipped, even if the other stuff (keys, timestamp, headers) are not null.

Here's a suggestion:

  1. Add a note to the javadoc of KafkaRecordEntity mentioning that these entities won't be generated for messages with null values
  2. Add a null-value check to the constructor of KafkaRecordEntity enforcing that comment

records.remainingCapacity(),
currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
);
if (log.isTraceEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 😄

I bet it will speed up Kinesis ingestion a bit.

currRecord.getPartitionId(),
currRecord.getSequenceNumber(),
records.remainingCapacity(),
currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK, because fromUtf8 doesn't throw exceptions, it just replaces invalid sequences with the replacement character. Although the log messages will look very weird.

new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I missed it — could you add a test that uses a custom InputFormat and verifies that something other than the value can be read properly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will add that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -74,7 +74,7 @@ public SequenceOffsetType getSequenceNumber()
}

@NotNull
public List<byte[]> getData()
public List<? extends ByteEntity> getData()
Copy link
Contributor

@gianm gianm Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means we're returning a thing that wraps a ByteBuffer instead of a byte[], which opens up potential issues due to callers handling the position and limit wrong. What are the expectations? Is it ok for callers to change the position of the underlying buffer or should they refrain? If they do change it, does that cause problems?

(For example: do we have situations where something calls getData() to parse a value, and that updates the position, and then later on getData() is called again for some other purpose like logging? If so — that'd break as a result of this change.)

Whatever the case, the javadoc for this method should describe the conclusion.

Copy link
Member Author

@xvrl xvrl Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did encounter some of this in test cases, where we would call getData on a record multiple times, causing issues. In general however, an InputEntity doesn't support being read multiple times, e.g. for ByteEntity, once it's been read via InputEntity.open(), the underlying buffer would have been exhausted and return an empty stream on subsequent calls to open().

Except for one logging case, all the calls to OrderedPartitionableRecord.getData() I could find would only be executed once per record, for the purpose of exposing InputEntities to parsers, which should ensure the buffers only get read once or treated appropriately as needed. I updated the logging case to make sure it wouldn't modify the buffer position.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xvrl
Copy link
Member Author

xvrl commented Jan 8, 2021

looks like tests are failing due to insufficient diff coverage for some lines. Many of them appear to be where we introduced additional type parameters. @gianm what's the guidance here?

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. For test coverage failure, it doesn't seem worth fixing it because they are about the new type parameter and avoiding trace-level logging. For LGTM failure, I think it will pass once you merge master into this branch (fixed in #10738).

@gianm
Copy link
Contributor

gianm commented Jan 8, 2021

looks like tests are failing due to insufficient diff coverage for some lines. Many of them appear to be where we introduced additional type parameters. @gianm what's the guidance here?

What I've done is either,

  • Add tests if it's something that really should be tested
  • Add a suppression if it's something that really shouldn't be tested
  • Ignore the coverage check if it's something that could be tested, but it isn't that important, and we want to skip it for now. (It's a diff-oriented check, so ignoring it won't affect future patches.)

@xvrl xvrl removed the Proposal label Jan 8, 2021
@xvrl
Copy link
Member Author

xvrl commented Jan 9, 2021

ok I will merge it as is then. Adding specific tests for the trace logging seems a little overkill. We can add it later if we feel strongly about it.

@xvrl xvrl merged commit 118b501 into apache:master Jan 9, 2021
@xvrl xvrl deleted the kafka-record-entity branch January 9, 2021 00:04
JulianJaffePinterest pushed a commit to JulianJaffePinterest/druid that referenced this pull request Jan 22, 2021
…pache#10730)

Today Kafka message support in streaming indexing tasks is limited to
message values, and does not provide a way to expose Kafka headers,
timestamps, or keys, which may be of interest to more specialized
Druid input formats. For instance, Kafka headers may be used to indicate
payload format/encoding or additional metadata, and timestamps are often
omitted from values in Kafka streams applications, since they are
included in the record.

This change proposes to introduce KafkaRecordEntity as InputEntity,
which would give input formats full access to the underlying Kafka record,
including headers, key, timestamps. It would also open access to low-level
information such as topic, partition, offset if needed.

KafkaEntity is a subclass of ByteEntity for backwards compatibility with
existing input formats, and to avoid introducing unnecessary complexity
for Kinesis indexing tasks.
@jihoonson jihoonson added this to the 0.21.0 milestone Jul 15, 2021
xvrl pushed a commit that referenced this pull request Oct 7, 2021
### Description

Today we ingest a number of high cardinality metrics into Druid across dimensions. These metrics are rolled up on a per minute basis, and are very useful when looking at metrics on a partition or client basis. Events is another class of data that provides useful information about a particular incident/scenario inside a Kafka cluster. Events themselves are carried inside kafka payload, but nonetheless there are some very useful metadata that is carried in kafka headers that can serve as useful dimension for aggregation and in turn bringing better insights.

PR(#10730) introduced support of Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those into relevant columns in Druid. Until that’s implemented, none of the information available in the Kafka message headers would be exposed. So first there is a need to write an input format that can parse headers in any given format(provided we support the format) like we parse payloads today. Apart from headers there is also some useful information present in the key portion of the kafka record. We also need a way to expose the data present in the key as druid columns. We need a generic way to express at configuration time what attributes from headers, key and payload need to be ingested into druid. We need to keep the design generic enough so that users can specify different parsers for headers, key and payload.

This PR is designed to solve the above by providing wrapper around any existing input formats and merging the data into a single unified Druid row.

Lets look at a sample input format from the above discussion

"inputFormat":
{
    "type": "kafka",     // New input format type
    "headerLabelPrefix": "kafka.header.",   // Label prefix for header columns, this will avoid collusions while merging columns
    "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is made available in case payload does not carry timestamp
    "headerFormat":  // Header parser specifying that values are of type string
    {
        "type": "string"
    },
    "valueFormat": // Value parser from json parsing
    {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [...]
        }
    },
    "keyFormat":  // Key parser also from json parsing
    {
        "type": "json"
    }
}

Since we have independent sections for header, key and payload, it will enable parsing each section with its own parser, eg., headers coming in as string and payload as json. 

KafkaInputFormat will be the uber class extending inputFormat interface and will be responsible for creating individual parsers for header, key and payload, blend the data resolving conflicts in columns and generating a single unified InputRow for Druid ingestion. 

"headerFormat" will allow users to plug parser type for the header values and will add default header prefix as "kafka.header."(can be overridden) for attributes to avoid collision while merging attributes with payload.

Kafka payload parser will be responsible for parsing the Value portion of the Kafka record. This is where most of the data will come from and we should be able to plugin existing parser. One thing to note here is that if batching is performed, then the code is augmenting header and key values to every record in the batch.

Kafka key parser will handle parsing Key portion of the Kafka record and will ingest the Key with dimension name as "kafka.key".

## KafkaInputFormat Class: 
This is the class that orchestrates sending the consumerRecord to each parser, retrieve rows, merge the columns into one final row for Druid consumption. KafkaInputformat should make sure to release the resources that gets allocated as a part of reader in CloseableIterator<InputRow> during normal and exception cases.

During conflicts in dimension/metrics names, the code will prefer dimension names from payload and ignore the dimension either from headers/key. This is done so that existing input formats can be easily migrated to this new format without worrying about losing information.
lokesh-lingarajan added a commit to confluentinc/druid that referenced this pull request Oct 12, 2021
Today we ingest a number of high cardinality metrics into Druid across dimensions. These metrics are rolled up on a per minute basis, and are very useful when looking at metrics on a partition or client basis. Events is another class of data that provides useful information about a particular incident/scenario inside a Kafka cluster. Events themselves are carried inside kafka payload, but nonetheless there are some very useful metadata that is carried in kafka headers that can serve as useful dimension for aggregation and in turn bringing better insights.

PR(apache#10730) introduced support of Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those into relevant columns in Druid. Until that’s implemented, none of the information available in the Kafka message headers would be exposed. So first there is a need to write an input format that can parse headers in any given format(provided we support the format) like we parse payloads today. Apart from headers there is also some useful information present in the key portion of the kafka record. We also need a way to expose the data present in the key as druid columns. We need a generic way to express at configuration time what attributes from headers, key and payload need to be ingested into druid. We need to keep the design generic enough so that users can specify different parsers for headers, key and payload.

This PR is designed to solve the above by providing wrapper around any existing input formats and merging the data into a single unified Druid row.

Lets look at a sample input format from the above discussion

"inputFormat":
{
    "type": "kafka",     // New input format type
    "headerLabelPrefix": "kafka.header.",   // Label prefix for header columns, this will avoid collusions while merging columns
    "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is made available in case payload does not carry timestamp
    "headerFormat":  // Header parser specifying that values are of type string
    {
        "type": "string"
    },
    "valueFormat": // Value parser from json parsing
    {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [...]
        }
    },
    "keyFormat":  // Key parser also from json parsing
    {
        "type": "json"
    }
}

Since we have independent sections for header, key and payload, it will enable parsing each section with its own parser, eg., headers coming in as string and payload as json.

KafkaInputFormat will be the uber class extending inputFormat interface and will be responsible for creating individual parsers for header, key and payload, blend the data resolving conflicts in columns and generating a single unified InputRow for Druid ingestion.

"headerFormat" will allow users to plug parser type for the header values and will add default header prefix as "kafka.header."(can be overridden) for attributes to avoid collision while merging attributes with payload.

Kafka payload parser will be responsible for parsing the Value portion of the Kafka record. This is where most of the data will come from and we should be able to plugin existing parser. One thing to note here is that if batching is performed, then the code is augmenting header and key values to every record in the batch.

Kafka key parser will handle parsing Key portion of the Kafka record and will ingest the Key with dimension name as "kafka.key".

This is the class that orchestrates sending the consumerRecord to each parser, retrieve rows, merge the columns into one final row for Druid consumption. KafkaInputformat should make sure to release the resources that gets allocated as a part of reader in CloseableIterator<InputRow> during normal and exception cases.

During conflicts in dimension/metrics names, the code will prefer dimension names from payload and ignore the dimension either from headers/key. This is done so that existing input formats can be easily migrated to this new format without worrying about losing information.

(cherry picked from commit ad6609a)
xvrl pushed a commit to confluentinc/druid that referenced this pull request Oct 13, 2021
…load parsing (apache#11630) (#52)

Today we ingest a number of high cardinality metrics into Druid across dimensions. These metrics are rolled up on a per minute basis, and are very useful when looking at metrics on a partition or client basis. Events is another class of data that provides useful information about a particular incident/scenario inside a Kafka cluster. Events themselves are carried inside kafka payload, but nonetheless there are some very useful metadata that is carried in kafka headers that can serve as useful dimension for aggregation and in turn bringing better insights.

PR(apache#10730) introduced support of Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those into relevant columns in Druid. Until that’s implemented, none of the information available in the Kafka message headers would be exposed. So first there is a need to write an input format that can parse headers in any given format(provided we support the format) like we parse payloads today. Apart from headers there is also some useful information present in the key portion of the kafka record. We also need a way to expose the data present in the key as druid columns. We need a generic way to express at configuration time what attributes from headers, key and payload need to be ingested into druid. We need to keep the design generic enough so that users can specify different parsers for headers, key and payload.

This PR is designed to solve the above by providing wrapper around any existing input formats and merging the data into a single unified Druid row.

Lets look at a sample input format from the above discussion

"inputFormat":
{
    "type": "kafka",     // New input format type
    "headerLabelPrefix": "kafka.header.",   // Label prefix for header columns, this will avoid collusions while merging columns
    "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is made available in case payload does not carry timestamp
    "headerFormat":  // Header parser specifying that values are of type string
    {
        "type": "string"
    },
    "valueFormat": // Value parser from json parsing
    {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [...]
        }
    },
    "keyFormat":  // Key parser also from json parsing
    {
        "type": "json"
    }
}

Since we have independent sections for header, key and payload, it will enable parsing each section with its own parser, eg., headers coming in as string and payload as json.

KafkaInputFormat will be the uber class extending inputFormat interface and will be responsible for creating individual parsers for header, key and payload, blend the data resolving conflicts in columns and generating a single unified InputRow for Druid ingestion.

"headerFormat" will allow users to plug parser type for the header values and will add default header prefix as "kafka.header."(can be overridden) for attributes to avoid collision while merging attributes with payload.

Kafka payload parser will be responsible for parsing the Value portion of the Kafka record. This is where most of the data will come from and we should be able to plugin existing parser. One thing to note here is that if batching is performed, then the code is augmenting header and key values to every record in the batch.

Kafka key parser will handle parsing Key portion of the Kafka record and will ingest the Key with dimension name as "kafka.key".

This is the class that orchestrates sending the consumerRecord to each parser, retrieve rows, merge the columns into one final row for Druid consumption. KafkaInputformat should make sure to release the resources that gets allocated as a part of reader in CloseableIterator<InputRow> during normal and exception cases.

During conflicts in dimension/metrics names, the code will prefer dimension names from payload and ignore the dimension either from headers/key. This is done so that existing input formats can be easily migrated to this new format without worrying about losing information.

(cherry picked from commit ad6609a)
harinirajendran pushed a commit to confluentinc/druid that referenced this pull request Feb 23, 2022
…load parsing (apache#11630) (#52)

Today we ingest a number of high cardinality metrics into Druid across dimensions. These metrics are rolled up on a per minute basis, and are very useful when looking at metrics on a partition or client basis. Events is another class of data that provides useful information about a particular incident/scenario inside a Kafka cluster. Events themselves are carried inside kafka payload, but nonetheless there are some very useful metadata that is carried in kafka headers that can serve as useful dimension for aggregation and in turn bringing better insights.

PR(apache#10730) introduced support of Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those into relevant columns in Druid. Until that’s implemented, none of the information available in the Kafka message headers would be exposed. So first there is a need to write an input format that can parse headers in any given format(provided we support the format) like we parse payloads today. Apart from headers there is also some useful information present in the key portion of the kafka record. We also need a way to expose the data present in the key as druid columns. We need a generic way to express at configuration time what attributes from headers, key and payload need to be ingested into druid. We need to keep the design generic enough so that users can specify different parsers for headers, key and payload.

This PR is designed to solve the above by providing wrapper around any existing input formats and merging the data into a single unified Druid row.

Lets look at a sample input format from the above discussion

"inputFormat":
{
    "type": "kafka",     // New input format type
    "headerLabelPrefix": "kafka.header.",   // Label prefix for header columns, this will avoid collusions while merging columns
    "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is made available in case payload does not carry timestamp
    "headerFormat":  // Header parser specifying that values are of type string
    {
        "type": "string"
    },
    "valueFormat": // Value parser from json parsing
    {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [...]
        }
    },
    "keyFormat":  // Key parser also from json parsing
    {
        "type": "json"
    }
}

Since we have independent sections for header, key and payload, it will enable parsing each section with its own parser, eg., headers coming in as string and payload as json.

KafkaInputFormat will be the uber class extending inputFormat interface and will be responsible for creating individual parsers for header, key and payload, blend the data resolving conflicts in columns and generating a single unified InputRow for Druid ingestion.

"headerFormat" will allow users to plug parser type for the header values and will add default header prefix as "kafka.header."(can be overridden) for attributes to avoid collision while merging attributes with payload.

Kafka payload parser will be responsible for parsing the Value portion of the Kafka record. This is where most of the data will come from and we should be able to plugin existing parser. One thing to note here is that if batching is performed, then the code is augmenting header and key values to every record in the batch.

Kafka key parser will handle parsing Key portion of the Kafka record and will ingest the Key with dimension name as "kafka.key".

This is the class that orchestrates sending the consumerRecord to each parser, retrieve rows, merge the columns into one final row for Druid consumption. KafkaInputformat should make sure to release the resources that gets allocated as a part of reader in CloseableIterator<InputRow> during normal and exception cases.

During conflicts in dimension/metrics names, the code will prefer dimension names from payload and ignore the dimension either from headers/key. This is done so that existing input formats can be easily migrated to this new format without worrying about losing information.

(cherry picked from commit ad6609a)
kkonstantine pushed a commit to confluentinc/druid that referenced this pull request Sep 1, 2022
…load parsing (apache#11630) (#52)

Today we ingest a number of high cardinality metrics into Druid across dimensions. These metrics are rolled up on a per minute basis, and are very useful when looking at metrics on a partition or client basis. Events is another class of data that provides useful information about a particular incident/scenario inside a Kafka cluster. Events themselves are carried inside kafka payload, but nonetheless there are some very useful metadata that is carried in kafka headers that can serve as useful dimension for aggregation and in turn bringing better insights.

PR(apache#10730) introduced support of Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those into relevant columns in Druid. Until that’s implemented, none of the information available in the Kafka message headers would be exposed. So first there is a need to write an input format that can parse headers in any given format(provided we support the format) like we parse payloads today. Apart from headers there is also some useful information present in the key portion of the kafka record. We also need a way to expose the data present in the key as druid columns. We need a generic way to express at configuration time what attributes from headers, key and payload need to be ingested into druid. We need to keep the design generic enough so that users can specify different parsers for headers, key and payload.

This PR is designed to solve the above by providing wrapper around any existing input formats and merging the data into a single unified Druid row.

Lets look at a sample input format from the above discussion

"inputFormat":
{
    "type": "kafka",     // New input format type
    "headerLabelPrefix": "kafka.header.",   // Label prefix for header columns, this will avoid collusions while merging columns
    "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is made available in case payload does not carry timestamp
    "headerFormat":  // Header parser specifying that values are of type string
    {
        "type": "string"
    },
    "valueFormat": // Value parser from json parsing
    {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [...]
        }
    },
    "keyFormat":  // Key parser also from json parsing
    {
        "type": "json"
    }
}

Since we have independent sections for header, key and payload, it will enable parsing each section with its own parser, eg., headers coming in as string and payload as json.

KafkaInputFormat will be the uber class extending inputFormat interface and will be responsible for creating individual parsers for header, key and payload, blend the data resolving conflicts in columns and generating a single unified InputRow for Druid ingestion.

"headerFormat" will allow users to plug parser type for the header values and will add default header prefix as "kafka.header."(can be overridden) for attributes to avoid collision while merging attributes with payload.

Kafka payload parser will be responsible for parsing the Value portion of the Kafka record. This is where most of the data will come from and we should be able to plugin existing parser. One thing to note here is that if batching is performed, then the code is augmenting header and key values to every record in the batch.

Kafka key parser will handle parsing Key portion of the Kafka record and will ingest the Key with dimension name as "kafka.key".

This is the class that orchestrates sending the consumerRecord to each parser, retrieve rows, merge the columns into one final row for Druid consumption. KafkaInputformat should make sure to release the resources that gets allocated as a part of reader in CloseableIterator<InputRow> during normal and exception cases.

During conflicts in dimension/metrics names, the code will prefer dimension names from payload and ignore the dimension either from headers/key. This is done so that existing input formats can be easily migrated to this new format without worrying about losing information.

(cherry picked from commit ad6609a)
kkonstantine pushed a commit to confluentinc/druid that referenced this pull request Sep 1, 2022
…load parsing (apache#11630) (#52)

Today we ingest a number of high cardinality metrics into Druid across dimensions. These metrics are rolled up on a per minute basis, and are very useful when looking at metrics on a partition or client basis. Events is another class of data that provides useful information about a particular incident/scenario inside a Kafka cluster. Events themselves are carried inside kafka payload, but nonetheless there are some very useful metadata that is carried in kafka headers that can serve as useful dimension for aggregation and in turn bringing better insights.

PR(apache#10730) introduced support of Kafka headers in InputFormats.

We still need an input format to parse out the headers and translate those into relevant columns in Druid. Until that’s implemented, none of the information available in the Kafka message headers would be exposed. So first there is a need to write an input format that can parse headers in any given format(provided we support the format) like we parse payloads today. Apart from headers there is also some useful information present in the key portion of the kafka record. We also need a way to expose the data present in the key as druid columns. We need a generic way to express at configuration time what attributes from headers, key and payload need to be ingested into druid. We need to keep the design generic enough so that users can specify different parsers for headers, key and payload.

This PR is designed to solve the above by providing wrapper around any existing input formats and merging the data into a single unified Druid row.

Lets look at a sample input format from the above discussion

"inputFormat":
{
    "type": "kafka",     // New input format type
    "headerLabelPrefix": "kafka.header.",   // Label prefix for header columns, this will avoid collusions while merging columns
    "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is made available in case payload does not carry timestamp
    "headerFormat":  // Header parser specifying that values are of type string
    {
        "type": "string"
    },
    "valueFormat": // Value parser from json parsing
    {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [...]
        }
    },
    "keyFormat":  // Key parser also from json parsing
    {
        "type": "json"
    }
}

Since we have independent sections for header, key and payload, it will enable parsing each section with its own parser, eg., headers coming in as string and payload as json.

KafkaInputFormat will be the uber class extending inputFormat interface and will be responsible for creating individual parsers for header, key and payload, blend the data resolving conflicts in columns and generating a single unified InputRow for Druid ingestion.

"headerFormat" will allow users to plug parser type for the header values and will add default header prefix as "kafka.header."(can be overridden) for attributes to avoid collision while merging attributes with payload.

Kafka payload parser will be responsible for parsing the Value portion of the Kafka record. This is where most of the data will come from and we should be able to plugin existing parser. One thing to note here is that if batching is performed, then the code is augmenting header and key values to every record in the batch.

Kafka key parser will handle parsing Key portion of the Kafka record and will ingest the Key with dimension name as "kafka.key".

This is the class that orchestrates sending the consumerRecord to each parser, retrieve rows, merge the columns into one final row for Druid consumption. KafkaInputformat should make sure to release the resources that gets allocated as a part of reader in CloseableIterator<InputRow> during normal and exception cases.

During conflicts in dimension/metrics names, the code will prefer dimension names from payload and ignore the dimension either from headers/key. This is done so that existing input formats can be easily migrated to this new format without worrying about losing information.

(cherry picked from commit ad6609a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants