Skip to content

feat: CDC-based mixed index synchronization (#4873)#4906

Draft
porunov wants to merge 1 commit into
JanusGraph:masterfrom
porunov:feature/4873-cdc-mixed-index
Draft

feat: CDC-based mixed index synchronization (#4873)#4906
porunov wants to merge 1 commit into
JanusGraph:masterfrom
porunov:feature/4873-cdc-mixed-index

Conversation

@porunov

@porunov porunov commented Jun 28, 2026

Copy link
Copy Markdown
Member

Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving their updates from a Change-Data-Capture stream of the committed graph data, instead of a synchronous second write during the transaction that can diverge on failure and leave a permanently stale index.

Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) ->
Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk.

Key design points:

  • Reindex-from-current-state: the worker reads each changed element's current graph state and fully replaces its index document (reusing IndexSerializer, like transaction recovery). This is idempotent and order-independent, so out-of-order or duplicate events still converge to the current state -- no strict ordering required, and a stale event can never overwrite a fresh value.
  • No dual write: the only synchronous write is to storage; the index is updated downstream from the committed change stream, so it cannot diverge.
  • Element-keyed Kafka partitioning gives per-element ordering and horizontal scaling via a consumer group; batches are de-duplicated and applied as one ElasticSearch _bulk per index.
  • At-least-once: offsets are committed only after a batch is durably applied; on failure the batch is reprocessed (rewind) rather than skipped, so the index eventually catches up.

Configuration (opt-in, disabled by default):

  • storage.cql.cdc: emit the Cassandra cdc=true table option on the edgestore table.
  • index.[X].cdc.enabled / index.[X].cdc.synchronous: per-index dual mode (write synchronously AND via CDC) or cdc-only mode (skip the synchronous write; ES updated solely via CDC).

Components:

  • janusgraph-core: per-index CDC config options, the commit-side skip hook in StandardJanusGraph, and MixedIndexUpdateApplier (the backend-agnostic reindex-from-current-state engine).
  • janusgraph-cql: the storage.cql.cdc table option (no Kafka dependency in production code).
  • janusgraph-cdc (new module; core + kafka-clients): the CdcEventDecoder SPI, DebeziumCassandraJsonDecoder, CdcWorkerConfiguration, CdcIndexUpdateWorker, and the standalone CdcIndexUpdateWorkerMain runner.

Testing: 38 tests, including unit/component coverage (decoder vs real serialized bytes, reindex engine, worker loop via Kafka MockConsumer, full-chain convergence over Lucene incl. vertex/edge add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline. The full Debezium E2E is gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes it and stays green.

Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the regenerated configuration reference.

Fixes #4873
Replaces #4874


Thank you for contributing to JanusGraph!

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there an issue associated with this PR? Is it referenced in the commit message?
  • Does your PR body contain #xyz where xyz is the issue number you are trying to resolve?
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Is your initial contribution a single, squashed commit?

For code changes:

  • Have you written and/or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE.txt file, including the main LICENSE.txt file in the root of this repository?
  • If applicable, have you updated the NOTICE.txt file, including the main NOTICE.txt file found in the root of this repository?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving
their updates from a Change-Data-Capture stream of the committed graph data, instead of a
synchronous second write during the transaction that can diverge on failure and leave a
permanently stale index.

Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) ->
Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk.

Key design points:
- Reindex-from-current-state: the worker reads each changed element's current graph state and
  fully replaces its index document (reusing IndexSerializer, like transaction recovery). This is
  idempotent and order-independent, so out-of-order or duplicate events still converge to the
  current state -- no strict ordering required, and a stale event can never overwrite a fresh value.
- No dual write: the only synchronous write is to storage; the index is updated downstream from
  the committed change stream, so it cannot diverge.
- Element-keyed Kafka partitioning gives per-element ordering and horizontal scaling via a
  consumer group; batches are de-duplicated and applied as one ElasticSearch _bulk per index.
- At-least-once: offsets are committed only after a batch is durably applied; on failure the
  batch is reprocessed (rewind) rather than skipped, so the index eventually catches up.

Configuration (opt-in, disabled by default):
- storage.cql.cdc: emit the Cassandra cdc=true table option on the edgestore table.
- index.[X].cdc.enabled / index.[X].cdc.synchronous: per-index dual mode (write synchronously AND
  via CDC) or cdc-only mode (skip the synchronous write; ES updated solely via CDC).

Components:
- janusgraph-core: per-index CDC config options, the commit-side skip hook in StandardJanusGraph,
  and MixedIndexUpdateApplier (the backend-agnostic reindex-from-current-state engine).
- janusgraph-cql: the storage.cql.cdc table option (no Kafka dependency in production code).
- janusgraph-cdc (new module; core + kafka-clients): the CdcEventDecoder SPI,
  DebeziumCassandraJsonDecoder, CdcWorkerConfiguration, CdcIndexUpdateWorker, and the standalone
  CdcIndexUpdateWorkerMain runner.

Testing: 38 tests, including unit/component coverage (decoder vs real serialized bytes, reindex
engine, worker loop via Kafka MockConsumer, full-chain convergence over Lucene incl. vertex/edge
add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka ->
ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline. The full
Debezium E2E is gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+,
required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes it and
stays green.

Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the
regenerated configuration reference.

Fixes JanusGraph#4873
Replaces JanusGraph#4874

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in Change-Data-Capture (CDC) pipeline to keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with committed graph data by asynchronously reindexing affected elements from their current state, eliminating the “dual write” divergence risk.

Changes:

  • Introduces per-index CDC configuration (index.[X].cdc.*) and commit-path logic to skip synchronous mixed-index writes in cdc-only mode.
  • Adds MixedIndexUpdateApplier (reindex-from-current-state) and CdcElementChange in core, plus Cassandra storage.cql.cdc table option support.
  • Adds a new janusgraph-cdc module implementing a Kafka consumer worker, Debezium Cassandra JSON decoder, and extensive unit/component/E2E tests + documentation.

Reviewed changes

Copilot reviewed 33 out of 33 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pom.xml Registers new janusgraph-cdc Maven module in the build.
mkdocs.yml Adds CDC operator guide page to documentation nav.
janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java Validates reindex-from-current-state behavior over Lucene in cdc-only mode.
janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/CdcSkipMutationTest.java Verifies synchronous mixed-index write is skipped in cdc-only and retained in dual mode.
janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLCdcTableOptionTest.java Unit-tests that storage.cql.cdc toggles cdc=true on edgestore DDL only.
janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java Refactors CREATE TABLE building and conditionally applies Cassandra cdc=true for edgestore.
janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java Adds storage.cql.cdc configuration option.
janusgraph-core/src/test/java/org/janusgraph/graphdb/configuration/CdcIndexConfigTest.java Tests defaults and per-index scoping of index.[X].cdc.*.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java Computes cdc-only backing indexes and skips synchronous mixed-index writes for them.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java Makes TransactionLogHeader.Modification constructor public for reuse in decoding.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/MixedIndexUpdateApplier.java Adds backend-agnostic reindex-from-current-state applier for CDC worker.
janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/CdcElementChange.java Adds normalized “element changed” model consumed by the applier/worker.
janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java Adds index.[X].cdc.enabled and index.[X].cdc.synchronous options.
janusgraph-cdc/src/test/resources/cassandra-cdc.yaml Provides Cassandra config enabling CDC for full pipeline E2E test.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoderTest.java Tests Debezium JSON decoding against real JanusGraph-serialized bytes.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConvergenceTest.java Drives worker+decoder+applier via MockConsumer to validate convergence semantics.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConfigurationTest.java Tests worker configuration defaults and properties parsing.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcKafkaElasticsearchTest.java Testcontainers E2E for Kafka → worker → ElasticSearch convergence.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMainTest.java Tests runner wiring and config reflection of CDC-enabled backing indexes.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerLoopTest.java Unit-tests polling loop semantics (dedupe/retry/commit/rewind).
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcEventDecoderTest.java Smoke test for decoder SPI and CdcElementChange interop.
janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcCassandraDebeziumElasticsearchTest.java Full Cassandra CDC → Debezium → Kafka → ElasticSearch pipeline E2E (profile-gated).
janusgraph-cdc/src/test/java/io/debezium/connector/cassandra/JanusGraphCdcConnectorStarter.java Test-only bridge to start Debezium Cassandra connector embedded.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoder.java Implements Debezium Cassandra JSON → CdcElementChange decoding.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcWorkerConfiguration.java Defines immutable worker/Kafka configuration and consumer properties.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMain.java Adds standalone runner that opens graph, wires decoder+applier, starts workers.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java Implements Kafka consume/decode/dedupe/apply/retry/commit/rewind loop.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexApplier.java Functional interface to abstract index application for worker tests.
janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcEventDecoder.java Decoder SPI for CDC record formats.
janusgraph-cdc/pom.xml New module POM with Kafka clients + testcontainers/Debezium profile gating.
docs/configs/janusgraph-cfg.md Regenerates config reference including new CDC options.
docs/changelog.md Adds 1.2.0 upgrade note describing CDC mixed index synchronization.
docs/advanced-topics/cdc-mixed-index.md Adds operator guide for Cassandra CDC + Debezium + Kafka + worker setup.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

final Map<String, List<Long>> result = new HashMap<>();
final JanusGraphManagement mgmt = graph.openManagement();
try {
for (Class<? extends Element> elementType : Arrays.<Class<? extends Element>>asList(Vertex.class, Edge.class)) {
Comment on lines +114 to +118
if (relation.isEdge()) {
return new CdcElementChange(ElementCategory.EDGE, relation.id());
}
// A property (or system) column changed -> reindex the owning vertex.
return new CdcElementChange(ElementCategory.VERTEX, vertexId);
return;
}
running = true;
thread = new Thread(this, "janusgraph-cdc-worker");
.awaitGraphIndexStatus(graph, "vsearch").status(SchemaStatus.ENABLED).timeout(10, ChronoUnit.SECONDS).call();

Set<String> cdcIndexes = CdcIndexUpdateWorkerMain.cdcEnabledBackingIndexes((StandardJanusGraph) graph);
assertEquals(Set.of("search"), cdcIndexes);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support CDC mode for Mixed Index mutations

2 participants