Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing a new snapshot segments threadpool to uploads segments of shards in parallel #39657

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;

/**
* Wrapper over {@link AbstractRunnable} which provides priority.
* <p>
* Priority is done by natural ordering.
* 0 has higher priority than 1, 1 has higher priority than 2 and so on.
*/
public abstract class AbstractPrioritizedRunnable extends AbstractRunnable implements Comparable<AbstractPrioritizedRunnable> {

private final Long priority;

protected AbstractPrioritizedRunnable(long priority) {
this.priority = priority;
}

public Long getPriority() {
return priority;
}

public int compareTo(AbstractPrioritizedRunnable other) {
return this.priority.compareTo(other.priority);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public class EsExecutors {
Expand Down Expand Up @@ -79,6 +81,16 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon
return executor;
}


public static EsThreadPoolExecutor newPrioritizedScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorPrioritizedScalingQueue<Runnable> queue = new ExecutorPrioritizedScalingQueue<>();
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory,
new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}

public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue<Runnable> queue;
Expand Down Expand Up @@ -307,6 +319,55 @@ public boolean offer(E e) {

}

static class ExecutorPrioritizedScalingQueue<E> extends PriorityBlockingQueue<E> {

private final ReentrantLock lock;
ThreadPoolExecutor executor;

ExecutorPrioritizedScalingQueue() {
lock = new ReentrantLock(true);
}

@Override
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// check if any threads is idle in pool
int notActive = executor.getPoolSize() - executor.getActiveCount();
if (notActive > 0) {
// There exists some idle threads in the pool, adding to the queue so that they get picked up
return super.offer(e);
}
// check if there might be spare capacity in the thread pool executor
int left = executor.getMaximumPoolSize() - executor.getPoolSize();
if (left > 0) {
// reject queuing the task to force the thread pool executor to add a worker if it can; combined
// with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size and we
// only queue when there is no spare capacity
return false;
} else {
return super.offer(e);
}
} finally {
lock.unlock();
}
}

@Override
public void put(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// This is called from ForceQueuePolicy so directly add it to parent
super.offer(e);
} finally {
lock.unlock();
}
}

}

/**
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
Expand All @@ -317,7 +378,8 @@ static class ForceQueuePolicy implements XRejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// force queue policy should only be used with a scaling queue
assert executor.getQueue() instanceof ExecutorScalingQueue;
assert (executor.getQueue() instanceof ExecutorScalingQueue) ||
(executor.getQueue() instanceof ExecutorPrioritizedScalingQueue);
executor.getQueue().put(r);
} catch (final InterruptedException e) {
// a scaling queue never blocks so a put to it can never be interrupted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,18 @@ public void addResponseHeader(final String key, final String value, final Functi
* <code>command</code> has already been passed through this method then it is returned unaltered rather than wrapped twice.
*/
public Runnable preserveContext(Runnable command) {
if (command instanceof ContextPreservingAbstractPrioritizedRunnable) {
return command;
}
if (command instanceof ContextPreservingAbstractRunnable) {
return command;
}
if (command instanceof ContextPreservingRunnable) {
return command;
}
if (command instanceof AbstractPrioritizedRunnable) {
return new ContextPreservingAbstractPrioritizedRunnable((AbstractPrioritizedRunnable) command);
}
if (command instanceof AbstractRunnable) {
return new ContextPreservingAbstractRunnable((AbstractRunnable) command);
}
Expand All @@ -361,6 +367,9 @@ public Runnable preserveContext(Runnable command) {
* Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}.
*/
public Runnable unwrap(Runnable command) {
if (command instanceof ContextPreservingAbstractPrioritizedRunnable) {
return ((ContextPreservingAbstractPrioritizedRunnable) command).unwrap();
}
if (command instanceof WrappedRunnable) {
return ((WrappedRunnable) command).unwrap();
}
Expand Down Expand Up @@ -771,6 +780,77 @@ public AbstractRunnable unwrap() {
}
}

/**
* Wraps an AbstractPrioritizedRunnable to preserve the thread context.
*/
private class ContextPreservingAbstractPrioritizedRunnable extends AbstractPrioritizedRunnable {
private final AbstractPrioritizedRunnable in;
private final ThreadContext.StoredContext creatorsContext;

private ThreadContext.StoredContext threadsOriginalContext = null;

private ContextPreservingAbstractPrioritizedRunnable(AbstractPrioritizedRunnable in) {
super(in.getPriority());
creatorsContext = newStoredContext(false);
this.in = in;
}

@Override
public boolean isForceExecution() {
return in.isForceExecution();
}

@Override
public void onAfter() {
try {
in.onAfter();
} finally {
if (threadsOriginalContext != null) {
threadsOriginalContext.restore();
}
}
}

@Override
public void onFailure(Exception e) {
in.onFailure(e);
}

@Override
public void onRejection(Exception e) {
in.onRejection(e);
}

@Override
protected void doRun() throws Exception {
boolean whileRunning = false;
threadsOriginalContext = stashContext();
try {
creatorsContext.restore();
whileRunning = true;
in.doRun();
whileRunning = false;
} catch (IllegalStateException ex) {
if (whileRunning || threadLocal.closed.get() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
}
}

@Override
public String toString() {
return in.toString();
}

public AbstractPrioritizedRunnable unwrap() {
return in;
}

}

private static final Collector<String, Set<String>, Set<String>> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>();

private static class LinkedHashSetCollector<T> implements Collector<T, Set<T>, Set<T>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

public class FilterRepository implements Repository {

Expand Down Expand Up @@ -124,6 +127,13 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, Optional<AtomicInteger> priorityGenerator,
Optional<Executor> executor) {
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, priorityGenerator, executor);
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -204,6 +207,30 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param shard shard to be snapshotted
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @param priorityGenerator priority generator for this shard
* @param executor executor to upload files in parallel
*/
default void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, Optional<AtomicInteger> priorityGenerator,
Optional<Executor> executor) {
// Default implementation will ignore priority and executor and execute the older way
snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

/**
* Restores snapshot of the shard.
* <p>
Expand Down
Loading