Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cron timer to arrange the period heartbeat executor invoke time #16900

Merged
merged 10 commits into from Apr 14, 2023
Expand Up @@ -71,7 +71,7 @@ public Optional<IOException> getException() {
}

@Override
public synchronized void heartbeat() {
public synchronized void heartbeat(long timeLimitMs) {
if (!mContext.getClientContext().getClusterConf().clusterDefaultsLoaded()) {
// Wait until the initial cluster defaults are loaded.
return;
Expand Down
Expand Up @@ -66,7 +66,7 @@ public FileSystemContextReinitializer(FileSystemContext context) {
mExecutor = new ConfigHashSync(context);
mFuture = REINIT_EXECUTOR.scheduleAtFixedRate(() -> {
try {
mExecutor.heartbeat();
mExecutor.heartbeat(Long.MAX_VALUE);
} catch (Exception e) {
LOG.error("Uncaught exception in config heartbeat executor, shutting down", e);
}
Expand Down
@@ -0,0 +1,59 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.heartbeat;

import org.apache.logging.log4j.core.util.CronExpression;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;

/**
* Calculate the next interval by given cron expression.
*/
public class CronExpressionIntervalSupplier implements SleepIntervalSupplier {
private final long mInterval;
private final CronExpression mCron;

/**
* Constructs a new {@link CronExpressionIntervalSupplier}.
*
* @param cronExpression the cron expression
* @param fixedInterval the fixed interval
*/
public CronExpressionIntervalSupplier(CronExpression cronExpression, long fixedInterval) {
mInterval = fixedInterval;
mCron = cronExpression;
}

@Override
public long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis) {
long nextInterval = 0;
long executionTimeMs = nowTimeStampMillis - mPreviousTickedMs;
if (executionTimeMs < mInterval) {
nextInterval = mInterval - executionTimeMs;
}
Date now = Date.from(Instant.ofEpochMilli(nowTimeStampMillis + nextInterval));
if (mCron.isSatisfiedBy(now)) {
return nextInterval;
}
return nextInterval + Duration.between(
now.toInstant(), mCron.getNextValidTimeAfter(now).toInstant()).toMillis();
}

@Override
public long getRunLimit(long mPreviousTickedMs) {
Date now = Date.from(Instant.ofEpochMilli(mPreviousTickedMs));
return Duration.between(now.toInstant(),
mCron.getNextInvalidTimeAfter(now).toInstant()).toMillis();
}
}
@@ -0,0 +1,63 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.heartbeat;

import org.slf4j.Logger;
import org.slf4j.helpers.NOPLogger;

/**
* Fixed interval supplier.
*/
public class FixedIntervalSupplier implements SleepIntervalSupplier {

private final long mInterval;
protected final Logger mLogger;

/**
* Constructs a new {@link FixedIntervalSupplier}.
*
* @param fixedInterval the fixed interval
* @param logger the logger
*/
public FixedIntervalSupplier(long fixedInterval, Logger logger) {
mInterval = fixedInterval;
mLogger = logger;
}

/**
* Constructs a new {@link FixedIntervalSupplier}.
*
* @param fixedInterval the fixed interval
*/
public FixedIntervalSupplier(long fixedInterval) {
this(fixedInterval, NOPLogger.NOP_LOGGER);
}

@Override
public long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis) {
if (mPreviousTickedMs == -1) {
return -1;
}
long executionTimeMs = nowTimeStampMillis - mPreviousTickedMs;
if (executionTimeMs > mInterval) {
mLogger.warn("{} last execution took {} ms. Longer than the interval {}",
Thread.currentThread().getName(), executionTimeMs, mInterval);
return 0;
}
return mInterval - executionTimeMs;
}

@Override
public long getRunLimit(long mPreviousTickedMs) {
return mInterval;
}
}
Expand Up @@ -15,15 +15,17 @@

/**
* An interface for a heartbeat execution. The {@link HeartbeatThread} calls the
* {@link #heartbeat()} method.
* {@link #heartbeat(long)} method.
*/
public interface HeartbeatExecutor extends Closeable {

/**
* Implements the heartbeat logic.
*
* @param timeLimitMs time limit in milliseconds this heartbeat should not exceed when running
* @throws InterruptedException if the thread is interrupted
*/
void heartbeat() throws InterruptedException;
void heartbeat(long timeLimitMs) throws InterruptedException;

/**
* Cleans up any resources used by the heartbeat executor.
Expand Down
74 changes: 39 additions & 35 deletions core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java
Expand Up @@ -12,7 +12,6 @@
package alluxio.heartbeat;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Reconfigurable;
import alluxio.conf.ReconfigurableRegistry;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.user.UserState;
Expand All @@ -21,25 +20,25 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Clock;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;

/**
* Thread class to execute a heartbeat periodically. This thread is daemonic, so it will not prevent
* the JVM from exiting.
*/
@NotThreadSafe
public final class HeartbeatThread implements Runnable, Reconfigurable {
public final class HeartbeatThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatThread.class);

private final String mThreadName;
private final HeartbeatExecutor mExecutor;
private final UserState mUserState;
private final Supplier<Long> mIntervalSupplier;
private HeartbeatTimer mTimer;
private AlluxioConfiguration mConfiguration;
private Status mStatus;
Expand Down Expand Up @@ -73,26 +72,28 @@ public static String generateThreadName(String executorName, String threadId) {
* @param intervalSupplier Sleep time between different heartbeat supplier
* @param conf Alluxio configuration
* @param userState the user state for this heartbeat thread
* @param clock the clock used to compute the current time
*/
public HeartbeatThread(String executorName, String threadId, HeartbeatExecutor executor,
Supplier<Long> intervalSupplier, AlluxioConfiguration conf, UserState userState) {
Supplier<SleepIntervalSupplier> intervalSupplier,
AlluxioConfiguration conf, UserState userState, Clock clock) {
mThreadName = generateThreadName(executorName, threadId);
mExecutor = Preconditions.checkNotNull(executor, "executor");
Class<? extends HeartbeatTimer> timerClass = HeartbeatContext.getTimerClass(executorName);
mTimer = CommonUtils.createNewClassInstance(timerClass, new Class[] {String.class, long.class},
new Object[] {mThreadName, intervalSupplier.get()});
mTimer = CommonUtils.createNewClassInstance(timerClass,
new Class[] {String.class, Clock.class, Supplier.class},
new Object[] {mThreadName, clock, intervalSupplier});
mConfiguration = conf;
mUserState = userState;
mIntervalSupplier = intervalSupplier;
mStatus = Status.INIT;
ReconfigurableRegistry.register(this);
ReconfigurableRegistry.register(mTimer);
}

/**
* Convenience method for
* {@link
* #HeartbeatThread(String, String, HeartbeatExecutor, Supplier, AlluxioConfiguration,
* UserState)} where threadId is null.
* UserState, Clock)} where threadId is null.
*
* @param executorName the executor name that is one of those defined in {@link HeartbeatContext}
* @param executor the heartbeat executor
Expand All @@ -101,12 +102,34 @@ public HeartbeatThread(String executorName, String threadId, HeartbeatExecutor e
* @param userState the user state for this heartbeat thread
*/
public HeartbeatThread(String executorName, HeartbeatExecutor executor,
Supplier<Long> intervalSupplier, AlluxioConfiguration conf, UserState userState) {
this(executorName, null, executor, intervalSupplier, conf, userState);
Supplier<SleepIntervalSupplier> intervalSupplier, AlluxioConfiguration conf,
UserState userState) {
this(executorName, null, executor, intervalSupplier, conf, userState, Clock.systemUTC());
}

/**
* Convenience method for
* {@link
* #HeartbeatThread(String, String, HeartbeatExecutor, Supplier, AlluxioConfiguration,
* UserState, Clock)} where threadId is null.
*
* @param executorName the executor name that is one of those defined in {@link HeartbeatContext}
* @param executor the heartbeat executor
* @param intervalSupplier the interval between heartbeats supplier
* @param conf the Alluxio configuration
* @param userState the user state for this heartbeat thread
* @param clock the clock used to compute the current time
*/
public HeartbeatThread(String executorName, HeartbeatExecutor executor,
Supplier<SleepIntervalSupplier> intervalSupplier,
AlluxioConfiguration conf, UserState userState, Clock clock) {
this(executorName, null, executor, intervalSupplier,
conf, userState, clock);
}

@Override
public void run() {
long counter = 0L;
try {
if (SecurityUtils.isSecurityEnabled(mConfiguration)
&& AuthenticatedClientUser.get(mConfiguration) == null) {
Expand All @@ -123,48 +146,29 @@ public void run() {
while (!Thread.interrupted()) {
// TODO(peis): Fix this. The current implementation consumes one thread even when ticking.
mStatus = Status.WAITING;
mTimer.tick();
long limitTime = mTimer.tick();
mStatus = Status.RUNNING;
mExecutor.heartbeat();
LOG.debug("{} #{} will run limited in {}s", mThreadName, counter++, limitTime / 1000);
mExecutor.heartbeat(limitTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who stops this heartbeat if the limitTime has been exceeded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopped by Heartbeat implementation

}
} catch (InterruptedException e) {
// Allow thread to exit.
} catch (Exception e) {
LOG.error("Uncaught exception in heartbeat executor, Heartbeat Thread shutting down", e);
} finally {
mStatus = Status.STOPPED;
ReconfigurableRegistry.unregister(mTimer);
mExecutor.close();
}
}

/**
* Updates the heartbeat interval.
*
* @param intervalMs the heartbeat interval in ms
*/
public void updateIntervalMs(long intervalMs) {
mTimer.setIntervalMs(intervalMs);
}

/**
* @return the status of current heartbeat thread
*/
public Status getStatus() {
return mStatus;
}

@Override
public void update() {
if (mStatus == Status.STOPPED) {
ReconfigurableRegistry.unregister(this);
return;
}
long interval = mIntervalSupplier.get();
if (interval != mTimer.getIntervalMs()) {
updateIntervalMs(interval);
}
}

/**
* Enum representing the status of HeartbeatThread.
*/
Expand Down
26 changes: 10 additions & 16 deletions core/common/src/main/java/alluxio/heartbeat/HeartbeatTimer.java
Expand Up @@ -11,33 +11,27 @@

package alluxio.heartbeat;

import alluxio.conf.Reconfigurable;

/**
* An interface for heartbeat timers. The {@link HeartbeatThread} calls the {@link #tick()} method.
*/
public interface HeartbeatTimer {
public interface HeartbeatTimer extends Reconfigurable {

/**
* Sets the heartbeat interval.
*
* @param intervalMs the heartbeat interval in ms
*/
default void setIntervalMs(long intervalMs) {
throw new UnsupportedOperationException("Setting interval is not supported");
}

/**
* Get the interval of HeartbeatTimer.
*
* @return the interval of this HeartbeatTimer
* When this object needs to be reconfigured
* due to external configuration change etc.,
* this function will be invoked.
*/
default long getIntervalMs() {
throw new UnsupportedOperationException("Getting interval is not supported");
default void update() {
}

/**
* Waits until next heartbeat should be executed.
*
* @return time limit in milliseconds for this heartbeat action to run for before
* the next heartbeat is due.
* @throws InterruptedException if the thread is interrupted while waiting
*/
void tick() throws InterruptedException;
long tick() throws InterruptedException;
}