Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to trim all ops above a certain seq# with a term lower than X #30176

Merged
merged 54 commits into from
Jun 8, 2018
Merged
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d3d210a
Allow to trim all ops above a certain seq# with a term lower than X
vladimirdolzhenko Apr 25, 2018
e622794
Allow to trim all ops above a certain seq# with a term lower than X
May 4, 2018
586908c
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
92addd1
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
2a9c58c
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
1e67a8a
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
d454a30
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
92d2d68
merged from master
May 7, 2018
13ae045
Allow to trim all ops above a certain seq# with a term lower than X -…
May 8, 2018
762f298
Allow to trim all ops above a certain seq# with a term lower than X -…
May 10, 2018
3896280
Allow to trim all ops above a certain seq# with a term lower than X -…
May 10, 2018
2c13980
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
49167c8
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
eaef2e4
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
ef7811d
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
928ac1e
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
fd6e16e
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
65604be
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
c215e6c
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
7c33612
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
6a41d7b
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
1afd5be
move translog checks under the lock, prevent readers being double closed
May 16, 2018
96ff7aa
we intend to break translog - make chance of failure proportional to …
May 16, 2018
42aec35
some test code cleanup - create and add operations to translog in the…
May 16, 2018
bdbcffa
added testSyncerSendsMaxSeqNo
May 16, 2018
51cf6d6
extend test to check that trimAboveSeqNo goes only in the 0th request
May 16, 2018
963f7b6
add assertion on assumption that translog.current has no any op seq# …
May 16, 2018
dbb9b13
fix broken test
May 16, 2018
edc4fe6
rebuild assertion check on assumption that translog.current has no an…
May 17, 2018
2f99f3b
extend test with duplicates <same seq#, lower primaryTerm>
May 17, 2018
dac5945
fixed assertNoSeqAbove - op.primaryTerm < belowTerm; small cosmetic f…
May 20, 2018
cd19157
dropped testSyncerSendsMaxSeqNo as all tested functionality is covere…
May 20, 2018
4060cd7
assertNoSeqAbove has to take into account seq > aboveSeqNo rather >=
May 20, 2018
8075a7d
dropped unnecessary seq# check in assertNoSeqAbove
May 21, 2018
1299112
use the most pessimistic fail rate approach in testRandomExceptionsOn…
May 21, 2018
f7d8f52
simplified testSnapshotTrimmedOperations
May 21, 2018
d4fb32b
fixing testSnapshotTrimmedOperations: higher term with the same seq# …
May 21, 2018
5c565c4
fixing testSnapshotTrimmedOperations: duplicates could have only forw…
May 22, 2018
5a88b65
Revert "rebuild assertion check on assumption that translog.current h…
May 22, 2018
4b6da39
addressing case of trimming trimmed translog
May 22, 2018
108f8af
fixed testSnapshotCurrentHasUnexpectedOperationsForTrimmedOperations:…
May 22, 2018
cbba0b1
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
May 22, 2018
cdbb0a1
aboveSeqNo has to be aboveOrEqSeqNo to be able to trim op those are s…
May 23, 2018
147e709
reverted back to aboveSeqNo, make NO_OPS_PERFORMED valid seq#
May 24, 2018
dc9c2dd
simplified testSnapshotTrimmedOperations and dropped testSnapshotTrim…
May 24, 2018
7a0750d
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
May 24, 2018
9cb43c7
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
May 25, 2018
99ae2df
streamification; InMemoryTranslog does not delegate anything to translog
May 30, 2018
ba339d9
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
Jun 2, 2018
0f9b206
keep operationsList and currentOperations apart
Jun 5, 2018
9d74767
drop assumption on not trimming current in InMemoryTranslog
Jun 6, 2018
546c890
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
Jun 6, 2018
db4105f
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
Jun 7, 2018
88f702b
InMemoryTranslog simplification
Jun 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -109,6 +108,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -1544,44 +1544,50 @@ public void testSnapshotCurrentHasUnexpectedOperationsForTrimmedOperations() thr
}

public void testSnapshotTrimmedOperations() throws Exception {
final TestingTranslog testingTranslog = new TestingTranslog(translog);
final InMemoryTranslog inMemoryTranslog = new InMemoryTranslog();
final List<Translog.Operation> allOperations = new ArrayList<>();

for(int attempt = 0, maxAttempts = randomIntBetween(3, 10); attempt < maxAttempts; attempt++) {

This comment was marked as resolved.

List<Long> ops = LongStream.range(0, allOperations.size() + randomIntBetween(10, 15))
.boxed().collect(Collectors.toList());
Randomness.shuffle(ops);

AtomicReference<String> source = new AtomicReference<>();
for (final long op : ops) {
String source = randomAlphaOfLengthBetween(1, 50);
source.set(randomAlphaOfLengthBetween(1, 50));

// have to use exactly the same source for same seq# if primaryTerm is not changed
if (primaryTerm.get() == translog.getCurrent().getPrimaryTerm()) {
for (Translog.Operation allOp : allOperations) {
if (allOp instanceof Translog.Index && allOp.seqNo() == op) {
// use the latest source of op with the same seq# - therefore no break
source = ((Translog.Index)allOp).source().utf8ToString();
}
}
// use the latest source of op with the same seq# - therefore no break
allOperations
.stream()
.filter(allOp -> allOp instanceof Translog.Index && allOp.seqNo() == op)
.map(allOp -> ((Translog.Index)allOp).source().utf8ToString())
.reduce((a, b) -> b)
.ifPresent(source::set);
}

// use ongoing primaryTerms - or the same as it was
Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get(),
source.getBytes("UTF-8"));
testingTranslog.add(operation);
source.get().getBytes("UTF-8"));
translog.add(operation);
inMemoryTranslog.add(operation);
allOperations.add(operation);
}

if (randomBoolean()) {
primaryTerm.incrementAndGet();
testingTranslog.rollGeneration();
translog.rollGeneration();
inMemoryTranslog.rollGeneration();
}

long maxTrimmedSeqNo = randomInt(allOperations.size());

testingTranslog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo);
translog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo);
inMemoryTranslog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo);
translog.sync();

List<Translog.Operation> effectiveOperations = testingTranslog.operations();
List<Translog.Operation> effectiveOperations = inMemoryTranslog.operations();

try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(snapshot, containsOperationsInAnyOrder(effectiveOperations));
Expand All @@ -1592,63 +1598,49 @@ public void testSnapshotTrimmedOperations() throws Exception {
}

/**
* this class mimic behaviour of original {@link Translog} and delegated to the original as well
* this class mimic behaviour of original {@link Translog}
*/
static class TestingTranslog {
static class InMemoryTranslog {
private final List<List<Translog.Operation>> operationsList = new ArrayList<>();
private final Translog translog;

TestingTranslog(Translog translog) {
this.translog = translog;
InMemoryTranslog() {
this.operationsList.add(new LinkedList<>());
}

void add(Translog.Operation operation) throws IOException {
translog.add(operation);
void add(Translog.Operation operation) {
operationsList.get(operationsList.size() - 1).add(operation);
}

void rollGeneration() throws IOException {
translog.rollGeneration();
void rollGeneration() {
operationsList.add(new LinkedList<>());
}

void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
translog.trimOperations(belowTerm, aboveSeqNo);
translog.sync();

for (Iterator<List<Translog.Operation>> opListIt = operationsList.iterator();opListIt.hasNext();) {
if (!opListIt.hasNext()) {
// ignore the last one
break;
}
List<Translog.Operation> operations = opListIt.next();
for (Iterator<Translog.Operation> it = operations.iterator(); it.hasNext(); ) {
Translog.Operation op = it.next();
boolean drop = op.primaryTerm() < belowTerm && op.seqNo() > aboveSeqNo;
if (drop) {
it.remove();
void trimOperations(long belowTerm, long aboveSeqNo) {
operationsList
.stream()
// handle all expect last one - it has `current` ops
.limit(operationsList.size() - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Current" ops should never be trimmed due to our assumptions, that said I think it's good not to model that here but rather make sure that the way we generate ops conforms to those assumptions?

.forEach(operations -> {
for (Iterator<Translog.Operation> it = operations.iterator(); it.hasNext(); ) {
Translog.Operation op = it.next();
boolean drop = op.primaryTerm() < belowTerm && op.seqNo() > aboveSeqNo;
if (drop) {
it.remove();
}
}
}
}
});
}

List<Translog.Operation> operations() {
final List<Translog.Operation> actualOperations = new ArrayList<>();

final Set<Long> seenSeqNo = new HashSet<>();
for(ListIterator<List<Translog.Operation>> listIt = operationsList.listIterator(operationsList.size()); listIt.hasPrevious();){
List<Translog.Operation> operations = listIt.previous();
for (Translog.Operation operation : operations) {
if (seenSeqNo.add(operation.seqNo())) {
actualOperations.add(operation);
}
}
}

Collections.sort(actualOperations, Comparator.comparing(Translog.Operation::seqNo));

return actualOperations;
// reverse traverse of operations
int size = operationsList.size();
return IntStream.range(0, size).mapToObj(i -> operationsList.get(size - 1 - i))
.flatMap(operations -> operations.stream())
// latest ops override firsts
.filter(operation -> seenSeqNo.add(operation.seqNo()))
.collect(Collectors.toList());
}
}

Expand Down