Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to trim all ops above a certain seq# with a term lower than X #30176

Merged
merged 54 commits into from
Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d3d210a
Allow to trim all ops above a certain seq# with a term lower than X
vladimirdolzhenko Apr 25, 2018
e622794
Allow to trim all ops above a certain seq# with a term lower than X
May 4, 2018
586908c
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
92addd1
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
2a9c58c
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
1e67a8a
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
d454a30
Allow to trim all ops above a certain seq# with a term lower than X -…
May 4, 2018
92d2d68
merged from master
May 7, 2018
13ae045
Allow to trim all ops above a certain seq# with a term lower than X -…
May 8, 2018
762f298
Allow to trim all ops above a certain seq# with a term lower than X -…
May 10, 2018
3896280
Allow to trim all ops above a certain seq# with a term lower than X -…
May 10, 2018
2c13980
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
49167c8
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
eaef2e4
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
ef7811d
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
928ac1e
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
fd6e16e
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
65604be
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
c215e6c
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
7c33612
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
6a41d7b
Allow to trim all ops above a certain seq# with a term lower than X -…
May 11, 2018
1afd5be
move translog checks under the lock, prevent readers being double closed
May 16, 2018
96ff7aa
we intend to break translog - make chance of failure proportional to …
May 16, 2018
42aec35
some test code cleanup - create and add operations to translog in the…
May 16, 2018
bdbcffa
added testSyncerSendsMaxSeqNo
May 16, 2018
51cf6d6
extend test to check that trimAboveSeqNo goes only in the 0th request
May 16, 2018
963f7b6
add assertion on assumption that translog.current has no any op seq# …
May 16, 2018
dbb9b13
fix broken test
May 16, 2018
edc4fe6
rebuild assertion check on assumption that translog.current has no an…
May 17, 2018
2f99f3b
extend test with duplicates <same seq#, lower primaryTerm>
May 17, 2018
dac5945
fixed assertNoSeqAbove - op.primaryTerm < belowTerm; small cosmetic f…
May 20, 2018
cd19157
dropped testSyncerSendsMaxSeqNo as all tested functionality is covere…
May 20, 2018
4060cd7
assertNoSeqAbove has to take into account seq > aboveSeqNo rather >=
May 20, 2018
8075a7d
dropped unnecessary seq# check in assertNoSeqAbove
May 21, 2018
1299112
use the most pessimistic fail rate approach in testRandomExceptionsOn…
May 21, 2018
f7d8f52
simplified testSnapshotTrimmedOperations
May 21, 2018
d4fb32b
fixing testSnapshotTrimmedOperations: higher term with the same seq# …
May 21, 2018
5c565c4
fixing testSnapshotTrimmedOperations: duplicates could have only forw…
May 22, 2018
5a88b65
Revert "rebuild assertion check on assumption that translog.current h…
May 22, 2018
4b6da39
addressing case of trimming trimmed translog
May 22, 2018
108f8af
fixed testSnapshotCurrentHasUnexpectedOperationsForTrimmedOperations:…
May 22, 2018
cbba0b1
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
May 22, 2018
cdbb0a1
aboveSeqNo has to be aboveOrEqSeqNo to be able to trim op those are s…
May 23, 2018
147e709
reverted back to aboveSeqNo, make NO_OPS_PERFORMED valid seq#
May 24, 2018
dc9c2dd
simplified testSnapshotTrimmedOperations and dropped testSnapshotTrim…
May 24, 2018
7a0750d
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
May 24, 2018
9cb43c7
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
May 25, 2018
99ae2df
streamification; InMemoryTranslog does not delegate anything to translog
May 30, 2018
ba339d9
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
Jun 2, 2018
0f9b206
keep operationsList and currentOperations apart
Jun 5, 2018
9d74767
drop assumption on not trimming current in InMemoryTranslog
Jun 6, 2018
546c890
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
Jun 6, 2018
db4105f
Merge remote-tracking branch 'remotes/origin/master' into trim_translog
Jun 7, 2018
88f702b
InMemoryTranslog simplification
Jun 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

Expand All @@ -33,17 +34,24 @@
*/
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {

private long trimAboveSeqNo;
private Translog.Operation[] operations;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
final Translog.Operation[] operations) {
super(shardId);
this.trimAboveSeqNo = trimAboveSeqNo;
this.operations = operations;
}

public long getTrimAboveSeqNo() {
return trimAboveSeqNo;
}

public Translog.Operation[] getOperations() {
return operations;
}
Expand All @@ -60,12 +68,20 @@ public void readFrom(final StreamInput in) throws IOException {
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
trimAboveSeqNo = in.readZLong();
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(trimAboveSeqNo);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -74,12 +90,13 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Arrays.equals(operations, that.operations);
return trimAboveSeqNo == that.trimAboveSeqNo
&& Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Arrays.hashCode(operations);
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
}

@Override
Expand All @@ -88,6 +105,7 @@ public String toString() {
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", trimAboveSeqNo=" + trimAboveSeqNo +
", ops=" + operations.length +
"}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
}
}
}
if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo());
}
return location;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ boolean isThrottled() {
*/
public abstract boolean isThrottled();

/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>

This comment was marked as resolved.

* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;

/** A Lock implementation that always allows the lock to be acquired */
protected static final class NoOpLock implements Lock {

Expand Down Expand Up @@ -904,7 +910,7 @@ public final boolean refreshNeeded() {
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimTranslog() throws EngineException;
public abstract void trimUnreferencedTranslogFiles() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ public void rollTranslogGeneration() throws EngineException {
}

@Override
public void trimTranslog() throws EngineException {
public void trimUnreferencedTranslogFiles() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
Expand All @@ -1569,6 +1569,24 @@ public void trimTranslog() throws EngineException {
}
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog operations", e);
}
}

private void pruneDeletedTombstones() {
/*
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SequenceNumbers {
*/
public static final long UNASSIGNED_SEQ_NO = -2L;
/**
* Represents no operations have been performed on the shard.
* Represents no operations have been performed on the shard. Initial value of a sequence number.
*/
public static final long NO_OPS_PERFORMED = -1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ public Engine.CommitId flush(FlushRequest request) {
public void trimTranslog() {
verifyNotClosed();
final Engine engine = getEngine();
engine.trimTranslog();
engine.trimUnreferencedTranslogFiles();
}

/**
Expand Down Expand Up @@ -1195,6 +1195,10 @@ public void prepareForIndexRecovery() {
assert currentEngineReference.get() == null;
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(primaryTerm, aboveSeqNo);
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
final Engine.Result result;
switch (operation.opType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
resyncListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(final ResyncTask resyncTask) {
Expand Down Expand Up @@ -135,7 +137,7 @@ public synchronized Translog.Operation next() throws IOException {
}
};
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
startingSeqNo, resyncListener);
startingSeqNo, maxSeqNo, resyncListener);
} catch (Exception e) {
if (resyncListener != null) {
resyncListener.onFailure(e);
Expand All @@ -146,7 +148,7 @@ public synchronized Translog.Operation next() throws IOException {
}

private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, ActionListener<ResyncTask> listener) {
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
Expand All @@ -166,7 +168,7 @@ public void onFailure(Exception e) {
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, wrappedListener).run();
startingSeqNo, maxSeqNo, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
Expand All @@ -186,14 +188,16 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final ShardId shardId;
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final long maxSeqNo;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
private final AtomicInteger totalSentOps = new AtomicInteger();
private final AtomicInteger totalSkippedOps = new AtomicInteger();
private AtomicBoolean closed = new AtomicBoolean();

SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
Expand All @@ -203,6 +207,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
this.snapshot = snapshot;
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
Expand Down Expand Up @@ -248,11 +253,15 @@ protected void doRun() throws Exception {
}
}

if (!operations.isEmpty()) {
final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
ResyncReplicationRequest request =
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
firstMessage.set(false);
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
} else if (closed.compareAndSet(false, true)) {
logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());
Expand Down
Loading