Skip to content

Commit

Permalink
feat: add key on a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumelamirand committed Mar 5, 2024
1 parent 49f9d36 commit 49ec37f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class Batch {
@Builder.Default
private String id = UUID.random().toString();

/**
* The key to identify the batch
*/
private String key;

/**
* The target id of the batch
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private void startBatchScheduler() {
if (clusterManager.self().primary()) {
log.debug("Executing Batch scheduled tasks");
this.batchStore.findByStatus(BatchStatus.PENDING)
.doOnNext(batch -> log.debug("Retry Batch {} for target id {}", batch.id(), batch.targetId()))
.doOnNext(batch ->
log.info("Retrying batch '{}' with key '{}' and target id '{}'", batch.id(), batch.key(), batch.targetId())
)
.flatMapSingle(this::sendBatchCommands)
.ignoreElements()
.blockingAwait();
Expand Down Expand Up @@ -152,7 +154,9 @@ public Single<Reply<?>> sendCommand(final Command<?> command, final String targe
@Override
public Single<Batch> executeBatch(final Batch batch) {
if (isBatchFeatureEnabled()) {
return this.batchStore.add(batch).flatMap(this::sendBatchCommands);
return this.batchStore.add(batch)
.doOnSuccess(b -> log.info("Executing batch '%s' with key '%s'".formatted(b.id(), b.key())))
.flatMap(this::sendBatchCommands);
} else {
return Single.error(new BatchDisabledException());
}
Expand All @@ -173,6 +177,7 @@ public Single<Batch> watchBatch(final String batchId) {
private Single<Batch> sendBatchCommands(final Batch batch) {
return this.updateBatch(batch.start())
.filter(a -> a.status().equals(BatchStatus.IN_PROGRESS))
.doOnSuccess(b -> log.debug("Batch '%s' for target '%s' and key '%s' in progress".formatted(b.id(), b.targetId(), b.key())))
.flatMapSingle(updateBatch -> {
List<BatchCommand> commands = updateBatch
.batchCommands()
Expand Down Expand Up @@ -205,7 +210,20 @@ private Single<Batch> sendCommands(final Batch batch, final List<BatchCommand> b
.flatMap(this::updateBatch)
)
.takeWhile(updatedBatch -> updatedBatch.status() == BatchStatus.IN_PROGRESS)
.last(batch);
.last(batch)
.doOnSuccess(b -> {
switch (b.status()) {
case PENDING -> log.info(
"Batch '%s' for target id '%s' and key '%s' is scheduled for retry".formatted(b.id(), b.targetId(), b.key())
);
case SUCCEEDED -> log.info(
"Batch '%s' for target id '%s' and key '%s' has succeed".formatted(b.id(), b.targetId(), b.key())
);
case ERROR -> log.info(
"Batch '%s' for target id '%s' and key '%s' stopped in error".formatted(b.id(), b.targetId(), b.key())
);
}
});
}

private Single<Batch> updateBatch(final Batch batch) {
Expand Down

0 comments on commit 49ec37f

Please sign in to comment.