Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
private String execName;

/** Default task execution options. */
private final ThreadLocal<TaskExecutionOptions> opts = ThreadLocal.withInitial(() ->
TaskExecutionOptions.options(prj.nodes()).withExecutor(execName)
);
private final ThreadLocal<TaskExecutionOptions> opts = ThreadLocal.withInitial(() -> enrich(TaskExecutionOptions.options()));

/**
* Required by {@link Externalizable}.
Expand All @@ -85,19 +83,7 @@ public IgniteComputeImpl() {
* @param prj Projection.
*/
public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj) {
this(ctx, prj, false);
}

/**
* @param ctx Kernal context.
* @param prj Projection.
* @param async Async support flag.
*/
private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, boolean async) {
super(async);

this.ctx = ctx;
this.prj = prj;
this(ctx, prj, false, null, null);
}

/**
Expand All @@ -107,19 +93,28 @@ private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, boolea
* @param prj Projection.
* @param async Async support flag.
* @param execName Custom executor name.
* @param opts Optional initial task execution options.
*/
private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, boolean async,
String execName) {
private IgniteComputeImpl(
GridKernalContext ctx,
ClusterGroupAdapter prj,
boolean async,
String execName,
@Nullable TaskExecutionOptions opts
) {
super(async);

this.ctx = ctx;
this.prj = prj;
this.execName = execName;

if (opts != null)
this.opts.set(enrich(TaskExecutionOptions.options(opts)));
}

/** {@inheritDoc} */
@Override protected IgniteCompute createAsyncInstance() {
return new IgniteComputeImpl(ctx, prj, true);
return new IgniteComputeImpl(ctx, prj, true, execName, opts.get());
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1136,6 +1131,16 @@ protected Object readResolve() throws ObjectStreamException {

/** {@inheritDoc} */
@Override public IgniteCompute withExecutor(@NotNull String name) {
return new IgniteComputeImpl(ctx, prj, isAsync(), name);
return new IgniteComputeImpl(ctx, prj, isAsync(), name, opts.get());
}

/** Enriches specified task execution options with those that are bounded to the current compute instance. */
private TaskExecutionOptions enrich(TaskExecutionOptions opts) {
opts.withProjection(prj.nodes());

if (execName != null)
opts.withExecutor(execName);

return opts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ public class TaskExecutionOptions {
/** */
private TaskExecutionOptions() {}

/** */
private TaskExecutionOptions(TaskExecutionOptions other) {
name = other.name;
timeout = other.timeout;
execName = other.execName;
pool = other.pool;
projection = other.projection;
projectionPredicate = other.projectionPredicate;
isFailoverDisabled = other.isFailoverDisabled;
isResultCacheDisabled = other.isResultCacheDisabled;
isSysTask = other.isSysTask;
isAuthDisabled = other.isAuthDisabled;
}

/** */
public static TaskExecutionOptions options() {
return new TaskExecutionOptions();
Expand All @@ -67,6 +81,11 @@ public static TaskExecutionOptions options(Collection<ClusterNode> projection) {
return new TaskExecutionOptions().withProjection(projection);
}

/** */
public static TaskExecutionOptions options(TaskExecutionOptions other) {
return new TaskExecutionOptions(other);
}

/** */
public long timeout() {
return timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ExecutorConfiguration;
Expand All @@ -34,6 +35,7 @@
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
Expand Down Expand Up @@ -221,6 +223,25 @@ public void testAllComputeApiByCustomExecutor() throws Exception {
comp.execute(TestTask.class, null);
}

/** */
@Test
public void testMultipleTaskModifiersWithExecutorName() throws Exception {
String taskName = "test-task-name";

IgniteCompute compute = grid(0).compute().withName(taskName);

// Here we check that withExecutor call does not affect existing compute instances.
compute.withExecutor(EXEC_NAME0);
compute.broadcast(new TestRunnable(taskName, "pub"));

IgniteCompute computeWithExecutor = compute.withName(taskName).withExecutor(EXEC_NAME0);

computeWithExecutor.broadcast(new TestRunnable(taskName, EXEC_NAME0));
computeWithExecutor.broadcast(new TestRunnable(TestRunnable.class.getName(), EXEC_NAME0));
computeWithExecutor.withName(taskName).broadcast(new TestRunnable(taskName, EXEC_NAME0));
computeWithExecutor.withExecutor(EXEC_NAME1).withName(taskName).broadcast(new TestRunnable(taskName, EXEC_NAME1));
}

/**
* Test task
*/
Expand All @@ -247,4 +268,29 @@ static class TestTask extends ComputeTaskSplitAdapter<Object, Object> {
return null;
}
}

/** */
private static class TestRunnable implements IgniteRunnable {
/** */
private final String expTaskName;

/** */
private final String expExecName;

/** */
public TestRunnable(String expTaskName, String expExecName) {
this.expTaskName = expTaskName;
this.expExecName = expExecName;
}

/** */
@TaskSessionResource
ComputeTaskSession ses;

/** {@inheritDoc} */
@Override public void run() {
assertEquals(expTaskName, ses.getTaskName());
assertTrue(Thread.currentThread().getName().contains(expExecName));
}
}
}