diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/IgniteInternalWrapper.java b/modules/commons/src/main/java/org/apache/ignite/internal/IgniteInternalWrapper.java index 61248fe781145..26216fa576369 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/IgniteInternalWrapper.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/IgniteInternalWrapper.java @@ -21,4 +21,15 @@ public interface IgniteInternalWrapper { /** @return Wrapped object. */ public T delegate(); + + /** + * @param target Object to unwrap. + * @return Original object. + */ + public static Object unwrap(Object target) { + while (target instanceof IgniteInternalWrapper) + target = ((IgniteInternalWrapper)target).delegate(); + + return target; + } } diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java index 197cc8dcd7a1b..c44f2c453afff 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java @@ -43,5 +43,4 @@ public OperationContextAwareInClosure(IgniteInClosure delegate, OperationCont public static IgniteInClosure wrap(IgniteInClosure delefate) { return wrap(delefate, OperationContextAwareInClosure::new); } - } diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java index 381eecfca09ac..0b4d48aa4aea3 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java @@ -22,8 +22,8 @@ import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.internal.thread.context.OperationContextSnapshot; -/** */ -public abstract class OperationContextAwareWrapper implements IgniteInternalWrapper { +/** Represents wrapper containing an arbitrary object along with {@link OperationContextSnapshot}. */ +public class OperationContextAwareWrapper implements IgniteInternalWrapper { /** */ protected final T delegate; @@ -31,23 +31,30 @@ public abstract class OperationContextAwareWrapper implements IgniteInternalW protected final OperationContextSnapshot snapshot; /** */ + public OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) { + assert delegate != null; + + this.delegate = delegate; + this.snapshot = snapshot; + } + + /** {@inheritDoc} */ @Override public T delegate() { return delegate; } /** */ - protected OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) { - this.delegate = delegate; - this.snapshot = snapshot; + public OperationContextSnapshot contextSnapshot() { + return snapshot; } /** */ - protected static T wrap(T delegate, BiFunction wrapper) { + public static T wrap(T delegate, BiFunction wrapper) { return wrap(delegate, wrapper, false); } /** */ - protected static T wrap(T delegate, BiFunction wrapper, boolean ignoreEmptyContext) { + public static T wrap(T delegate, BiFunction wrapper, boolean ignoreEmptyContext) { if (delegate == null || delegate instanceof OperationContextAwareWrapper) return delegate; diff --git a/modules/commons/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/commons/src/main/java/org/apache/ignite/thread/IgniteThread.java index 173b66a84a142..cabee1ac152c3 100644 --- a/modules/commons/src/main/java/org/apache/ignite/thread/IgniteThread.java +++ b/modules/commons/src/main/java/org/apache/ignite/thread/IgniteThread.java @@ -19,9 +19,13 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; +import static org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable.wrapIfContextNotEmpty; + /** * This class adds some necessary plumbing on top of the {@link Thread} class. * Specifically, it adds: @@ -30,6 +34,7 @@ *
  • Dedicated parent thread group
  • *
  • Backing interrupted flag
  • *
  • Name of the grid this thread belongs to
  • + *
  • Automatic capturing of {@link OperationContext} of parent thread
  • * * Note: this class is intended for internal use only. */ @@ -76,13 +81,18 @@ public IgniteThread(String igniteInstanceName, String threadName) { * @param r Runnable to execute. */ public IgniteThread(String igniteInstanceName, String threadName, Runnable r) { - this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED); + this(igniteInstanceName, threadName, wrapIfContextNotEmpty(r), GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED); } /** * Creates grid thread with given name for a given Ignite instance with specified * thread group. * + * Note: This constructor creates a thread that does NOT automatically acquire the parent thread's Operation + * Context, ensuring that no Operation Context is attached to it at the start of execution. It is used in Ignite + * thread pools and worker threads, which rely on this behavior to avoid unnecessary wrapping + * (see {@link OperationContextAwareWrapper}) + * * @param igniteInstanceName Name of the Ignite instance this thread is created for. * @param threadName Name of thread. * @param r Runnable to execute. @@ -101,20 +111,6 @@ public IgniteThread(String igniteInstanceName, String threadName, Runnable r, in this.plc = plc; } - /** - * @param igniteInstanceName Name of the Ignite instance this thread is created for. - * @param threadGrp Thread group. - * @param threadName Name of thread. - */ - protected IgniteThread(String igniteInstanceName, ThreadGroup threadGrp, String threadName) { - super(threadGrp, threadName); - - this.igniteInstanceName = igniteInstanceName; - this.compositeRwLockIdx = GRP_IDX_UNASSIGNED; - this.stripe = -1; - this.plc = GridIoPolicy.UNDEFINED; - } - /** * @return Related {@link GridIoPolicy} for internal Ignite pools. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index c46f28b0eda4e..744a844f88e2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -320,7 +320,7 @@ public interface GridKernalContext extends Iterable { * * @return Data streamer processor. */ - public DataStreamProcessor dataStream(); + public DataStreamProcessor dataStream(); /** * Gets event continuous processor. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 83b9e3defc35e..5d779302d1f7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -838,8 +838,8 @@ else if (!(comp instanceof DiscoveryNodeValidationProcessor } /** {@inheritDoc} */ - @Override public DataStreamProcessor dataStream() { - return (DataStreamProcessor)dataLdrProc; + @Override public DataStreamProcessor dataStream() { + return dataLdrProc; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b94f0df3ca02c..d62afb99c957b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1098,7 +1098,7 @@ public void start( startProcessor(new GridTaskProcessor(ctx)); startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); startProcessor(createComponent(IgniteRestProcessor.class, ctx)); - startProcessor(new DataStreamProcessor<>(ctx)); + startProcessor(new DataStreamProcessor(ctx)); startProcessor(new GridContinuousProcessor(ctx)); startProcessor(new DataStructuresProcessor(ctx)); startProcessor(createComponent(PlatformProcessor.class, ctx)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index dbed8016c2895..0f82736eabb97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -31,12 +31,10 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -97,7 +95,9 @@ import org.apache.ignite.internal.systemview.NodeAttributeViewWalker; import org.apache.ignite.internal.systemview.NodeMetricsViewWalker; import org.apache.ignite.internal.thread.OomExceptionHandler; +import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -116,6 +116,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -182,8 +183,6 @@ import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; import static org.apache.ignite.internal.processors.security.SecurityUtils.isSecurityCompatibilityMode; import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext; -import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext; -import static org.apache.ignite.internal.processors.security.SecurityUtils.withRemoteSecurityContext; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.nodeConsistentIds; import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP; @@ -235,15 +234,12 @@ public class GridDiscoveryManager extends GridManagerAdapter { /** Discovery event worker. */ private final DiscoveryWorker discoWrk = new DiscoveryWorker(); - /** Discovery event notyfier worker. */ + /** Discovery event notifier worker. */ private final DiscoveryMessageNotifierWorker discoNtfWrk = new DiscoveryMessageNotifierWorker(); /** Network segment check worker. */ private SegmentCheckWorker segChkWrk; - /** Network segment check thread. */ - private IgniteThread segChkThread; - /** Last logged topology. */ private final GridAtomicLong lastLoggedTop = new GridAtomicLong(); @@ -763,8 +759,6 @@ else if (customMsg instanceof ChangeGlobalStateMessage) { } } - SecurityContext secCtx = remoteSecurityContext(ctx); - // If this is a local join event, just save it and do not notify listeners. if (locJoinEvt) { if (gridStartTime == 0) @@ -876,8 +870,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) { discoCache0, notification.getTopSnapshot(), null, - notification.getSpanContainer(), - secCtx + notification.getSpanContainer() ) ); } @@ -898,8 +891,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) { node, discoCache, notification.getTopSnapshot(), customMsg, - notification.getSpanContainer(), - secCtx + notification.getSpanContainer() ) ); @@ -912,8 +904,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) { discoCache, notification.getTopSnapshot(), stateFinishMsg, - notification.getSpanContainer(), - secCtx + notification.getSpanContainer() ) ); @@ -1062,7 +1053,7 @@ private void waitForLastStateChangeEventFuture() { } }); - new DiscoveryMessageNotifierThread(discoNtfWrk).start(); + discoNtfWrk.start(); startSpi(); @@ -1079,11 +1070,7 @@ private void waitForLastStateChangeEventFuture() { if (hasRslvrs && segChkFreq > 0) { segChkWrk = new SegmentCheckWorker(); - segChkThread = U.newThread(segChkWrk); - - segChkThread.setUncaughtExceptionHandler(new OomExceptionHandler(ctx)); - - segChkThread.start(); + segChkWrk.start(); } locNode = spi.getLocalNode(); @@ -1091,7 +1078,7 @@ private void waitForLastStateChangeEventFuture() { checkAttributes(discoCache().remoteNodes()); // Start discovery worker. - U.newThread(discoWrk).start(); + discoWrk.start(); if (log.isDebugEnabled()) log.debug(startInfo()); @@ -1736,7 +1723,7 @@ private static String nodeDescription(ClusterNode node) { if (segChkWrk != null) { segChkWrk.cancel(); - U.join(segChkThread, log); + U.join(segChkWrk, log); } if (!locJoin.isDone()) @@ -2372,8 +2359,7 @@ public void clientCacheStartEvent(UUID reqId, null, Collections.emptyList(), new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose), - null, - remoteSecurityContext(ctx) + null ) ); } @@ -2394,8 +2380,7 @@ public void metricsUpdateEvent(DiscoCache discoCache, ClusterNode node) { discoCache, discoCache.nodeMap.values(), null, - null, - remoteSecurityContext(ctx) + null ) ); } @@ -2730,25 +2715,23 @@ public ClusterNode historicalNode(UUID nodeId) { } /** Worker for network segment checks. */ - private class SegmentCheckWorker extends GridWorker { + private class SegmentCheckWorker extends IgniteLinkedBlockingQueueProcessor { /** */ - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - - /** - * - */ private SegmentCheckWorker() { - super(ctx.igniteInstanceName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log); + super(ctx.igniteInstanceName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log, ctx.workersRegistry()); assert hasRslvrs; assert segChkFreq > 0; } - /** - * - */ + /** */ public void scheduleSegmentCheck() { - queue.add(new Object()); + addToQueue(new Object()); + } + + /** {@inheritDoc} */ + @Override public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { + return new OomExceptionHandler(ctx); } /** {@inheritDoc} */ @@ -2756,59 +2739,57 @@ public void scheduleSegmentCheck() { long lastChkNanos = 0; while (!isCancelled()) { - Object req = queue.poll(2000, MILLISECONDS); + OperationContextAwareWrapper contextualReq = pollQueuedElement(2000, MILLISECONDS); long nowNanos = System.nanoTime(); // Check frequency if segment check has not been requested. - if (req == null && (segChkFreq == 0 || U.nanosToMillis(nowNanos - lastChkNanos) <= segChkFreq)) { + if (contextualReq == null && (segChkFreq == 0 || U.nanosToMillis(nowNanos - lastChkNanos) <= segChkFreq)) { if (log.isDebugEnabled()) log.debug("Skipping segment check as it has not been requested and it is not time to check."); continue; } - // We should always check segment if it has been explicitly - // requested (on any node failure or leave). - assert req != null || U.nanosToMillis(nowNanos - lastChkNanos) > segChkFreq; + try (Scope ignored = OperationContext.restoreSnapshot(contextualReq.contextSnapshot())) { + // We should always check segment if it has been explicitly + // requested (on any node failure or leave). + assert contextualReq != null || U.nanosToMillis(nowNanos - lastChkNanos) > segChkFreq; - // Drain queue. - while (queue.poll() != null) { - // No-op. - } + drainQueue(e -> {}); - if (lastSegChkRes.get()) { - boolean segValid = ctx.segmentation().isValidSegment(); + if (lastSegChkRes.get()) { + boolean segValid = ctx.segmentation().isValidSegment(); - lastChkNanos = nowNanos; + lastChkNanos = nowNanos; - if (!segValid) { - ClusterNode node = getSpi().getLocalNode(); + if (!segValid) { + ClusterNode node = getSpi().getLocalNode(); - Collection locNodeOnlyTop = Collections.singleton(node); + Collection locNodeOnlyTop = Collections.singleton(node); - discoWrk.addEvent( - new NotificationEvent( - EVT_NODE_SEGMENTED, - AffinityTopologyVersion.NONE, - node, - createDiscoCache( + discoWrk.addEvent( + new NotificationEvent( + EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, - ctx.state().clusterState(), node, - locNodeOnlyTop), - locNodeOnlyTop, - null, - null, - remoteSecurityContext(ctx) - ) - ); + createDiscoCache( + AffinityTopologyVersion.NONE, + ctx.state().clusterState(), + node, + locNodeOnlyTop), + locNodeOnlyTop, + null, + null + ) + ); - lastSegChkRes.set(false); - } + lastSegChkRes.set(false); + } - if (log.isDebugEnabled()) - log.debug("Segment has been checked [requested=" + (req != null) + ", valid=" + segValid + ']'); + if (log.isDebugEnabled()) + log.debug("Segment has been checked [isSegValid=" + segValid + ']'); + } } } } @@ -2837,68 +2818,50 @@ public DiscoveryMessageNotifierThread(GridWorker worker) { } } - /** - * - */ - private class DiscoveryMessageNotifierWorker extends GridWorker { - /** Queue. */ - private final BlockingQueue> queue = new LinkedBlockingQueue<>(); - - /** - * Default constructor. - */ + /** */ + private class DiscoveryMessageNotifierWorker extends IgniteLinkedBlockingQueueProcessor, Runnable>> { + /** Default constructor. */ protected DiscoveryMessageNotifierWorker() { super(ctx.igniteInstanceName(), "disco-notifier-worker", GridDiscoveryManager.this.log, ctx.workersRegistry()); } - /** - * - */ - private void body0() throws InterruptedException { - T2 notification; + /** {@inheritDoc} */ + @Override public IgniteThread createWorkerThread(GridWorker worker) { + return new DiscoveryMessageNotifierThread(worker); + } - blockingSectionBegin(); + /** */ + private void body0() throws InterruptedException { + OperationContextAwareWrapper, Runnable>> contextualNotification = takeQueuedElement(); - try { - notification = queue.take(); - } - finally { - blockingSectionEnd(); - } + try (Scope ignored = OperationContext.restoreSnapshot(contextualNotification.contextSnapshot())) { + T2, Runnable> notification = contextualNotification.delegate(); - try { - notification.get2().run(); - } - finally { - notification.get1().onDone(); + try { + notification.get2().run(); + } + finally { + notification.get1().onDone(); + } } } - /** - * @param cmd Command. - */ - public synchronized void submit(GridFutureAdapter notificationFut, Runnable cmd) { + /** @param cmd Command. */ + public synchronized void submit(GridFutureAdapter notificationFut, Runnable cmd) { if (isCancelled()) { notificationFut.onDone(); return; } - queue.add(new T2<>(notificationFut, cmd)); + addToQueue(new T2<>(notificationFut, cmd)); } - /** - * Cancel thread execution and completes all notification futures. - */ + /** * Cancel thread execution and completes all notification futures. */ @Override public synchronized void cancel() { super.cancel(); - while (!queue.isEmpty()) { - T2 notification = queue.poll(); - - if (notification != null) - notification.get1().onDone(); - } + drainQueue(n -> n.get1().onDone()); } /** {@inheritDoc} */ @@ -2952,9 +2915,6 @@ private static class NotificationEvent { /** Span container. */ SpanContainer spanContainer; - /** Security context. */ - SecurityContext secCtx; - /** * @param type Type. * @param topVer Topology version. @@ -2971,8 +2931,7 @@ public NotificationEvent( DiscoCache discoCache, Collection topSnapshot, @Nullable DiscoveryCustomMessage customMsg, - SpanContainer spanContainer, - SecurityContext secCtx + SpanContainer spanContainer ) { this.type = type; this.topVer = topVer; @@ -2981,18 +2940,14 @@ public NotificationEvent( this.topSnapshot = topSnapshot; this.customMsg = customMsg; this.spanContainer = spanContainer; - this.secCtx = secCtx; } } /** Worker for discovery events. */ - private class DiscoveryWorker extends GridWorker { + private class DiscoveryWorker extends IgniteLinkedBlockingQueueProcessor { /** */ private DiscoCache discoCache; - /** Event queue. */ - private final BlockingQueue evts = new LinkedBlockingQueue<>(); - /** Restart process handler. */ private final RestartProcessFailureHandler restartProcHnd = new RestartProcessFailureHandler(); @@ -3083,7 +3038,7 @@ void addEvent(NotificationEvent notificationEvt) { if (notificationEvt.type == EVT_CLIENT_NODE_DISCONNECTED) discoWrk.disconnectEvtFut = new GridFutureAdapter(); - evts.add(notificationEvt); + addToQueue(notificationEvt); } /** {@inheritDoc} */ @@ -3114,18 +3069,11 @@ void addEvent(NotificationEvent notificationEvt) { /** @throws InterruptedException If interrupted. */ private void body0() throws InterruptedException { - NotificationEvent evt; - - blockingSectionBegin(); + OperationContextAwareWrapper contextualEvt = takeQueuedElement(); - try { - evt = evts.take(); - } - finally { - blockingSectionEnd(); - } + try (Scope ignored = OperationContext.restoreSnapshot(contextualEvt.contextSnapshot())) { + NotificationEvent evt = contextualEvt.delegate(); - try (Scope ignored = withRemoteSecurityContext(ctx, evt.secCtx)) { int type = evt.type; AffinityTopologyVersion topVer = evt.topVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/AbstractCachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/AbstractCachePartitionExchangeWorkerTask.java deleted file mode 100644 index c83297db50d19..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/AbstractCachePartitionExchangeWorkerTask.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.internal.processors.security.SecurityContext; -import org.jetbrains.annotations.Nullable; - -/** */ -public abstract class AbstractCachePartitionExchangeWorkerTask implements CachePartitionExchangeWorkerTask { - /** Security context in which current task must be executed. */ - @Nullable private final SecurityContext secCtx; - - /** */ - protected AbstractCachePartitionExchangeWorkerTask(@Nullable SecurityContext secCtx) { - this.secCtx = secCtx; - } - - /** {@inheritDoc} */ - @Override @Nullable public SecurityContext securityContext() { - return secCtx; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java index e8c1b12981ffa..f4c1392ac2f30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.processors.security.SecurityContext; -import org.jetbrains.annotations.Nullable; - /** * Cache partition exchange worker task marker interface. */ @@ -28,10 +25,4 @@ public interface CachePartitionExchangeWorkerTask { * @return {@code False} if exchange merge should stop if this task is found in exchange worker queue. */ boolean skipForExchangeMerge(); - - /** - * @return Security context in which current task must be executed. - * {@code null} means that the task is to run in the context of a local node, - */ - @Nullable SecurityContext securityContext(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeTask.java index 682b9437aae4d..d44f8b191e9bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeTask.java @@ -17,23 +17,17 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.internal.S; /** * Cache statistics mode change task for exchange worker. */ -public class CacheStatisticsModeChangeTask extends AbstractCachePartitionExchangeWorkerTask { +public class CacheStatisticsModeChangeTask implements CachePartitionExchangeWorkerTask { /** Discovery message. */ private final CacheStatisticsModeChangeMessage msg; - /** - * @param secCtx Security context in which current task must be executed. - * @param msg Message. - */ - public CacheStatisticsModeChangeTask(SecurityContext secCtx, CacheStatisticsModeChangeMessage msg) { - super(secCtx); - + /** @param msg Message. */ + public CacheStatisticsModeChangeTask(CacheStatisticsModeChangeMessage msg) { assert msg != null; this.msg = msg; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryTask.java index e7c6165ebbce3..24e48876157dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryTask.java @@ -17,21 +17,15 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.internal.S; /** Client cache change dummy task for exchange worker. */ -public class ClientCacheChangeDummyDiscoveryTask extends AbstractCachePartitionExchangeWorkerTask { +public class ClientCacheChangeDummyDiscoveryTask implements CachePartitionExchangeWorkerTask { /** Discovery message. */ private final ClientCacheChangeDummyDiscoveryMessage msg; - /** - * @param secCtx Security context in which current task must be executed. - * @param msg Message. - */ - public ClientCacheChangeDummyDiscoveryTask(SecurityContext secCtx, ClientCacheChangeDummyDiscoveryMessage msg) { - super(secCtx); - + /** @param msg Message. */ + public ClientCacheChangeDummyDiscoveryTask(ClientCacheChangeDummyDiscoveryMessage msg) { assert msg != null; this.msg = msg; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java index 6b94151bfdf48..73cc69a8dcaac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java @@ -17,11 +17,7 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext; /** * @@ -30,9 +26,6 @@ class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements Cache /** */ private final GridCacheSharedContext cctx; - /** Security context. */ - @Nullable private final SecurityContext secCtx; - /** * @param cctx Context. * @param timeout Timeout. @@ -41,7 +34,6 @@ class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements Cache super(timeout); this.cctx = cctx; - secCtx = remoteSecurityContext(cctx.kernalContext()); } /** {@inheritDoc} */ @@ -49,11 +41,6 @@ class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements Cache return true; } - /** {@inheritDoc} */ - @Override @Nullable public SecurityContext securityContext() { - return secCtx; - } - /** {@inheritDoc} */ @Override public void onTimeout() { if (!cctx.kernalContext().isStopping()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a49cadcbb1962..8013ca6c96509 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -105,11 +104,12 @@ import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanTags; +import org.apache.ignite.internal.thread.context.OperationContext; import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.GridStringBuilder; @@ -129,11 +129,11 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.metric.MetricRegistry; -import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -163,8 +163,6 @@ import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_OPS_BLOCKED_DURATION; import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_OPS_BLOCKED_DURATION_HISTOGRAM; import static org.apache.ignite.internal.processors.metric.GridMetricManager.REBALANCED; -import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext; -import static org.apache.ignite.internal.processors.security.SecurityUtils.withRemoteSecurityContext; import static org.apache.ignite.internal.processors.task.TaskExecutionOptions.options; import static org.apache.ignite.internal.processors.tracing.SpanType.EXCHANGE_FUTURE; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.nodeIds; @@ -679,10 +677,8 @@ else if (customMsg instanceof WalStateAbstractMessage // Notify indexing engine about node leave so that we can re-map coordinator accordingly. if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { - SecurityContext secCtx = remoteSecurityContext(cctx.kernalContext()); - - exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(secCtx, evt.eventNode())); - exchWorker.addCustomTask(new WalStateNodeLeaveExchangeTask(secCtx, evt.eventNode())); + exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode())); + exchWorker.addCustomTask(new WalStateNodeLeaveExchangeTask(evt.eventNode())); } } @@ -744,7 +740,7 @@ public AffinityTopologyVersion onKernalStart(boolean active, boolean reconnect) else if (reconnect) reconnectExchangeFut.onDone(); - new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start(); + exchWorker.start(); if (reconnect) { if (fut != null) { @@ -855,7 +851,7 @@ public static Object rebalanceTopic(int idx) { // Finish all exchange futures. ExchangeFutureSet exchFuts0 = exchFuts; - for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + for (CachePartitionExchangeWorkerTask task : exchWorker) { if (task instanceof GridDhtPartitionsExchangeFuture) ((GridDhtPartitionsExchangeFuture)task).onDone(stopErr); } @@ -2449,7 +2445,7 @@ public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, Grid if (exchWorker.waitForExchangeFuture(resVer)) return true; - for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + for (CachePartitionExchangeWorkerTask task : exchWorker) { if (task instanceof GridDhtPartitionsExchangeFuture) { GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; @@ -2529,7 +2525,7 @@ public boolean mergeExchangesOnCoordinator( synchronized (curFut.mutex()) { int awaited = 0; - for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + for (CachePartitionExchangeWorkerTask task : exchWorker) { if (task instanceof GridDhtPartitionsExchangeFuture) { GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; @@ -2628,7 +2624,7 @@ private void waitForTestVersion(AffinityTopologyVersion exchMergeTestWaitVer, Gr while (U.currentTimeMillis() < end) { boolean found = false; - for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + for (CachePartitionExchangeWorkerTask task : exchWorker) { if (task instanceof GridDhtPartitionsExchangeFuture) { GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; @@ -2750,11 +2746,7 @@ public BooleanMetricImpl clusterRebalancedMetric() { * Exchange future thread. All exchanges happen only by one thread and next * exchange will not start until previous one completes. */ - private class ExchangeWorker extends GridWorker { - /** Future queue. */ - private final LinkedBlockingDeque futQ = - new LinkedBlockingDeque<>(); - + private class ExchangeWorker extends IgniteLinkedBlockingQueueProcessor { /** */ private AffinityTopologyVersion lastFutVer; @@ -2771,8 +2763,12 @@ private class ExchangeWorker extends GridWorker { * Constructor. */ private ExchangeWorker() { - super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log, - cctx.kernalContext().workersRegistry()); + super( + cctx.igniteInstanceName(), + "exchange-worker", + GridCachePartitionExchangeManager.this.log, + cctx.kernalContext().workersRegistry() + ); } /** @@ -2780,7 +2776,7 @@ private ExchangeWorker() { */ void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) { if (!hasPendingExchange()) - futQ.add(new RebalanceReassignExchangeTask(remoteSecurityContext(cctx.kernalContext()), exchId, fut)); + addToQueue(new RebalanceReassignExchangeTask(exchId, fut)); } /** @@ -2790,7 +2786,7 @@ void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeF IgniteInternalFuture forceRebalance(GridDhtPartitionExchangeId exchId) { GridCompoundFuture fut = new GridCompoundFuture<>(CU.boolReducer()); - futQ.add(new ForceRebalanceExchangeTask(remoteSecurityContext(cctx.kernalContext()), exchId, fut)); + addToQueue(new ForceRebalanceExchangeTask(exchId, fut)); return fut; } @@ -2799,10 +2795,9 @@ IgniteInternalFuture forceRebalance(GridDhtPartitionExchangeId exchId) * @param caches Caches to stop. */ IgniteInternalFuture deferStopCachesOnClientReconnect(Collection caches) { - StopCachesOnClientReconnectExchangeTask task = - new StopCachesOnClientReconnectExchangeTask(remoteSecurityContext(cctx.kernalContext()), caches); + StopCachesOnClientReconnectExchangeTask task = new StopCachesOnClientReconnectExchangeTask(caches); - futQ.add(task); + addToQueue(task); return task; } @@ -2813,7 +2808,7 @@ IgniteInternalFuture deferStopCachesOnClientReconnect(Collection= 0) { fut0.finishMerged(resVer, exchFut); - futQ.remove(fut0); + removeQueuedElement(fut0); } else break; @@ -2895,7 +2890,7 @@ void addCustomTask(CachePartitionExchangeWorkerTask task) { assert !isExchangeTask(task); - futQ.offer(task); + addToQueue(task); } /** @@ -2918,8 +2913,8 @@ void processCustomTask(CachePartitionExchangeWorkerTask task) { * @return Whether pending exchange future exists. */ boolean hasPendingExchange() { - if (!futQ.isEmpty()) { - for (CachePartitionExchangeWorkerTask task : futQ) { + if (!isQueueEmpty()) { + for (CachePartitionExchangeWorkerTask task : this) { if (isExchangeTask(task)) return true; } @@ -2932,8 +2927,8 @@ boolean hasPendingExchange() { * @return Whether pending exchange future triggered by non client node exists. */ boolean hasPendingServerExchange() { - if (!futQ.isEmpty()) { - for (CachePartitionExchangeWorkerTask task : futQ) { + if (!isQueueEmpty()) { + for (CachePartitionExchangeWorkerTask task : this) { if (task instanceof GridDhtPartitionsExchangeFuture) { if (((GridDhtPartitionsExchangeFuture)task).changedAffinity()) return true; @@ -2948,12 +2943,12 @@ boolean hasPendingServerExchange() { * Dump debug info. */ void dumpExchangeDebugInfo() { - U.warn(log, "First " + DIAGNOSTIC_WARN_LIMIT + " pending exchange futures [total=" + futQ.size() + ']'); + U.warn(log, "First " + DIAGNOSTIC_WARN_LIMIT + " pending exchange futures [total=" + queueSize() + ']'); if (DIAGNOSTIC_WARN_LIMIT > 0) { int cnt = 0; - for (CachePartitionExchangeWorkerTask task : futQ) { + for (CachePartitionExchangeWorkerTask task : this) { if (task instanceof GridDhtPartitionsExchangeFuture) { U.warn(log, ">>> " + ((GridDhtPartitionsExchangeFuture)task).shortInfo()); @@ -3006,7 +3001,7 @@ private void body0() throws InterruptedException, IgniteCheckedException { cnt++; - CachePartitionExchangeWorkerTask task = null; + OperationContextAwareWrapper contextualTask = null; try { boolean preloadFinished = true; @@ -3038,16 +3033,14 @@ private void body0() throws InterruptedException, IgniteCheckedException { if (isCancelled()) Thread.currentThread().interrupt(); - blockingSectionBegin(); + contextualTask = pollQueuedElement(timeout, MILLISECONDS); - task = futQ.poll(timeout, MILLISECONDS); - - blockingSectionEnd(); - - if (task == null) + if (contextualTask == null) continue; // Main while loop. - try (Scope ignored = withRemoteSecurityContext(cctx.kernalContext(), task.securityContext())) { + try (Scope ignored = OperationContext.restoreSnapshot(contextualTask.contextSnapshot())) { + CachePartitionExchangeWorkerTask task = contextualTask.delegate(); + if (!isExchangeTask(task)) { processCustomTask(task); @@ -3334,7 +3327,7 @@ else if (task instanceof ForceRebalanceExchangeTask) { } catch (IgniteCheckedException e) { U.error(log, "Failed to wait for completion of partition map exchange " + - "(preloading will not start): " + task, e); + "(preloading will not start): " + contextualTask == null ? null : contextualTask.delegate(), e); throw e; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 223dac5edaa5d..a82a8a9179c5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -220,7 +220,6 @@ import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; -import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext; import static org.apache.ignite.internal.util.IgniteUtils.doInParallel; /** @@ -393,18 +392,18 @@ public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(Di SchemaAbstractDiscoveryMessage msg0 = (SchemaAbstractDiscoveryMessage)msg; if (msg0.exchange()) - return new SchemaExchangeWorkerTask(remoteSecurityContext(ctx), msg0); + return new SchemaExchangeWorkerTask(msg0); } else if (msg instanceof ClientCacheChangeDummyDiscoveryMessage) { ClientCacheChangeDummyDiscoveryMessage msg0 = (ClientCacheChangeDummyDiscoveryMessage)msg; - return new ClientCacheChangeDummyDiscoveryTask(remoteSecurityContext(ctx), msg0); + return new ClientCacheChangeDummyDiscoveryTask(msg0); } else if (msg instanceof CacheStatisticsModeChangeMessage) { CacheStatisticsModeChangeMessage msg0 = (CacheStatisticsModeChangeMessage)msg; if (msg0.initial()) - return new CacheStatisticsModeChangeTask(remoteSecurityContext(ctx), msg0); + return new CacheStatisticsModeChangeTask(msg0); } return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java index 0c9bb3f5ac063..77dfc34e32e7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java @@ -18,25 +18,21 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.internal.S; /** * Exchange task to handle node leave for WAL state manager. */ -public class WalStateNodeLeaveExchangeTask extends AbstractCachePartitionExchangeWorkerTask { +public class WalStateNodeLeaveExchangeTask implements CachePartitionExchangeWorkerTask { /** Node that has left the grid. */ private final ClusterNode node; /** * Constructor. * - * @param secCtx Security context in which current task must be executed. * @param node Node that has left the grid. */ - public WalStateNodeLeaveExchangeTask(SecurityContext secCtx, ClusterNode node) { - super(secCtx); - + public WalStateNodeLeaveExchangeTask(ClusterNode node) { assert node != null; this.node = node; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java index 961f4c539cdc2..95a524c4077a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java @@ -21,10 +21,8 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -41,12 +39,15 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; @@ -119,7 +120,7 @@ void start() throws IgniteCheckedException { writer = new BinaryMetadataAsyncWriter(); - U.newThread(writer).start(); + writer.start(); } /** @@ -392,12 +393,7 @@ public static boolean enabled(IgniteConfiguration cfg) { /** * */ - private class BinaryMetadataAsyncWriter extends GridWorker { - /** - * Queue of write tasks submitted for execution. - */ - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - + private class BinaryMetadataAsyncWriter extends IgniteLinkedBlockingQueueProcessor { /** * Write operation tasks prepared for writing (but not yet submitted to execution (actual writing). */ @@ -405,8 +401,7 @@ private class BinaryMetadataAsyncWriter extends GridWorker { /** */ BinaryMetadataAsyncWriter() { - super(ctx.igniteInstanceName(), "binary-metadata-writer", - BinaryMetadataFileStore.this.log, ctx.workersRegistry()); + super(ctx.igniteInstanceName(), "binary-metadata-writer", BinaryMetadataFileStore.this.log, ctx.workersRegistry()); } /** @@ -427,7 +422,7 @@ synchronized void startTaskAsync(int typeId, int typeVer) { ", typeVersion=" + typeVer + ']' ); - queue.add(task); + addToQueue(task); } else { if (log.isDebugEnabled()) @@ -443,7 +438,7 @@ synchronized void startTaskAsync(int typeId, int typeVer) { @Override public synchronized void cancel() { super.cancel(); - queue.clear(); + clearQueue(); IgniteCheckedException err = new IgniteCheckedException("Operation has been cancelled (node is stopping)."); @@ -479,27 +474,23 @@ synchronized void startTaskAsync(int typeId, int typeVer) { /** */ private void body0() throws InterruptedException { - OperationTask task; + OperationContextAwareWrapper contextualTask = takeQueuedElement(); - blockingSectionBegin(); - - try { - task = queue.take(); + try (Scope ignored = OperationContext.restoreSnapshot(contextualTask.contextSnapshot())) { + OperationTask task = contextualTask.delegate(); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug( "Starting write operation for" + " [typeId=" + task.typeId() + ", typeVer=" + task.typeVersion() + ']' ); + } task.execute(BinaryMetadataFileStore.this); - } - finally { - blockingSectionEnd(); - } - finishWriteFuture(task.typeId(), task.typeVersion(), task); + finishWriteFuture(task.typeId(), task.typeVersion(), task); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java index 91df187060d82..a9312591ca237 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java @@ -18,13 +18,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.AbstractCachePartitionExchangeWorkerTask; -import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; /** * A task for finishing preloading future in exchange worker thread. */ -public class FinishPreloadingTask extends AbstractCachePartitionExchangeWorkerTask { +public class FinishPreloadingTask implements CachePartitionExchangeWorkerTask { /** Topology version. */ private final AffinityTopologyVersion topVer; @@ -35,12 +34,9 @@ public class FinishPreloadingTask extends AbstractCachePartitionExchangeWorkerTa private final long rebalanceId; /** - * @param secCtx Security context in which current task must be executed. * @param topVer Topology version. */ - public FinishPreloadingTask(SecurityContext secCtx, AffinityTopologyVersion topVer, int grpId, long rebalanceId) { - super(secCtx); - + public FinishPreloadingTask(AffinityTopologyVersion topVer, int grpId, long rebalanceId) { this.grpId = grpId; this.topVer = topVer; this.rebalanceId = rebalanceId; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java index 294f5609b3053..dfa0e1e0fb335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java @@ -17,14 +17,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import org.apache.ignite.internal.processors.cache.AbstractCachePartitionExchangeWorkerTask; -import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.util.future.GridCompoundFuture; /** * */ -public class ForceRebalanceExchangeTask extends AbstractCachePartitionExchangeWorkerTask { +public class ForceRebalanceExchangeTask implements CachePartitionExchangeWorkerTask { /** */ private final GridDhtPartitionExchangeId exchId; @@ -32,17 +31,10 @@ public class ForceRebalanceExchangeTask extends AbstractCachePartitionExchangeWo private final GridCompoundFuture forcedRebFut; /** - * @param secCtx Security context in which current task must be executed. * @param exchId Exchange ID. * @param forcedRebFut Rebalance future. */ - public ForceRebalanceExchangeTask( - SecurityContext secCtx, - GridDhtPartitionExchangeId exchId, - GridCompoundFuture forcedRebFut - ) { - super(secCtx); - + public ForceRebalanceExchangeTask(GridDhtPartitionExchangeId exchId, GridCompoundFuture forcedRebFut) { assert exchId != null; assert forcedRebFut != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 733f9b0adc462..59d3df1e45657 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -108,7 +108,6 @@ import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.internal.processors.metric.GridMetricManager; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.tracing.NoopSpan; import org.apache.ignite.internal.processors.tracing.Span; @@ -146,7 +145,6 @@ import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent; import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation; -import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext; import static org.apache.ignite.internal.util.IgniteUtils.doInParallel; import static org.apache.ignite.internal.util.IgniteUtils.doInParallelUninterruptibly; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id; @@ -281,9 +279,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Cache change requests. */ private ExchangeActions exchActions; - /** Security context. */ - @Nullable private final SecurityContext secCtx; - /** */ private final IgniteLogger exchLog; @@ -432,8 +427,6 @@ public GridDhtPartitionsExchangeFuture( log = cctx.logger(getClass()); exchLog = cctx.logger(EXCHANGE_LOG); - secCtx = remoteSecurityContext(cctx.kernalContext()); - timeBag = new TimeBag(log.isInfoEnabled()); initFut = new GridFutureAdapter() { @@ -483,11 +476,6 @@ public GridCacheSharedContext sharedContext() { return false; } - /** {@inheritDoc} */ - @Override @Nullable public SecurityContext securityContext() { - return secCtx; - } - /** * @return Exchange context. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java index 454856f237c59..6f6fcf7b5f063 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import org.apache.ignite.internal.processors.cache.AbstractCachePartitionExchangeWorkerTask; -import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; /** * */ -public class RebalanceReassignExchangeTask extends AbstractCachePartitionExchangeWorkerTask { +public class RebalanceReassignExchangeTask implements CachePartitionExchangeWorkerTask { /** */ private final GridDhtPartitionExchangeId exchId; @@ -31,17 +30,13 @@ public class RebalanceReassignExchangeTask extends AbstractCachePartitionExchang private final GridDhtPartitionsExchangeFuture exchFut; /** - * @param secCtx Security context in which current task must be executed. * @param exchId Exchange ID. * @param exchFut Exchange future. */ public RebalanceReassignExchangeTask( - SecurityContext secCtx, GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut ) { - super(secCtx); - assert exchId != null; assert exchFut != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java index e65a11f085c7a..eeb57df28f740 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/StopCachesOnClientReconnectExchangeTask.java @@ -20,30 +20,20 @@ import java.util.Collection; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; /** * */ -public class StopCachesOnClientReconnectExchangeTask extends GridFutureAdapter - implements CachePartitionExchangeWorkerTask { +public class StopCachesOnClientReconnectExchangeTask extends GridFutureAdapter implements CachePartitionExchangeWorkerTask { /** */ @GridToStringInclude private final Collection stoppedCaches; - /** Security context. */ - @Nullable private final SecurityContext secCtx; - - /** - * @param secCtx Security context. - * @param stoppedCaches Collection of stopped caches. - */ - public StopCachesOnClientReconnectExchangeTask(@Nullable SecurityContext secCtx, Collection stoppedCaches) { - this.secCtx = secCtx; + /** @param stoppedCaches Collection of stopped caches. */ + public StopCachesOnClientReconnectExchangeTask(Collection stoppedCaches) { this.stoppedCaches = stoppedCaches; } @@ -52,11 +42,6 @@ public StopCachesOnClientReconnectExchangeTask(@Nullable SecurityContext secCtx, return false; } - /** {@inheritDoc} */ - @Override @Nullable public SecurityContext securityContext() { - return secCtx; - } - /** * @return Collection of stopped caches. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 4866664f2ecab..621cabfad03fa 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -70,12 +70,12 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManagerImpl; import org.apache.ignite.internal.processors.query.GridQueryProcessor; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridStripedReadWriteLock; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.GridWorkerPool; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.maintenance.MaintenanceRegistry; import org.apache.ignite.maintenance.MaintenanceTask; @@ -87,6 +87,7 @@ import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID; import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.TMP_SUFFIX; +import static org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor.newCachedThreadPool; /** * File page store manager. @@ -239,9 +240,14 @@ public FilePageStoreManager(GridKernalContext ctx) { }; if (cleanFiles) { - cleanupAsyncExecutor.async(doShutdown); + try { + cleanupAsyncExecutor.async(doShutdown); - U.log(log, "Cache stores cleanup started asynchronously"); + U.log(log, "Cache stores cleanup started asynchronously"); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to start cache stores cleanup asynchronously", e); + } } else doShutdown.run(); @@ -924,7 +930,7 @@ protected static class LongOperationAsyncExecutor { private final IgniteLogger log; /** */ - private Set workers = new GridConcurrentHashSet<>(); + private final GridWorkerPool workerPool; /** */ private static final AtomicLong workerCounter = new AtomicLong(0); @@ -932,17 +938,17 @@ protected static class LongOperationAsyncExecutor { /** */ public LongOperationAsyncExecutor(String igniteInstanceName, IgniteLogger log) { this.igniteInstanceName = igniteInstanceName; - this.log = log; + + workerPool = new GridWorkerPool(newCachedThreadPool("async-file-store-cleanup-worker", igniteInstanceName), log); } /** - * Executes long operation in dedicated thread. Uses write lock as such operations can't run - * simultaneously. + * Executes long operation asynchronously. Uses write lock as such operations can't run simultaneously. * * @param runnable long operation */ - public void async(Runnable runnable) { + public void async(Runnable runnable) throws IgniteCheckedException { String workerName = "async-file-store-cleanup-task-" + workerCounter.getAndIncrement(); GridWorker worker = new GridWorker(igniteInstanceName, workerName, log) { @@ -954,17 +960,11 @@ public void async(Runnable runnable) { } finally { readWriteLock.writeLock().unlock(); - - workers.remove(this); } } }; - workers.add(worker); - - Thread asyncTask = U.newThread(worker); - - asyncTask.start(); + workerPool.execute(worker); } /** @@ -990,7 +990,7 @@ public T afterAsyncCompletion(IgniteOutClosure closure) { * Cancels async tasks. */ public void awaitAsyncTaskCompletion(boolean cancel) { - U.awaitForWorkersStop(workers, cancel, log); + workerPool.join(cancel); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 7cebeaca6a894..48525f5d697d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -470,7 +470,7 @@ private void setField(IgniteEx kernal, String name, Object val) throws NoSuchFie } /** {@inheritDoc} */ - @Override public DataStreamProcessor dataStream() { + @Override public DataStreamProcessor dataStream() { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifier.java index 57a219fc128e1..9b8ec045ce001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifier.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; @@ -166,7 +167,7 @@ void checkForNewVersion(IgniteLogger log, boolean first) { log = log.getLogger(getClass()); try { - cmd.set(new UpdateChecker(log, first)); + cmd.set(OperationContextAwareRunnable.wrapIfContextNotEmpty(new UpdateChecker(log, first))); } catch (RejectedExecutionException e) { U.error(log, "Failed to schedule a thread due to execution rejection (safely ignoring): " + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 9cdb2f424a3ca..011f3ab4a4925 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.UUID; -import java.util.concurrent.DelayQueue; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; @@ -35,6 +34,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.thread.OomExceptionHandler; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -42,7 +44,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.IgniteDelayQueueProcessor; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -56,18 +58,15 @@ /** * Data stream processor. */ -public class DataStreamProcessor extends GridProcessorAdapter { +public class DataStreamProcessor extends GridProcessorAdapter { /** Loaders map (access is not supposed to be highly concurrent). */ private Collection ldrs = new GridConcurrentHashSet<>(); /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - /** Flushing thread. */ - private Thread flusher; - - /** */ - private final DelayQueue> flushQ = new DelayQueue<>(); + /** Data Streamer flusher. */ + private final DataStreamerFlusher flusher = new DataStreamerFlusher(); /** Marshaller. */ private final Marshaller marsh; @@ -93,31 +92,6 @@ public DataStreamProcessor(GridKernalContext ctx) { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - flusher = U.newThread(new GridWorker(ctx.igniteInstanceName(), "grid-data-loader-flusher", log) { - @Override protected void body() throws InterruptedException { - while (!isCancelled()) { - DataStreamerImpl ldr = flushQ.take(); - - if (!busyLock.enterBusy()) - return; - - try { - if (ldr.isClosed()) - continue; - - ldr.tryFlush(); - - flushQ.offer(ldr); - } - finally { - busyLock.leaveBusy(); - } - } - } - }); - - flusher.setUncaughtExceptionHandler(new OomExceptionHandler(ctx)); - flusher.start(); if (log.isDebugEnabled()) @@ -131,7 +105,7 @@ public DataStreamProcessor(GridKernalContext ctx) { busyLock.block(); - U.interrupt(flusher); + U.cancel(flusher); U.join(flusher, log); for (DataStreamerImpl ldr : ldrs) { @@ -163,12 +137,12 @@ public DataStreamProcessor(GridKernalContext ctx) { * @param cacheName Cache name ({@code null} for default cache). * @return Data loader. */ - public DataStreamerImpl dataStreamer(@Nullable String cacheName) { + public DataStreamerImpl dataStreamer(@Nullable String cacheName) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to create data streamer (grid is stopping)."); try { - final DataStreamerImpl ldr = new DataStreamerImpl<>(ctx, cacheName, flushQ); + final DataStreamerImpl ldr = new DataStreamerImpl<>(ctx, cacheName); ldrs.add(ldr); @@ -190,6 +164,16 @@ public DataStreamerImpl dataStreamer(@Nullable String cacheName) { } } + /** */ + public void scheduleAutoFlush(DataStreamerImpl dataStreamer) { + flusher.addToQueue(dataStreamer); + } + + /** */ + public void stopAutoFlush(DataStreamerImpl dataStreamer) { + flusher.removeQueuedElement(dataStreamer); + } + /** * @param nodeId Sender ID. * @param req Request. @@ -272,7 +256,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { clsLdr = dep.classLoader(); } - StreamReceiver updater; + StreamReceiver updater; try { updater = U.unmarshal(marsh, req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config())); @@ -303,7 +287,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { */ private void localUpdate(final UUID nodeId, final DataStreamerRequest req, - final StreamReceiver updater, + final StreamReceiver updater, final Object topic) { final boolean allowOverwrite = !(updater instanceof DataStreamerImpl.IsolatedUpdater); @@ -461,4 +445,41 @@ public static byte ioPolicy(@Nullable IgniteClosure rslvr, Cl X.println(">>> Data streamer processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']'); X.println(">>> ldrsSize: " + ldrs.size()); } + + /** */ + private class DataStreamerFlusher extends IgniteDelayQueueProcessor> { + /** */ + public DataStreamerFlusher() { + super(ctx.igniteInstanceName(), "grid-data-loader-flusher", DataStreamProcessor.this.log, ctx.workersRegistry()); + } + + /** {@inheritDoc} */ + @Override public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { + return new OomExceptionHandler(ctx); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + while (!isCancelled()) { + OperationContextAwareWrapper> contextualLdr = takeQueuedElement(); + + if (!busyLock.enterBusy()) + return; + + try (Scope ignored = OperationContext.restoreSnapshot(contextualLdr.contextSnapshot())) { + DataStreamerImpl ldr = contextualLdr.delegate(); + + if (ldr.isClosed()) + continue; + + ldr.tryFlush(); + + addToQueue(ldr); + } + finally { + busyLock.leaveBusy(); + } + } + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a86b6240ed06a..54049682e642d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -270,9 +269,6 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** */ private volatile long lastFlushTime = U.currentTimeMillis(); - /** */ - private final DelayQueue> flushQ; - /** */ private boolean skipStore; @@ -300,12 +296,10 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** * @param ctx Grid kernal context. * @param cacheName Cache name. - * @param flushQ Flush queue. */ public DataStreamerImpl( final GridKernalContext ctx, - @Nullable final String cacheName, - DelayQueue> flushQ + @Nullable final String cacheName ) { assert ctx != null; @@ -325,7 +319,6 @@ public DataStreamerImpl( } this.cacheName = cacheName; - this.flushQ = flushQ; discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -583,9 +576,9 @@ public IgniteInternalFuture internalFuture() { this.autoFlushFreq = autoFlushFreq; if (autoFlushFreq != 0 && old == 0) - flushQ.add(this); + ctx.dataStream().scheduleAutoFlush(this); else if (autoFlushFreq == 0) - flushQ.remove(this); + ctx.dataStream().stopAutoFlush(this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 4fd610343ad87..3e584ed0e4842 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -113,6 +113,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL; import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS; import static org.apache.ignite.internal.GridTopic.TOPIC_TASK; +import static org.apache.ignite.internal.IgniteInternalWrapper.unwrap; import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; @@ -122,7 +123,6 @@ import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; -import static org.apache.ignite.internal.processors.security.SecurityUtils.unwrap; import static org.apache.ignite.internal.processors.task.GridTaskProcessor.resolveTaskClass; import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL; import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java index 7d3db0d5d192a..6129b9d3c305b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java @@ -23,18 +23,19 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.function.Consumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.COMMON_KEY_PREFIX; @@ -44,7 +45,7 @@ import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.versionKey; /** */ -public class DmsDataWriterWorker extends GridWorker { +public class DmsDataWriterWorker extends IgniteLinkedBlockingQueueProcessor> { /** */ public static final byte[] DUMMY_VALUE = {}; @@ -54,9 +55,6 @@ public class DmsDataWriterWorker extends GridWorker { /** */ private static final Object AWAIT = new Object(); - /** */ - private final LinkedBlockingQueue> updateQueue = new LinkedBlockingQueue<>(); - /** */ private final DmsLocalMetaStorageLock lock; @@ -85,12 +83,12 @@ public DmsDataWriterWorker( DmsLocalMetaStorageLock lock, Consumer errorHnd ) { - super(igniteInstanceName, "dms-writer", log); + super(igniteInstanceName, "dms-writer", log, null); this.lock = lock; this.errorHnd = errorHnd; // Put restore task to the queue, so it will be executed on worker start. - updateQueue.offer(newDmsTask(this::restore)); + addToQueue(newDmsTask(this::restore)); } /** */ @@ -99,10 +97,10 @@ public void setMetaStorage(ReadWriteMetastorage metastorage) { } /** Start new distributed metastorage worker thread. */ - public void start() { + @Override public void start() { isCancelled.set(false); - new IgniteThread(igniteInstanceName(), "dms-writer-thread", this).start(); + super.start(); } /** @@ -121,7 +119,7 @@ public void suspend(IgniteInternalFuture compFut) { else { latch = new CountDownLatch(1); - updateQueue.offer((RunnableFuture)(suspendFut = new FutureTask<>(() -> AWAIT))); + addToQueue((RunnableFuture)(suspendFut = new FutureTask<>(() -> AWAIT))); compFut.listen(() -> latch.countDown()); } @@ -129,7 +127,7 @@ public void suspend(IgniteInternalFuture compFut) { /** */ public void update(DistributedMetaStorageHistoryItem histItem) { - updateQueue.offer(newDmsTask(() -> { + addToQueue(newDmsTask(() -> { metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem); workerDmsVer = workerDmsVer.nextVersion(histItem); @@ -146,7 +144,7 @@ public void update(DistributedMetaStorageClusterNodeData fullNodeData) { assert fullNodeData.fullData != null; assert fullNodeData.hist != null; - updateQueue.offer(newDmsTask(() -> { + addToQueue(newDmsTask(() -> { metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE); doCleanup(); @@ -172,19 +170,19 @@ public void update(DistributedMetaStorageClusterNodeData fullNodeData) { /** */ public void removeHistItem(long ver) { - updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver)))); + addToQueue(newDmsTask(() -> metastorage.remove(historyItemKey(ver)))); } /** */ public void cancel(boolean halt) throws InterruptedException { if (halt) { - updateQueue.clear(); + clearQueue(); if (suspendFut instanceof RunnableFuture) ((Runnable)suspendFut).run(); } - updateQueue.offer(new FutureTask<>(() -> STOP)); + addToQueue(new FutureTask<>(() -> STOP)); latch.countDown(); isCancelled.set(true); @@ -199,18 +197,22 @@ public void cancel(boolean halt) throws InterruptedException { @Override protected void body() { while (true) { try { - RunnableFuture curTask = updateQueue.take(); + OperationContextAwareWrapper> contextualCurTask = takeQueuedElement(); - curTask.run(); + try (Scope ignored = OperationContext.restoreSnapshot(contextualCurTask.contextSnapshot())) { + RunnableFuture curTask = contextualCurTask.delegate(); - // Result will be null for any runnable executed tasks over metastorage and non-null for system DMS tasks. - Object res = U.get(curTask); + curTask.run(); - if (res == STOP) - break; + // Result will be null for any runnable executed tasks over metastorage and non-null for system DMS tasks. + Object res = U.get(curTask); - if (res == AWAIT) - latch.await(); + if (res == STOP) + break; + + if (res == AWAIT) + latch.await(); + } } catch (InterruptedException ignore) { } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java index b7f4fe06edcaa..c16a7bacee219 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java @@ -17,27 +17,23 @@ package org.apache.ignite.internal.processors.query.schema; -import org.apache.ignite.internal.processors.cache.AbstractCachePartitionExchangeWorkerTask; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; -import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.internal.S; /** * Cache schema change task for exchange worker. */ -public class SchemaExchangeWorkerTask extends AbstractCachePartitionExchangeWorkerTask { +public class SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTask { /** Message. */ private final SchemaAbstractDiscoveryMessage msg; /** * Constructor. * - * @param secCtx Security context in which current task must be executed. * @param msg Message. */ - public SchemaExchangeWorkerTask(SecurityContext secCtx, SchemaAbstractDiscoveryMessage msg) { - super(secCtx); - + public SchemaExchangeWorkerTask(SchemaAbstractDiscoveryMessage msg) { assert msg != null; this.msg = msg; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java index 3870e7a913399..668a2c62b52f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java @@ -18,15 +18,14 @@ package org.apache.ignite.internal.processors.query.schema; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.cache.AbstractCachePartitionExchangeWorkerTask; -import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; /** * Node leave exchange worker task. */ -public class SchemaNodeLeaveExchangeWorkerTask extends AbstractCachePartitionExchangeWorkerTask { +public class SchemaNodeLeaveExchangeWorkerTask implements CachePartitionExchangeWorkerTask { /** Node. */ @GridToStringInclude private final ClusterNode node; @@ -34,12 +33,9 @@ public class SchemaNodeLeaveExchangeWorkerTask extends AbstractCachePartitionExc /** * Constructor. * - * @param secCtx Security context in which current task must be executed. * @param node Node. */ - public SchemaNodeLeaveExchangeWorkerTask(SecurityContext secCtx, ClusterNode node) { - super(secCtx); - + public SchemaNodeLeaveExchangeWorkerTask(ClusterNode node) { this.node = node; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java index 0911df2a584eb..b0529868019a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteDomainCombiner; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; -import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; @@ -59,6 +58,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import static org.apache.ignite.internal.IgniteInternalWrapper.unwrap; import static org.apache.ignite.internal.util.IgniteUtils.IGNITE_PKG; import static org.apache.ignite.internal.util.IgniteUtils.packageName; @@ -166,19 +166,6 @@ public static SecurityContext nodeSecurityContext(Marshaller marsh, ClassLoader } } - /** - * @return Current security context if it is different from local node security context, otherwise {@code null}. - * @see #withRemoteSecurityContext(GridKernalContext, SecurityContext) - */ - public static SecurityContext remoteSecurityContext(GridKernalContext ctx) { - IgniteSecurity security = ctx.security(); - - if (!security.enabled() || security.isDefaultContext()) - return null; - - return security.securityContext(); - } - /** @return Current security subject ID if security is enabled, otherwise null. */ public static UUID securitySubjectId(GridKernalContext ctx) { IgniteSecurity security = ctx.security(); @@ -196,21 +183,6 @@ public static UUID securitySubjectId(GridCacheSharedContext cctx) { return securitySubjectId(cctx.kernalContext()); } - /** - * Sets specified security context as current if it differs from the {@code null}. - * {@code null} means that security context of the local node is specified or security is disabled so no security - * context change is needed. - * Note that this method is safe to use only when it is known to be called in the security context of the local node - * (e.g. in system workers). - * @return {@link Scope} instance if new security context is set, otherwise {@code null}. - */ - public static Scope withRemoteSecurityContext(GridKernalContext ctx, SecurityContext secCtx) { - if (secCtx == null) - return null; - - return ctx.security().withContext(secCtx); - } - /** * Computes a result in a privileged action. * @@ -244,11 +216,6 @@ public static boolean isSystemType(GridKernalContext ctx, Object target, boolean return ctx.security().isSystemType(target.getClass()); } - /** */ - public static Object unwrap(Object target) { - return target instanceof IgniteInternalWrapper ? ((IgniteInternalWrapper)target).delegate() : target; - } - /** * @param cls Class instance. * @return Whether specified class is in Ignite package. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java index 15e8afb2487db..f32965ce79254 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -44,10 +43,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -125,7 +127,7 @@ public class ServiceDeploymentManager { void startProcessing() { assert depWorker.runner() == null : "Method shouldn't be called twice during lifecycle;"; - new IgniteThread(ctx.igniteInstanceName(), "services-deployment-worker", depWorker).start(); + depWorker.start(); } /** @@ -145,7 +147,7 @@ void stopProcessing(IgniteCheckedException stopErr) { U.join(depWorker, log); - depWorker.tasksQueue.clear(); + depWorker.clearQueue(); pendingEvts.clear(); @@ -252,7 +254,7 @@ private void addTask(@NotNull DiscoveryEvent evt, @NotNull AffinityTopologyVersi task.onEvent(evt, topVer, depActions); - depWorker.tasksQueue.add(task); + depWorker.addToQueue(task); } /** @@ -440,14 +442,10 @@ private class ServiceCommunicationListener implements GridMessageListener { /** * Services deployment worker. */ - private class ServicesDeploymentWorker extends GridWorker { - /** Queue to process. */ - private final LinkedBlockingQueue tasksQueue = new LinkedBlockingQueue<>(); - + private class ServicesDeploymentWorker extends IgniteLinkedBlockingQueueProcessor { /** {@inheritDoc} */ private ServicesDeploymentWorker() { - super(ctx.igniteInstanceName(), "services-deployment-worker", - ServiceDeploymentManager.this.log, ctx.workersRegistry()); + super(ctx.igniteInstanceName(), "services-deployment-worker", ServiceDeploymentManager.this.log, ctx.workersRegistry()); } /** {@inheritDoc} */ @@ -455,65 +453,60 @@ private ServicesDeploymentWorker() { Throwable err = null; try { - ServiceDeploymentTask task; - while (!isCancelled()) { onIdle(); - blockingSectionBegin(); - - try { - task = tasksQueue.take(); - } - finally { - blockingSectionEnd(); - } + OperationContextAwareWrapper contextualTask = takeQueuedElement(); if (isCancelled()) Thread.currentThread().interrupt(); - task.init(); + try (Scope ignored = OperationContext.restoreSnapshot(contextualTask.contextSnapshot())) { + ServiceDeploymentTask task = contextualTask.delegate(); - final long dumpTimeout = 2 * ctx.config().getNetworkTimeout(); + task.init(); - long dumpCnt = 0; - long nextDumpTime = 0; + final long dumpTimeout = 2 * ctx.config().getNetworkTimeout(); - while (true) { - try { - blockingSectionBegin(); + long dumpCnt = 0; + long nextDumpTime = 0; + while (true) { try { - task.waitForComplete(dumpTimeout); - } - finally { - blockingSectionEnd(); - } + blockingSectionBegin(); - taskPostProcessing(task); + try { + task.waitForComplete(dumpTimeout); + } + finally { + blockingSectionEnd(); + } - break; - } - catch (IgniteFutureTimeoutCheckedException ignored) { - if (isCancelled()) - return; + taskPostProcessing(task); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignoredErr) { + if (isCancelled()) + return; - if (nextDumpTime <= U.currentTimeMillis()) { - log.warning("Failed to wait service deployment process or timeout had been" + - " reached, timeout=" + dumpTimeout + - (log.isDebugEnabled() ? ", task=" + task : ", taskDepId=" + task.deploymentId())); + if (nextDumpTime <= U.currentTimeMillis()) { + log.warning("Failed to wait service deployment process or timeout had been" + + " reached, timeout=" + dumpTimeout + + (log.isDebugEnabled() ? ", task=" + task : ", taskDepId=" + task.deploymentId())); - long nextTimeout = dumpTimeout * (2 + dumpCnt++); + long nextTimeout = dumpTimeout * (2 + dumpCnt++); - nextDumpTime = U.currentTimeMillis() + Math.min(nextTimeout, dfltDumpTimeoutLimit); + nextDumpTime = U.currentTimeMillis() + Math.min(nextTimeout, dfltDumpTimeoutLimit); + } } - } - catch (ClusterTopologyServerNotFoundException e) { - U.error(log, e); + catch (ClusterTopologyServerNotFoundException e) { + U.error(log, e); - taskPostProcessing(task); + taskPostProcessing(task); - break; + break; + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 16c91b86edf53..d21de03723432 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -106,13 +106,13 @@ import static org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT; import static org.apache.ignite.internal.GridTopic.TOPIC_JOB; import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL; +import static org.apache.ignite.internal.IgniteInternalWrapper.unwrap; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED; import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED; import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED; import static org.apache.ignite.internal.processors.security.SecurityUtils.authorizeAll; -import static org.apache.ignite.internal.processors.security.SecurityUtils.unwrap; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id; import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL; import static org.apache.ignite.plugin.security.SecurityPermission.TASK_EXECUTE; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java index 7f7b730126a07..72e1bd0f28b35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java @@ -37,4 +37,9 @@ public interface GridTimeoutObject { * Timeout callback. */ void onTimeout(); + + /** The name of the timeout object. Used to distinguish objects belonging to different subsystems. */ + default String name() { + return getClass().getName(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index 03c72d413adae..11f513e758d55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.timeout; import java.io.Closeable; -import java.util.Comparator; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; @@ -26,6 +25,10 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.X; @@ -35,9 +38,11 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; +import static org.apache.ignite.internal.IgniteInternalWrapper.unwrap; /** * Detects timeout events and processes them. @@ -47,24 +52,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { private final TimeoutWorker timeoutWorker; /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator() { - /** {@inheritDoc} */ - @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) { - int res = Long.compare(o1.endTime(), o2.endTime()); - - if (res != 0) - return res; - - res = o1.timeoutId().compareTo(o2.timeoutId()); - - if (res != 0) - return res; - - // There can be an intersection between timeouts and ids for different subsystems. - return o1.getClass().getName().compareTo(o2.getClass().getName()); - } - }); + private final GridConcurrentSkipListSet timeoutObjs = new GridConcurrentSkipListSet<>(); /** */ private final Object mux = new Object(); @@ -105,11 +93,11 @@ public boolean addTimeoutObject(GridTimeoutObject timeoutObj) { // Timeout will never happen. return false; - boolean added = timeoutObjs.add(timeoutObj); + boolean added = timeoutObjs.add(new TimeoutObjectWrapper(timeoutObj, OperationContext.createSnapshot())); assert added : "Duplicate timeout object found: " + timeoutObj; - if (timeoutObjs.firstx() == timeoutObj) { + if (unwrap(timeoutObjs.firstx()) == timeoutObj) { synchronized (mux) { mux.notify(); // No need to notifyAll since we only have one thread. } @@ -143,7 +131,7 @@ public CancelableTask schedule(Runnable task, long delay, long period) { * @return {@code True} if timeout object was removed. */ public boolean removeTimeoutObject(GridTimeoutObject timeoutObj) { - return timeoutObjs.remove(timeoutObj); + return timeoutObjs.remove(new TimeoutObjectWrapper(timeoutObj)); } /** @@ -225,10 +213,10 @@ private class TimeoutWorker extends GridWorker { onIdle(); - for (Iterator iter = timeoutObjs.iterator(); iter.hasNext(); ) { - GridTimeoutObject timeoutObj = iter.next(); + for (Iterator iter = timeoutObjs.iterator(); iter.hasNext(); ) { + TimeoutObjectWrapper timeoutObj = iter.next(); - if (timeoutObj.endTime() <= now) { + if (timeoutObj.delegate().endTime() <= now) { try { boolean rmvd = timeoutObjs.remove(timeoutObj); @@ -262,10 +250,10 @@ private class TimeoutWorker extends GridWorker { // synchronization block, so we don't miss out // on thread notification events sent from // 'addTimeoutObject(..)' method. - GridTimeoutObject first = timeoutObjs.firstx(); + TimeoutObjectWrapper first = timeoutObjs.firstx(); if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); + long waitTime = first.delegate().endTime() - U.currentTimeMillis(); if (waitTime > 0) { blockingSectionBegin(); @@ -432,4 +420,65 @@ private static class WaitFutureTimeoutObject extends GridTimeoutObjectAdapter { return S.toString(WaitFutureTimeoutObject.class, this); } } + + /** */ + private static class TimeoutObjectWrapper + extends OperationContextAwareWrapper + implements Comparable { + /** */ + TimeoutObjectWrapper(GridTimeoutObject delegate) { + super(delegate, null); + } + + /** */ + TimeoutObjectWrapper(GridTimeoutObject delegate, OperationContextSnapshot snapshot) { + super(delegate, snapshot); + } + + /** */ + void onTimeout() { + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + delegate.onTimeout(); + } + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GridTimeoutProcessor.TimeoutObjectWrapper o) { + int res = Long.compare(delegate.endTime(), o.delegate.endTime()); + + if (res != 0) + return res; + + res = delegate.timeoutId().compareTo(o.delegate.timeoutId()); + + if (res != 0) + return res; + + // There can be an intersection between timeouts and ids for different subsystems. + return delegate.name().compareTo(o.delegate.name()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TimeoutObjectWrapper timeoutObj = (TimeoutObjectWrapper)o; + + return delegate.equals(timeoutObj.delegate); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return delegate.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return delegate.toString(); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/AsynchronousQueueProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/AsynchronousQueueProcessor.java new file mode 100644 index 0000000000000..925063719a690 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/AsynchronousQueueProcessor.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.worker; + +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; +import org.apache.ignite.internal.worker.WorkersRegistry; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.thread.IgniteThread.GRP_IDX_UNASSIGNED; + +/** + * Represents a single-threaded, asynchronous queue processor. It automatically captures the {@link OperationContext} + * attached to the thread that submitted the item for processing and restores it before processing actually begins in the + * worker thread. + * + * @param Type of items to be processed. + * @param Type of wrapper over processing item that are stored in the underlying queue. + */ +public abstract class AsynchronousQueueProcessor> extends GridWorker implements Iterable { + /** */ + private final BlockingQueue workerQueue; + + /** */ + private Thread workerThread; + + /** */ + protected AsynchronousQueueProcessor( + @Nullable String igniteInstanceName, + String workerThreadName, + IgniteLogger log, + @Nullable WorkersRegistry workerReg, + BlockingQueue workerQueue + ) { + super(igniteInstanceName, workerThreadName, log, workerReg); + + this.workerQueue = workerQueue; + } + + /** */ + protected abstract W wrapQueueElement(T delegate, OperationContextSnapshot snapshot); + + /** */ + public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { + return null; + } + + /** */ + public IgniteThread createWorkerThread(GridWorker worker) { + return new IgniteThread(igniteInstanceName(), name(), worker, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED); + } + + /** */ + public void start() { + synchronized (this) { + if (workerThread != null) + return; + + workerThread = createWorkerThread(this); + + Thread.UncaughtExceptionHandler errHnd = uncaughtExceptionHandler(); + + if (errHnd != null) + workerThread.setUncaughtExceptionHandler(errHnd); + + workerThread.start(); + } + } + + /** {@inheritDoc} */ + @Override protected void cleanup() { + synchronized (this) { + workerThread = null; + } + } + + /** */ + public boolean addToQueue(@NotNull T t) { + assert !OperationContextAwareWrapper.class.isAssignableFrom(t.getClass()); + + return workerQueue.add(wrapQueueElement(t, OperationContext.createSnapshot())); + } + + /** */ + public boolean removeQueuedElement(Object o) { + return workerQueue.removeIf(w -> o.equals(w.delegate())); + } + + /** */ + public void clearQueue() { + workerQueue.clear(); + } + + /** */ + public int queueSize() { + return workerQueue.size(); + } + + /** */ + public boolean isQueueEmpty() { + return workerQueue.isEmpty(); + } + + /** */ + @Nullable protected OperationContextAwareWrapper takeQueuedElement() throws InterruptedException { + blockingSectionBegin(); + + try { + return workerQueue.take(); + } + finally { + blockingSectionEnd(); + } + } + + /** */ + @Nullable protected OperationContextAwareWrapper pollQueuedElement( + long timeout, + @NotNull TimeUnit unit + ) throws InterruptedException { + blockingSectionBegin(); + + try { + return workerQueue.poll(timeout, unit); + } + finally { + blockingSectionEnd(); + } + } + + /** {@inheritDoc} */ + @Override public @NotNull Iterator iterator() { + Iterator iter = workerQueue.iterator(); + + return new Iterator<>() { + @Override public boolean hasNext() { + return iter.hasNext(); + } + + @Override public T next() { + return iter.next().delegate(); + } + }; + } + + /** */ + public void drainQueue(Consumer consumer) { + W element; + + while (true) { + element = workerQueue.poll(); + + if (element == null) + break; + + consumer.accept(element.delegate()); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java index 85959950f330a..e19e2170aa1db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java @@ -49,7 +49,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher { /** Finish mark. */ private volatile boolean finished; - /** Whether or not this runnable is cancelled. */ + /** Whether this runnable is canceled. */ protected final AtomicBoolean isCancelled = new AtomicBoolean(); /** Actual thread runner. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerPool.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerPool.java index e99c112f3ab55..83f8fd8a30872 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerPool.java @@ -99,19 +99,17 @@ public void execute(final GridWorker w) throws IgniteCheckedException { * before waiting for them to finish. */ public void join(boolean cancel) { - if (cancel) - U.cancel(workers); + for (GridWorker worker : workers) { + try { + if (cancel) + U.cancel(worker); - // Record current interrupted status of calling thread. - boolean interrupted = Thread.interrupted(); - - try { - U.join(workers, log); - } - finally { - // Reset interrupted flag on calling thread. - if (interrupted) - Thread.currentThread().interrupt(); + U.join(worker, log); + } + catch (Exception e) { + if (log != null) + log.warning("Failed to stop grid worker [" + worker.name() + ']', e); + } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteDelayQueueProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteDelayQueueProcessor.java new file mode 100644 index 0000000000000..c7eb5a35ffaeb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteDelayQueueProcessor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.worker; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalWrapper; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; +import org.apache.ignite.internal.worker.WorkersRegistry; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** */ +public abstract class IgniteDelayQueueProcessor + extends AsynchronousQueueProcessor.QueueElementWrapper> { + /** */ + protected IgniteDelayQueueProcessor( + String igniteInstanceName, + String workerThreadName, + IgniteLogger log, + @Nullable WorkersRegistry workerReg + ) { + super(igniteInstanceName, workerThreadName, log, workerReg, new DelayQueue<>()); + } + + /** {@inheritDoc} */ + @Override protected QueueElementWrapper wrapQueueElement(T delegate, OperationContextSnapshot snapshot) { + return delegate == null ? null : new QueueElementWrapper(delegate, snapshot); + } + + /** */ + protected class QueueElementWrapper extends OperationContextAwareWrapper implements Delayed { + /** */ + protected QueueElementWrapper(T delegate, OperationContextSnapshot snapshot) { + super(delegate, snapshot); + } + + /** {@inheritDoc} */ + @Override public long getDelay(@NotNull TimeUnit unit) { + return delegate.getDelay(unit); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull Delayed o) { + return delegate.compareTo((Delayed)IgniteInternalWrapper.unwrap(o)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + IgniteDelayQueueProcessor.QueueElementWrapper w = (IgniteDelayQueueProcessor.QueueElementWrapper)o; + + return delegate.equals(w.delegate); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return delegate.hashCode(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteLinkedBlockingQueueProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteLinkedBlockingQueueProcessor.java new file mode 100644 index 0000000000000..6b7195c5dfeb5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteLinkedBlockingQueueProcessor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.worker; + +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; +import org.apache.ignite.internal.worker.WorkersRegistry; + +/** */ +public abstract class IgniteLinkedBlockingQueueProcessor extends AsynchronousQueueProcessor> { + /** */ + protected IgniteLinkedBlockingQueueProcessor( + String igniteInstanceName, + String workerThreadName, + IgniteLogger log, + WorkersRegistry workerReg + ) { + super(igniteInstanceName, workerThreadName, log, workerReg, new LinkedBlockingQueue<>()); + } + + /** {@inheritDoc} */ + @Override protected OperationContextAwareWrapper wrapQueueElement(T delegate, OperationContextSnapshot snapshot) { + return delegate == null ? null : new OperationContextAwareWrapper<>(delegate, snapshot); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3417cbe3a415f..86e16566894f9 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -68,7 +68,6 @@ import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; -import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.internal.ClusterStateProvider; @@ -776,11 +775,7 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { this.srvLsnr.communicationWorker(commWorker); this.nioSrvWrapper.communicationWorker(commWorker); - new IgniteSpiThread(igniteInstanceName, commWorker.name(), log) { - @Override protected void body() { - commWorker.run(); - } - }.start(); + commWorker.start(); // Ack start. if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java index 46ecf4e4d106f..988866459f8ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationWorker.java @@ -21,9 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -36,6 +34,9 @@ import org.apache.ignite.internal.IgniteTooManyOpenFilesException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.failure.FailureProcessor; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; @@ -43,9 +44,12 @@ import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.apache.ignite.internal.worker.WorkersRegistry; +import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.communication.tcp.AttributeNames; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; +import org.apache.ignite.thread.IgniteThread; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; @@ -54,7 +58,7 @@ /** * Works with connections states. */ -public class CommunicationWorker extends GridWorker { +public class CommunicationWorker extends IgniteLinkedBlockingQueueProcessor { /** Worker name. */ public static final String WORKER_NAME = "tcp-comm-worker"; @@ -64,9 +68,6 @@ public class CommunicationWorker extends GridWorker { /** Attributes. */ private final AttributeNames attrs; - /** */ - private final BlockingQueue q = new LinkedBlockingQueue<>(); - /** Client pool. */ private final ConnectionClientPool clientPool; @@ -132,11 +133,20 @@ public CommunicationWorker( this.spiName = spiName; } + /** {@inheritDoc} */ + @Override public IgniteThread createWorkerThread(GridWorker worker) { + return new IgniteSpiThread(igniteInstanceName(), name(), log) { + @Override protected void body() { + worker.run(); + } + }; + } + /** * @param sesInfo Disconnected session information. */ public void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) { - boolean add = q.add(sesInfo); + boolean add = addToQueue(sesInfo); assert add; } @@ -159,19 +169,16 @@ public void stop() { try { while (!isCancelled()) { - DisconnectedSessionInfo disconnectData; - - blockingSectionBegin(); - - try { - disconnectData = q.poll(cfg.idleConnectionTimeout(), TimeUnit.MILLISECONDS); - } - finally { - blockingSectionEnd(); + OperationContextAwareWrapper contextualDisconnectData = pollQueuedElement( + cfg.idleConnectionTimeout(), + TimeUnit.MILLISECONDS + ); + + if (contextualDisconnectData != null) { + try (Scope ignored = OperationContext.restoreSnapshot(contextualDisconnectData.contextSnapshot())) { + processDisconnect(contextualDisconnectData.delegate()); + } } - - if (disconnectData != null) - processDisconnect(disconnectData); else processIdle(); diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java index b67e055d68fbd..71d3ea56d0b4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.cache.affinity; -import java.util.Queue; import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteDataStreamer; @@ -33,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -257,20 +256,15 @@ public void testStartStopServalServersWithClisnt() throws Exception { }); } - /** - * Waiting for exchanges beginning. - * - * @param ignite Ignite. - */ + /** Waiting for exchanges beginning. */ private void waitForExchnagesBegin(GridCachePartitionExchangeManager exchangeManager, int exchanges) { - GridWorker exchWorker = U.field(exchangeManager, "exchWorker"); - Queue exchnageQueue = U.field(exchWorker, "futQ"); + IgniteLinkedBlockingQueueProcessor exchWorker = U.field(exchangeManager, "exchWorker"); try { assertTrue(GridTestUtils.waitForCondition(() -> { int exFuts = 0; - for (CachePartitionExchangeWorkerTask task : exchnageQueue) { + for (CachePartitionExchangeWorkerTask task : exchWorker) { if (task instanceof GridDhtPartitionsExchangeFuture) exFuts++; } diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java index a71ab3010b4ce..53e29d5d96294 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java @@ -116,7 +116,7 @@ static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask { * Default constructor. */ ExchangeWorkerFailureTask() { - super(null, new SchemaAbstractDiscoveryMessage(null) { + super(new SchemaAbstractDiscoveryMessage(null) { @Override public boolean exchange() { return false; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java index ca22dca7ffc6a..13b04ac63b307 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -96,8 +95,6 @@ public void testMetastorageUpdateDuringSnapshot() throws Exception { DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(), DistributedMetaStorageImpl.class, "worker"); - LinkedBlockingQueue> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class, - "updateQueue"); RunnableFuture testTask = new FutureTask<>(() -> { U.await(latch); @@ -105,9 +102,9 @@ public void testMetastorageUpdateDuringSnapshot() throws Exception { return null; }); - queue.offer(testTask); + worker.addToQueue(testTask); - assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout())); + assertTrue(GridTestUtils.waitForCondition(() -> worker.queueSize() > 10, getTestTimeout())); ignite.context().cache().context().exchange() .registerExchangeAwareComponent(new PartitionsExchangeAware() { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java index 0bd42eae51f90..caaa4abbe3bc0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalWrapper; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.job.GridJobWorker; import org.apache.ignite.internal.processors.job.JobWorkerInterruptionTimeoutObject; @@ -284,8 +285,10 @@ private void cancelBeforeStartWitchChecks(GridJobWorker jobWorker) { * @param n Node. * @return Value of {@code GridTimeoutProcessor#timeoutObjs}. */ - private static GridConcurrentSkipListSet timeoutObjects(IgniteEx n) { - return getFieldValue(n.context().timeout(), "timeoutObjs"); + private static Collection timeoutObjects(IgniteEx n) { + GridConcurrentSkipListSet res = getFieldValue(n.context().timeout(), "timeoutObjs"); + + return F.viewReadOnly(res, o -> (GridTimeoutObject)IgniteInternalWrapper.unwrap(o)); } /** @@ -294,7 +297,7 @@ private static GridConcurrentSkipListSet timeoutObjects(Ignit * @return Collection of {@link JobWorkerInterruptionTimeoutObject} for {@code jobWorker}. */ private static Collection jobWorkerInterrupters( - GridConcurrentSkipListSet timeoutObjects, + Collection timeoutObjects, GridJobWorker jobWorker ) { return timeoutObjects.stream() diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java index 6f0ba81c90aea..f2539ea542812 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorkerTest.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -290,17 +288,14 @@ public void testRestore3() throws Exception { public void testHalt() throws Exception { CountDownLatch latch = new CountDownLatch(1); - LinkedBlockingQueue> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class, - "updateQueue"); - metastorage = new MyReadWriteMetaStorageMock() { @Override public void writeRaw(String key, byte[] data) { try { - assertTrue(GridTestUtils.waitForCondition(() -> queue.size() == 3, getTestTimeout())); + assertTrue(GridTestUtils.waitForCondition(() -> worker.queueSize() == 3, getTestTimeout())); latch.countDown(); - assertTrue(GridTestUtils.waitForCondition(() -> queue.size() == 1, getTestTimeout())); + assertTrue(GridTestUtils.waitForCondition(() -> worker.queueSize() == 1, getTestTimeout())); } catch (Exception ignore) { } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index e67b307884f6f..6a377688917e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -25,9 +25,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -36,19 +38,28 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper; import org.apache.ignite.internal.thread.pool.IgniteForkJoinPool; import org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor; import org.apache.ignite.internal.thread.pool.IgniteStripedExecutor; import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.IgniteDelayQueueProcessor; +import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.thread.IgniteThread; import org.junit.Test; +import org.springframework.lang.NonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; @@ -676,6 +687,127 @@ public void testCompletableFutureContextPropagation() throws Exception { allFut.get(getTestTimeout(), MILLISECONDS); } + /** */ + @Test + public void testTimeoutWorker() throws Exception { + startGrid(0); + + GridTimeoutProcessor timeoutProc = grid(0).context().timeout(); + + List scheduledTasks = new ArrayList<>(); + + try { + BiConsumerX checks = (s, i) -> { + assertTrue(timeoutProc.addTimeoutObject(AttributeValueChecker.createTimeoutObject(s, i))); + scheduledTasks.add(timeoutProc.schedule(new AttributeValueChecker(s, i), 100, 100)); + }; + + execute(checks); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + finally { + scheduledTasks.forEach(GridTimeoutProcessor.CancelableTask::close); + } + } + + /** */ + @Test + public void testIgniteThread() throws Exception { + List threads = new ArrayList<>(); + + try { + BiConsumerX checks = (s, i) -> + threads.add(new IgniteThread("test", "test", new AttributeValueChecker(s, i))); + + execute(checks); + + threads.forEach(IgniteThread::start); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + finally { + threads.forEach(IgniteThread::interrupt); + + for (IgniteThread thread : threads) + thread.join(); + } + } + + /** */ + @Test + public void testContextAwareQueue() throws Exception { + IgniteLinkedBlockingQueueProcessor proc = + new IgniteLinkedBlockingQueueProcessor<>("test", "test", log, null) { + @Override protected void body() throws InterruptedException { + while (!isCancelled()) { + OperationContextAwareWrapper w = pollQueuedElement(100, MILLISECONDS); + + if (w == null) + continue; + + try (Scope ignored0 = OperationContext.set(STR_ATTR, "test", INT_ATTR, 5)) { + try (Scope ignored1 = OperationContext.restoreSnapshot(w.contextSnapshot())) { + w.delegate().run(); + } + + checkAttributeValues("test", 5); + } + } + + } + }; + + try { + proc.start(); + + execute((s, i) -> proc.addToQueue(new AttributeValueChecker(s, i))); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + finally { + U.cancel(proc); + U.join(proc); + } + } + + /** */ + @Test + public void testContextAwareDelayQueue() throws Exception { + IgniteDelayQueueProcessor proc = new IgniteDelayQueueProcessor<>("test", "test", log, null) { + @Override protected void body() { + try { + while (!isCancelled()) { + OperationContextAwareWrapper w = takeQueuedElement(); + + try (Scope ignored0 = OperationContext.set(STR_ATTR, "test", INT_ATTR, 5)) { + try (Scope ignored1 = OperationContext.restoreSnapshot(w.contextSnapshot())) { + w.delegate().checker.run(); + } + + checkAttributeValues("test", 5); + } + } + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + }; + + try { + proc.start(); + + execute((s, i) -> proc.addToQueue(new TestDelayedObject(new AttributeValueChecker(s, i)))); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + finally { + U.cancel(proc); + U.join(proc); + } + } + /** */ private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { CountDownLatch poolUnblockedLatch = blockPool(pool); @@ -792,10 +924,32 @@ public AttributeValueChecker(String expStrAttrVal, Integer expIntAttrVal) { /** */ static void assertAllCreatedChecksPassed() throws Exception { for (AttributeValueChecker check : CHECKS) { - check.get(1000, MILLISECONDS); + check.get(5_000, MILLISECONDS); } } + /** */ + static GridTimeoutObject createTimeoutObject(String strAttrVal, int intAttrVal) { + AttributeValueChecker checker = new AttributeValueChecker(strAttrVal, intAttrVal); + + IgniteUuid id = IgniteUuid.randomUuid(); + long endTime = System.currentTimeMillis() + 1000; + + return new GridTimeoutObject() { + @Override public IgniteUuid timeoutId() { + return id; + } + + @Override public long endTime() { + return endTime; + } + + @Override public void onTimeout() { + checker.run(); + } + }; + } + /** */ static IgniteClosure, Integer> createClosure(String strAttrVal, int intAttrVal) { AttributeValueChecker checker = new AttributeValueChecker(strAttrVal, intAttrVal); @@ -898,6 +1052,27 @@ static BiConsumer createBiConsumer(String strAttrVal, int intAttrVa } } + /** */ + private static class TestDelayedObject implements Delayed { + /** */ + private final AttributeValueChecker checker; + + /** */ + private TestDelayedObject(AttributeValueChecker checker) { + this.checker = checker; + } + + /** {@inheritDoc} */ + @Override public long getDelay(@NonNull TimeUnit unit) { + return 0; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NonNull Delayed o) { + return 0; + } + } + /** */ private interface BiConsumerX { /** */