Skip to content

Commit 9c9d62f

Browse files
committed
introduce method of flushing by topic (can be extended to flush by topic(s) instead).
1 parent 1cf92cc commit 9c9d62f

File tree

10 files changed

+238
-1
lines changed

10 files changed

+238
-1
lines changed

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ static Builder builder(String name) {
2828
@Getter private final String lock;
2929
@Getter private final String checkSql;
3030
@Getter private final String fetchNextInAllTopics;
31+
@Getter private final String fetchNextInSelectedTopics;
3132
@Getter private final String fetchCurrentVersion;
3233
@Getter private final String fetchNextSequence;
3334
private final Collection<Migration> migrations;
@@ -78,6 +79,12 @@ static final class Builder {
7879
+ " AND seq = ("
7980
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = false"
8081
+ ") LIMIT {{batchSize}}";
82+
private String fetchNextInSelectedTopics =
83+
"SELECT {{allFields}} FROM {{table}} a"
84+
+ " WHERE processed = false AND topic IN ({{topicNames}}) AND nextAttemptTime < ?"
85+
+ " AND seq = ("
86+
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = false"
87+
+ ") LIMIT {{batchSize}}";
8188
private String fetchCurrentVersion = "SELECT version FROM TXNO_VERSION FOR UPDATE";
8289
private String fetchNextSequence = "SELECT seq FROM TXNO_SEQUENCE WHERE topic = ? FOR UPDATE";
8390

@@ -183,6 +190,7 @@ Dialect build() {
183190
lock,
184191
checkSql,
185192
fetchNextInAllTopics,
193+
fetchNextInSelectedTopics,
186194
fetchCurrentVersion,
187195
fetchNextSequence,
188196
migrations.values()) {

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,30 @@ public Collection<TransactionOutboxEntry> selectNextInTopics(
339339
}
340340
}
341341

342+
@Override
343+
public Collection<TransactionOutboxEntry> selectNextInSelectedTopics(
344+
Transaction tx, List<String> topicNames, int batchSize, Instant now) throws Exception {
345+
var sql =
346+
dialect
347+
.getFetchNextInSelectedTopics()
348+
.replace("{{table}}", tableName)
349+
.replace(
350+
"{{topicNames}}",
351+
topicNames.stream()
352+
.map(it -> "'%s'".replace("%s", it))
353+
.reduce((a, b) -> a.concat(",").concat(b))
354+
.orElse("''"))
355+
.replace("{{batchSize}}", Integer.toString(batchSize))
356+
.replace("{{allFields}}", ALL_FIELDS);
357+
//noinspection resource
358+
try (PreparedStatement stmt = tx.connection().prepareStatement(sql)) {
359+
stmt.setTimestamp(1, Timestamp.from(now));
360+
var results = new ArrayList<TransactionOutboxEntry>();
361+
gatherResults(stmt, results);
362+
return results;
363+
}
364+
}
365+
342366
@Override
343367
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
344368
throws Exception {

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public interface Dialect {
2222

2323
String getFetchNextInAllTopics();
2424

25+
String getFetchNextInSelectedTopics();
26+
2527
String getFetchCurrentVersion();
2628

2729
String getFetchNextSequence();
@@ -44,6 +46,10 @@ public interface Dialect {
4446
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
4547
+ " FROM {{table}} WHERE processed = false AND topic <> '*')"
4648
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
49+
.fetchNextInSelectedTopics(
50+
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
51+
+ " FROM {{table}} WHERE processed = false AND topic IN ({{topicNames}}))"
52+
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
4753
.deleteExpired(
4854
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false"
4955
+ " LIMIT {{batchSize}}")
@@ -64,6 +70,10 @@ public interface Dialect {
6470
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
6571
+ " FROM {{table}} WHERE processed = false AND topic <> '*')"
6672
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
73+
.fetchNextInSelectedTopics(
74+
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
75+
+ " FROM {{table}} WHERE processed = false AND topic IN ({{topicNames}}))"
76+
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
6777
.deleteExpired(
6878
"DELETE FROM {{table}} WHERE id IN "
6979
+ "(SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT {{batchSize}})")
@@ -93,6 +103,10 @@ public interface Dialect {
93103
"WITH cte1 AS (SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
94104
+ " FROM {{table}} WHERE processed = 0 AND topic <> '*')"
95105
+ " SELECT * FROM cte1 WHERE rn = 1 AND nextAttemptTime < ? AND ROWNUM <= {{batchSize}}")
106+
.fetchNextInSelectedTopics(
107+
"WITH cte1 AS (SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
108+
+ " FROM {{table}} WHERE processed = 0 AND topic IN ({{topicNames}}))"
109+
+ " SELECT * FROM cte1 WHERE rn = 1 AND nextAttemptTime < ? AND ROWNUM <= {{batchSize}}")
96110
.deleteExpired(
97111
"DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 "
98112
+ "AND ROWNUM <= {{batchSize}}")
@@ -161,6 +175,12 @@ public interface Dialect {
161175
+ " AND seq = ("
162176
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = 0"
163177
+ ")")
178+
.fetchNextInSelectedTopics(
179+
"SELECT TOP {{batchSize}} {{allFields}} FROM {{table}} a"
180+
+ " WHERE processed = 0 AND topic IN ({{topicNames}}) AND nextAttemptTime < ?"
181+
+ " AND seq = ("
182+
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = 0"
183+
+ ")")
164184
.fetchNextSequence(
165185
"SELECT seq FROM TXNO_SEQUENCE WITH (UPDLOCK, ROWLOCK, READPAST) WHERE topic = ?")
166186
.booleanValueFrom(v -> v ? "1" : "0")

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,19 @@ List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant
118118
Collection<TransactionOutboxEntry> selectNextInTopics(Transaction tx, int batchSize, Instant now)
119119
throws Exception;
120120

121+
/**
122+
* Selects the next items in all selected topics as a batch for processing. Does not lock.
123+
*
124+
* @param tx The current {@link Transaction}.
125+
* @param topicNames The topics to select records from.
126+
* @param batchSize The maximum number of records to select.
127+
* @param now The time to use when selecting records.
128+
* @return The records.
129+
* @throws Exception Any exception.
130+
*/
131+
Collection<TransactionOutboxEntry> selectNextInSelectedTopics(
132+
Transaction tx, List<String> topicNames, int batchSize, Instant now) throws Exception;
133+
121134
/**
122135
* Deletes records which have processed and passed their expiry time, in specified batch sizes.
123136
*

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public Collection<TransactionOutboxEntry> selectNextInTopics(
5252
return List.of();
5353
}
5454

55+
@Override
56+
public Collection<TransactionOutboxEntry> selectNextInSelectedTopics(
57+
Transaction tx, List<String> topicNames, int batchSize, Instant now) throws Exception {
58+
return List.of();
59+
}
60+
5561
@Override
5662
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) {
5763
return 0;

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ default boolean flush() {
102102
*/
103103
boolean flush(Executor executor);
104104

105+
/**
106+
* Flushes a specific topic
107+
*
108+
* @param executor
109+
* @param topicName
110+
* @return
111+
*/
112+
boolean flushTopic(Executor executor, String topicName);
113+
105114
/**
106115
* Unblocks a blocked entry and resets the attempt count so that it will be retried again.
107116
* Requires an active transaction and a transaction manager that supports thread local context.

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,36 @@ public boolean flush(Executor executor) {
149149
.orElse(false);
150150
}
151151

152+
@Override
153+
public boolean flushTopic(Executor executor, String topicName) {
154+
if (!initialized.get()) {
155+
throw new IllegalStateException("Not initialized");
156+
}
157+
Instant now = clockProvider.get().instant();
158+
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
159+
futures.add(
160+
CompletableFuture.runAsync(() -> expireIdempotencyProtection(now), executor)
161+
.thenApply(it -> false));
162+
163+
futures.add(
164+
CompletableFuture.supplyAsync(
165+
() -> {
166+
log.debug("Flushing selected topic {}", topicName);
167+
return doFlush(
168+
tx ->
169+
uncheckedly(
170+
() ->
171+
persistor.selectNextInSelectedTopics(
172+
tx, List.of(topicName), flushBatchSize, now)));
173+
},
174+
executor));
175+
176+
return futures.stream()
177+
.reduce((f1, f2) -> f1.thenCombine(f2, (d1, d2) -> d1 || d2))
178+
.map(CompletableFuture::join)
179+
.orElse(false);
180+
}
181+
152182
private void expireIdempotencyProtection(Instant now) {
153183
long totalRecordsDeleted = 0;
154184
int recordsDeleted;

transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,63 @@ final void retryBehaviour() throws Exception {
501501
singleThreadPool);
502502
}
503503

504+
@Test
505+
final void flushOnlyASpecifiedTopic() throws Exception {
506+
TransactionManager transactionManager = txManager();
507+
CountDownLatch successLatch = new CountDownLatch(1);
508+
AtomicInteger attempts = new AtomicInteger(-1);
509+
var processedEntryListener = new ProcessedEntryListener(successLatch);
510+
TransactionOutbox outbox =
511+
TransactionOutbox.builder()
512+
.transactionManager(transactionManager)
513+
.persistor(Persistor.forDialect(connectionDetails().dialect()))
514+
.instantiator(new FailingInstantiator(attempts))
515+
.submitter(Submitter.withExecutor(singleThreadPool))
516+
.attemptFrequency(Duration.ofMillis(500))
517+
.listener(processedEntryListener)
518+
.blockAfterAttempts(5)
519+
.build();
520+
521+
clearOutbox();
522+
523+
var selectedTopic = "SELECTED_TOPIC";
524+
withRunningFlusher(
525+
outbox,
526+
() -> {
527+
transactionManager.inTransaction(
528+
() -> {
529+
outbox.with().schedule(InterfaceProcessor.class).process(2, "Whee");
530+
outbox
531+
.with()
532+
.ordered(selectedTopic)
533+
.schedule(InterfaceProcessor.class)
534+
.process(3, "Whoo");
535+
outbox
536+
.with()
537+
.ordered("IGNORED_TOPIC")
538+
.schedule(InterfaceProcessor.class)
539+
.process(2, "Wheeeee");
540+
});
541+
542+
assertTrue(successLatch.await(20, SECONDS), "Timeout waiting for success");
543+
var successes = processedEntryListener.getSuccessfulEntries();
544+
var failures = processedEntryListener.getFailingEntries();
545+
546+
// then we only expect the selected topic we're flushing to have had eventually succeeded
547+
// as the other work would not have been picked up for a retry
548+
assertEquals(1, successes.stream().map(TransactionOutboxEntry::getId).distinct().count());
549+
550+
// a single failure is expected; from the unordered submission (as these run automatically
551+
assertEquals(1, failures.stream().filter(it -> it.getTopic() == null).count());
552+
553+
// all other failures accounted for belong to the selected topic
554+
assertEquals(
555+
2, failures.stream().filter(it -> selectedTopic.equals(it.getTopic())).count());
556+
},
557+
singleThreadPool,
558+
selectedTopic);
559+
}
560+
504561
@Test
505562
final void onSchedulingFailure_BubbleExceptionsUp() throws Exception {
506563
Assumptions.assumeTrue(

transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,22 @@ protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable run
7979

8080
protected void withRunningFlusher(
8181
TransactionOutbox outbox, ThrowingRunnable runnable, Executor executor) throws Exception {
82+
withRunningFlusher(outbox, runnable, executor, null);
83+
}
84+
85+
protected void withRunningFlusher(
86+
TransactionOutbox outbox, ThrowingRunnable runnable, Executor executor, String topicName)
87+
throws Exception {
8288
Thread backgroundThread =
8389
new Thread(
8490
() -> {
8591
while (!Thread.interrupted()) {
8692
try {
8793
// Keep flushing work until there's nothing left to flush
8894
log.info("Starting flush...");
89-
while (outbox.flush(executor)) {
95+
while (topicName == null
96+
? outbox.flush(executor)
97+
: outbox.flushTopic(executor, topicName)) {
9098
log.info("More work to do...");
9199
}
92100
log.info("Done!");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.gruelbox.transactionoutbox.testing;
2+
3+
import com.gruelbox.transactionoutbox.TransactionOutboxEntry;
4+
import com.gruelbox.transactionoutbox.TransactionOutboxListener;
5+
import java.util.List;
6+
import java.util.concurrent.CopyOnWriteArrayList;
7+
import java.util.concurrent.CountDownLatch;
8+
import lombok.extern.slf4j.Slf4j;
9+
10+
/**
11+
* Collects an ordered list of tx outbox entries that have been 'processed' i.e. succeeded or failed
12+
* in processing.
13+
*/
14+
@Slf4j
15+
public final class ProcessedEntryListener implements TransactionOutboxListener {
16+
private final CountDownLatch successLatch;
17+
18+
private final CopyOnWriteArrayList<TransactionOutboxEntry> successfulEntries =
19+
new CopyOnWriteArrayList<>();
20+
private final CopyOnWriteArrayList<TransactionOutboxEntry> failingEntries =
21+
new CopyOnWriteArrayList<>();
22+
23+
public ProcessedEntryListener(CountDownLatch successLatch) {
24+
this.successLatch = successLatch;
25+
}
26+
27+
@Override
28+
public void success(TransactionOutboxEntry entry) {
29+
var copy = from(entry);
30+
successfulEntries.add(copy);
31+
log.info(
32+
"Received success #{}. Counting down at {}",
33+
successfulEntries.size(),
34+
successLatch.getCount());
35+
successLatch.countDown();
36+
}
37+
38+
@Override
39+
public void failure(TransactionOutboxEntry entry, Throwable cause) {
40+
failingEntries.add(from(entry));
41+
}
42+
43+
/**
44+
* Retrieve an unmodifiable copy of {@link #successfulEntries}. Beware, expectation is that this
45+
* does not/ should not get accessed until the correct number of {@link
46+
* #success(TransactionOutboxEntry)} and {@link #blocked(TransactionOutboxEntry, Throwable)}}
47+
* counts have occurred.
48+
*
49+
* @return unmodifiable list of ordered outbox entry events.
50+
*/
51+
public List<TransactionOutboxEntry> getSuccessfulEntries() {
52+
return List.copyOf(successfulEntries);
53+
}
54+
55+
public List<TransactionOutboxEntry> getFailingEntries() {
56+
return List.copyOf(failingEntries);
57+
}
58+
59+
private TransactionOutboxEntry from(TransactionOutboxEntry entry) {
60+
return entry.toBuilder().build();
61+
}
62+
}

0 commit comments

Comments
 (0)