Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow rebalancing primary shards on shared filesystems #10585

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -1037,8 +1037,15 @@ public void release() {

protected abstract SearcherManager getSearcherManager();

/**
* Method to close the engine while the write lock is held.
*/
protected abstract void closeNoLock(String reason) throws ElasticsearchException;

/**
* Flush the engine (committing segments to disk and truncating the
* translog) and close it.
*/
public void flushAndClose() throws IOException {
if (isClosed.get() == false) {
logger.trace("flushAndClose now acquire writeLock");
Expand All @@ -1062,7 +1069,8 @@ public void flushAndClose() throws IOException {

@Override
public void close() throws IOException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
// don't acquire the write lock if we are already closed
if (isClosed.get() == false) {
logger.debug("close now acquiring writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.debug("close acquired writeLock");
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Expand Up @@ -627,7 +627,7 @@ private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing)
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing) {
logger.trace("waiting fore in-flight flush to finish");
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
Expand All @@ -639,7 +639,7 @@ private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing)
try {
if (commitTranslog) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush is not allowed");
}

if (flushNeeded || force) {
Expand Down Expand Up @@ -825,6 +825,7 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {

SnapshotIndexCommit phase1Snapshot;
try {
logger.trace("[pre-phase1] performing deletion policy snapshot");
phase1Snapshot = deletionPolicy.snapshot();
} catch (Throwable e) {
maybeFailEngine("recovery", e);
Expand All @@ -833,25 +834,28 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
}

try {
logger.trace("[phase1] performing phase 1 recovery (file recovery)");
recoveryHandler.phase1(phase1Snapshot);
} catch (Throwable e) {
maybeFailEngine("recovery phase 1", e);
maybeFailEngine("recovery phase 1 (file transfer)", e);
Releasables.closeWhileHandlingException(phase1Snapshot, onGoingRecoveries);
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
}

Translog.Snapshot phase2Snapshot;
try {
logger.trace("[pre-phase2] performing translog snapshot");
phase2Snapshot = translog.snapshot();
} catch (Throwable e) {
maybeFailEngine("snapshot recovery", e);
maybeFailEngine("translog snapshot", e);
Releasables.closeWhileHandlingException(phase1Snapshot, onGoingRecoveries);
throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e));
}
try {
logger.trace("[phase2] performing phase 2 recovery (translog replay)");
recoveryHandler.phase2(phase2Snapshot);
} catch (Throwable e) {
maybeFailEngine("recovery phase 2", e);
maybeFailEngine("recovery phase 2 (snapshot transfer)", e);
Releasables.closeWhileHandlingException(phase1Snapshot, phase2Snapshot, onGoingRecoveries);
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
}
Expand All @@ -861,16 +865,19 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
boolean success = false;
try {
ensureOpen();
logger.trace("[pre-phase3] performing translog snapshot");
phase3Snapshot = translog.snapshot(phase2Snapshot);
logger.trace("[phase3] performing phase 3 recovery (translog replay under write lock)");
recoveryHandler.phase3(phase3Snapshot);
success = true;
} catch (Throwable e) {
maybeFailEngine("recovery phase 3", e);
maybeFailEngine("recovery phase 3 (snapshot transfer)", e);
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
} finally {
Releasables.close(success, phase1Snapshot, phase2Snapshot, phase3Snapshot,
onGoingRecoveries, writeLock); // hmm why can't we use try-with here?
}
logger.trace("[post-recovery] recovery complete");
}

@Override
Expand Down
Expand Up @@ -18,10 +18,19 @@
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.cluster.metadata.IndexMetaData;

public class InternalEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) {
return new InternalEngine(config, skipTranslogRecovery);
// On a shared filesystem, we need to handle recovery slightly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to it's own class? and then use the same pattern as all the other Mockengine classes?

// differently. We take no translog snapshots because a flush is forced
// when the engine is closed during phase1
if (IndexMetaData.isOnSharedFilesystem(config.getIndexSettings())) {
return new SharedFSEngine(config, skipTranslogRecovery);
} else {
return new InternalEngine(config, skipTranslogRecovery);
}
}

@Override
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/org/elasticsearch/index/engine/SharedFSEngine.java
@@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.common.util.concurrent.ReleasableLock;

/**
* SharedFSEngine behaves similarly to InternalEngine, however, during
* recovery, it does not take a snapshot of the translog or index and it
* performs stage1 (file transfer) under the write lock.
*/
public class SharedFSEngine extends InternalEngine {
public SharedFSEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
super(engineConfig, skipInitialTranslogRecovery);
}

@Override
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
store.incRef();
try {
logger.trace("[pre-recovery] acquiring write lock");
try (ReleasableLock lock = writeLock.acquire()) {
// phase1 under lock
ensureOpen();
try {
logger.trace("[phase1] performing phase 1 recovery (file recovery)");
recoveryHandler.phase1(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 1 (file transfer)", e);
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
}
}
try {
logger.trace("[phase2] performing phase 2 recovery (translog replay)");
recoveryHandler.phase2(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 2 (snapshot transfer)", e);
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
}
try {
logger.trace("[phase3] performing phase 3 recovery (finalization)");
recoveryHandler.phase3(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 3 (finalization)", e);
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
}
} finally {
store.decRef();
}
logger.trace("[post-recovery] recovery complete");
}
}
12 changes: 8 additions & 4 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -116,7 +116,6 @@
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -749,7 +748,9 @@ public void close(String reason, boolean flushEngine) throws IOException {
if (flushEngine && this.flushOnClose) {
engine.flushAndClose();
}
} finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
} finally {
// playing safe here and close the engine even if the above
// succeeds - close can be called multiple times.
IOUtils.close(engine);
}
}
Expand Down Expand Up @@ -867,12 +868,16 @@ public RecoveryState recoveryState() {
public void finalizeRecovery() {
recoveryState().setStage(RecoveryState.Stage.FINALIZE);
// clear unreferenced files
translog.clearUnreferenced();
clearUnreferencedTranslogs();
engine().refresh("recovery_finalization");
startScheduledTasksIfNeeded();
engineConfig.setEnableGcDeletes(true);
}

protected void clearUnreferencedTranslogs() {
translog.clearUnreferenced();
}

/**
* Returns <tt>true</tt> if this shard can ignore a recovery attempt made to it (since the already doing/done it)
*/
Expand Down Expand Up @@ -1229,7 +1234,6 @@ protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) {
return engineFactory.newReadWriteEngine(config, skipTranslogRecovery);
}


/**
* Returns <code>true</code> iff this shard allows primary promotion, otherwise <code>false</code>
*/
Expand Down
Expand Up @@ -110,6 +110,11 @@ protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig con
return engineFactory.newReadOnlyEngine(config);
}

@Override
protected void clearUnreferencedTranslogs() {
// no-op - Shadow replicas should never delete translogs
}

public boolean allowsPrimaryPromotion() {
return false;
}
Expand Down
Expand Up @@ -39,15 +39,13 @@
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.*;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -432,6 +430,8 @@ public long findLargestPresentTranslogId() throws IOException {
final Matcher matcher = PARSE_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
maxId = Math.max(maxId, Long.parseLong(matcher.group(1)));
logger.trace("found translog: [{}], maxId: [{}], location: [{}]",
fileName, maxId, location);
}
} catch (NumberFormatException ex) {
logger.warn("Couldn't parse translog id from file " + translogFile + " skipping");
Expand Down
Expand Up @@ -611,8 +611,8 @@ protected int sendSnapshot(Translog.Snapshot snapshot) {
.withTimeout(recoverySettings.internalActionLongTimeout());

if (operation == null) {
logger.trace("[{}][{}] no translog operations to send to {}",
indexName, shardId, request.targetNode());
logger.trace("[{}][{}] no translog operations (id: [{}]) to send to {}",
indexName, shardId, snapshot.translogId(), request.targetNode());
}
while (operation != null) {
if (shard.state() == IndexShardState.CLOSED) {
Expand All @@ -637,6 +637,13 @@ protected int sendSnapshot(Translog.Snapshot snapshot) {
// recoverySettings.rateLimiter().pause(size);
// }


if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sending batch of [{}][{}] (total: [{}], id: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
shard.translog().estimatedNumberOfOperations(),
snapshot.translogId(), request.targetNode());
}
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
Expand All @@ -646,12 +653,6 @@ public void run() throws InterruptedException {
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
shard.translog().estimatedNumberOfOperations(),
request.targetNode());
}

ops = 0;
size = 0;
Expand All @@ -663,6 +664,12 @@ indexName, shardId, ops, new ByteSizeValue(size),
throw new ElasticsearchException("failed to get next operation from translog", ex);
} }
// send the leftover
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sending final batch of [{}][{}] (total: [{}], id: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
shard.translog().estimatedNumberOfOperations(),
snapshot.translogId(), request.targetNode());
}
if (!operations.isEmpty()) {
cancellableThreads.execute(new Interruptable() {
@Override
Expand All @@ -675,12 +682,6 @@ public void run() throws InterruptedException {
});

}
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent final batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
shard.translog().estimatedNumberOfOperations(),
request.targetNode());
}
return totalOperations;
}

Expand Down