Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -107,7 +107,7 @@ public String toString() {
private final Semaphore requestSemaphore;
private final TimeDuration requestTimeout;
private final TimeDuration closeTimeout;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
this.dataStreamClientRpc = dataStreamClientRpc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -135,7 +135,7 @@ void set(Collection<RaftPeer> newPeers) {

private volatile RaftPeerId leaderId;

private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

private final Supplier<OrderedAsync> orderedAsync;
private final Supplier<AsyncImpl> asyncApi;
Expand Down Expand Up @@ -226,7 +226,7 @@ TimeDuration getEffectiveSleepTime(Throwable t, TimeDuration sleepDefault) {
TimeDuration.ZERO : sleepDefault;
}

TimeoutScheduler getScheduler() {
TimeoutExecutor getScheduler() {
return scheduler;
}

Expand Down Expand Up @@ -403,7 +403,6 @@ public RaftClientRpc getClientRpc() {

@Override
public void close() throws IOException {
scheduler.close();
clientRpc.close();
if (dataStreamApi.isInitialized()) {
dataStreamApi.get().close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;

import java.util.function.Consumer;
import java.util.function.Supplier;

/** Execute timeout tasks. */
public interface TimeoutExecutor {
int MAXIMUM_POOL_SIZE = 8;
static TimeoutExecutor getInstance() {
return TimeoutTimer.getInstance();
}

/** @return the number of scheduled but not completed timeout tasks. */
int getTaskCount();

/**
* Schedule a timeout task.
*
* @param timeout the timeout value.
* @param task the task to run when timeout.
* @param errorHandler to handle the error, if there is any.
*/
<THROWABLE extends Throwable> void onTimeout(
TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler);

/** When timeout, run the task. Log the error, if there is any. */
default void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -32,7 +31,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

public final class TimeoutScheduler implements Closeable {
public final class TimeoutScheduler implements TimeoutExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this class, it will cause thread overflow and should not be called again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it for a while. Not sure if we may find some problem in the new TimeoutTimer in the future. It may be useful then.

public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class);

static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
Expand Down Expand Up @@ -110,7 +109,8 @@ void shutdown() {
private TimeoutScheduler() {
}

int getQueueSize() {
@Override
public int getTaskCount() {
return scheduler.getQueueSize();
}

Expand All @@ -126,13 +126,7 @@ boolean hasScheduler() {
return scheduler.hasExecutor();
}

/**
* Schedule a timeout task.
*
* @param timeout the timeout value.
* @param task the task to run when timeout.
* @param errorHandler to handle the error, if there is any.
*/
@Override
public <THROWABLE extends Throwable> void onTimeout(
TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
onTimeout(timeout, sid -> {
Expand Down Expand Up @@ -186,13 +180,7 @@ private synchronized void tryShutdownScheduler(int sid) {
}
}

/** When timeout, run the task. Log the error, if there is any. */
public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
}

@Override
public synchronized void close() {
public synchronized void tryShutdownScheduler() {
tryShutdownScheduler(scheduleID);
}
}
109 changes: 109 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

public final class TimeoutTimer implements TimeoutExecutor {
public static final Logger LOG = LoggerFactory.getLogger(TimeoutTimer.class);

private static final Supplier<TimeoutTimer> INSTANCE = JavaUtils.memoize(() -> new TimeoutTimer(MAXIMUM_POOL_SIZE));

public static TimeoutTimer getInstance() {
return INSTANCE.get();
}

static class Task extends TimerTask {
private final int id;
private final Runnable runnable;

Task(int id, Runnable runnable) {
this.id = id;
this.runnable = LogUtils.newRunnable(LOG, runnable, this::toString);
}

@Override
public void run() {
LOG.debug("run {}", this);
runnable.run();
}

@Override
public String toString() {
return "task #" + id;
}
}

/** The number of scheduled tasks. */
private final AtomicInteger numTasks = new AtomicInteger();
/** A unique ID for each task. */
private final AtomicInteger taskId = new AtomicInteger();

private final List<MemoizedSupplier<Timer>> timers;

private TimeoutTimer(int numTimers) {
final List<MemoizedSupplier<Timer>> list = new ArrayList<>(numTimers);
for(int i = 0; i < numTimers; i++) {
final String name = "timer" + i;
list.add(JavaUtils.memoize(() -> new Timer(name, true)));
}
this.timers = Collections.unmodifiableList(list);
}

@Override
public int getTaskCount() {
return numTasks.get();
}

private Timer getTimer(int tid) {
return timers.get(Math.toIntExact(Integer.toUnsignedLong(tid) % timers.size())).get();
}

private void schedule(TimeDuration timeout, Runnable toSchedule) {
final int tid = taskId.incrementAndGet();
final int n = numTasks.incrementAndGet();
LOG.debug("schedule a task #{} with timeout {}, numTasks={}", tid, timeout, n);
getTimer(n).schedule(new Task(tid, toSchedule), timeout.toLong(TimeUnit.MILLISECONDS));
}

@Override
public <THROWABLE extends Throwable> void onTimeout(
TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
schedule(timeout, () -> {
try {
task.run();
} catch(Throwable t) {
errorHandler.accept(JavaUtils.cast(t));
} finally {
numTasks.decrementAndGet();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class GrpcClientProtocolClient implements Closeable {

private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
Expand Down Expand Up @@ -173,7 +173,6 @@ public void close() {
if (clientChannel != adminChannel) {
GrpcUtil.shutdownManagedChannel(adminChannel);
}
scheduler.close();
metricClientInterceptor.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private final boolean installSnapshotEnabled;

private final TimeDuration requestTimeoutDuration;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -243,7 +243,7 @@ public String toString() {

private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
private final TimeDuration replyQueueGracePeriod;
private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();

public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,7 +81,7 @@ PendingRequest getAndUpdate(Supplier<PendingRequest> supplier) {
}

private final LeaderStateImpl leader;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final PendingRequestReference pending = new PendingRequestReference();

PendingStepDown(LeaderStateImpl leaderState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,7 +89,7 @@ PendingRequest getAndUpdate(Supplier<PendingRequest> supplier) {
}

private final RaftServerImpl server;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final PendingRequestReference pending = new PendingRequestReference();

SnapshotManagementRequestHandler(RaftServerImpl server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,7 +74,7 @@ public String toString() {
}

private final RaftServerImpl server;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
private final AtomicReference<PendingRequest> pending = new AtomicReference<>();

TransferLeadership(RaftServerImpl server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ synchronized void failAll(Exception e) {

private final TimeDuration watchTimeoutNanos;
private final TimeDuration watchTimeoutDenominationNanos;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

WatchRequests(Object name, RaftProperties properties) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
Expand Down
Loading