Skip to content

Commit c972c60

Browse files
committed
+1 modifications to flushBy(topicName) functionality given PR feedback.
1 parent 1f1680d commit c972c60

File tree

5 files changed

+72
-68
lines changed

5 files changed

+72
-68
lines changed

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.ArrayList;
1616
import java.util.Collection;
1717
import java.util.List;
18+
import java.util.stream.Collectors;
1819
import lombok.AccessLevel;
1920
import lombok.AllArgsConstructor;
2021
import lombok.Builder;
@@ -342,21 +343,23 @@ public Collection<TransactionOutboxEntry> selectNextInTopics(
342343
@Override
343344
public Collection<TransactionOutboxEntry> selectNextInSelectedTopics(
344345
Transaction tx, List<String> topicNames, int batchSize, Instant now) throws Exception {
346+
347+
var topicsInParameterList = topicNames.stream().map(it -> "?").collect(Collectors.joining(","));
345348
var sql =
346349
dialect
347350
.getFetchNextInSelectedTopics()
348351
.replace("{{table}}", tableName)
349-
.replace(
350-
"{{topicNames}}",
351-
topicNames.stream()
352-
.map(it -> "'%s'".replace("%s", it))
353-
.reduce((a, b) -> a.concat(",").concat(b))
354-
.orElse("''"))
352+
.replace("{{topicNames}}", topicsInParameterList)
355353
.replace("{{batchSize}}", Integer.toString(batchSize))
356354
.replace("{{allFields}}", ALL_FIELDS);
357355
//noinspection resource
358356
try (PreparedStatement stmt = tx.connection().prepareStatement(sql)) {
359-
stmt.setTimestamp(1, Timestamp.from(now));
357+
var counter = 1;
358+
for (var topicName : topicNames) {
359+
stmt.setString(counter, topicName);
360+
counter++;
361+
}
362+
stmt.setTimestamp(counter, Timestamp.from(now));
360363
var results = new ArrayList<TransactionOutboxEntry>();
361364
gatherResults(stmt, results);
362365
return results;

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.time.Clock;
44
import java.time.Duration;
5+
import java.util.Arrays;
6+
import java.util.List;
57
import java.util.concurrent.Executor;
68
import java.util.function.Supplier;
79
import lombok.ToString;
@@ -103,13 +105,26 @@ default boolean flush() {
103105
boolean flush(Executor executor);
104106

105107
/**
106-
* Flushes a specific topic
108+
* Flushes a specific topic (or set of topics)
107109
*
108-
* @param executor
109-
* @param topicName
110-
* @return
110+
* @param executor to be used for parallelising work (note that the method overall is blocking and
111+
* this is solely ued for fork-join semantics).
112+
* @param topicNames the list of specific topics to flush
113+
* @return true if any work was flushed
114+
*/
115+
default boolean flushTopics(Executor executor, String... topicNames) {
116+
return flushTopics(executor, Arrays.asList(topicNames));
117+
}
118+
119+
/**
120+
* Flushes a specific topic (or set of topics)
121+
*
122+
* @param executor to be used for parallelising work (note that the method overall is blocking and
123+
* this is solely ued for fork-join semantics).
124+
* @param topicNames the list of specific topics to flush
125+
* @return true if any work was flushed
111126
*/
112-
boolean flushTopic(Executor executor, String topicName);
127+
boolean flushTopics(Executor executor, List<String> topicNames);
113128

114129
/**
115130
* Unblocks a blocked entry and resets the attempt count so that it will be retried again.

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -150,33 +150,17 @@ public boolean flush(Executor executor) {
150150
}
151151

152152
@Override
153-
public boolean flushTopic(Executor executor, String topicName) {
153+
public boolean flushTopics(Executor executor, List<String> topicNames) {
154154
if (!initialized.get()) {
155155
throw new IllegalStateException("Not initialized");
156156
}
157157
Instant now = clockProvider.get().instant();
158-
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
159-
futures.add(
160-
CompletableFuture.runAsync(() -> expireIdempotencyProtection(now), executor)
161-
.thenApply(it -> false));
162-
163-
futures.add(
164-
CompletableFuture.supplyAsync(
165-
() -> {
166-
log.debug("Flushing selected topic {}", topicName);
167-
return doFlush(
168-
tx ->
169-
uncheckedly(
170-
() ->
171-
persistor.selectNextInSelectedTopics(
172-
tx, List.of(topicName), flushBatchSize, now)));
173-
},
174-
executor));
175158

176-
return futures.stream()
177-
.reduce((f1, f2) -> f1.thenCombine(f2, (d1, d2) -> d1 || d2))
178-
.map(CompletableFuture::join)
179-
.orElse(false);
159+
log.debug("Flushing selected topics {}", topicNames);
160+
return doFlush(
161+
tx ->
162+
uncheckedly(
163+
() -> persistor.selectNextInSelectedTopics(tx, topicNames, flushBatchSize, now)));
180164
}
181165

182166
private void expireIdempotencyProtection(Instant now) {

transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -505,57 +505,59 @@ final void retryBehaviour() throws Exception {
505505
final void flushOnlyASpecifiedTopic() throws Exception {
506506
TransactionManager transactionManager = txManager();
507507
CountDownLatch successLatch = new CountDownLatch(1);
508-
AtomicInteger attempts = new AtomicInteger(-1);
509508
var processedEntryListener = new ProcessedEntryListener(successLatch);
510509
TransactionOutbox outbox =
511510
TransactionOutbox.builder()
512511
.transactionManager(transactionManager)
513512
.persistor(Persistor.forDialect(connectionDetails().dialect()))
514-
.instantiator(new FailingInstantiator(attempts))
513+
.instantiator(
514+
Instantiator.using(
515+
clazz ->
516+
(InterfaceProcessor)
517+
(foo, bar) ->
518+
LOGGER.info(
519+
"Entered the method to process successfully. Processing ({}, {})",
520+
foo,
521+
bar)))
515522
.submitter(Submitter.withExecutor(singleThreadPool))
516523
.attemptFrequency(Duration.ofMillis(500))
517524
.listener(processedEntryListener)
518-
.blockAfterAttempts(5)
519525
.build();
520526

521527
clearOutbox();
522528

523529
var selectedTopic = "SELECTED_TOPIC";
524-
withRunningFlusher(
525-
outbox,
530+
transactionManager.inTransaction(
526531
() -> {
527-
transactionManager.inTransaction(
528-
() -> {
529-
outbox.with().schedule(InterfaceProcessor.class).process(2, "Whee");
530-
outbox
531-
.with()
532-
.ordered(selectedTopic)
533-
.schedule(InterfaceProcessor.class)
534-
.process(3, "Whoo");
535-
outbox
536-
.with()
537-
.ordered("IGNORED_TOPIC")
538-
.schedule(InterfaceProcessor.class)
539-
.process(2, "Wheeeee");
540-
});
532+
outbox
533+
.with()
534+
.ordered(selectedTopic)
535+
.schedule(InterfaceProcessor.class)
536+
.process(1, "Whoo");
537+
outbox
538+
.with()
539+
.ordered("IGNORED_TOPIC")
540+
.schedule(InterfaceProcessor.class)
541+
.process(2, "Wheeeee");
542+
});
543+
assertFalse(
544+
successLatch.await(5, SECONDS),
545+
"At this point, nothing should have been picked up for processing");
541546

542-
assertTrue(successLatch.await(20, SECONDS), "Timeout waiting for success");
543-
var successes = processedEntryListener.getSuccessfulEntries();
544-
var failures = processedEntryListener.getFailingEntries();
547+
outbox.flushTopics(singleThreadPool, selectedTopic);
545548

546-
// then we only expect the selected topic we're flushing to have had eventually succeeded
547-
// as the other work would not have been picked up for a retry
548-
assertEquals(1, successes.stream().map(TransactionOutboxEntry::getId).distinct().count());
549+
assertTrue(successLatch.await(5, SECONDS), "Should have successfully processed something");
549550

550-
// a single failure is expected; from the unordered submission (as these run automatically
551-
assertEquals(1, failures.stream().filter(it -> it.getTopic() == null).count());
551+
var successes = processedEntryListener.getSuccessfulEntries();
552+
var failures = processedEntryListener.getFailingEntries();
552553

553-
// all other failures accounted for belong to the selected topic
554-
assertEquals(
555-
2, failures.stream().filter(it -> selectedTopic.equals(it.getTopic())).count());
556-
},
557-
singleThreadPool,
558-
selectedTopic);
554+
// then we only expect the selected topic we're flushing to have had eventually succeeded
555+
// as the other work would not have been picked up for a retry
556+
assertEquals(1, successes.stream().map(TransactionOutboxEntry::getTopic).distinct().count());
557+
assertEquals(selectedTopic, successes.get(0).getTopic());
558+
559+
// no failures expected
560+
assertEquals(0, failures.size());
559561
}
560562

561563
@Test

transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ protected void withRunningFlusher(
9494
log.info("Starting flush...");
9595
while (topicName == null
9696
? outbox.flush(executor)
97-
: outbox.flushTopic(executor, topicName)) {
97+
: outbox.flushTopics(executor, topicName)) {
9898
log.info("More work to do...");
9999
}
100100
log.info("Done!");

0 commit comments

Comments
 (0)