Skip to content

Fix uncaught ParseException when reading Avro from Kafka#14183

Merged
abhishekagarwal87 merged 10 commits intoapache:masterfrom
abhishekrb19:avro_unhandled_parse_exception
May 4, 2023
Merged

Fix uncaught ParseException when reading Avro from Kafka#14183
abhishekagarwal87 merged 10 commits intoapache:masterfrom
abhishekrb19:avro_unhandled_parse_exception

Conversation

@abhishekrb19
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 commented Apr 28, 2023

Fixes #13894, #14041.

In StreamChunkParser#parseWithInputFormat, we call byteEntityReader.read() without handling a potential ParseException, which is thrown during this function call by the delegate AvroStreamReader#intermediateRowIterator.
A ParseException can be thrown if an Avro stream has corrupt data or data that doesn't conform to the schema specified or for other decoding reasons. This exception if uncaught, can cause ingestion to fail.

Primary code changes:

  • Handle ParseException in the call stack StreamChunkParser.
  • Improved the error message when the avro schema id cannot be found.
  • Added a new constructor to StreamChunkParser to facilitate unit testing. This allows dependencies to be passed in by the caller, making testing easier.
  • Fix miscellaneous typos and javadocs in the modified code.

Unit test changes:

  • Add unit tests in StreamChunkParserTest and KafkaIndexTaskTest to test this exception handling behavior alongside setting maximum allowed parse exceptions to different thresholds.
  • Replace NoopRowIngestionMeters and a mocked RowIngestionMeters with a more realistic implementation, SimpleRowIngestionMeters, used to validate any parse exceptions.

Release note

Fixes #13894, #14041.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Copy Markdown
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for fixing this.

@abhishekrb19
Copy link
Copy Markdown
Contributor Author

There is the usual flaky kafka slow IT failure unrelated to this change. Hopefully re-running should fix it:

2023-04-28T11:19:34,972 INFO [main] org.apache.druid.security.basic.authorization.db.cache.CoordinatorPollingBasicAuthorizerCacheManager - CoordinatorPollingBasicAuthorizerCacheManager is stopping.
2023-04-28T11:19:34,972 INFO [main] org.apache.druid.security.basic.authorization.db.cache.CoordinatorPollingBasicAuthorizerCacheManager - CoordinatorPollingBasicAuthorizerCacheManager is stopped.
2023-04-28T11:19:34,972 INFO [main] org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager - CoordinatorPollingBasicAuthenticatorCacheManager is stopping.
2023-04-28T11:19:34,972 INFO [main] org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager - CoordinatorPollingBasicAuthenticatorCacheManager is stopped.
2023-04-28T11:19:35,014 INFO [Curator-Framework-0] org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting
2023-04-28T11:19:35,134 INFO [main] org.apache.zookeeper.ZooKeeper - Session: 0x100000e65e40007 closed
2023-04-28T11:19:35,135 INFO [main-EventThread] org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x100000e65e40007
2023-04-28T11:19:35,154 INFO [main] org.apache.druid.java.util.emitter.core.LoggingEmitter - {"feed":"metrics","metric":"jetty/numOpenConnections","service":"","host":"","value":0,"timestamp":"2023-04-28T11:19:35.152Z"}
2023-04-28T11:19:35,154 INFO [main] org.apache.druid.java.util.emitter.core.LoggingEmitter - Close: started [false]
2023-04-28T11:19:35,154 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle [module] stage [INIT]
Error:  Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2,038.216 s <<< FAILURE! - in TestSuite
Error:  testKafkaIndexDataWithLosingCoordinator(org.apache.druid.tests.indexer.ITKafkaIndexingServiceTransactionalSerializedTest)  Time elapsed: 234.292 s
Error:  testKafkaIndexDataWithLosingHistorical(org.apache.druid.tests.indexer.ITKafkaIndexingServiceTransactionalSerializedTest)  Time elapsed: 192.643 s
Error:  testKafkaIndexDataWithLosingOverlord(org.apache.druid.tests.indexer.ITKafkaIndexingServiceTransactionalSerializedTest)  Time elapsed: 189.737 s
Error:  testQueryNilColumnBeforeAndAfterPublishingSegments(org.apache.druid.tests.indexer.ITNilColumnTest)  Time elapsed: 1,344.173 s  <<< FAILURE!
org.apache.druid.java.util.common.ISE: Max number of retries[240] exceeded for Task[Waiting for all tasks to stop]. Failing.
	at org.apache.druid.tests.indexer.ITNilColumnTest.testQueryNilColumnBeforeAndAfterPublishingSegments(ITNilColumnTest.java:126)

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The production changes and new tests LGTM. I suggest also adding a test for Kafka indexing itself, to future-proof this bug fix against potential future refactors or entire replacements of StreamChunkParser. That test doesn't need to be Avro-specific.

@abhishekrb19 abhishekrb19 requested a review from gianm May 3, 2023 18:39
@abhishekrb19
Copy link
Copy Markdown
Contributor Author

@gianm, thanks for the review. Added a UT to Kafka indexing as well.

@abhishekagarwal87 abhishekagarwal87 merged commit 68f908e into apache:master May 4, 2023
abhishekrb19 added a commit to abhishekrb19/incubator-druid that referenced this pull request May 5, 2023
)

In StreamChunkParser#parseWithInputFormat, we call byteEntityReader.read() without handling a potential ParseException, which is thrown during this function call by the delegate AvroStreamReader#intermediateRowIterator.
A ParseException can be thrown if an Avro stream has corrupt data or data that doesn't conform to the schema specified or for other decoding reasons. This exception if uncaught, can cause ingestion to fail.
@abhishekagarwal87 abhishekagarwal87 added this to the 26.0 milestone May 5, 2023
abhishekagarwal87 pushed a commit that referenced this pull request May 5, 2023
…14212)

In StreamChunkParser#parseWithInputFormat, we call byteEntityReader.read() without handling a potential ParseException, which is thrown during this function call by the delegate AvroStreamReader#intermediateRowIterator.
A ParseException can be thrown if an Avro stream has corrupt data or data that doesn't conform to the schema specified or for other decoding reasons. This exception if uncaught, can cause ingestion to fail.
@abhishekrb19 abhishekrb19 deleted the avro_unhandled_parse_exception branch August 4, 2023 14:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Uncaught ParseException when reading Avro from Kafka

3 participants