Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify testSendSnapshotSendsOps #37445

Merged
merged 1 commit into from Jan 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -76,7 +76,6 @@
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -108,7 +107,6 @@
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RecoverySourceHandlerTests extends ESTestCase {
Expand Down Expand Up @@ -205,9 +203,6 @@ public void testSendSnapshotSendsOps() throws IOException {
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
final RecoverySourceHandler handler =
new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(16, 64);
for (int i = 0; i < initialNumberOfDocs; i++) {
Expand All @@ -219,38 +214,23 @@ public void testSendSnapshotSendsOps() throws IOException {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
}
operations.add(null);
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {

}

private int counter = 0;

@Override
public int totalOperations() {
return operations.size() - 1;
}

@Override
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
}, randomNonNegativeLong(), randomNonNegativeLong());
final List<Translog.Operation> shippedOps = new ArrayList<>();
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu) {
shippedOps.addAll(operations);
return SequenceNumbers.NO_OPS_PERFORMED;
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong());
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
assertThat(result.totalOperations, equalTo(expectedOps));
final ArgumentCaptor<List> shippedOpsCaptor = ArgumentCaptor.forClass(List.class);
verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(),
ArgumentCaptor.forClass(Long.class).capture(), ArgumentCaptor.forClass(Long.class).capture());
List<Translog.Operation> shippedOps = new ArrayList<>();
for (List list: shippedOpsCaptor.getAllValues()) {
shippedOps.addAll(list);
}
shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
assertThat(shippedOps.size(), equalTo(expectedOps));
for (int i = 0; i < shippedOps.size(); i++) {
Expand All @@ -261,30 +241,8 @@ public Translog.Operation next() throws IOException {
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
expectThrows(IllegalStateException.class, () ->
handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {

}

private int counter = 0;

@Override
public int totalOperations() {
return operations.size() - 1 - opsToSkip.size();
}

@Override
public Translog.Operation next() throws IOException {
Translog.Operation op;
do {
op = operations.get(counter++);
} while (op != null && opsToSkip.contains(op));
return op;
}
}, randomNonNegativeLong(), randomNonNegativeLong()));
expectThrows(IllegalStateException.class, () -> handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, newTranslogSnapshot(operations, opsToSkip), randomNonNegativeLong(), randomNonNegativeLong()));
}
}

Expand Down Expand Up @@ -716,4 +674,39 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
int totalTranslogOps, ActionListener<Void> listener) {
}
}

private Translog.Snapshot newTranslogSnapshot(List<Translog.Operation> operations, List<Translog.Operation> operationsToSkip) {
return new Translog.Snapshot() {
int index = 0;
int skippedCount = 0;

@Override
public int totalOperations() {
return operations.size();
}

@Override
public int skippedOperations() {
return skippedCount;
}

@Override
public Translog.Operation next() {
while (index < operations.size()) {
Translog.Operation op = operations.get(index++);
if (operationsToSkip.contains(op)) {
skippedCount++;
} else {
return op;
}
}
return null;
}

@Override
public void close() {

}
};
}
}