Skip to content

Fix ProtoBufRecordExtractor cache invalidation for Protobuf schema evolution#18839

Closed
arunkumarucet wants to merge 2 commits into
apache:masterfrom
arunkumarucet:fix/protobuf-schema-evolution-descriptor-cache
Closed

Fix ProtoBufRecordExtractor cache invalidation for Protobuf schema evolution#18839
arunkumarucet wants to merge 2 commits into
apache:masterfrom
arunkumarucet:fix/protobuf-schema-evolution-descriptor-cache

Conversation

@arunkumarucet

Copy link
Copy Markdown
Contributor

Summary

Fixes a regression introduced in #17593 where ProtoBufRecordExtractor throws IllegalArgumentException: FieldDescriptor does not match message type after a Confluent Schema Registry Protobuf schema evolves (e.g. a new column is added to the schema).

Root cause: The field-descriptor cache added in #17593 used descriptor.getFullName() equality to detect schema changes. When a new schema version is registered, KafkaProtobufDeserializer returns DynamicMessage instances built from a new Descriptors.Descriptor object — same full name, different instance. The stale cache was never invalidated, so FieldDescriptor objects from the old Descriptor were passed to DynamicMessage.hasField() / getField() on the new message. Protobuf enforces strict object identity (fd.getContainingType() != descriptor), causing the crash.

Fix: Replace the full-name string comparison with object identity (!=). This is cheaper (no string allocation) and correctly detects any descriptor change, including same-name schema evolutions.

// Before (insufficient — same full name across schema versions)
if (_descriptorFullName == null || !_descriptorFullName.equals(descriptor.getFullName()))

// After (correct — new registry version → new Descriptor instance)
if (_cachedDescriptor != descriptor)

Test plan

  • Added testSchemaEvolutionSameFullNameDifferentDescriptorInstance to ProtoBufRecordExtractorCachingTest — builds two Descriptor instances programmatically with the same full name (Order) but different fields (V1: id, amount; V2: id, amount, status), exactly replicating what Confluent Schema Registry does on schema evolution. Without the fix this test throws IllegalArgumentException; with the fix all 13 caching tests pass.
  • ./mvnw -pl pinot-plugins/pinot-input-format/pinot-protobuf -Dtest=ProtoBufRecordExtractorCachingTest test → 13/13 pass
  • ./mvnw spotless:apply checkstyle:check license:check -pl pinot-plugins/pinot-input-format/pinot-protobuf → clean

…ing via JMX

Emit a per-table `tableTenantInfo` gauge from `SegmentStatusChecker` with the
server tenant name embedded as an extra key segment in the metric name:

  pinot.controller.tableTenantInfo.<tableNameWithType>.<serverTenant> = 1

This lets Prometheus scrape the metric via the JMX exporter and use a
`group_left(tenant)` join to attach the tenant label to any existing
table-scoped metric without modifying the core metrics pipeline.

Implementation details:
- The gauge is registered only on first encounter or when the tenant changes,
  avoiding redundant writes on every 5-minute SegmentStatusChecker cycle.
- Stale gauges are cleaned up on tenant change, null config, and table removal,
  tracked via an internal `_tableTenantMap`.
- A dedicated JMX exporter rule in `controller.yml` extracts `table`,
  `tableType`, `tenant`, and `database` labels. The rule is placed before the
  generic tableNameWithType rules to ensure the tenant segment is captured.
The field-descriptor cache introduced in PR apache#17593 keyed invalidation on
descriptor.getFullName(). When a Confluent Schema Registry schema evolves
(e.g. a new column is added), KafkaProtobufDeserializer returns a fresh
Descriptors.Descriptor object with the same full name but a different
instance. The stale cache was never invalidated, so FieldDescriptors from
the old Descriptor were applied to the new DynamicMessage. Protobuf
enforces strict object identity between a FieldDescriptor and its
containing Descriptor, causing:

  IllegalArgumentException: FieldDescriptor does not match message type

Fix: replace full-name string comparison with object identity (!=).
This is cheaper and correctly detects any descriptor change, including
same-name schema evolutions. A regression test is added that directly
replicates the failure scenario using two programmatically built
Descriptor instances with the same full name but different schema IDs.
@arunkumarucet

Copy link
Copy Markdown
Contributor Author

Superseded by a clean branch — only the 2 protobuf files are included.

@codecov-commenter

codecov-commenter commented Jun 23, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 76.19048% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.77%. Comparing base (14bc147) to head (5beea5e).
⚠️ Report is 10 commits behind head on master.

Files with missing lines Patch % Lines
...e/pinot/controller/helix/SegmentStatusChecker.java 72.22% 2 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18839      +/-   ##
============================================
+ Coverage     64.76%   64.77%   +0.01%     
- Complexity     1319     1322       +3     
============================================
  Files          3392     3393       +1     
  Lines        210949   211041      +92     
  Branches      33119    33140      +21     
============================================
+ Hits         136611   136712     +101     
+ Misses        63323    63297      -26     
- Partials      11015    11032      +17     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.77% <76.19%> (+0.01%) ⬆️
temurin 64.77% <76.19%> (+0.01%) ⬆️
unittests 64.77% <76.19%> (+0.01%) ⬆️
unittests1 56.97% <100.00%> (+0.01%) ⬆️
unittests2 37.19% <76.19%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants