Skip to content

Commit

Permalink
#605: Add config value for number of entries recieved with one bd que…
Browse files Browse the repository at this point in the history
…ry from connection-snap collection

Signed-off-by: Vadim Guenther <vadim.guenther@bosch.io>
  • Loading branch information
VadimGue committed Feb 18, 2021
1 parent cfff2de commit 839a03e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public final class DefaultReconnectConfig implements ReconnectConfig {
private final Duration interval;
private final RateConfig rateConfig;
private final int readJournalBatchSize;
private final int readSnapBatchSize;

private DefaultReconnectConfig(final ScopedConfig config, final RateConfig theRateConfig) {
initialDelay = config.getDuration(ReconnectConfigValue.INITIAL_DELAY.getConfigPath());
interval = config.getDuration(ReconnectConfigValue.INTERVAL.getConfigPath());
readJournalBatchSize = config.getInt(ReconnectConfigValue.READ_JOURNAL_BATCH_SIZE.getConfigPath());
readSnapBatchSize = config.getInt(ReconnectConfigValue.READ_SNAP_BATCH_SIZE.getConfigPath());
rateConfig = theRateConfig;
}

Expand Down Expand Up @@ -71,6 +73,11 @@ public int getReadJournalBatchSize() {
return readJournalBatchSize;
}

@Override
public int getReadSnapBatchSize() {
return readSnapBatchSize;
}

@Override
public RateConfig getRateConfig() {
return rateConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public interface ReconnectConfig {
*/
int getReadJournalBatchSize();

/**
* Returns the number of entries to read from the snap collection with one query.
*
* @return the number of entries to read with one query.
*/
int getReadSnapBatchSize();

/**
* Returns the config for recovery throttling.
*
Expand All @@ -71,7 +78,12 @@ enum ReconnectConfigValue implements KnownConfigValue {
/**
* The number of events to read in one query.
*/
READ_JOURNAL_BATCH_SIZE("read-journal-batch-size", 500);
READ_JOURNAL_BATCH_SIZE("read-journal-batch-size", 500),

/**
* The number of events to read in one query.
*/
READ_SNAP_BATCH_SIZE("read-snap-batch-size", 50);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private ReconnectActor(final ActorRef connectionShardRegion, final MongoReadJour
readJournal.getJournalPids(reconnectConfig.getReadJournalBatchSize(), reconnectConfig.getInterval(),
materializer);
currentPersistenceFromSnapIdsSourceSupplier = () ->
readJournal.getNewestSnapshotsAbove("", 4, materializer);
readJournal.getNewestSnapshotsAbove("", reconnectConfig.getReadSnapBatchSize(), materializer);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ ditto {
# how many events to read in one query
read-journal-batch-size = 500
read-journal-batch-size = ${?RECONNECT_READ_JOURNAL_BATCH_SIZE}
# how many entries to read in one query
read-snap-batch-size = 50
read-snap-batch-size = ${?RECONNECT_READ_SNAP_BATCH_SIZE}

# used to throttle recovery of connections, so that not all connections are recovered at the same time
rate {
Expand Down

0 comments on commit 839a03e

Please sign in to comment.