Skip to content

Commit

Permalink
IGNITE-9737 Added configuration parameters for system workers livenes…
Browse files Browse the repository at this point in the history
…s checking

Signed-off-by: Andrey Gura <agura@apache.org>
  • Loading branch information
andrey-kuznetsov authored and agura committed Oct 17, 2018
1 parent 33b9611 commit 8828c1d
Show file tree
Hide file tree
Showing 21 changed files with 697 additions and 85 deletions.
Expand Up @@ -1007,6 +1007,17 @@ public final class IgniteSystemProperties {
*/ */
public static final String IGNITE_REUSE_MEMORY_ON_DEACTIVATE = "IGNITE_REUSE_MEMORY_ON_DEACTIVATE"; public static final String IGNITE_REUSE_MEMORY_ON_DEACTIVATE = "IGNITE_REUSE_MEMORY_ON_DEACTIVATE";


/**
* Maximum inactivity period for system worker in milliseconds. When this value is exceeded, worker is considered
* blocked with consequent critical failure handler invocation.
*/
public static final String IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT = "IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT";

/**
* Timeout for checkpoint read lock acquisition in milliseconds.
*/
public static final String IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT = "IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT";

/** /**
* Timeout for waiting schema update if schema was not found for last accepted version. * Timeout for waiting schema update if schema was not found for last accepted version.
*/ */
Expand Down
Expand Up @@ -279,6 +279,9 @@ public class DataStorageConfiguration implements Serializable {
*/ */
private int walCompactionLevel = DFLT_WAL_COMPACTION_LEVEL; private int walCompactionLevel = DFLT_WAL_COMPACTION_LEVEL;


/** Timeout for checkpoint read lock acquisition. */
private Long checkpointReadLockTimeout;

/** /**
* Initial size of a data region reserved for system cache. * Initial size of a data region reserved for system cache.
* *
Expand Down Expand Up @@ -983,6 +986,30 @@ public void setWalCompactionLevel(int walCompactionLevel) {
this.walCompactionLevel = walCompactionLevel; this.walCompactionLevel = walCompactionLevel;
} }


/**
* Returns timeout for checkpoint read lock acquisition.
*
* @see #setCheckpointReadLockTimeout(long)
* @return Returns timeout for checkpoint read lock acquisition in milliseconds.
*/
public Long getCheckpointReadLockTimeout() {
return checkpointReadLockTimeout;
}

/**
* Sets timeout for checkpoint read lock acquisition.
* <p>
* When any thread cannot acquire checkpoint read lock in this time, then critical failure handler is being called.
*
* @param checkpointReadLockTimeout Timeout for checkpoint read lock acquisition in milliseconds.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setCheckpointReadLockTimeout(long checkpointReadLockTimeout) {
this.checkpointReadLockTimeout = checkpointReadLockTimeout;

return this;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(DataStorageConfiguration.class, this); return S.toString(DataStorageConfiguration.class, this);
Expand Down
Expand Up @@ -411,6 +411,9 @@ public class IgniteConfiguration {
/** Failure detection timeout. */ /** Failure detection timeout. */
private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT; private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT;


/** Timeout for blocked system workers detection. */
private Long sysWorkerBlockedTimeout;

/** Failure detection timeout for client nodes. */ /** Failure detection timeout for client nodes. */
private Long clientFailureDetectionTimeout = DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT; private Long clientFailureDetectionTimeout = DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT;


Expand Down Expand Up @@ -624,6 +627,7 @@ public IgniteConfiguration(IgniteConfiguration cfg) {
svcCfgs = cfg.getServiceConfiguration(); svcCfgs = cfg.getServiceConfiguration();
svcPoolSize = cfg.getServiceThreadPoolSize(); svcPoolSize = cfg.getServiceThreadPoolSize();
sysPoolSize = cfg.getSystemThreadPoolSize(); sysPoolSize = cfg.getSystemThreadPoolSize();
sysWorkerBlockedTimeout = cfg.getSystemWorkerBlockedTimeout();
timeSrvPortBase = cfg.getTimeServerPortBase(); timeSrvPortBase = cfg.getTimeServerPortBase();
timeSrvPortRange = cfg.getTimeServerPortRange(); timeSrvPortRange = cfg.getTimeServerPortRange();
txCfg = cfg.getTransactionConfiguration(); txCfg = cfg.getTransactionConfiguration();
Expand Down Expand Up @@ -1981,6 +1985,31 @@ public IgniteConfiguration setFailureDetectionTimeout(long failureDetectionTimeo
return this; return this;
} }


/**
* Returns maximum inactivity period for system worker. When this value is exceeded, worker is considered blocked
* with consequent critical failure handler invocation.
*
* @see #setSystemWorkerBlockedTimeout(long)
* @return Maximum inactivity period for system worker in milliseconds.
*/
public Long getSystemWorkerBlockedTimeout() {
return sysWorkerBlockedTimeout;
}

/**
* Sets maximum inactivity period for system worker. When this value is exceeded, worker is considered blocked
* with consequent critical failure handler invocation.
*
* @see #setFailureHandler(FailureHandler)
* @param sysWorkerBlockedTimeout Maximum inactivity period for system worker in milliseconds.
* @return {@code this} for chaining.
*/
public IgniteConfiguration setSystemWorkerBlockedTimeout(long sysWorkerBlockedTimeout) {
this.sysWorkerBlockedTimeout = sysWorkerBlockedTimeout;

return this;
}

/** /**
* Should return fully configured load balancing SPI implementation. If not provided, * Should return fully configured load balancing SPI implementation. If not provided,
* {@link RoundRobinLoadBalancingSpi} will be used. * {@link RoundRobinLoadBalancingSpi} will be used.
Expand Down
Expand Up @@ -188,6 +188,7 @@
import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl;
import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl; import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteBiTuple;
Expand All @@ -201,6 +202,7 @@
import org.apache.ignite.marshaller.MarshallerExclusions; import org.apache.ignite.marshaller.MarshallerExclusions;
import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.mxbean.FailureHandlingMxBean;
import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.mxbean.ClusterMetricsMXBean;
import org.apache.ignite.mxbean.DataStorageMXBean; import org.apache.ignite.mxbean.DataStorageMXBean;
import org.apache.ignite.mxbean.IgniteMXBean; import org.apache.ignite.mxbean.IgniteMXBean;
Expand Down Expand Up @@ -4365,6 +4367,12 @@ private void registerAllMBeans(
registerMBean("Kernal", workerCtrlMXBean.getClass().getSimpleName(), registerMBean("Kernal", workerCtrlMXBean.getClass().getSimpleName(),
workerCtrlMXBean, WorkersControlMXBean.class); workerCtrlMXBean, WorkersControlMXBean.class);
} }

FailureHandlingMxBean blockOpCtrlMXBean = new FailureHandlingMxBeanImpl(workersRegistry,
ctx.cache().context().database());

registerMBean("Kernal", blockOpCtrlMXBean.getClass().getSimpleName(), blockOpCtrlMXBean,
FailureHandlingMxBean.class);
} }


/** /**
Expand Down
Expand Up @@ -138,6 +138,7 @@
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RESTART_CODE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_RESTART_CODE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
Expand Down Expand Up @@ -1830,7 +1831,10 @@ private void start0(GridStartContext startCtx) throws IgniteCheckedException {
new IgniteException(S.toString(GridWorker.class, deadWorker)))); new IgniteException(S.toString(GridWorker.class, deadWorker))));
} }
}, },
cfg.getFailureDetectionTimeout(), IgniteSystemProperties.getLong(IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT,
cfg.getSystemWorkerBlockedTimeout() != null
? cfg.getSystemWorkerBlockedTimeout()
: cfg.getFailureDetectionTimeout()),
log); log);


stripedExecSvc = new StripedExecutor( stripedExecSvc = new StripedExecutor(
Expand Down
Expand Up @@ -79,7 +79,6 @@
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.DirectMemoryRegion; import org.apache.ignite.internal.mem.DirectMemoryRegion;
Expand Down Expand Up @@ -111,9 +110,9 @@
import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
Expand Down Expand Up @@ -160,12 +159,13 @@
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean; import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap; import org.jsr166.ConcurrentLinkedHashMap;


import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
Expand Down Expand Up @@ -352,6 +352,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan


/** File I/O factory for writing checkpoint markers. */ /** File I/O factory for writing checkpoint markers. */
private final FileIOFactory ioFactory; private final FileIOFactory ioFactory;

/** Timeout for checkpoint read lock acquisition in milliseconds. */
private volatile long checkpointReadLockTimeout;

/** /**
* @param ctx Kernal context. * @param ctx Kernal context.
*/ */
Expand All @@ -377,6 +381,17 @@ public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
); );


ioFactory = persistenceCfg.getFileIOFactory(); ioFactory = persistenceCfg.getFileIOFactory();

Long cfgCheckpointReadLockTimeout = ctx.config().getDataStorageConfiguration() != null
? ctx.config().getDataStorageConfiguration().getCheckpointReadLockTimeout()
: null;

checkpointReadLockTimeout = IgniteSystemProperties.getLong(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT,
cfgCheckpointReadLockTimeout != null
? cfgCheckpointReadLockTimeout
: (ctx.workersRegistry() != null
? ctx.workersRegistry().getSystemWorkerBlockedTimeout()
: ctx.config().getFailureDetectionTimeout()));
} }


/** */ /** */
Expand Down Expand Up @@ -1486,53 +1501,64 @@ private void prepareIndexRebuildFuture(int cacheId) {
if (checkpointLock.writeLock().isHeldByCurrentThread()) if (checkpointLock.writeLock().isHeldByCurrentThread())
return; return;


long timeout = cctx.gridConfig().getFailureDetectionTimeout(); long timeout = checkpointReadLockTimeout;


long start = U.currentTimeMillis(); long start = U.currentTimeMillis();
long passed;


boolean interruped = false; boolean interruped = false;


try { try {
for (; ; ) { for (; ; ) {
if ((passed = U.currentTimeMillis() - start) >= timeout)
failCheckpointReadLock();

try { try {
if (!checkpointLock.readLock().tryLock(timeout - passed, TimeUnit.MILLISECONDS)) if (timeout > 0 && (U.currentTimeMillis() - start) >= timeout)
failCheckpointReadLock(); failCheckpointReadLock();
}
catch (InterruptedException e) {
interruped = true;


continue; try {
} if (timeout > 0) {
if (!checkpointLock.readLock().tryLock(timeout - (U.currentTimeMillis() - start),
TimeUnit.MILLISECONDS))
failCheckpointReadLock();
}
else
checkpointLock.readLock().lock();
}
catch (InterruptedException e) {
interruped = true;


if (stopping) { continue;
checkpointLock.readLock().unlock(); }


throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping.")); if (stopping) {
} checkpointLock.readLock().unlock();


if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories()) throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping."));
break; }
else {
checkpointLock.readLock().unlock();


if (U.currentTimeMillis() - start >= timeout) if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories())
failCheckpointReadLock(); break;
else {
checkpointLock.readLock().unlock();


try { if (timeout > 0 && U.currentTimeMillis() - start >= timeout)
checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut failCheckpointReadLock();
.getUninterruptibly();
} try {
catch (IgniteFutureTimeoutCheckedException e) { checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut
failCheckpointReadLock(); .getUninterruptibly();
} }
catch (IgniteCheckedException e) { catch (IgniteFutureTimeoutCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e); failCheckpointReadLock();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e);
}
} }
} }
catch (CheckpointReadLockTimeoutException e) {
log.error(e.getMessage(), e);

timeout = 0;
}
} }
} }
finally { finally {
Expand All @@ -1544,13 +1570,21 @@ private void prepareIndexRebuildFuture(int cacheId) {
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1); CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
} }


/** */ /**
private void failCheckpointReadLock() throws IgniteException { * Invokes critical failure processing. Always throws.
IgniteException e = new IgniteException("Checkpoint read lock acquisition has been timed out."); *
* @throws CheckpointReadLockTimeoutException If node was not invalidated as result of handling.
* @throws IgniteException If node was invalidated as result of handling.
*/
private void failCheckpointReadLock() throws CheckpointReadLockTimeoutException, IgniteException {
String msg = "Checkpoint read lock acquisition has been timed out.";


cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); IgniteException e = new IgniteException(msg);


throw e; if (cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)))
throw e;

throw new CheckpointReadLockTimeoutException(msg);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -2882,6 +2916,24 @@ public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteChe
cp.cancelOrWaitPartitionDestroy(grpId, partId); cp.cancelOrWaitPartitionDestroy(grpId, partId);
} }


/**
* Timeout for checkpoint read lock acquisition.
*
* @return Timeout for checkpoint read lock acquisition in milliseconds.
*/
@Override public long checkpointReadLockTimeout() {
return checkpointReadLockTimeout;
}

/**
* Sets timeout for checkpoint read lock acquisition.
*
* @param val New timeout in milliseconds, non-positive value denotes infinite timeout.
*/
@Override public void checkpointReadLockTimeout(long val) {
checkpointReadLockTimeout = val;
}

/** /**
* Partition destroy queue. * Partition destroy queue.
*/ */
Expand Down Expand Up @@ -4809,4 +4861,15 @@ public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) {
super(lastArchivedSegment, log); super(lastArchivedSegment, log);
} }
} }

/** Indicates checkpoint read lock acquisition failure which did not lead to node invalidation. */
private static class CheckpointReadLockTimeoutException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;

/** */
private CheckpointReadLockTimeoutException(String msg) {
super(msg);
}
}
} }

0 comments on commit 8828c1d

Please sign in to comment.