Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27 #18

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from

Conversation

tharanga
Copy link

@tharanga tharanga commented Oct 20, 2020

This is a WIP version of a Cassandra change stream emitter based on the CDC feature of Cassandra 4.0-beta2.

New dependencies:

  • Cassandra 4.0-beta2 Jar

New config:

  • cdc: configPath:Path to the Cassandra server config file

Pre-read:
https://cassandra.apache.org/doc/latest/operating/cdc.html

How to use:

  1. Enable CDC in Cassandra through cassandra.yaml : cdc_enabled: true
  2. Set commitlog_sync_period_in_ms: 10000 to a value on how quickly you want to observe changes (100ms lower limit)
  3. Enable CDC on a table ALTER TABLE <your table> WITH cdc=true;
  4. Change sidecar config cdc: configPath: to point to the cassandra.yaml
  5. Start the sidecar, insert data into the CDC enabled table and you'll see changes are emitted to the log

Current limitations:

  • Restart the sidecar upon schema changes
  • Other unknown bugs due to the absence of unit tests

Tasks of the initial version:

  • Read and emit changes from CDC enabled tables from the local Cassandra node

  • Emit events in real-time, as governed by Cassandra’s commit log flush interval

  • Emit events in the same order as they appear in commit log (optionally tradeoff this for throughout)

  • Bookmark the change stream; hence commit log read is resumable

  • Add a config option to enable/disable the CDC reader

  • Make bookmarking robust

  • Provide the flexibility of emitting changes in different formats (JSON, PartitionUpdate, Avro) to different output types (Kafka, Console, etc)

  • Add Console output

  • Add Kafka output

  • Add PartitionUpdate format

  • Add JSON format

  • (optional)Support start a fresh change stream from a full snapshot (of CDC enabled tables)

  • (optional) Provide an API to take full snapshots (of CDC enabled tables)

  • Support monitoring

  • Support managing the CDC log

  • Unit and integration tests

  • Automatic schema change detector

  • Publish performance characteristics

  • Decouple this code from a specific Cassandra version

@@ -24,3 +24,6 @@ sidecar:

healthcheck:
- poll_freq_millis: 30000

cdc:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can change from "cdc" to "cassandra config file" for we may got some other useage of cassandra yaml path not only cdc .

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I think this config path should be put to "cassandra:" which of the top choice of the sidecar.yaml

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd wait for a general need and then refactor it out of the cdc section. We can do it now if there's such a need.

src/main/java/org/apache/cassandra/sidecar/CQLSession.java Outdated Show resolved Hide resolved
src/main/java/org/apache/cassandra/sidecar/CQLSession.java Outdated Show resolved Hide resolved
src/main/java/org/apache/cassandra/sidecar/CQLSession.java Outdated Show resolved Hide resolved
@@ -151,6 +163,7 @@ public Configuration configuration() throws ConfigurationException, IOException
.setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null))
.setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null))
.setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false))
.setCassandraConfigPath(yamlConf.get(String.class, "cdc.configPath"))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I said before, the cassandra configre path can also be for cassandra choice at the sidecar.yaml。

}

public void start()
{
banner(System.out);
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
cdcReaderService.start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log for cdc reader service?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a log saying the CDCReaderService started? There is such a log statement in that class: logger.info("Successfully started the CDC reader");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

public void start()
{
banner(System.out);
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
cdcReaderService.start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides , do you think we should add a flag to enable or disable the cdc reader service?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Let me add that, so users who don't need this can just keep it disabled.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

public void start()
{
banner(System.out);
validate();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
cdcReaderService.start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I also think we can add a common method where all other service can be added inner the method .
such as startInitService() , for me ,I think cdcReaderService is a sidecar init start service and we can use a flag to
show the service's start or not .

logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds",
retryIntervalMs);
Thread.sleep(retryIntervalMs);
retryIntervalMs *= 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should add some retry limit ? in this case ,the program will always try to connect to cassandra.
I think we can add a default retry time such as 3, and init retryIntervalMs can be 10ms, we can retry 3 time ,if cluster is still null we can throw an exception?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modify the code :
long retryIntervalMs = 10;
int defaultRetryTime = 5;
Cluster cluster = null;

    while (cluster == null)
    {
        int retryTime = 0;
        if (this.session.getLocalCql() != null)
        {
            cluster = session.getLocalCql().getCluster();
        }
        if (cluster != null)
        {
            break;
        }
        else
        {
            logger.warn("Waiting for Cassandra server to start. Retrying after {} milliseconds",
                retryIntervalMs);
            if (retryTime++ >= defaultRetryTime)
                throw new InterruptedException(String.format("Can not connect to cassandra after retry  %s times", defaultRetryTime));
            Thread.sleep(retryIntervalMs);
            retryIntervalMs *= 2;
        }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether a retry limit is helpful. If the sidecar has to do something useful, it has to wait until the Cassandra starts. If this throws an exception after certain retries, does that mean the sidecar has to stop? or other services are started but not the CDC reader?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean is that set the process just sleep round and round is not a good choice, or we can just throw exception after some retry . So the users can go to check what is wrong with the process. Users may not know what is wrong with the process if the deamon is just sleep , they may think that the process is running healthy but actually the cassandra daemon is something wrong and the cdc reader is just sleep.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should let the user know what is going on

// to ensure CDC reader doesn't accidentally step on Cassandra data.
this.cassandraConfig.init();
// TODO : Load metadata from the CQLSession.
Schema.instance.loadFromDisk(false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we load from disk ? for all cassandra schema information ,we can just reside on cassandra driver ,we can make first connection to cassandra, and get the information from cassandra driver , also the tablemeta when we need to know if the table's cdc is enable

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I saw your "TODO"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can help with this todo, Now I am working on it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Max. Added a skeleton CDCSchemaChangeListener. Go ahead with the change.

*/
public interface Output extends Closeable
{
void emitChange(Change change) throws Exception;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides , I want to add some method such emitChange but the return is partitionUpdate

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the method signature would look like in that case?

}
for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
{
logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After read the code, I saw that this code is useless ,If the cdc is enable can get from mutation at the mutationhandler section . So the code here is useless I think ,And we may not doing the schema load disk method to get the schema meta data

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is just a dummy code for interim development work. I will remove or move these to debug logs.

private final CassandraConfig cassandraConfig;

@Inject
public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CDCReaderService is injected but I do not see where the object is injected ? including CDCIndexWathcher ,CDCRawDirectoryMonitor

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the CassandraSidecarDaemon?

Future<CommitLogPosition> mutationFuture = null;
private ExecutorService executor;
private CDCReaderMonitor monitor;
private CDCBookmark bookmark;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also add "private Configuration conf;" here and at the constructor function,we can set the conf to input configuration conf ,then we can use the configuration.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if we can just delete input params Configuration conf

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, dead code, removing it.

private final Configuration conf;

@Inject
CDCBookmark(Configuration conf)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration conf is not used in the class, so I want to know if we will use in the future ?same with MutationHandler class

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

{
// TODO: Make the output type configurable
bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class);
bind(Output.class).to(ConsoleOutput.class);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this method and like other variables such as HttpServer/Vertx/VertxRequestHandler/Router/Configuration use @provide to get the real value and the the out put can be configurable ;
that is my code;
@provides
@singleton
public Output outPut(Configuration conf)
{
String outPutClass = conf.getOutPut();
if (!outPutClass.contains("."))
outPutClass = "org.apache.cassandra.sidecar.common.output." + outPutClass; // I move the out put to common dir
Output output = FBUtilities.construct(outPutClass, "output");
return output;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output is an interface. Let me see whether there's a benefit to refactoring it like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing an advantage to the request, @Maxwell-Guo. The approach @tharanga took is pretty standard for a Guice binding.

this.cassandraConfig.init();
// TODO : Load metadata from the CQLSession.
Schema.instance.loadFromDisk(false);
this.cassandraConfig.muteConfigs();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we mute the loaded configuration file ? I think now we just use the cdc location ,we can just at first load the configuration get the data and assigned the cassandraConfig to null if you think the configuration may cost some memory

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to ensure no code path accidentally modifies the Cassandra data. Yes, we don't intentionally do it today, but this code guarantees that never happens.

continue;
}
for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here ,I think we can add a loop , that every time we check if there is a table have got its cdc enable in schema info ,
if all table 's cdc is not enabled ,we can just go to sleep for a while , such as commitlog flush period ,in this way for
users that do not use cdc ,we can just save some resource.
also every loop the schema should refresh (I think the cassandra driver have done ,we just use it );
when a table is cdc enabled ,we go to next step;m
my code here is :

// Ensure Cassandra config is valid and remove mutable data paths from the config
// to ensure CDC reader doesn't accidentally step on Cassandra data.
this.cassandraConfig.init();

        while (true)
        {
            int cdcEnableTables = 0;
            Metadata metadata = this.session.getLocalCql().getCluster().getMetadata();
            List<KeyspaceMetadata> keyspaceMetadatas = metadata.getKeyspaces();
            for (KeyspaceMetadata keyspaceMetadata : keyspaceMetadatas)
            {
                if (keyspaceMetadata == null)
                {
                    continue;
                }
                for (TableMetadata tableMetadata : keyspaceMetadata.getTables())
                {
                    if (tableMetadata.getOptions().isCDC())
                        ++cdcEnableTables;
                }
            }
            if (cdcEnableTables != 0)
                break;
            logger.warn("There is no table enable the cdc , just sleep for %", DatabaseDescriptor.getCommitLogSyncPeriod());
            Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
        }

        this.cassandraConfig.muteConfigs();
        // Start monitoring the cdc_raw directory
        this.cdcRawDirectoryMonitor.startMonitoring();

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was just added for debugging. Users can alter tables to enable CDC at any time. Sidecar shouldn't wait. Also, whatever user does is not visible to us without reloading metadata. This is where the CDCSchemaChangeListener is helpful.
+1 for the suggestion. When CDCSchemaChangeListener is working, we can add a guard rail to CDCIndexWatcher so it won't read commit log entries when there are no tables with CDC enabled.

}
// TODO : Don't be someone who just complains, do some useful work, clean files older than
// the last persisted bookmark.
this.monitor.reportCdcRawDirectorySizeInBytes(getCdcRawDirectorySize());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't got cdc data clean ? I saw your TODO, and Why we do not clean the cdc log that is useless after we emit the change of the log ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do delete commit logs after reading them : https://github.com/apache/cassandra-sidecar/pull/18/files#diff-32afd2c4bf3fe7d4c3268bec08a902cf36fb92b079d9c99fe47bafe0b073d5ceR289. However, if the output is blocked/slow for some reason (e.g. a Kafka node is down), then commit logs can pile up and eventually halt Cassandra writes. This is the default behavior, but we don't want that.

// to ensure CDC reader doesn't accidentally step on Cassandra data.
this.cassandraConfig.init();
// TODO : Load metadata from the CQLSession.
Schema.instance.loadFromDisk(false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know why you need to load the schema from disk....for the commitlog reader should deserialize from log need to have data judgement .

Copy link
Contributor

@rustyrazorblade rustyrazorblade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My biggest concern with this patch is organization - not the code itself. I spent a lot of time (months) trying to future proof the codebase where it comes to supporting multiple versions. I left a couple notes, but not an exhaustive review since structural changes may significantly impact the codebase.

I'm happy to go into any of the details of the prior work I did, since it's only 1 commit old it might not be obvious where I was going with it. Let me know if there's anything I can explain in detail that'll help you get more familiar with the codebase.

@@ -179,6 +179,7 @@ dependencies {

compile project(":common")
compile project(":cassandra40")
compile 'org.apache.cassandra:cassandra-all:4.0-beta2'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take a look at the last commit I added, I spent a lot of time trying to decouple the sidecar from using a specific version of Cassandra. Each version we decide to support can (and will) have an adapter, allowing us to maintain a single sidecar project that can work with different versions of Cassandra each of which has different implementations. There's no assurance that C* 5.0 will have the same CDC implementation as the 4.0 version. Could you please move the version specific logic into the cassandra40 subproject?

In addition, we may want to have the user point to their cassandra lib directory as well in order to not ship every version of C* with the sidecar. That will give us the flexibility for folks to use their own builds (private or public) as well as ship a smaller artifact. Since everyone has to run Cassandra I think this is a fair ask. Using a compileOnly dependency would allow us to test against each version of C* without shipping the jars.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rustyrazorblade +1 for both suggestions. I was thinking of punting this to a future commit, but I see the work you've done at a4805a910904019698ae373ac33f88855cf67f3d. Let me refactor this code to address both points.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
// TODO: Make the output type configurable
bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class);
bind(Output.class).to(ConsoleOutput.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing an advantage to the request, @Maxwell-Guo. The approach @tharanga took is pretty standard for a Guice binding.

@tharanga tharanga changed the title CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27 (WIP) CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27 Nov 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants