Skip to content

Commit b91731b

Browse files
committed
All pending topology notifications must be propagated to a newly published epoch, not only the pending notification for that epoch
Also Fix: - JournalAndTableKeyIterator not merging in consistent order, which can lead to replaying topologies in non-ascending order - Invariant failure when reporting topologies if fetchTopologies on startup yields a gap - removeRedundantMissing could erroneously remove missing dependencies occurring after the new appliedBeforeIndex - Invariant failure for some propagate calls supplying 'wrong' addRoute - Disambiguate nextTxnId and nextTxnIdWithDefaultFlags Also Improve: - LocalLog should not use NoSuchElementException for control flow (instead use ConcurrentSkipListMap) patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-20886 - JournalAndTableKeyIterator not merging in consistent order
1 parent 5c7d8d8 commit b91731b

File tree

9 files changed

+58
-72
lines changed

9 files changed

+58
-72
lines changed

src/java/org/apache/cassandra/service/accord/AccordJournal.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,9 +370,10 @@ public void saveCommand(int commandStoreId, CommandUpdate update, @Nullable Runn
370370
}
371371

372372
@Override
373-
public CloseableIterator<TopologyUpdate> replayTopologies()
373+
public List<TopologyUpdate> replayTopologies()
374374
{
375-
return new CloseableIterator<>()
375+
List<TopologyUpdate> images = new ArrayList<>();
376+
try (CloseableIterator<TopologyUpdate> iter = new CloseableIterator<>()
376377
{
377378
final CloseableIterator<Journal.KeyRefs<JournalKey>> iter = journalTable.keyIterator(topologyUpdateKey(0L),
378379
topologyUpdateKey(Timestamp.MAX_EPOCH));
@@ -388,6 +389,7 @@ public boolean hasNext()
388389
public TopologyUpdate next()
389390
{
390391
Journal.KeyRefs<JournalKey> ref = iter.next();
392+
System.out.println(ref.key());
391393
Accumulator read = readAll(ref.key());
392394
if (read.accumulated.kind() == Kind.NoOp)
393395
prev = read.accumulated.asImage(Invariants.nonNull(prev.getUpdate()));
@@ -403,7 +405,22 @@ public void close()
403405
{
404406
iter.close();
405407
}
406-
};
408+
})
409+
{
410+
TopologyUpdate prev = null;
411+
while (iter.hasNext())
412+
{
413+
TopologyUpdate next = iter.next();
414+
Invariants.require(prev == null || next.global.epoch() > prev.global.epoch());
415+
// Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here.
416+
if (prev != null && next.global.epoch() > prev.global.epoch() + 1)
417+
images.clear();
418+
419+
images.add(next);
420+
prev = next;
421+
}
422+
}
423+
return images;
407424
}
408425

409426
@Override

src/java/org/apache/cassandra/service/accord/AccordJournalTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ protected Journal.KeyRefs<K> computeNext()
567567
return journalIterator.next();
568568
}
569569

570-
return cmp > 0 ? new Journal.KeyRefs<>(tableIterator.next()) : journalIterator.next();
570+
return cmp < 0 ? new Journal.KeyRefs<>(tableIterator.next()) : journalIterator.next();
571571
}
572572

573573
public void close()

src/java/org/apache/cassandra/service/accord/AccordService.java

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@
128128
import org.apache.cassandra.tcm.listeners.ChangeListener;
129129
import org.apache.cassandra.tcm.membership.NodeId;
130130
import org.apache.cassandra.transport.Dispatcher;
131-
import org.apache.cassandra.utils.CloseableIterator;
132131
import org.apache.cassandra.utils.ExecutorUtils;
133132
import org.apache.cassandra.utils.FBUtilities;
134133
import org.apache.cassandra.utils.concurrent.AsyncPromise;
@@ -144,7 +143,6 @@
144143
import static accord.messages.SimpleReply.Ok;
145144
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
146145
import static accord.primitives.Txn.Kind.Write;
147-
import static accord.primitives.TxnId.Cardinality.cardinality;
148146
import static accord.topology.TopologyManager.TopologyRange;
149147
import static java.util.concurrent.TimeUnit.NANOSECONDS;
150148
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -203,6 +201,7 @@ static class PreInitStateCollector implements ChangeListener
203201
@Override
204202
public synchronized void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot)
205203
{
204+
logger.debug("Saving epoch {} to deliver after startup", next.epoch);
206205
items.add(next);
207206
}
208207

@@ -428,29 +427,11 @@ public synchronized void startup()
428427
ClusterMetadata metadata = ClusterMetadata.current();
429428
configService.updateMapping(metadata);
430429

431-
List<TopologyUpdate> images = new ArrayList<>();
432-
433-
TopologyUpdate prev = null;
434-
// Collect locally known topologies
435-
try (CloseableIterator<TopologyUpdate> iter = journal.replayTopologies())
436-
{
437-
while (iter.hasNext())
438-
{
439-
TopologyUpdate next = iter.next();
440-
// Due to partial compaction, we can clean up only some of the old epochs, creating gaps. We skip these epochs here.
441-
if (prev != null && next.global.epoch() > prev.global.epoch() + 1)
442-
images.clear();
443-
444-
images.add(next);
445-
prev = next;
446-
}
447-
}
430+
List<TopologyUpdate> images = journal.replayTopologies();
448431

449432
// Instantiate latest topology from the log, if known
450-
if (prev != null)
451-
{
452-
node.commandStores().initializeTopologyUnsafe(prev);
453-
}
433+
if (!images.isEmpty())
434+
node.commandStores().initializeTopologyUnsafe(images.get(images.size() - 1));
454435

455436
// Replay local epochs
456437
for (TopologyUpdate image : images)
@@ -494,8 +475,8 @@ public void finishInitialization()
494475
{
495476
TopologyRange remote = fetchTopologies(highestKnown + 1);
496477

497-
if (remote != null)
498-
remote.forEach(configService::reportTopology, highestKnown + 1, Integer.MAX_VALUE);
478+
if (remote != null) // TODO (required): if remote.min > highestKnown + 1, should we decide if we need to truncate our local topologies? Probably not until startup has finished.
479+
remote.forEach(configService::reportTopology, remote.min, Integer.MAX_VALUE);
499480
}
500481
catch (InterruptedException e)
501482
{
@@ -693,7 +674,7 @@ public TopologyManager topology()
693674
@Override
694675
public @Nonnull TxnResult coordinate(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull Dispatcher.RequestTime requestTime, long minHlc) throws RequestExecutionException
695676
{
696-
return coordinateAsync(minEpoch, txn, consistencyLevel, requestTime, minHlc).awaitAndGet();
677+
return coordinateAsync(minEpoch, minHlc, txn, consistencyLevel, requestTime).awaitAndGet();
697678
}
698679

699680
@Override
@@ -709,9 +690,9 @@ public boolean isEnabled()
709690
}
710691

711692
@Override
712-
public @Nonnull IAccordResult<TxnResult> coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull Dispatcher.RequestTime requestTime, long minHlc)
693+
public @Nonnull IAccordResult<TxnResult> coordinateAsync(long minEpoch, long minHlc, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull Dispatcher.RequestTime requestTime)
713694
{
714-
TxnId txnId = node.nextTxnId(minHlc >= 0 ? minHlc : 0, txn.kind(), txn.keys().domain(), cardinality(txn.keys()));
695+
TxnId txnId = node.nextTxnId(minEpoch, minHlc, txn);
715696
long timeout = txnId.isWrite() ? DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS) : DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS);
716697
ClientRequestBookkeeping bookkeeping = txn.isWrite() ? accordWriteBookkeeping : accordReadBookkeeping;
717698
bookkeeping.metrics.keySize.update(txn.keys().size());

src/java/org/apache/cassandra/service/accord/IAccordService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ public interface IAccordService
8787
@Nonnull
8888
default IAccordResult<TxnResult> coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime)
8989
{
90-
return coordinateAsync(minEpoch, txn, consistencyLevel, requestTime, IAccordService.NO_HLC);
90+
return coordinateAsync(minEpoch, IAccordService.NO_HLC, txn, consistencyLevel, requestTime);
9191
}
92-
@Nonnull IAccordResult<TxnResult> coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime, long minHlc);
92+
@Nonnull IAccordResult<TxnResult> coordinateAsync(long minEpoch, long minHlc, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime);
9393
@Nonnull default TxnResult coordinate(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime)
9494
{
9595
return coordinate(minEpoch, txn, consistencyLevel, requestTime, NO_HLC);
@@ -244,7 +244,7 @@ public List<AccordExecutor> executors()
244244
}
245245

246246
@Override
247-
public @Nonnull IAccordResult<TxnResult> coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime, long minHlc)
247+
public @Nonnull IAccordResult<TxnResult> coordinateAsync(long minEpoch, long minHlc, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime)
248248
{
249249
throw new UnsupportedOperationException("No accord transaction should be executed when accord.enabled = false in cassandra.yaml");
250250
}
@@ -454,9 +454,9 @@ public boolean isEnabled()
454454

455455
@Nonnull
456456
@Override
457-
public IAccordResult<TxnResult> coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime, long minHlc)
457+
public IAccordResult<TxnResult> coordinateAsync(long minEpoch, long minHlc, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, RequestTime requestTime)
458458
{
459-
return delegate.coordinateAsync(minEpoch, txn, consistencyLevel, requestTime, minHlc);
459+
return delegate.coordinateAsync(minEpoch, minHlc, txn, consistencyLevel, requestTime);
460460
}
461461

462462
@Override

src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ private boolean doInteropPersist(Node node, SequentialAsyncExecutor executor, To
115115
{
116116
Update update = txn.update();
117117
ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ? ((AccordUpdate) update).cassandraCommitCL() : null;
118+
// TODO (expected): do we care about consistency level on recovery? we aren't reporting to the client so we shouldn't.
118119
if (consistencyLevel == null || consistencyLevel == ConsistencyLevel.ANY || writes.isEmpty())
119120
return false;
120121

src/java/org/apache/cassandra/tcm/log/LocalLog.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.util.Comparator;
2525
import java.util.HashSet;
2626
import java.util.List;
27-
import java.util.NoSuchElementException;
27+
import java.util.Map;
2828
import java.util.Optional;
2929
import java.util.Set;
30-
import java.util.concurrent.ConcurrentSkipListSet;
30+
import java.util.concurrent.ConcurrentSkipListMap;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.TimeoutException;
3333
import java.util.concurrent.atomic.AtomicBoolean;
@@ -240,7 +240,7 @@ public final LocalLog createLog()
240240
* However, snapshots should be applied out of order, and snapshots with higher epoch should be applied before snapshots
241241
* with a lower epoch in cases when there are multiple snapshots present.
242242
*/
243-
protected final ConcurrentSkipListSet<Entry> pending = new ConcurrentSkipListSet<>((Entry e1, Entry e2) -> {
243+
protected final ConcurrentSkipListMap<Entry, Boolean> pending = new ConcurrentSkipListMap<>((Entry e1, Entry e2) -> {
244244
if (e1.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT && e2.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT)
245245
return e2.epoch.compareTo(e1.epoch);
246246

@@ -335,7 +335,7 @@ public int pendingBufferSize()
335335
public boolean hasGaps()
336336
{
337337
Epoch start = committed.get().epoch;
338-
for (Entry entry : pending)
338+
for (Entry entry : pending.keySet())
339339
{
340340
if (!entry.epoch.isDirectlyAfter(start))
341341
return true;
@@ -347,14 +347,8 @@ public boolean hasGaps()
347347

348348
public Optional<Epoch> highestPending()
349349
{
350-
try
351-
{
352-
return Optional.of(pending.last().epoch);
353-
}
354-
catch (NoSuchElementException eag)
355-
{
356-
return Optional.empty();
357-
}
350+
Map.Entry<Entry, Boolean> e = pending.lastEntry();
351+
return e == null ? Optional.empty() : Optional.of(e.getKey().epoch);
358352
}
359353

360354
public LogState getLocalEntries(Epoch since)
@@ -381,7 +375,7 @@ public void append(Collection<Entry> entries)
381375
{
382376
if (logger.isDebugEnabled())
383377
logger.debug("Appending entries to the pending buffer: {}", entries.stream().map(e -> e.epoch).collect(Collectors.toList()));
384-
pending.addAll(entries);
378+
entries.forEach(e -> pending.put(e, true));
385379
}
386380
processPending();
387381
}
@@ -433,7 +427,7 @@ private void maybeAppend(Entry entry)
433427
}
434428

435429
logger.debug("Appending entry to the pending buffer: {}", entry.epoch);
436-
pending.add(entry);
430+
pending.put(entry, true);
437431
}
438432

439433
public abstract ClusterMetadata awaitAtLeast(Epoch epoch) throws InterruptedException, TimeoutException;
@@ -459,14 +453,8 @@ void runOnce()
459453

460454
private Entry peek()
461455
{
462-
try
463-
{
464-
return pending.first();
465-
}
466-
catch (NoSuchElementException ignore)
467-
{
468-
return null;
469-
}
456+
Map.Entry<Entry, Boolean> e = pending.firstEntry();
457+
return e == null ? null : e.getKey();
470458
}
471459

472460
/**
@@ -571,7 +559,7 @@ else if (!pendingEntry.epoch.isAfter(metadata().epoch))
571559
}
572560
else
573561
{
574-
Entry tmp = pending.first();
562+
Entry tmp = pending.firstKey();
575563
if (tmp.epoch.is(pendingEntry.epoch))
576564
{
577565
logger.debug("Smallest entry is non-consecutive {} to {}", pendingEntry.epoch, prev.epoch);
@@ -745,7 +733,7 @@ else if (!current.awaitThrowUncheckedOnInterrupt(duration.to(TimeUnit.MILLISECON
745733
{
746734
throw new TimeoutException(String.format("Timed out waiting for follower to run at least once. " +
747735
"Pending is %s and current is now at epoch %s.",
748-
pending.stream().map((re) -> re.epoch).collect(Collectors.toList()),
736+
pending.keySet().stream().map((re) -> re.epoch).collect(Collectors.toList()),
749737
metadata().epoch));
750738
}
751739
}

test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void tracing()
152152
String tableName = createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
153153
AccordService accord = accord();
154154
DatabaseDescriptor.getAccord().fetch_txn = "1s";
155-
TxnId id = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
155+
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
156156
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
157157

158158
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
@@ -215,7 +215,7 @@ public void completedTxn() throws ExecutionException, InterruptedException
215215
{
216216
String tableName = createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
217217
AccordService accord = accord();
218-
TxnId id = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
218+
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
219219
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
220220
String keyStr = txn.keys().get(0).toUnseekable().toString();
221221
AsyncChains.getBlocking(accord.node().coordinate(id, txn));
@@ -237,7 +237,7 @@ public void inflight() throws ExecutionException, InterruptedException
237237
{
238238
String tableName = createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
239239
AccordService accord = accord();
240-
TxnId id = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
240+
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
241241
String insertTxn = String.format("BEGIN TRANSACTION\n" +
242242
" LET r = (SELECT * FROM %s.%s WHERE k = ? AND c = ?);\n" +
243243
" IF r IS NULL THEN\n " +
@@ -273,7 +273,7 @@ public void blocked() throws ExecutionException, InterruptedException
273273
{
274274
String tableName = createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
275275
AccordService accord = accord();
276-
TxnId first = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
276+
TxnId first = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
277277
String insertTxn = String.format("BEGIN TRANSACTION\n" +
278278
" LET r = (SELECT * FROM %s.%s WHERE k = ? AND c = ?);\n" +
279279
" IF r IS NULL THEN\n " +
@@ -292,7 +292,7 @@ public void blocked() throws ExecutionException, InterruptedException
292292

293293
filter.reset();
294294

295-
TxnId second = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
295+
TxnId second = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
296296
accord.node().coordinate(second, createTxn(insertTxn, 0, 0, 0, 0, 0));
297297

298298
filter.commit.awaitThrowUncheckedOnInterrupt();
@@ -356,7 +356,7 @@ private void testPatchJournal(String cleanupAction, String expectedStatus)
356356
KEYSPACE,
357357
tableName);
358358
AccordService accord = accord();
359-
TxnId id = accord.node().nextTxnId(Txn.Kind.Write, Routable.Domain.Key);
359+
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
360360
accord.node().coordinate(id, createTxn(insertTxn, 0, 0, 0));
361361

362362
filter.preAccept.awaitThrowUncheckedOnInterrupt();

test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.nio.file.Files;
2323
import java.util.ArrayList;
2424
import java.util.Collections;
25-
import java.util.Iterator;
2625
import java.util.List;
2726
import java.util.Optional;
2827
import java.util.UUID;
@@ -205,10 +204,10 @@ public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boo
205204
listener = new AbstractConfigurationServiceTest.TestListener(loaded, true);
206205
loaded.registerListener(listener);
207206
journal_.closeCurrentSegmentForTestingIfNonEmpty();
208-
Iterator<TopologyUpdate> iter = journal.replayTopologies();
207+
List<TopologyUpdate> list = journal.replayTopologies();
209208
// Simulate journal replay
210-
while (iter.hasNext())
211-
loaded.reportTopology(iter.next().global);
209+
for (TopologyUpdate update : list)
210+
loaded.reportTopology(update.global);
212211
loaded.start();
213212

214213
listener.assertTopologiesFor(1L, 2L, 3L);

0 commit comments

Comments
 (0)