Skip to content
This repository has been archived by the owner on Nov 18, 2021. It is now read-only.

DBZ-607 Add Cassandra connector to debezium-incubator #98

Merged
merged 3 commits into from
Jul 17, 2019

Conversation

jgao54
Copy link
Contributor

@jgao54 jgao54 commented Jun 27, 2019

The Cassandra connector is an implementation of change data capture for Cassandra. It is intended to be deployed on each Cassandra node, acting as an agent that parses the commit logs from cdc_raw directory and generate events to Kafka.

Note: currently it does not handle deduplication, ordering of events, or hydration -- guidelines will be provided on how each case should be handled.

Dependencies:

  • Cassandra >= 3.0, < 4.0
  • Kafka
  • Confluent Schema Registry
  • Google Cloud Storage [optional]

Detailed documentation will be updated/linked in the following days/weeks.

Comments/suggestions/questions welcome.

Update: see debezium/debezium.github.io#325 for documentation (still a WIP as well).

@jgao54 jgao54 changed the title WIP: [DBZ-607] Add Cassandra connector to debezium-incubator WIP: DBZ-607 Add Cassandra connector to debezium-incubator Jun 27, 2019
@gunnarmorling
Copy link
Member

gunnarmorling commented Jun 28, 2019

Wow, this is freakin' awesome, @jgao54! I'm diving into this right now.

Some very high-level questions upfront:

  • What are the expectations in terms of provided/running services when building this? I tried to run mvn clean install on the CLI, but this failed all the tests. Do I need to have Kafka and Cassandra already running?
  • ad Confluent Schema Registry: is this hard-wired? Or can we replace it with pluggable serializers as with Kafka Connect?
  • ad Google Cloud Storage: I see that's behind a configurable interface already. So could that CommitLogTransfer implementation be maintained separately? It could easily be extracted, so you could maintain it under your own organization or internally. It seems quite specific, so I'd prefer to avoid the dependency here.
  • Are there some integration tests which test everything together?

Thanks a lot! I'll get back to you with more feedback in a bit.

Copy link
Member

@gunnarmorling gunnarmorling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @jgao54, this is great work!

I did a first review pass and added some comments inline. So far it's mostly related to aligning this with existing concepts/code from the debezium-core module. It seems there are some pieces which are very similar to existing code (e.g. the BlockingQueue); were you planning to replace these with their core counterparts eventually?

I'll keep diving into this, probably coming back with some more questions more related to the Cassandra specifics, once I actually understood them better :)

Really excited about this and am looking forward to this connector very much!

* This queue stores the change events sent from the readers and gets processed by {@link QueueProcessor}
* where the events will get emitted to kafka.
*/
public class BlockingEventQueue<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you re-use ChangeEventQueue from core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will create a subsequent PR to replace this.

<packaging>jar</packaging>

<dependencies>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the entire Cassandra distribution? Could we narrow this down to specific modules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunate not able to narrow this down :(

<artifactId>annotations</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the CommitLogTransfer implementation be maintained separately? I see it's already an interface with a configurable implementation, so it seems it could easily be extracted and you'd maintain it under your own organization or internally? It seems quite specific, so I'd prefer to avoid the dependency here. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CommitLogTransfer is responsible for archiving processed commit logs (both successful and otherwise), so uploading to GCS for archiving seem to be a pretty common use case imo.

What are your thoughts on making the gcs dependency as optional dependency + create a top level contrib/ directory to store community contributions? This way it won't be as intrusive, but still makes the class available for use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it really seems quite specific and it's not something I'd like us to maintain going forward. So IMO the best would be to maintain this in a separate repo, not under the Debezium organization. If you cannot host it internally or under a repo owned by WePay for whatever reasons, the one alternative I could see us doing is to add it somewhere in our examples repo. Note we don't publish builds for these examples, though. Could this be an option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, that's fair, i can move it out for now. adding to example repo would be useful too.


String configPath = args[0];
try (FileInputStream fis = new FileInputStream(configPath)) {
Map<String, Object> props = new Yaml().load(fis);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be made a plain properties file? It'll avoid the YAML dependency and also simplify converging with other connector configs later on (which need to be configurable as K/V pairs via Connect's REST API).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will create a subsequent PR to replace YAML with property file.

/**
* Responsible for selecting the Kafka topic that the record will get send to.
*/
public class CassandraTopicSelector {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the core TopicSelector be used (might require some adjustments, not sure)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create separate PR, but this should be doable.

/**
* Metadata about the source of the change event
*/
public class SourceInfo implements AvroRecord {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be derived from BaseSourceInfo or AbstractSourceInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Similar to the other comment, I will address this refactoring in a subsequent PR.


public static class KeyValueSchema {
private final TableMetadata tableMetadata;
private final Schema keySchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so this is actually specific to Avro then. One of the next steps should be to use the Kafka Connect Schema system, so we then can apply different serializers which convert it to Avro, JSON etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!

import java.util.concurrent.atomic.AtomicBoolean;

/**
* The schema processor is responsible for periodically
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this means there might be a mismatch of schema and event, right? E.g. we're processing an event from the beginning of the log file, and just now the schema has been updated. I'm not sure how this could be handled with Cassandra.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. This is one caveat of the Cassandra commit logs. It's actually even more nuanced than that: the mutations in commit log are not always in order, so we could process an event at time T followed by another event at time T-1. This means the concept of a DatabaseHistory is very difficult to implement here since we can't rely on file position to determine order of events, instead we must rely on (client) timestamp. But since commit logs don't include schema change, we won't be have the timestamp for schema change...
Haven't been able to come up with a great solution on this.

* -1). This means if the SnapshotProcessor is terminated midway, upon restart
* it will skip all the tables that are already recorded in offset.properties
*/
public class SnapshotProcessor extends AbstractProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the transition from snapshot to log reading work? Is it possible to process the logs from the exact position of the snapshot? Or would it more be an approximation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it process the logs from exact position of the snapshot is going to be extremely difficult to implement because:
(1) events are logged out of order on a single node
(2) even if we can somehow sort the events on each node, the same event replicated in the cluster will arrive in Kafka in an indeterministic order.

This is why we were not concerned with snapshotting must happen prior to commit log processing. We have both of them starting in separate threads at the same time instead. And since Cassandra is LWW, the 'sorting' is done downstream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha; what is "LWW"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, last-write-win

@gunnarmorling
Copy link
Member

not handle deduplication, ordering of events, or hydration

@jgao54, on the first two: would it be an option to set up one node in the Cassandra cluster so that it receives the changes from all tables and then set up the agent just on this single node? Just throwing out the idea -- I've not much experience with Cassandra, so it might not be doable.

And WDYM by "hydration" in this context? Thanks!

@jgao54
Copy link
Contributor Author

jgao54 commented Jun 28, 2019

Wow, thanks for taking a first look already @gunnarmorling, will address your questions in coming days.

@gunnarmorling
Copy link
Member

gunnarmorling commented Jun 29, 2019

Cool. Also sent a PR against your fork with some tiny fixes related to dependencies.

I also believe now I get what you mean by "hydration": recovering the full row state if there are only partial update events (only containing affected columns). I think ideally that'd just be done on the sink side. E.g. SQL naturally would let you update the given colums only, although I just learned that apparently the existing JDBC sink connector always does full row updates. Elasticsearch would be another case AFAIK.

IIRC I saw the idea of using KStreams as a solution to recover full state if not supported by sinks themselves; I could definitely see that being one tool in the box. Interestingly, the same question just recently came up for MySQL (via non-full binlog row mode). So this would be another case benefitting from this. In any case it's a separate discussion, but I wanted to get out my thoughts around it :)

@jgao54
Copy link
Contributor Author

jgao54 commented Jul 10, 2019

@gunnarmorling addressed your questions inline.

Given a lot of the work is regarding to replacing custom class with DBZ class, I will create subsequent PR to address each one of them.
For the immediate PR, want to get your thoughts on (1), and I'll look into (2)
(1) optional dependency (see inline comment)
(2) narrow down Cassandra dependency if possible

Also RE: comments above:

What are the expectations in terms of provided/running services when building this? I tried to run mvn clean install on the CLI, but this failed all the tests. Do I need to have Kafka and Cassandra already running?

Odd, all tests passed for me. what is the error you are seeing?

Confluent Schema Registry: is this hard-wired? Or can we replace it with pluggable serializers as with Kafka Connect?

Right now it is hard-wired yes, i think it would be nice to use the Schema class in Kafka Connect to support various serialization, as long as it's not too coupled with the rest of Kafka Connect. I haven't looked into it in details.

Google Cloud Storage: I see that's behind a configurable interface already. So could that CommitLogTransfer implementation be maintained separately? It could easily be extracted, so you could maintain it under your own organization or internally. It seems quite specific, so I'd prefer to avoid the dependency here.

See inline comment.

Are there some integration tests which test everything together?

Not right now, tests have been done manually so far, definitely going to add some IT down the road.

Cool. Also sent a PR against your fork with some tiny fixes related to dependencies.

Thanks! I'll make the change.

I also believe now I get what you mean by "hydration": ...

Yep, that's what i meant.

IIRC I saw the idea of using KStreams as a solution to recover full state if not supported by sinks themselves.

This is interesting. I think one tricky part is where to put the state store. Shouldn't be local ideally, because it's the same instance which Cassandra runs on. If it's remote, it could be slow to fetch reads, which could potentially lead to a growing backlog.
Stream processing after events recorded to Kafka feels a bit safer, but that will be burden on the end user of the connector.

One thing @criccomini and I were discussing today was that in Cassandra 4.0 there is a new consistency level called NODE_LOCAL, which allows querying just the local node (this is not currently possible).
What's nice about querying local node only are (1) local read will be much faster than a remote read, making it less frowned upon to perform a read after every write (2) no dependencies since we don't have to replicate the storage into a separate state store.
But there are of course downsides: (1) the extra read on each write will still increase db's workload, which may concern some users (2) the after values in the row is an approximation, since another column may be changed by another client at the same time.

@gunnarmorling
Copy link
Member

Stream processing after events recorded to Kafka feels a bit safer, but that will be burden on the end user of the connector.

Yes, so that's what you'd get with Kafka Streams. The connectors would write the data to Kafka topics, Kafka Streams would read from there, hydrate the complete state and write back to another Kafka topic. State stores would be local to the Kafka Streams node(s) by default via RocksDB.

@jgao54
Copy link
Contributor Author

jgao54 commented Jul 10, 2019

Yes, so that's what you'd get with Kafka Streams.

ah okay, I misunderstood earlier

@jgao54 jgao54 force-pushed the debezium-connector-cassandra branch from 8fd25bb to f8439cd Compare July 11, 2019 10:00
@gunnarmorling
Copy link
Member

Hey @jgao54, just noticed you pushed another commit, thanks! How should we proceed, do you have more changes lining up right now, or do you think it's ok for the initial merge?

@jgao54
Copy link
Contributor Author

jgao54 commented Jul 15, 2019

@gunnarmorling it's good for initial merge! I'm going to focus on the additional DBZ migration this week (mentioned in the PR comment above).

@jgao54 jgao54 changed the title WIP: DBZ-607 Add Cassandra connector to debezium-incubator DBZ-607 Add Cassandra connector to debezium-incubator Jul 15, 2019
@gunnarmorling gunnarmorling merged commit bf59f46 into debezium:master Jul 17, 2019
@gunnarmorling
Copy link
Member

Rebased and applied. Thanks a lot, @jgao54!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants