Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-5410 Added ability to process Cassandra 4 Commit log files incrementally #69

Merged
merged 1 commit into from Sep 1, 2022

Conversation

nitinitt
Copy link

I had a merge commit in my previous PR: #62 and am not able to remove that without altering the git history of "main" branch. Please accept this PR as part of:
https://issues.redhat.com/browse/DBZ-5410

@nitinitt
Copy link
Author

@jpechane @smiklosovic re-tagging you in this PR.

@smiklosovic
Copy link
Member

I ll do a review next week.

protected Cassandra4CommitLogProcessor.LogicalCommitLog commitLog;
protected int pollingInterval;

public AbstractCassandra4CommitLogParser(Cassandra4CommitLogProcessor.LogicalCommitLog commitLog, final List<ChangeEventQueue<Event>> queues,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move LogicalCommitLog out of Cassandra4CommitLogProcessor so it is standalone class in the same package? I find it weird that parser depends on some random stuff in log processor. Same situation with ProcessingResult.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessingResult might be renamed to CommitLogProcessingResult.

public Cassandra4CommitLogProcessor.ProcessingResult process() {
if (!commitLog.exists()) {
LOGGER.warn("Commit log " + commitLog + " does not exist!");
return new Cassandra4CommitLogProcessor.ProcessingResult(commitLog, Cassandra4CommitLogProcessor.ProcessingResult.Result.DOES_NOT_EXIST);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOES_NOT_EXIST might be imported.


while (!commitLog.completed) {
if (completePrematurely) {
LOGGER.info("{} completed prematurely", commitLog);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be {} processing completed prematurely or Processing of {} completed prematurely.

try {
parseIndexFile(commitLog);
while (!commitLog.completed) {
LOGGER.info("Polling for completeness of idx file for: {}", commitLog.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toString is not necessary.


CommitLogPosition commitLogPosition = null;
if (offset == null) {
LOGGER.info("Start to read the partial file : {}", commitLog.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commitLog.toString is not necessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you please remove redundant space before the colon? Like ... partial file: {}.

commitLogPosition = new CommitLogPosition(commitLog.commitLogSegmentId, 0);
}
else if (offset < commitLog.offsetOfEndOfLastWrittenCDCMutation) {
LOGGER.info("Resume to read the partial file : {}", commitLog.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commitLog.toString is not necessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus same space issue before colon.

commitLogPosition = new CommitLogPosition(commitLog.commitLogSegmentId, offset);
}
else {
LOGGER.info("No movement in offset in IDX file: {}", commitLog.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commitLog.toString is not necessary

parseIndexFile(commitLog);
}

LOGGER.info("IDX file is completed for: {}", commitLog.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once there is IDX other time idx. We should be consistent in it. Also, commitLog.toString here is not necessary.

return new Cassandra4CommitLogProcessor.ProcessingResult(commitLog);
}
catch (final Exception ex) {
LOGGER.error("Processing of {} errored out", commitLog.toString(), ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, toString is redundant

public static final Field COMMIT_LOG_REAL_TIME_PROCESSING_ENABLED = Field.create("commit.log.real.time.processing.enabled")
.withType(Type.BOOLEAN)
.withDefault(DEFAULT_COMMIT_LOG_REAL_TIME_PROCESSING_ENABLED)
.withDescription("Enables the near real time processing of commit logs for Cassandra 4 by reading commit log files incrementally");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not there be some hyphens in near real time? Like near real-time.

public static final Field COMMIT_LOG_MARKED_COMPLETE_POLL_INTERVAL_IN_MS = Field.create("commit.log.marked.complete.poll.interval.ms")
.withType(Type.INT)
.withDefault(DEFAULT_COMMIT_LOG_MARKED_COMPLETE_POLL_INTERVAL_IN_MS)
.withDescription("Defines the polling interval to check for Commit Log file marked complete in Cassandra 4");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be CommitLog, like these words should be together.

LOGGER.info("Polling for completeness of idx file for: {}", commitLog.toString());
if (completePrematurely) {
LOGGER.info("{} completed prematurely", commitLog.toString());
return new Cassandra4CommitLogProcessor.ProcessingResult(commitLog, Cassandra4CommitLogProcessor.ProcessingResult.Result.COMPLETED_PREMATURELY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be imported

LOGGER.debug("finished reading commit log segments {} on position {}", logicalCommitLog, position);
}
catch (Exception e) {
if (commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_COMMIT_LOG_TRANSFER_CLASS might be imported

parser = new Cassandra4CommitLogBatchParser(new LogicalCommitLog(index.toFile()), queues, metrics, this.context);
}

Future<ProcessingResult> future = executorService.submit(() -> parser.process());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

() -> parser.process() might be replaced with method reference like .submit(parser::proces).


protected void processCommitLog(Cassandra4CommitLogProcessor.LogicalCommitLog logicalCommitLog, CommitLogPosition position) {
try {
LOGGER.debug("starting to read commit log segments {} on position {}", logicalCommitLog, position);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you upper-case this? I am sorry I left it like that (I think it was me). Same with the next debug log below.

@smiklosovic
Copy link
Member

smiklosovic commented Aug 23, 2022

The main thing I am missing here is a proper test case. Without this, we can not know this actually works without testing that manually. I think the solid start would be to try to create a simple test case, something like this:

public class RealtimeTest extends AbstractCommitLogProcessorTest {

    @Override
    protected CassandraConnectorContext generateTaskContext() throws Exception {
	Properties properties = TestUtils.generateDefaultConfigMap();
	properties.put(CassandraConnectorConfig.COMMIT_LOG_REAL_TIME_PROCESSING_ENABLED, true);
	return generateTaskContext(Configuration.from(properties));
    }

    @Override
    public void initialiseData() throws Exception {

    }

    @Override
    public void verifyEvents() throws Exception {

    }
}

AbstractCommitLogProcessorTest is currently written in quite opinionated way and it does not count on the fact we might initialise and verify events differently. I think we might create another abstract class which satisfies real-time testing scenario better with refactored setup method and so on. That is where you might get quite creative, I have not explored that more deeply.

My idea of a test case is something like this:

  • insert few rows and wait until they are processed (like 10s or what is there, we might also change that config value for the sake of tests being faster / scanning occuring more often).
  • verify that rows were read
  • insert yet few more rows
  • verify only these added rows were processed

I think that after some refactoring this should be quite straightforward to do.

@nitinitt
Copy link
Author

nitinitt commented Aug 29, 2022

The main thing I am missing here is a proper test case. Without this, we can not know this actually works without testing that manually. I think the solid start would be to try to create a simple test case, something like this:

public class RealtimeTest extends AbstractCommitLogProcessorTest {

    @Override
    protected CassandraConnectorContext generateTaskContext() throws Exception {
	Properties properties = TestUtils.generateDefaultConfigMap();
	properties.put(CassandraConnectorConfig.COMMIT_LOG_REAL_TIME_PROCESSING_ENABLED, true);
	return generateTaskContext(Configuration.from(properties));
    }

    @Override
    public void initialiseData() throws Exception {

    }

    @Override
    public void verifyEvents() throws Exception {

    }
}

AbstractCommitLogProcessorTest is currently written in quite opinionated way and it does not count on the fact we might initialise and verify events differently. I think we might create another abstract class which satisfies real-time testing scenario better with refactored setup method and so on. That is where you might get quite creative, I have not explored that more deeply.

My idea of a test case is something like this:

  • insert few rows and wait until they are processed (like 10s or what is there, we might also change that config value for the sake of tests being faster / scanning occuring more often).
  • verify that rows were read
  • insert yet few more rows
  • verify only these added rows were processed

I think that after some refactoring this should be quite straightforward to do.

Hey @smiklosovic Thanks for the pointers on creating tests, and I totally agree to have tests to have confidence in this new feature. I was able to add a new test for real-time parsing, exactly as you mentioned in the comment above. It was little trickier than I thought particularly due to presence of 2 Cassandra YAML file(one for embedded cassandra/docker and other for running Cassandra Debezium tests). The existing tests provide coverage for CommitLogReadHandlerImpl class which is great, and this approach in new test class: CommitLogRealTimeParserTest can be leveraged to test the Cassandra4CommitLogProcessor class.
Also, have addressed all the comments as well. Hopefully the PR is in a mergeable state now.

for (int i = 0; i < expectedEventsCount; i++) {
Record record = (Record) events.get(i);
Assert.assertEquals("Operation type must be insert", Record.Operation.INSERT, record.getOp());
Assert.assertEquals("Inserted key should be " + i, record.getRowData().getPrimary().get(0).value, i + keyInc);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not this be "Inserted key should be" + (i + keyInc) ?


for (int i = 0; i < expectedEventsCount; i++) {
Record record = (Record) events.get(i);
Assert.assertEquals("Operation type must be insert", Record.Operation.INSERT, record.getOp());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to actually log what that operation was if it was not insert, it is nice we know it is not insert, so what is it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitinitt

this is enough but I was expecting it to be directly in that assert. Like

    Record.Operation op = record.getOp();
    Assert.assertEquals("Operation type must be insert but it was " + op, Record.Operation.INSERT, op);

I just do not to see the value in log and then in assert which does not log it when it fails. You would need to look into the logs first to find it there what it was.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds better, have updated the assert statement. Thanks!

String cdcLoc = DatabaseDescriptor.getCDCLogLocation();
LOGGER.info("CDC Location: {}", cdcLoc);

File[] commitLogs = CommitLogUtil.getIndexes(new File(cdcLoc));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what you get are not commitLogs but indexes. Hence in the next for loop, you are supposed to iterate over indexes and you are submitting an index as well. From index, the actual commit log file name is parsed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Have renamed the variable to commitLogIndexes for better readability

LOGGER.info("CDC Location: {}", cdcLoc);

File[] commitLogs = CommitLogUtil.getIndexes(new File(cdcLoc));
Thread.sleep(2000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this sleep good for? Would you mind to elaborate a bit? Maybe it would be better to use Awaitility and wait for some condition to happen? Having explicit sleep times like this tend to be flaky from what I observed in the past.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad you asked. Firstly, this sleep is at incorrect place. It should be above L89, i.e. before reading the idx files, and you are right this sleep can cause flakiness in tests and I have replaced it with Awaitility. Cassandra periodically(as defined in cassandra.yaml file, param: commitlog_sync_period_in_ms) flushes idx files and Commit Log files to cdc location. The sleep was to make sure idx file got generated/updated, and now it will be replaced by a condition.

@@ -222,7 +222,7 @@ cdc_enabled: true
# separate spindle than the data directories. If not set, the default directory is
# $CASSANDRA_HOME/data/cdc_raw.
# cdc_raw_directory: /var/lib/cassandra/cdc_raw
cdc_raw_directory: /var/lib/cassandra/cdc_raw_directory
cdc_raw_directory: /var/lib/cassandra/cdc_raw
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just double checking, is this what we want? (in case it was a typo)

Copy link
Author

@nitinitt nitinitt Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was intentional. It is to make sure the directory name is same as provided in cassandra-unit-for-context.yaml, so that test code can programmatically pick it up via: DatabaseDescriptor.getCDCLogLocation() method.

List<Event> events = queue.poll();
int count = 0;
int maxRetryCount = 3;
while (events.size() != expectedEventsCount && count < maxRetryCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Awaitility library for this type of code like we do in other connectors.

Copy link
Author

@nitinitt nitinitt Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated the test to use Awaitility.

@nitinitt nitinitt force-pushed the DBZ-5410-real-time-processor branch from daba56d to 6cd35c2 Compare August 30, 2022 20:37
@nitinitt
Copy link
Author

Hey @jpechane, I have incorporated all the code review feedback. Please let me know if its in a mergeable/acceptable state.

@jpechane jpechane merged commit af7c492 into debezium:main Sep 1, 2022
@jpechane
Copy link
Contributor

jpechane commented Sep 1, 2022

@nitinitt Applied, thanks a lot! Could you please provide a docs PR to the main repo reflecting the change in the connector behaviour? Thanks

@nitinitt
Copy link
Author

nitinitt commented Sep 1, 2022

@nitinitt Applied, thanks a lot! Could you please provide a docs PR to the main repo reflecting the change in the connector behaviour? Thanks

Thanks @jpechane for merging the PR. I will update the docs and create a new PR in Debezium repo. Would this feature be a part of 2.0.0.beta2 release? Would you want this PR to be applied to 1.9 series as well?

@jpechane
Copy link
Contributor

jpechane commented Sep 1, 2022

@nitinitt Yes, it will be in 2.0.0.Beta2. There is no need for 1.9 as this is a new feature and we usually limit backporting to bugfixes only.

@smiklosovic smiklosovic mentioned this pull request Mar 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants