Skip to content

[HUDI-6683] Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource#9403

Merged
danny0405 merged 6 commits intoapache:masterfrom
prathit06:add-kafka-key-json-source
Aug 15, 2023
Merged

[HUDI-6683] Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource#9403
danny0405 merged 6 commits intoapache:masterfrom
prathit06:add-kafka-key-json-source

Conversation

@prathit06
Copy link
Contributor

@prathit06 prathit06 commented Aug 9, 2023

Change Logs

This changes add capability to add kafka message key as part of hudi metadata columns for JsonKafkaSource & AvroKafkaSource
For context : #9391

Impact

Describe any public API or user-facing feature change or any performance impact. : NA

Risk level (write none, low medium or high below) : None

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update : None

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@prathit06 prathit06 changed the title Added kafka key as part of hudi metadata columns for JsonKafkaSource Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource Aug 9, 2023
recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, kafkaKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, kafkaKey);
recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, String.valueOf(consumerRecord.key()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accepted the suggestion & have made the changes

newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0));
newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0));
newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, createNullableSchema(Schema.Type.STRING), "kafka key column", JsonProperties.NULL_VALUE));
Schema newSchema = Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key is always a string type? Could it be bytes in Kafka ?

Copy link
Contributor Author

@prathit06 prathit06 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key will always be string type , please refer : https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java#L61
"key.deserializer", StringDeserializer.class.getName()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the code that the key is used as a record key field, I didn't see it.

Copy link
Contributor Author

@prathit06 prathit06 Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Referring to the discussion here , the idea was to add kafka key as part of hudi metadata column & not as a recordKey.

In order to set kafka key as record key, end user can do so by setting hoodie.datasource.write.recordkey.field to _hoodie_kafka_source_key , please refer here for more context.

@prathit06 prathit06 requested a review from danny0405 August 10, 2023 15:41
@prathit06
Copy link
Contributor Author

@hudi-bot run azure

@prathit06 prathit06 changed the title Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource [MINOR] Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource Aug 10, 2023
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 changed the title [MINOR] Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource [HUDI-6683] Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource Aug 11, 2023
@prathit06
Copy link
Contributor Author

Hi @danny0405 , if all looks good, can we merge this PR ?
Please let me if any other action item.

@danny0405
Copy link
Contributor

Hi @danny0405 , if all looks good, can we merge this PR ? Please let me if any other action item.

Have re-triggered the tests.

@prathit06
Copy link
Contributor Author

Hi @danny0405 , if all looks good, can we merge this PR ? Please let me if any other action item.

Have re-triggered the tests.

tests are success for CI @danny0405

ObjectMapper om = new ObjectMapper();
partitionIterator.forEachRemaining(consumerRecord -> {
String record = consumerRecord.value().toString();
String recordKey = (String) consumerRecord.key();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String recordKey = (String) consumerRecord.key();
String recordKey = consumerRecord.key().toString();

@@ -80,11 +81,13 @@ protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object
ObjectMapper om = new ObjectMapper();
partitionIterator.forEachRemaining(consumerRecord -> {
String record = consumerRecord.value().toString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think renaming this variable to recordValue might make the code more readable:

Suggested change
String record = consumerRecord.value().toString();
String recordValue = consumerRecord.value().toString();

recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, String.valueOf(consumerRecord.key()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, String.valueOf(consumerRecord.key()));
recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, consumerRecord.key().toString());

@danny0405
Copy link
Contributor

Thanks for the nice feedback @hussein-awala , maybe you can fire a separate PR to address it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants