Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,15 @@
public interface IgniteInternalWrapper<T> {
/** @return Wrapped object. */
public T delegate();

/**
* @param target Object to unwrap.
* @return Original object.
*/
public static Object unwrap(Object target) {
while (target instanceof IgniteInternalWrapper)
target = ((IgniteInternalWrapper<?>)target).delegate();

return target;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,4 @@ public OperationContextAwareInClosure(IgniteInClosure<E> delegate, OperationCont
public static <E> IgniteInClosure<E> wrap(IgniteInClosure<E> delefate) {
return wrap(delefate, OperationContextAwareInClosure::new);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,39 @@
import org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.OperationContextSnapshot;

/** */
public abstract class OperationContextAwareWrapper<T> implements IgniteInternalWrapper<T> {
/** Represents wrapper containing an arbitrary object along with {@link OperationContextSnapshot}. */
public class OperationContextAwareWrapper<T> implements IgniteInternalWrapper<T> {
/** */
protected final T delegate;

/** */
protected final OperationContextSnapshot snapshot;

/** */
public OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) {
assert delegate != null;

Check warning on line 35 in modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this assert with a proper check.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ5HdbVRwJX1QOFT5NUX&open=AZ5HdbVRwJX1QOFT5NUX&pullRequest=13148

this.delegate = delegate;
this.snapshot = snapshot;
}

/** {@inheritDoc} */
@Override public T delegate() {
return delegate;
}

/** */
protected OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) {
this.delegate = delegate;
this.snapshot = snapshot;
public OperationContextSnapshot contextSnapshot() {
return snapshot;
}

/** */
protected static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper) {
public static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper) {
return wrap(delegate, wrapper, false);
}

/** */
protected static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
public static <T> T wrap(T delegate, BiFunction<T, OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
if (delegate == null || delegate instanceof OperationContextAwareWrapper)
return delegate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;

import static org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable.wrapIfContextNotEmpty;

/**
* This class adds some necessary plumbing on top of the {@link Thread} class.
* Specifically, it adds:
Expand All @@ -30,6 +34,7 @@
* <li>Dedicated parent thread group</li>
* <li>Backing interrupted flag</li>
* <li>Name of the grid this thread belongs to</li>
* <li>Automatic capturing of {@link OperationContext} of parent thread</li>
* </ul>
* <b>Note</b>: this class is intended for internal use only.
*/
Expand Down Expand Up @@ -76,13 +81,18 @@ public IgniteThread(String igniteInstanceName, String threadName) {
* @param r Runnable to execute.
*/
public IgniteThread(String igniteInstanceName, String threadName, Runnable r) {
this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
this(igniteInstanceName, threadName, wrapIfContextNotEmpty(r), GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
}

/**
* Creates grid thread with given name for a given Ignite instance with specified
* thread group.
*
* <b>Note</b>: This constructor creates a thread that does NOT automatically acquire the parent thread's Operation
* Context, ensuring that no Operation Context is attached to it at the start of execution. It is used in Ignite
* thread pools and worker threads, which rely on this behavior to avoid unnecessary wrapping
* (see {@link OperationContextAwareWrapper})
*
* @param igniteInstanceName Name of the Ignite instance this thread is created for.
* @param threadName Name of thread.
* @param r Runnable to execute.
Expand All @@ -101,20 +111,6 @@ public IgniteThread(String igniteInstanceName, String threadName, Runnable r, in
this.plc = plc;
}

/**
* @param igniteInstanceName Name of the Ignite instance this thread is created for.
* @param threadGrp Thread group.
* @param threadName Name of thread.
*/
protected IgniteThread(String igniteInstanceName, ThreadGroup threadGrp, String threadName) {
super(threadGrp, threadName);

this.igniteInstanceName = igniteInstanceName;
this.compositeRwLockIdx = GRP_IDX_UNASSIGNED;
this.stripe = -1;
this.plc = GridIoPolicy.UNDEFINED;
}

/**
* @return Related {@link GridIoPolicy} for internal Ignite pools.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*
* @return Data streamer processor.
*/
public <K, V> DataStreamProcessor<K, V> dataStream();
public DataStreamProcessor dataStream();

/**
* Gets event continuous processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,8 @@ else if (!(comp instanceof DiscoveryNodeValidationProcessor
}

/** {@inheritDoc} */
@Override public <K, V> DataStreamProcessor<K, V> dataStream() {
return (DataStreamProcessor<K, V>)dataLdrProc;
@Override public DataStreamProcessor dataStream() {
return dataLdrProc;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ public void start(
startProcessor(new GridTaskProcessor(ctx));
startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
startProcessor(createComponent(IgniteRestProcessor.class, ctx));
startProcessor(new DataStreamProcessor<>(ctx));
startProcessor(new DataStreamProcessor(ctx));
startProcessor(new GridContinuousProcessor(ctx));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
Expand Down
Loading
Loading