522 - Kafka steram client upgrade initial change#77
Conversation
|
@sanath-madhav Please update the required details in the PR, such as link to ticket it resolves, section for "Describe behaviour before the change", checklist, etc. |
kaushalaroraharman
left a comment
There was a problem hiding this comment.
Comments added in-line
| @Override | ||
| public void write(final WriteBatch batch) throws RocksDBException { | ||
| db.write(writeOptions, batch); | ||
| public void write(final WriteBatchInterface batch) throws RocksDBException { |
There was a problem hiding this comment.
Please provide reason for this change from WriteBatch to WriteBatchInterface as the method parameter.
Also please document this on the design page here : https://github.com/HARMAN-Auto/sp-platform-productenablers-streambase/blob/main/docs/designs/kafka-client-upgrade-3.9.1.md, along with all the other required details for this upgrade.
There was a problem hiding this comment.
Interface signature change
from
void write(final WriteBatch batch) throws RocksDBException
to
void write(final WriteBatchInterface batch) throws RocksDBException
| if (batch instanceof WriteBatch writeBatch) { | ||
| db.write(writeOptions, writeBatch); | ||
| } else { | ||
| throw new IllegalArgumentException("Batch must be an instance of WriteBatch"); |
There was a problem hiding this comment.
If any other implementation is not supported by this method, add an error level logger as well before throwing the exception and also add the type of instance received by the method.
There was a problem hiding this comment.
Added error log before throwing the exception
| } | ||
| } else { | ||
| batch.put(key, value); | ||
| throw new IllegalArgumentException("Batch must be an instance of WriteBatch"); |
There was a problem hiding this comment.
Same as above comment, add a logger and the type of instance received in this case.
There was a problem hiding this comment.
Added error log before throwing the exception
| effectiveConfigProps.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); | ||
| effectiveConfigProps.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), Constants.INT_1000000); | ||
| effectiveConfigProps.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); | ||
| effectiveConfigProps.put("broker.id", 0); |
There was a problem hiding this comment.
Per the documentation, this has been removed in favour of KRaft adoption.
There should be properties available in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala, which can be used. Avoid hardcoding property names here, and use the one provided by Kafka client library. Only if not able to use from KafkaConfig class, you can create these in propertyNames file, whilst some would already be existing.
| final Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper); | ||
| LOG.debug("Starting a Kafka instance on port {} ...", | ||
| effectiveBrokerConfig.getProperty(KafkaConfig.ListenersProp())); | ||
| effectiveBrokerConfig.getProperty("listeners")); |
There was a problem hiding this comment.
Do not hardcode property names directly in the code. Either fetch from Kafka client library, if the source has changed post upgrade, or use from PropertyNames class in stream-base.
| public String brokerList() { | ||
| final EndPoint endPoint = ((ArraySeq<EndPoint>) kafka.advertisedListeners()).head(); | ||
| // Kafka 3.9.1 returns ArrayBuffer instead of ArraySeq | ||
| final scala.collection.Seq<EndPoint> listeners = kafka.advertisedListeners(); |
There was a problem hiding this comment.
Is there any particular reason for using the fully qualified name here instead of importing the required class?
There was a problem hiding this comment.
Api has been changed
final EndPoint endPoint = ((ArrayBuffer) kafka.advertisedListeners()).head();
| final EndPoint endPoint = ((ArraySeq<EndPoint>) kafka.advertisedListeners()).head(); | ||
| // Kafka 3.9.1 returns ArrayBuffer instead of ArraySeq | ||
| final scala.collection.Seq<EndPoint> listeners = kafka.advertisedListeners(); | ||
| final EndPoint endPoint = listeners.apply(0); |
There was a problem hiding this comment.
Are .apply(0) and .head() equivalent?
There was a problem hiding this comment.
Api has been refactored
final EndPoint endPoint = ((ArrayBuffer) kafka.advertisedListeners()).head();
Updated the respective details |
|
| mqttClientId, platform); | ||
| } | ||
| }); | ||
| }).join(); |
There was a problem hiding this comment.
This change doesn't seem to be related to kafka streams upgrade. If fixing an existing bug, please do so in a separate PR.
d2d0454
into
eclipse-ecsp:feature/74-feat-upgrade-kafka-streams-version-to-410
Please refer to our contributing docs for any questions on submitting a pull request.
Issues are required for both bug fixes and features.
Resolves #522
Describe behaviour before the change
Describe behaviour after the change
Pull request checklist
Does this introduce a breaking change?