Skip to content

Conversation

@Jiabao-Sun
Copy link
Contributor

Mongo sink waits for new record to write previous records. I have a upsert-kafka topic filled that has already some events. I start a new upsert-kafka to mongo db sink job. I expect all the data from the topic to be loaded to mongodb right away. But instead, only the first record is written to mongo db. The rest of the records don’t arrive in mongodb until a new event is written to kafka topic. The new event that was written is delayed until the next event arrives.

To prevent this problem, the MongoWriter should regularly check whether the last write time is over the specific limit.

@Jiabao-Sun
Copy link
Contributor Author

Hi @leonardBang,
Could you help review this?

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Jiabao-Sun for the contribution, I left minor comments

Comment on lines 136 to 137
writeOptions.getBatchIntervalMs(),
writeOptions.getBatchIntervalMs(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batchIntervalMs could be initialized earlier during construct the MongoDBWriter

try {
doBulkWrite();
} catch (Exception e) {
LOG.warn("Writing records to MongoDB failed when closing MongoWriter", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error level?

}

private boolean flushOnlyOnCheckpoint() {
return writeOptions.getBatchIntervalMs() == -1 && writeOptions.getBatchSize() == -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optimize to calculate once in constructor

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Jiabao-Sun for the update, LGTM, let's wait the CI green

@Jiabao-Sun
Copy link
Contributor Author

Thanks @Jiabao-Sun for the update, LGTM, let's wait the CI green

Thanks @leonardBang, the CI seems passed.

@leonardBang leonardBang merged commit 49b7550 into apache:main Jun 28, 2023

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
public synchronized void write(IN element, Context context)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jiabao-Sun just curious, wouldn't adding synchronized here hurt the performance for write operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @6harat with this comment.
Adding synchronized here because we may add an other thread to check whether the last write time is over limit.
The thread may call doBulkWrite method and clear the bulkRequests list which is write by write method introduces a race condition.

dannycranmer pushed a commit that referenced this pull request Aug 4, 2023
…whether the latest flush time is arriving

This closes #12.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants