Skip to content

Commit

Permalink
Allow to trim all ops above a certain seq# with a term lower than X (#…
Browse files Browse the repository at this point in the history
…30176)

Allow to trim all ops above a certain seq# with a term lower than X

Relates to #10708
  • Loading branch information
vladimirdolzhenko committed Jun 8, 2018
1 parent 01140a3 commit a86c0f8
Show file tree
Hide file tree
Showing 20 changed files with 579 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

Expand All @@ -33,17 +34,24 @@
*/
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {

private long trimAboveSeqNo;
private Translog.Operation[] operations;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
final Translog.Operation[] operations) {
super(shardId);
this.trimAboveSeqNo = trimAboveSeqNo;
this.operations = operations;
}

public long getTrimAboveSeqNo() {
return trimAboveSeqNo;
}

public Translog.Operation[] getOperations() {
return operations;
}
Expand All @@ -60,12 +68,20 @@ public void readFrom(final StreamInput in) throws IOException {
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
trimAboveSeqNo = in.readZLong();
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(trimAboveSeqNo);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -74,12 +90,13 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Arrays.equals(operations, that.operations);
return trimAboveSeqNo == that.trimAboveSeqNo
&& Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Arrays.hashCode(operations);
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
}

@Override
Expand All @@ -88,6 +105,7 @@ public String toString() {
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", trimAboveSeqNo=" + trimAboveSeqNo +
", ops=" + operations.length +
"}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
}
}
}
if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo());
}
return location;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ boolean isThrottled() {
*/
public abstract boolean isThrottled();

/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;

/** A Lock implementation that always allows the lock to be acquired */
protected static final class NoOpLock implements Lock {

Expand Down Expand Up @@ -904,7 +910,7 @@ public final boolean refreshNeeded() {
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimTranslog() throws EngineException;
public abstract void trimUnreferencedTranslogFiles() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ public void rollTranslogGeneration() throws EngineException {
}

@Override
public void trimTranslog() throws EngineException {
public void trimUnreferencedTranslogFiles() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
Expand All @@ -1569,6 +1569,24 @@ public void trimTranslog() throws EngineException {
}
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog operations", e);
}
}

private void pruneDeletedTombstones() {
/*
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SequenceNumbers {
*/
public static final long UNASSIGNED_SEQ_NO = -2L;
/**
* Represents no operations have been performed on the shard.
* Represents no operations have been performed on the shard. Initial value of a sequence number.
*/
public static final long NO_OPS_PERFORMED = -1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ public Engine.CommitId flush(FlushRequest request) {
public void trimTranslog() {
verifyNotClosed();
final Engine engine = getEngine();
engine.trimTranslog();
engine.trimUnreferencedTranslogFiles();
}

/**
Expand Down Expand Up @@ -1194,6 +1194,10 @@ public void prepareForIndexRecovery() {
assert currentEngineReference.get() == null;
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(primaryTerm, aboveSeqNo);
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
final Engine.Result result;
switch (operation.opType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
resyncListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(final ResyncTask resyncTask) {
Expand Down Expand Up @@ -135,7 +137,7 @@ public synchronized Translog.Operation next() throws IOException {
}
};
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
startingSeqNo, resyncListener);
startingSeqNo, maxSeqNo, resyncListener);
} catch (Exception e) {
if (resyncListener != null) {
resyncListener.onFailure(e);
Expand All @@ -146,7 +148,7 @@ public synchronized Translog.Operation next() throws IOException {
}

private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, ActionListener<ResyncTask> listener) {
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
Expand All @@ -166,7 +168,7 @@ public void onFailure(Exception e) {
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, wrappedListener).run();
startingSeqNo, maxSeqNo, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
Expand All @@ -186,14 +188,16 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final ShardId shardId;
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final long maxSeqNo;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
private final AtomicInteger totalSentOps = new AtomicInteger();
private final AtomicInteger totalSkippedOps = new AtomicInteger();
private AtomicBoolean closed = new AtomicBoolean();

SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
Expand All @@ -203,6 +207,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
this.snapshot = snapshot;
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
Expand Down Expand Up @@ -248,11 +253,15 @@ protected void doRun() throws Exception {
}
}

if (!operations.isEmpty()) {
final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
ResyncReplicationRequest request =
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
firstMessage.set(false);
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
} else if (closed.compareAndSet(false, true)) {
logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());
Expand Down
Loading

0 comments on commit a86c0f8

Please sign in to comment.