Cache local JAR file in ProtoBufCodeGenMessageDecoder to eliminate redundant remote fetches#18233
Cache local JAR file in ProtoBufCodeGenMessageDecoder to eliminate redundant remote fetches#18233rseetham wants to merge 1 commit intoapache:masterfrom
Conversation
…ant remote fetches
There was a problem hiding this comment.
Pull request overview
This PR optimizes protobuf decoder initialization in pinot-protobuf by caching the locally-copied schema JAR to avoid repeated remote fetches during consuming segment rollovers, while adding tests for cache hit and fetch-failure fallback behavior.
Changes:
- Add a JVM-level JAR cache and a
resolveJar()path inProtoBufCodeGenMessageDecoder.init(). - Close the per-init
URLClassLoaderafter codegen/Janino compilation. - Add unit tests covering cache-hit behavior and stale fallback on fetch failure.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufCodeGenMessageDecoder.java |
Introduces local JAR caching + fetch-failure fallback and classloader lifecycle updates. |
pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufCodeGenMessageDecoderTest.java |
Adds tests for cache hit and stale fallback; clears cache between tests. |
pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufUtilsTest.java |
Updates test classloading approach for descriptor lookup. |
| URL jarFile = getClass().getClassLoader().getResource("complex_types.jar"); | ||
| ClassLoader clsLoader = ProtoBufCodeGenMessageDecoder.loadClass(jarFile.getPath()); | ||
| ClassLoader clsLoader = new URLClassLoader(new URL[]{jarFile}); | ||
| Descriptors.Descriptor desc = ProtoBufCodeGenMessageDecoder.getDescriptorForProtoClass(clsLoader, | ||
| "org.apache.pinot.plugin.inputformat.protobuf.ComplexTypes$TestMessage"); |
There was a problem hiding this comment.
URLClassLoader implements Closeable, but this test never closes it. Use try-with-resources to avoid leaking an open JAR handle across the test suite (especially since this PR is explicitly addressing classloader/JAR handle churn).
| File localFile = resolveJar(topicName, jarPath); | ||
| URLClassLoader loader = new URLClassLoader(new URL[]{localFile.toURI().toURL()}); | ||
| Descriptors.Descriptor descriptor = getDescriptorForProtoClass(loader, protoClassName); | ||
| String codeGenCode = new MessageCodeGen().codegen(descriptor, fieldsToRead); | ||
| Class<?> recordExtractor = compileClass(protoMessageClsLoader, | ||
| Class<?> recordExtractor = compileClass(loader, | ||
| MessageCodeGen.EXTRACTOR_PACKAGE_NAME + "." + MessageCodeGen.EXTRACTOR_CLASS_NAME, codeGenCode); | ||
| _decodeMethod = recordExtractor.getMethod(MessageCodeGen.EXTRACTOR_METHOD_NAME, byte[].class, GenericRow.class); | ||
| loader.close(); |
There was a problem hiding this comment.
URLClassLoader loader is only closed on the happy path. If getDescriptorForProtoClass(), codegen(), compileClass(), or getMethod() throws, the classloader (and underlying JAR handle) will leak. Use try-with-resources (or a finally) so the loader is always closed.
| private static File resolveJar(String topicName, String jarPath) | ||
| throws Exception { | ||
| CachedJar cached = JAR_CACHE.get(topicName); | ||
| if (cached != null && cached._jarPath.equals(jarPath) && cached._localFile.exists()) { | ||
| return cached._localFile; |
There was a problem hiding this comment.
The cache is keyed only by topicName. Pinot can have multiple tables consuming the same topic (note other components key by (table, topic)), and in that case a fetch failure could cause this decoder to fall back to a cached JAR from a different table/config, risking silent schema mismatch and bad ingestion. Consider including at least protoClassName (and/or jarPath) in the cache key, or restricting stale fallback so it cannot reuse an entry from a different configuration scope.
There was a problem hiding this comment.
For a given kafka topic, the data can only be in 1 format. So the jar is the same to read from it. It the schema of different tables is different only those fields will be extracted per table. Here we are only caching the jar so that we don't keep fetching all the time.
| } | ||
| try { | ||
| File localFile = ProtoBufUtils.getFileCopiedToLocal(jarPath); | ||
| localFile.getParentFile().deleteOnExit(); |
There was a problem hiding this comment.
localFile.getParentFile().deleteOnExit() won’t delete the temp directory at JVM exit because the directory is non-empty (the JAR file remains). If you want cleanup-on-exit, register localFile.deleteOnExit() first and then register the directory, or otherwise perform recursive cleanup.
| localFile.getParentFile().deleteOnExit(); | |
| localFile.deleteOnExit(); | |
| File parentFile = localFile.getParentFile(); | |
| if (parentFile != null) { | |
| parentFile.deleteOnExit(); | |
| } |
| if (cached != null && cached._localFile.exists()) { | ||
| LOGGER.error("Failed to fetch JAR for topic '{}' from '{}', reusing stale local copy from '{}'. " | ||
| + "Rows decoded with the stale schema may be incorrect if the schema has changed.", | ||
| topicName, jarPath, cached._jarPath, e); |
There was a problem hiding this comment.
The error log says it is "reusing stale local copy from '…'" but the value being logged is cached._jarPath (remote path), not the local file path. This makes troubleshooting difficult; log cached._localFile (and optionally also log the stale remote path separately).
| topicName, jarPath, cached._jarPath, e); | |
| topicName, jarPath, cached._localFile, e); |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18233 +/- ##
=========================================
Coverage 63.36% 63.36%
Complexity 1627 1627
=========================================
Files 3243 3243
Lines 197038 197054 +16
Branches 30466 30468 +2
=========================================
+ Hits 124845 124856 +11
- Misses 62195 62204 +9
+ Partials 9998 9994 -4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal ingestion correctness risk; see inline comment.
| private static File resolveJar(String topicName, String jarPath) | ||
| throws Exception { | ||
| CachedJar cached = JAR_CACHE.get(topicName); | ||
| if (cached != null && cached._jarPath.equals(jarPath) && cached._localFile.exists()) { |
There was a problem hiding this comment.
This changes the rollout semantics from 're-fetch the protobuf JAR on every init' to 'reuse it indefinitely while the URI string stays the same'. Pinot deployments often replace the decoder JAR in place at the same S3/HDFS path during schema rollouts; after this cache hits once, long-lived servers will keep decoding with the old generated classes until restart, which can silently ingest rows with the wrong schema. We need either a freshness/version check here or an explicit versioned-URI contract before making the cached file authoritative.
There was a problem hiding this comment.
I dabbled with having a background job refresh the jar fetch every hour. The issue is the plugin module does not have access to the server executor service so I'll have to create and manage it here. So I don't think this is a good solution.
Another solution is having a cache with a ttl of an hour/ some configured value (server property) That would also force a periodic fetch. The issue here is if you set the segment completion time to the same time as the cache expiration, all segments completed at the same time so they would wait for the jar fetch anyway. Users would have to set this more carefully. But this solves the problem. I'll add this and address the other smaller comments that were brought up.
Still the fundamental issue with both of these is the only way to force a fetch is a restart in case the jar was changed in place. At the moment a table force commit will force a fetch. During incidents saying this will take 1 hr will be an issue.
Is there another solution you'd suggest?
Why
ProtoBufCodeGenMessageDecoder.init() is called once per consuming segment creation — once per topic partition every time a segment rolls over, and once per partition on server restart. Each call unconditionally fetched the protobuf schema JAR from remote storage (S3, HDFS, etc.) via ProtoBufUtils.getFileCopiedToLocal(), which copies the JAR into a new timestamped temp directory every time. The JAR only changes when the table's decoder config is updated, so in normal operation every fetch after the first is unnecessary network I/O.
Additionally, if jar is fetched by object store and that connection is broken, ingestion stops right now. With this fix, ingestion will continue based on the cached copy.
What
Introduce a JVM-level ConcurrentHashMap<String, CachedJar> keyed by topicName. CachedJar stores the remote JAR path and the local File it was copied to. On every init():
The URLClassLoader created to load the proto class is closed after the compiled Method is extracted, releasing the file handle immediately rather than accumulating them across segment rollovers.
How it behaves in each lifecycle event
Normal segment rollover: init() hits the cache, skips the remote fetch, runs codegen + Janino in memory (~ms), and returns. Each segment manager thread runs its own init() in parallel — no serialization across topics.
New table creation: First init() for that topicName — cache miss, JAR is fetched and cached. Subsequent segments for the same table hit the fast path.
Decoder config update (new JAR deployed): Next init() sees cached._jarPath != newJarPath, fetches the new JAR, replaces the cache entry.
Server restart: All cache entries are gone (JVM-level cache). Each partition's first init() after restart fetches the JAR once; subsequent rollovers hit the cache.
Tests
decodes correctly.
🤖 Generated with Claude Code