Skip to content

[Enhancement] Replace native JNI CqCompactionFilter with built-in Java CompactionFilterFactory #10334

@lizhimins

Description

@lizhimins

Before Creating the Enhancement Request

  • I have confirmed that this should be classified as an enhancement rather than a bug/feature.

Summary

Replace the current native C++ JNI-based CqCompactionFilter (used for ConsumeQueue compaction in RocksDB mode) with RocksDB Java's built-in AbstractCompactionFilterFactory + RemoveConsumeQueueCompactionFilter API.

The current implementation requires:

  • A custom C++ shim library (cq_compaction_filter.cpp) compiled per platform (Linux .so, macOS .dylib, Windows .dll)
  • JNI loading with complex platform detection and temp directory extraction
  • Reflection to call ColumnFamilyOptions.setCompactionFilterHandle (a private method)
  • A NativeCqCompactionFilter wrapper with disOwnNativeHandle() to avoid use-after-free
  • Pre-loading the RocksDB native library to resolve DT_NEEDED / LC_LOAD_DYLIB dependencies

This should all be replaced with a pure Java ConsumeQueueCompactionFilterFactory that uses the public RocksDB Java API.

Motivation

  1. Maintainability: The native C++ shim must be recompiled for every target platform (Linux x86_64, macOS x86_64/aarch64, Windows x64). Each new platform requires a separate build toolchain and binary artifact. This is a significant maintenance burden for the community.

  2. Correctness / Safety: The current approach uses reflection to call a private setCompactionFilterHandle method, which is fragile across RocksDB Java version upgrades. The disOwnNativeHandle() workaround to prevent use-after-free adds complexity and risk.

  3. Build simplicity: The native library requires C++ compilation with RocksDB headers, which is not part of the standard Maven build. Contributors cannot easily build/test the compaction filter without a C++ toolchain.

  4. RocksDB Java already provides the right API: AbstractCompactionFilterFactory creates a fresh filter per compaction job, which naturally solves the minPhyOffset update problem — no need for a JNI setMinPhyOffset0() call. The factory's createCompactionFilter() method reads the latest minPhyOffset from a LongSupplier each time compaction starts.

Describe the Solution You'd Like

1. Add ConsumeQueueCompactionFilterFactory

A pure Java class extending AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter>, taking a LongSupplier for minPhyOffset. Each createCompactionFilter() call reads the latest offset and creates a new RemoveConsumeQueueCompactionFilter:

public class ConsumeQueueCompactionFilterFactory
        extends AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {

    private final LongSupplier minPhyOffsetSupplier;

    public ConsumeQueueCompactionFilterFactory(LongSupplier minPhyOffsetSupplier) {
        this.minPhyOffsetSupplier = minPhyOffsetSupplier;
    }

    @Override
    public RemoveConsumeQueueCompactionFilter createCompactionFilter(Context context) {
        return new RemoveConsumeQueueCompactionFilter(minPhyOffsetSupplier.getAsLong());
    }
}

2. Wire the factory into RocksDBOptionsFactory

Call setCompactionFilterFactory(factory) on the ColumnFamilyOptions for the default (CQ) column family:

public static ColumnFamilyOptions createCQCFOptions(MessageStore messageStore,
        ConsumeQueueCompactionFilterFactory factory) {
    return new ColumnFamilyOptions()
        // ... existing options ...
        .setCompactionFilterFactory(factory);
}

3. Remove the native C++ code and JNI infrastructure

  • Delete CqCompactionFilterJni.java, NativeCqCompactionFilter.java
  • Delete cq_compaction_filter.cpp and pre-built binaries (.so, .dylib)
  • Delete CqCompactionFilterJniTest.java
  • Remove the rocksdbjni-dev build dependency from store/pom.xml

4. Simplify ConsumeQueueRocksDBStorage

  • Remove all CqCompactionFilterJni.isLoaded() checks and conditional logic
  • Remove triggerCompactionSync(), flushAll(), countEntries() test-only methods
  • Compaction filtering is now always active (no degraded mode when native lib fails to load)

Describe Alternatives You've Considered

  1. Keep the native C++ approach but improve it: Add a CI matrix to auto-compile for all platforms, fix the reflection with a RocksDB Java PR to expose the method publicly. This would work but adds CI complexity and couples to RocksDB internals.

  2. Use a Java AbstractCompactionFilter (not factory): A single filter instance shared across compaction jobs. This was the approach in an intermediate refactoring (the NativeCqCompactionFilter wrapper). However, the factory pattern is superior because it avoids shared mutable state (setMinPhyOffset updates) and lifecycle issues (filter outliving options).

  3. Do compaction filtering in application code (not in RocksDB): Iterate and delete expired entries manually. This is much slower than letting RocksDB filter during compaction (which is a free piggyback on existing I/O) and doesn't clean up tombstones.

Additional Context

  • RemoveConsumeQueueCompactionFilter is already available in the RocksDB Java library bundled with RocketMQ — no new dependencies required.
  • The CompactionFilterFactory pattern is the recommended RocksDB approach for filters that need per-compaction state, as documented in the RocksDB wiki.
  • This change reduces the store module's native code from ~300 lines of C++ to zero, and removes ~400 lines of Java JNI loading/reflection code.
  • Net effect: -1200 lines (including native binaries), +50 lines of clean Java code.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions