From 8828c1d1910d209b9f520efcc8d256c6bc6b4251 Mon Sep 17 00:00:00 2001 From: Andrey Kuznetsov Date: Wed, 17 Oct 2018 16:53:53 +0300 Subject: [PATCH] IGNITE-9737 Added configuration parameters for system workers liveness checking Signed-off-by: Andrey Gura --- .../apache/ignite/IgniteSystemProperties.java | 11 + .../DataStorageConfiguration.java | 27 ++ .../configuration/IgniteConfiguration.java | 29 ++ .../apache/ignite/internal/IgniteKernal.java | 8 + .../apache/ignite/internal/IgnitionEx.java | 6 +- .../GridCacheDatabaseSharedManager.java | 139 +++++++--- .../IgniteCacheDatabaseSharedManager.java | 14 + .../processors/failure/FailureProcessor.java | 12 +- .../utils/PlatformConfigurationUtils.java | 42 ++- .../ignite/internal/util/IgniteUtils.java | 11 + .../worker/FailureHandlingMxBeanImpl.java | 73 +++++ .../worker/WorkersControlMXBeanImpl.java | 10 - .../internal/worker/WorkersRegistry.java | 44 ++- .../ignite/mxbean/FailureHandlingMxBean.java | 47 ++++ .../ignite/mxbean/WorkersControlMXBean.java | 7 - .../FailureHandlingConfigurationTest.java | 262 ++++++++++++++++++ .../IgniteBasicWithPersistenceTestSuite.java | 2 + .../IgniteConfigurationTest.cs | 3 + .../Configuration/DataStorageConfiguration.cs | 7 + .../Apache.Ignite.Core/IgniteConfiguration.cs | 14 + .../IgniteConfigurationSection.xsd | 14 + 21 files changed, 697 insertions(+), 85 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/worker/FailureHandlingMxBeanImpl.java create mode 100644 modules/core/src/main/java/org/apache/ignite/mxbean/FailureHandlingMxBean.java create mode 100644 modules/core/src/test/java/org/apache/ignite/failure/FailureHandlingConfigurationTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 02ebb25731dd9..521222cee9084 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1007,6 +1007,17 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_REUSE_MEMORY_ON_DEACTIVATE = "IGNITE_REUSE_MEMORY_ON_DEACTIVATE"; + /** + * Maximum inactivity period for system worker in milliseconds. When this value is exceeded, worker is considered + * blocked with consequent critical failure handler invocation. + */ + public static final String IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT = "IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT"; + + /** + * Timeout for checkpoint read lock acquisition in milliseconds. + */ + public static final String IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT = "IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT"; + /** * Timeout for waiting schema update if schema was not found for last accepted version. */ diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 556e3cd44b3a4..7bca0f96479e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -279,6 +279,9 @@ public class DataStorageConfiguration implements Serializable { */ private int walCompactionLevel = DFLT_WAL_COMPACTION_LEVEL; + /** Timeout for checkpoint read lock acquisition. */ + private Long checkpointReadLockTimeout; + /** * Initial size of a data region reserved for system cache. * @@ -983,6 +986,30 @@ public void setWalCompactionLevel(int walCompactionLevel) { this.walCompactionLevel = walCompactionLevel; } + /** + * Returns timeout for checkpoint read lock acquisition. + * + * @see #setCheckpointReadLockTimeout(long) + * @return Returns timeout for checkpoint read lock acquisition in milliseconds. + */ + public Long getCheckpointReadLockTimeout() { + return checkpointReadLockTimeout; + } + + /** + * Sets timeout for checkpoint read lock acquisition. + *

+ * When any thread cannot acquire checkpoint read lock in this time, then critical failure handler is being called. + * + * @param checkpointReadLockTimeout Timeout for checkpoint read lock acquisition in milliseconds. + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setCheckpointReadLockTimeout(long checkpointReadLockTimeout) { + this.checkpointReadLockTimeout = checkpointReadLockTimeout; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStorageConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 1dbec7db233ab..e7ccaf556ccf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -411,6 +411,9 @@ public class IgniteConfiguration { /** Failure detection timeout. */ private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT; + /** Timeout for blocked system workers detection. */ + private Long sysWorkerBlockedTimeout; + /** Failure detection timeout for client nodes. */ private Long clientFailureDetectionTimeout = DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT; @@ -624,6 +627,7 @@ public IgniteConfiguration(IgniteConfiguration cfg) { svcCfgs = cfg.getServiceConfiguration(); svcPoolSize = cfg.getServiceThreadPoolSize(); sysPoolSize = cfg.getSystemThreadPoolSize(); + sysWorkerBlockedTimeout = cfg.getSystemWorkerBlockedTimeout(); timeSrvPortBase = cfg.getTimeServerPortBase(); timeSrvPortRange = cfg.getTimeServerPortRange(); txCfg = cfg.getTransactionConfiguration(); @@ -1981,6 +1985,31 @@ public IgniteConfiguration setFailureDetectionTimeout(long failureDetectionTimeo return this; } + /** + * Returns maximum inactivity period for system worker. When this value is exceeded, worker is considered blocked + * with consequent critical failure handler invocation. + * + * @see #setSystemWorkerBlockedTimeout(long) + * @return Maximum inactivity period for system worker in milliseconds. + */ + public Long getSystemWorkerBlockedTimeout() { + return sysWorkerBlockedTimeout; + } + + /** + * Sets maximum inactivity period for system worker. When this value is exceeded, worker is considered blocked + * with consequent critical failure handler invocation. + * + * @see #setFailureHandler(FailureHandler) + * @param sysWorkerBlockedTimeout Maximum inactivity period for system worker in milliseconds. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setSystemWorkerBlockedTimeout(long sysWorkerBlockedTimeout) { + this.sysWorkerBlockedTimeout = sysWorkerBlockedTimeout; + + return this; + } + /** * Should return fully configured load balancing SPI implementation. If not provided, * {@link RoundRobinLoadBalancingSpi} will be used. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f0658630084c0..7f4310fd88ad9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -188,6 +188,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl; import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.lang.IgniteBiTuple; @@ -201,6 +202,7 @@ import org.apache.ignite.marshaller.MarshallerExclusions; import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.mxbean.FailureHandlingMxBean; import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.mxbean.DataStorageMXBean; import org.apache.ignite.mxbean.IgniteMXBean; @@ -4365,6 +4367,12 @@ private void registerAllMBeans( registerMBean("Kernal", workerCtrlMXBean.getClass().getSimpleName(), workerCtrlMXBean, WorkersControlMXBean.class); } + + FailureHandlingMxBean blockOpCtrlMXBean = new FailureHandlingMxBeanImpl(workersRegistry, + ctx.cache().context().database()); + + registerMBean("Kernal", blockOpCtrlMXBean.getClass().getSimpleName(), blockOpCtrlMXBean, + FailureHandlingMxBean.class); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 95001deeee66c..1e56cde9e052a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -138,6 +138,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK; import static org.apache.ignite.IgniteSystemProperties.IGNITE_RESTART_CODE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; @@ -1830,7 +1831,10 @@ private void start0(GridStartContext startCtx) throws IgniteCheckedException { new IgniteException(S.toString(GridWorker.class, deadWorker)))); } }, - cfg.getFailureDetectionTimeout(), + IgniteSystemProperties.getLong(IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT, + cfg.getSystemWorkerBlockedTimeout() != null + ? cfg.getSystemWorkerBlockedTimeout() + : cfg.getFailureDetectionTimeout()), log); stripedExecSvc = new StripedExecutor( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index e74954f3531f3..ea3e3148b5568 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -79,7 +79,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.NodeStoppingException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.DirectMemoryRegion; @@ -111,9 +110,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType; @@ -160,12 +159,13 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.mxbean.DataStorageMetricsMXBean; import org.apache.ignite.thread.IgniteThread; -import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; import static java.nio.file.StandardOpenOption.READ; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; @@ -352,6 +352,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** File I/O factory for writing checkpoint markers. */ private final FileIOFactory ioFactory; + + /** Timeout for checkpoint read lock acquisition in milliseconds. */ + private volatile long checkpointReadLockTimeout; + /** * @param ctx Kernal context. */ @@ -377,6 +381,17 @@ public GridCacheDatabaseSharedManager(GridKernalContext ctx) { ); ioFactory = persistenceCfg.getFileIOFactory(); + + Long cfgCheckpointReadLockTimeout = ctx.config().getDataStorageConfiguration() != null + ? ctx.config().getDataStorageConfiguration().getCheckpointReadLockTimeout() + : null; + + checkpointReadLockTimeout = IgniteSystemProperties.getLong(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT, + cfgCheckpointReadLockTimeout != null + ? cfgCheckpointReadLockTimeout + : (ctx.workersRegistry() != null + ? ctx.workersRegistry().getSystemWorkerBlockedTimeout() + : ctx.config().getFailureDetectionTimeout())); } /** */ @@ -1486,53 +1501,64 @@ private void prepareIndexRebuildFuture(int cacheId) { if (checkpointLock.writeLock().isHeldByCurrentThread()) return; - long timeout = cctx.gridConfig().getFailureDetectionTimeout(); + long timeout = checkpointReadLockTimeout; long start = U.currentTimeMillis(); - long passed; boolean interruped = false; try { for (; ; ) { - if ((passed = U.currentTimeMillis() - start) >= timeout) - failCheckpointReadLock(); - try { - if (!checkpointLock.readLock().tryLock(timeout - passed, TimeUnit.MILLISECONDS)) + if (timeout > 0 && (U.currentTimeMillis() - start) >= timeout) failCheckpointReadLock(); - } - catch (InterruptedException e) { - interruped = true; - continue; - } + try { + if (timeout > 0) { + if (!checkpointLock.readLock().tryLock(timeout - (U.currentTimeMillis() - start), + TimeUnit.MILLISECONDS)) + failCheckpointReadLock(); + } + else + checkpointLock.readLock().lock(); + } + catch (InterruptedException e) { + interruped = true; - if (stopping) { - checkpointLock.readLock().unlock(); + continue; + } - throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping.")); - } + if (stopping) { + checkpointLock.readLock().unlock(); - if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories()) - break; - else { - checkpointLock.readLock().unlock(); + throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping.")); + } - if (U.currentTimeMillis() - start >= timeout) - failCheckpointReadLock(); + if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories()) + break; + else { + checkpointLock.readLock().unlock(); - try { - checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut - .getUninterruptibly(); - } - catch (IgniteFutureTimeoutCheckedException e) { - failCheckpointReadLock(); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to wait for checkpoint begin.", e); + if (timeout > 0 && U.currentTimeMillis() - start >= timeout) + failCheckpointReadLock(); + + try { + checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut + .getUninterruptibly(); + } + catch (IgniteFutureTimeoutCheckedException e) { + failCheckpointReadLock(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for checkpoint begin.", e); + } } } + catch (CheckpointReadLockTimeoutException e) { + log.error(e.getMessage(), e); + + timeout = 0; + } } } finally { @@ -1544,13 +1570,21 @@ private void prepareIndexRebuildFuture(int cacheId) { CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1); } - /** */ - private void failCheckpointReadLock() throws IgniteException { - IgniteException e = new IgniteException("Checkpoint read lock acquisition has been timed out."); + /** + * Invokes critical failure processing. Always throws. + * + * @throws CheckpointReadLockTimeoutException If node was not invalidated as result of handling. + * @throws IgniteException If node was invalidated as result of handling. + */ + private void failCheckpointReadLock() throws CheckpointReadLockTimeoutException, IgniteException { + String msg = "Checkpoint read lock acquisition has been timed out."; - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + IgniteException e = new IgniteException(msg); - throw e; + if (cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e))) + throw e; + + throw new CheckpointReadLockTimeoutException(msg); } /** {@inheritDoc} */ @@ -2882,6 +2916,24 @@ public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteChe cp.cancelOrWaitPartitionDestroy(grpId, partId); } + /** + * Timeout for checkpoint read lock acquisition. + * + * @return Timeout for checkpoint read lock acquisition in milliseconds. + */ + @Override public long checkpointReadLockTimeout() { + return checkpointReadLockTimeout; + } + + /** + * Sets timeout for checkpoint read lock acquisition. + * + * @param val New timeout in milliseconds, non-positive value denotes infinite timeout. + */ + @Override public void checkpointReadLockTimeout(long val) { + checkpointReadLockTimeout = val; + } + /** * Partition destroy queue. */ @@ -4809,4 +4861,15 @@ public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) { super(lastArchivedSegment, log); } } + + /** Indicates checkpoint read lock acquisition failure which did not lead to node invalidation. */ + private static class CheckpointReadLockTimeoutException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private CheckpointReadLockTimeoutException(String msg) { + super(msg); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index f35d15aad05f7..28ce085608d7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -762,6 +762,20 @@ public void checkpointReadUnlock() { // No-op. } + /** + * @return {@code 0} for non-persistent storage. + */ + public long checkpointReadLockTimeout() { + return 0; + } + + /** + * No-op for non-persistent storage. + */ + public void checkpointReadLockTimeout(long val) { + // No-op. + } + /** * No-op for non-persistent storage. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java index b48eff1311719..f31f0e93a78b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java @@ -102,9 +102,10 @@ public FailureContext failureContext() { * Processes failure accordingly to configured {@link FailureHandler}. * * @param failureCtx Failure context. + * @return {@code True} If this very call led to Ignite node invalidation. */ - public void process(FailureContext failureCtx) { - process(failureCtx, hnd); + public boolean process(FailureContext failureCtx) { + return process(failureCtx, hnd); } /** @@ -112,13 +113,14 @@ public void process(FailureContext failureCtx) { * * @param failureCtx Failure context. * @param hnd Failure handler. + * @return {@code True} If this very call led to Ignite node invalidation. */ - public synchronized void process(FailureContext failureCtx, FailureHandler hnd) { + public synchronized boolean process(FailureContext failureCtx, FailureHandler hnd) { assert failureCtx != null; assert hnd != null; if (this.failureCtx != null) // Node already terminating, no reason to process more errors. - return; + return false; U.error(ignite.log(), "Critical system error detected. Will be handled accordingly to configured handler " + "[hnd=" + hnd + ", failureCtx=" + failureCtx + ']', failureCtx.error()); @@ -136,5 +138,7 @@ public synchronized void process(FailureContext failureCtx, FailureHandler hnd) log.error("Ignite node is in invalid state due to a critical failure."); } + + return invalidated; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 25b9cb870cfa3..915075afe7a34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -251,9 +251,8 @@ public static CacheConfiguration readCacheConfiguration(BinaryRawReaderEx in, Cl if (keyCnt > 0) { CacheKeyConfiguration[] keys = new CacheKeyConfiguration[keyCnt]; - for (int i = 0; i < keyCnt; i++) { + for (int i = 0; i < keyCnt; i++) keys[i] = new CacheKeyConfiguration(in.readString(), in.readString()); - } ccfg.setKeyConfiguration(keys); } @@ -662,6 +661,8 @@ public static void readIgniteConfiguration(BinaryRawReaderEx in, IgniteConfigura cfg.setMvccVacuumFrequency(in.readLong()); if (in.readBoolean()) cfg.setMvccVacuumThreadCount(in.readInt()); + if (in.readBoolean()) + cfg.setSystemWorkerBlockedTimeout(in.readLong()); int sqlSchemasCnt = in.readInt(); @@ -1239,6 +1240,12 @@ public static void writeIgniteConfiguration(BinaryRawWriter w, IgniteConfigurati w.writeLong(cfg.getMvccVacuumFrequency()); w.writeBoolean(true); w.writeInt(cfg.getMvccVacuumThreadCount()); + if (cfg.getSystemWorkerBlockedTimeout() != null) { + w.writeBoolean(true); + w.writeLong(cfg.getSystemWorkerBlockedTimeout()); + } else { + w.writeBoolean(false); + } if (cfg.getSqlSchemas() == null) w.writeInt(-1); @@ -1894,21 +1901,22 @@ private static DataStorageConfiguration readDataStorageConfiguration(BinaryRawRe .setConcurrencyLevel(in.readInt()) .setWalAutoArchiveAfterInactivity(in.readLong()); + if (in.readBoolean()) + res.setCheckpointReadLockTimeout(in.readLong()); + int cnt = in.readInt(); if (cnt > 0) { DataRegionConfiguration[] regs = new DataRegionConfiguration[cnt]; - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < cnt; i++) regs[i] = readDataRegionConfiguration(in); - } res.setDataRegionConfigurations(regs); } - if (in.readBoolean()) { + if (in.readBoolean()) res.setDefaultDataRegionConfiguration(readDataRegionConfiguration(in)); - } return res; } @@ -2022,25 +2030,31 @@ private static void writeDataStorageConfiguration(BinaryRawWriter w, DataStorage w.writeInt(cfg.getConcurrencyLevel()); w.writeLong(cfg.getWalAutoArchiveAfterInactivity()); + if (cfg.getCheckpointReadLockTimeout() != null) { + w.writeBoolean(true); + w.writeLong(cfg.getCheckpointReadLockTimeout()); + } + else + w.writeBoolean(false); + if (cfg.getDataRegionConfigurations() != null) { w.writeInt(cfg.getDataRegionConfigurations().length); - for (DataRegionConfiguration d : cfg.getDataRegionConfigurations()) { + for (DataRegionConfiguration d : cfg.getDataRegionConfigurations()) writeDataRegionConfiguration(w, d); - } - } else { - w.writeInt(0); } + else + w.writeInt(0); if (cfg.getDefaultDataRegionConfiguration() != null) { w.writeBoolean(true); writeDataRegionConfiguration(w, cfg.getDefaultDataRegionConfiguration()); - } else { - w.writeBoolean(false); } - } else { - w.writeBoolean(false); + else + w.writeBoolean(false); } + else + w.writeBoolean(false); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index edb9871573382..d2f45c9453fc2 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -8680,6 +8680,17 @@ public static long safeAbs(long i) { return i < 0 ? 0 : i; } + /** + * When {@code long} value given is positive returns that value, otherwise returns provided default value. + * + * @param i Input value. + * @param dflt Default value. + * @return {@code i} if {@code i > 0} and {@code dflt} otherwise. + */ + public static long ensurePositive(long i, long dflt) { + return i <= 0 ? dflt : i; + } + /** * Gets wrapper class for a primitive type. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/FailureHandlingMxBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/FailureHandlingMxBeanImpl.java new file mode 100644 index 0000000000000..61b9afeca74fa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/FailureHandlingMxBeanImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.worker; + +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.mxbean.FailureHandlingMxBean; +import org.jetbrains.annotations.NotNull; + +/** {@inheritDoc} */ +public class FailureHandlingMxBeanImpl implements FailureHandlingMxBean { + /** System worker registry. */ + private final WorkersRegistry workerRegistry; + + /** Database manager. */ + private final IgniteCacheDatabaseSharedManager dbMgr; + + /** + * @param workersRegistry Workers registry. + * @param dbMgr Database manager. + */ + public FailureHandlingMxBeanImpl( + @NotNull WorkersRegistry workersRegistry, + @NotNull IgniteCacheDatabaseSharedManager dbMgr + ) { + this.workerRegistry = workersRegistry; + this.dbMgr = dbMgr; + } + + /** {@inheritDoc} */ + @Override public boolean getLivenessCheckEnabled() { + return workerRegistry.livenessCheckEnabled(); + } + + /** {@inheritDoc} */ + @Override public void setLivenessCheckEnabled(boolean val) { + workerRegistry.livenessCheckEnabled(val); + } + + /** {@inheritDoc} */ + @Override public long getSystemWorkerBlockedTimeout() { + return workerRegistry.getSystemWorkerBlockedTimeout(); + } + + /** {@inheritDoc} */ + @Override public void setSystemWorkerBlockedTimeout(long val) { + workerRegistry.setSystemWorkerBlockedTimeout(val); + } + + /** {@inheritDoc} */ + @Override public long getCheckpointReadLockTimeout() { + return dbMgr.checkpointReadLockTimeout(); + } + + /** {@inheritDoc} */ + @Override public void setCheckpointReadLockTimeout(long val) { + dbMgr.checkpointReadLockTimeout(val); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java index e6abe6e9d1da9..1f082b53ea4dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java @@ -66,16 +66,6 @@ public WorkersControlMXBeanImpl(WorkersRegistry registry) { return true; } - /** {@inheritDoc} */ - @Override public boolean getHealthMonitoringEnabled() { - return workerRegistry.livenessCheckEnabled(); - } - - /** {@inheritDoc} */ - @Override public void setHealthMonitoringEnabled(boolean val) { - workerRegistry.livenessCheckEnabled(val); - } - /** {@inheritDoc} */ @Override public boolean stopThreadByUniqueName(String name) { Thread[] threads = Thread.getAllStackTraces().keySet().stream() diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java index 848bb5925bab7..153b289951f0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java @@ -61,8 +61,10 @@ public class WorkersRegistry implements GridWorkerListener { /** */ private final IgniteBiInClosure workerFailedHnd; - /** Worker heartbeat timeout in milliseconds, when exceeded, worker is considered as blocked. */ - private final long heartbeatTimeout; + /** + * Maximum inactivity period for system worker in milliseconds, when exceeded, worker is considered as blocked. + */ + private volatile long sysWorkerBlockedTimeout; /** Time in milliseconds between successive workers checks. */ private final long checkInterval; @@ -72,15 +74,17 @@ public class WorkersRegistry implements GridWorkerListener { /** * @param workerFailedHnd Closure to invoke on worker failure. - * @param heartbeatTimeout Maximum allowed worker heartbeat interval in milliseconds, should be positive. + * @param sysWorkerBlockedTimeout Maximum allowed worker heartbeat interval in milliseconds, non-positive value denotes + * infinite interval. */ public WorkersRegistry( @NotNull IgniteBiInClosure workerFailedHnd, - long heartbeatTimeout, - IgniteLogger log) { + long sysWorkerBlockedTimeout, + IgniteLogger log + ) { this.workerFailedHnd = workerFailedHnd; - this.heartbeatTimeout = heartbeatTimeout; - this.checkInterval = Math.min(DFLT_CHECK_INTERVAL, heartbeatTimeout); + this.sysWorkerBlockedTimeout = U.ensurePositive(sysWorkerBlockedTimeout, Long.MAX_VALUE); + this.checkInterval = Math.min(DFLT_CHECK_INTERVAL, sysWorkerBlockedTimeout); this.log = log; } @@ -127,15 +131,33 @@ public GridWorker worker(String name) { } /** */ - boolean livenessCheckEnabled() { + public boolean livenessCheckEnabled() { return livenessCheckEnabled; } /** */ - void livenessCheckEnabled(boolean val) { + public void livenessCheckEnabled(boolean val) { livenessCheckEnabled = val; } + /** + * Returns maximum inactivity period for system worker. When exceeded, worker is considered as blocked. + * + * @return Maximum inactivity period for system worker in milliseconds. + */ + public long getSystemWorkerBlockedTimeout() { + return sysWorkerBlockedTimeout == Long.MAX_VALUE ? 0 : sysWorkerBlockedTimeout; + } + + /** + * Sets maximum inactivity period for system worker. When exceeded, worker is considered as blocked. + * + * @param val Maximum inactivity period for system worker in milliseconds. + */ + public void setSystemWorkerBlockedTimeout(long val) { + sysWorkerBlockedTimeout = U.ensurePositive(val, Long.MAX_VALUE); + } + /** {@inheritDoc} */ @Override public void onStarted(GridWorker w) { register(w); @@ -164,7 +186,7 @@ void livenessCheckEnabled(boolean val) { try { lastCheckTs = U.currentTimeMillis(); - long workersToCheck = Math.max(registeredWorkers.size() * checkInterval / heartbeatTimeout, 1); + long workersToCheck = Math.max(registeredWorkers.size() * checkInterval / sysWorkerBlockedTimeout, 1); int workersChecked = 0; @@ -198,7 +220,7 @@ void livenessCheckEnabled(boolean val) { long heartbeatDelay = U.currentTimeMillis() - worker.heartbeatTs(); - if (heartbeatDelay > heartbeatTimeout) { + if (heartbeatDelay > sysWorkerBlockedTimeout) { GridWorker worker0 = registeredWorkers.get(worker.runner().getName()); if (worker0 != null && worker0 == worker) { diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/FailureHandlingMxBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/FailureHandlingMxBean.java new file mode 100644 index 0000000000000..199d7523019ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/FailureHandlingMxBean.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mxbean; + +/** + * MBean that controls critical failure handling. + */ +@MXBeanDescription("MBean that controls critical failure handling.") +public interface FailureHandlingMxBean { + /** */ + @MXBeanDescription("Enable/disable critical workers liveness checking.") + public boolean getLivenessCheckEnabled(); + + /** */ + public void setLivenessCheckEnabled(boolean val); + + /** */ + @MXBeanDescription("Maximum inactivity period for system worker. Critical failure handler fires if exceeded. " + + "Nonpositive value denotes infinite timeout.") + public long getSystemWorkerBlockedTimeout(); + + /** */ + public void setSystemWorkerBlockedTimeout(long val); + + /** */ + @MXBeanDescription("Timeout for checkpoint read lock acquisition. Critical failure handler fires if exceeded. " + + "Nonpositive value denotes infinite timeout.") + public long getCheckpointReadLockTimeout(); + + /** */ + public void setCheckpointReadLockTimeout(long val); +} diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java index 18b0084f5fcd4..b999ab7d716de 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java @@ -47,13 +47,6 @@ public interface WorkersControlMXBean { ) public boolean terminateWorker(String name); - /** */ - @MXBeanDescription("Whether workers check each other's health.") - public boolean getHealthMonitoringEnabled(); - - /** */ - public void setHealthMonitoringEnabled(boolean val); - /** * Stops thread by {@code name}, if exists and unique. * diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlingConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlingConfigurationTest.java new file mode 100644 index 0000000000000..4e5b7535bb3a3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlingConfigurationTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.failure; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.CountDownLatch; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl; +import org.apache.ignite.internal.worker.WorkersRegistry; +import org.apache.ignite.mxbean.FailureHandlingMxBean; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT; + +/** + * Tests configuration parameters related to failure handling. + */ +public class FailureHandlingConfigurationTest extends GridCommonAbstractTest { + /** */ + private Long checkpointReadLockTimeout; + + /** */ + private Long sysWorkerBlockedTimeout; + + /** */ + private CountDownLatch failureLatch; + + /** */ + private class TestFailureHandler extends AbstractFailureHandler { + /** */ + TestFailureHandler() { + failureLatch = new CountDownLatch(1); + } + + /** {@inheritDoc} */ + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + failureLatch.countDown(); + + return false; + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setFailureHandler(new TestFailureHandler()); + + DataRegionConfiguration drCfg = new DataRegionConfiguration(); + drCfg.setPersistenceEnabled(true); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration(); + dsCfg.setDefaultDataRegionConfiguration(drCfg); + + if (checkpointReadLockTimeout != null) + dsCfg.setCheckpointReadLockTimeout(checkpointReadLockTimeout); + + cfg.setDataStorageConfiguration(dsCfg); + + if (sysWorkerBlockedTimeout != null) + cfg.setSystemWorkerBlockedTimeout(sysWorkerBlockedTimeout); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + + sysWorkerBlockedTimeout = null; + checkpointReadLockTimeout = null; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + public void testCfgParamsPropagation() throws Exception { + sysWorkerBlockedTimeout = 30_000L; + checkpointReadLockTimeout = 20_000L; + + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + WorkersRegistry reg = ignite.context().workersRegistry(); + + IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + + FailureHandlingMxBean mBean = getMBean(); + + assertEquals(sysWorkerBlockedTimeout.longValue(), reg.getSystemWorkerBlockedTimeout()); + assertEquals(checkpointReadLockTimeout.longValue(), dbMgr.checkpointReadLockTimeout()); + + assertEquals(sysWorkerBlockedTimeout.longValue(), mBean.getSystemWorkerBlockedTimeout()); + assertEquals(checkpointReadLockTimeout.longValue(), mBean.getCheckpointReadLockTimeout()); + } + + /** + * @throws Exception If failed. + */ + public void testPartialCfgParamsPropagation() throws Exception { + sysWorkerBlockedTimeout = 30_000L; + checkpointReadLockTimeout = null; + + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + WorkersRegistry reg = ignite.context().workersRegistry(); + + IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + + FailureHandlingMxBean mBean = getMBean(); + + assertEquals(sysWorkerBlockedTimeout.longValue(), reg.getSystemWorkerBlockedTimeout()); + assertEquals(sysWorkerBlockedTimeout.longValue(), dbMgr.checkpointReadLockTimeout()); + + assertEquals(sysWorkerBlockedTimeout.longValue(), mBean.getSystemWorkerBlockedTimeout()); + assertEquals(sysWorkerBlockedTimeout.longValue(), mBean.getCheckpointReadLockTimeout()); + } + + /** + * @throws Exception If failed. + */ + public void testNegativeParamValues() throws Exception { + sysWorkerBlockedTimeout = -1L; + checkpointReadLockTimeout = -85L; + + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + WorkersRegistry reg = ignite.context().workersRegistry(); + + IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + + FailureHandlingMxBean mBean = getMBean(); + + assertEquals(0L, reg.getSystemWorkerBlockedTimeout()); + assertEquals(-85L, dbMgr.checkpointReadLockTimeout()); + + assertEquals(0L, mBean.getSystemWorkerBlockedTimeout()); + assertEquals(-85L, mBean.getCheckpointReadLockTimeout()); + } + + /** + * @throws Exception If failed. + */ + public void testOverridingBySysProps() throws Exception { + String prevWorkerProp = System.getProperty(IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT); + String prevCheckpointProp = System.getProperty(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT); + + long workerPropVal = 80_000; + long checkpointPropVal = 90_000; + + System.setProperty(IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT, String.valueOf(workerPropVal)); + System.setProperty(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT, String.valueOf(checkpointPropVal)); + + try { + sysWorkerBlockedTimeout = 1L; + checkpointReadLockTimeout = 2L; + + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + WorkersRegistry reg = ignite.context().workersRegistry(); + + IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + + FailureHandlingMxBean mBean = getMBean(); + + assertEquals(sysWorkerBlockedTimeout, ignite.configuration().getSystemWorkerBlockedTimeout()); + assertEquals(checkpointReadLockTimeout, + ignite.configuration().getDataStorageConfiguration().getCheckpointReadLockTimeout()); + + assertEquals(workerPropVal, reg.getSystemWorkerBlockedTimeout()); + assertEquals(checkpointPropVal, dbMgr.checkpointReadLockTimeout()); + + assertEquals(workerPropVal, mBean.getSystemWorkerBlockedTimeout()); + assertEquals(checkpointPropVal, mBean.getCheckpointReadLockTimeout()); + } + finally { + if (prevWorkerProp != null) + System.setProperty(IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT, prevWorkerProp); + else + System.clearProperty(IGNITE_SYSTEM_WORKER_BLOCKED_TIMEOUT); + + if (prevCheckpointProp != null) + System.setProperty(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT, prevCheckpointProp); + else + System.clearProperty(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT); + } + } + + /** + * @throws Exception If failed. + */ + public void testMBeanParamsChanging() throws Exception { + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + FailureHandlingMxBean mBean = getMBean(); + + mBean.setSystemWorkerBlockedTimeout(80_000L); + assertEquals(80_000L, ignite.context().workersRegistry().getSystemWorkerBlockedTimeout()); + + mBean.setCheckpointReadLockTimeout(90_000L); + assertEquals(90_000L, ignite.context().cache().context().database().checkpointReadLockTimeout()); + + assertTrue(mBean.getLivenessCheckEnabled()); + mBean.setLivenessCheckEnabled(false); + assertFalse(ignite.context().workersRegistry().livenessCheckEnabled()); + ignite.context().workersRegistry().livenessCheckEnabled(true); + assertTrue(mBean.getLivenessCheckEnabled()); + } + + /** */ + private FailureHandlingMxBean getMBean() throws Exception { + ObjectName name = U.makeMBeanName(getTestIgniteInstanceName(0), "Kernal", + FailureHandlingMxBeanImpl.class.getSimpleName()); + + MBeanServer srv = ManagementFactory.getPlatformMBeanServer(); + + assertTrue(srv.isRegistered(name)); + + return MBeanServerInvocationHandler.newProxyInstance(srv, name, FailureHandlingMxBean.class, true); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java index e7876f85bdfda..68085db0e25b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java @@ -19,6 +19,7 @@ import java.util.Set; import junit.framework.TestSuite; +import org.apache.ignite.failure.FailureHandlingConfigurationTest; import org.apache.ignite.failure.IoomFailureHandlerTest; import org.apache.ignite.failure.SystemWorkersTerminationTest; import org.apache.ignite.internal.ClusterBaselineNodesMetricsSelfTest; @@ -63,6 +64,7 @@ public static TestSuite suite(@Nullable final Set ignoredTests) throws Ex suite.addTestSuite(ServiceDeploymentOutsideBaselineTest.class); suite.addTestSuite(GridMarshallerMappingConsistencyTest.class); suite.addTestSuite(SystemWorkersTerminationTest.class); + suite.addTestSuite(FailureHandlingConfigurationTest.class); suite.addTestSuite(GridCommandHandlerTest.class); suite.addTestSuite(GridInternalTaskUnusedWalSegmentsTest.class); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs index 354a511a5ea76..c26e5a346694e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs @@ -204,6 +204,7 @@ public void TestAllConfigurationProperties() Assert.AreEqual(com.UnacknowledgedMessagesBufferSize, resCom.UnacknowledgedMessagesBufferSize); Assert.AreEqual(cfg.FailureDetectionTimeout, resCfg.FailureDetectionTimeout); + Assert.AreEqual(cfg.SystemWorkerBlockedTimeout, resCfg.SystemWorkerBlockedTimeout); Assert.AreEqual(cfg.ClientFailureDetectionTimeout, resCfg.ClientFailureDetectionTimeout); Assert.AreEqual(cfg.LongQueryWarningTimeout, resCfg.LongQueryWarningTimeout); @@ -752,6 +753,7 @@ private static IgniteConfiguration GetCustomConfig() UnacknowledgedMessagesBufferSize = 3450 }, FailureDetectionTimeout = TimeSpan.FromSeconds(3.5), + SystemWorkerBlockedTimeout = TimeSpan.FromSeconds(8.5), ClientFailureDetectionTimeout = TimeSpan.FromMinutes(12.3), LongQueryWarningTimeout = TimeSpan.FromMinutes(1.23), IsActiveOnStart = true, @@ -824,6 +826,7 @@ private static IgniteConfiguration GetCustomConfig() ConcurrencyLevel = 1, PageSize = 8 * 1024, WalAutoArchiveAfterInactivity = TimeSpan.FromMinutes(5), + CheckpointReadLockTimeout = TimeSpan.FromSeconds(9.5), DefaultDataRegionConfiguration = new DataRegionConfiguration { Name = "reg1", diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs index 0a010b4a75c6d..8771c7795d788 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs @@ -234,6 +234,7 @@ internal DataStorageConfiguration(IBinaryRawReader reader) PageSize = reader.ReadInt(); ConcurrencyLevel = reader.ReadInt(); WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan(); + CheckpointReadLockTimeout = reader.ReadTimeSpanNullable(); var count = reader.ReadInt(); @@ -286,6 +287,7 @@ internal void Write(IBinaryRawWriter writer) writer.WriteInt(PageSize); writer.WriteInt(ConcurrencyLevel); writer.WriteTimeSpanAsLong(WalAutoArchiveAfterInactivity); + writer.WriteTimeSpanAsLongNullable(CheckpointReadLockTimeout); if (DataRegionConfigurations != null) { @@ -488,6 +490,11 @@ internal void Write(IBinaryRawWriter writer) [DefaultValue(typeof(TimeSpan), "-00:00:00.001")] public TimeSpan WalAutoArchiveAfterInactivity { get; set; } + ///

+ /// Gets or sets the timeout for checkpoint read lock acquisition. + /// + public TimeSpan? CheckpointReadLockTimeout { get; set; } + /// /// Gets or sets the data region configurations. /// diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs index fc6afb6f60c2b..63bf7948d971e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs @@ -165,6 +165,9 @@ public class IgniteConfiguration /** */ private TimeSpan? _clientFailureDetectionTimeout; + /** */ + private TimeSpan? _sysWorkerBlockedTimeout; + /** */ private int? _publicThreadPoolSize; @@ -329,6 +332,7 @@ internal void Write(BinaryWriter writer, ClientProtocolVersion srvVer) writer.WriteBooleanNullable(_authenticationEnabled); writer.WriteLongNullable(_mvccVacuumFreq); writer.WriteIntNullable(_mvccVacuumThreadCnt); + writer.WriteTimeSpanAsLongNullable(_sysWorkerBlockedTimeout); if (SqlSchemas == null) writer.WriteInt(-1); @@ -717,6 +721,7 @@ private void ReadCore(BinaryReader r, ClientProtocolVersion srvVer) _authenticationEnabled = r.ReadBooleanNullable(); _mvccVacuumFreq = r.ReadLongNullable(); _mvccVacuumThreadCnt = r.ReadIntNullable(); + _sysWorkerBlockedTimeout = r.ReadTimeSpanNullable(); int sqlSchemasCnt = r.ReadInt(); @@ -1374,6 +1379,15 @@ public TimeSpan FailureDetectionTimeout set { _failureDetectionTimeout = value; } } + /// + /// Gets or sets the timeout for blocked system workers detection. + /// + public TimeSpan? SystemWorkerBlockedTimeout + { + get { return _sysWorkerBlockedTimeout; } + set { _sysWorkerBlockedTimeout = value; } + } + /// /// Gets or sets the failure detection timeout used by /// and for client nodes. diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index 0a5509583aed6..5f4a439f80683 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -1922,6 +1922,13 @@ Inactivity time after which to run WAL segment auto archiving. + + + + Timeout for checkpoint read lock acquisition. + + + @@ -2228,6 +2235,13 @@ + + + + Timeout for blocked system workers detection. + + +