[FLINK] Support Apache Pulsar as DataSource#12297
Conversation
7814777 to
13a947e
Compare
|
@KevinyhZou @liujiayi771 @lgbo-ustc Pls review. |
1c1b8b5 to
284a079
Compare
| processAvailableElement(sourceContext); | ||
| break; | ||
| case BLOCKED: | ||
| task.waitFor(); |
There was a problem hiding this comment.
explain the reason for adding this?
There was a problem hiding this comment.
This line was added in the Pulsar source support commit.
Previously the BLOCKED case only logged "Get empty row" and continued the loop, which was fine for Nexmark since it generates data in batches and rarely returns BLOCKED. However, for real streaming sources like Pulsar/Kafka, when there's no upstream data available, advance() returns BLOCKED. Without waitFor(), the run loop would spin indefinitely calling advance() in a tight loop and pin the CPU.
waitFor() blocks the thread until the native side has data available or the task is closed. This is the standard UpIterator usage pattern in velox4j.
Regarding cancellation: Flink calls cancel() (sets isRunning = false) followed by close() (calls task.close()). The native task close will complete the blocking future inside waitFor(), which unblocks the thread. The run loop then checks isRunning and exits.
7303242 to
8683a14
Compare
| return transform; | ||
| } | ||
|
|
||
| private int getWindowPropertyIndex(Class<? extends WindowProperty> propertyClass) { |
There was a problem hiding this comment.
This seems useless code.
| LOG.warn("Failed to load Velox source/sink factory", e); | ||
| } | ||
| for (String factoryClassName : FACTORY_CLASS_NAMES) { | ||
| Optional<VeloxSourceSinkFactory> factory = loadFactory(factoryClassName, parameters); |
There was a problem hiding this comment.
Seems by implemeting this, we can remove the spi org.apache.gluten.velox/.VeloxSourceSinkFactory
There was a problem hiding this comment.
Yes, The FACTORY_CLASS_NAMES list and the SPI file register exactly the same 7 factories. getFactory() first attempts ServiceLoader (SPI), then falls back to manual class loading from FACTORY_CLASS_NAMES. This duplication is intentional — runtime modules cannot depend on planner module classes at compile time (there is a risk of circular dependencies), and the manual loadFactory explicitly tries three different class loaders (from the parameter, the thread context, and the interface's own loader), which is more robust than ServiceLoader's single-loader approach in Flink's class loader hierarchy.
If this is to be cleaned up in the future, the safer direction is to keep the FACTORY_CLASS_NAMES manual loading and remove the SPI file, because the manual path handles class loaders more robustly. However, this refactoring is outside the scope of this PR and should be verified in a real Flink deployment. For now, keep it as-is.
4b2ba97 to
20ec499
Compare
- Simplify isPulsarSource to use class simple name check (like Kafka) - Replace hand-rolled reflection with ReflectUtils.getObjectField - Adapt NexmarkSourceFactory to new NexmarkGeneratorConfig API - Adapt PrintSinkFactory to new PrintTableHandle constructor (isStdErr) - Update CI velox4j pin to gluten-0530 latest commit
…tArgs The NexmarkTest was failing with InaccessibleObjectException because surefire forks a new JVM that reads argLine (not JAVA_TOOL_OPTIONS). Adding --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED to the POM's extraJavaTestArgs ensures the forked JVM always has the required opens regardless of environment variables.
fea7f8c to
9a326aa
Compare
Summary
Add Gluten Flink planner/runtime support for Pulsar sources backed by Velox. Solves #12310 .
Depends on bigo-sg/velox#42 and bigo-sg/velox4j#35.
This patch:
PulsarSourceSinkFactorythat maps Flink Pulsar table/source options to Velox Pulsar connector parameters.connector-pulsarin Gluten Flink connector config and service discovery.Test Plan
mvn -pl ut -am -Dtest=PulsarSourceSinkFactoryTest -DfailIfNoTests=false testAI Tooling Disclosure
Cowork with codex.