Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Jan 29, 2015
1 parent aad2750 commit f4ca658
Show file tree
Hide file tree
Showing 20 changed files with 182 additions and 97 deletions.
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.compute.*; import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*; import org.apache.ignite.events.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.client.*; import org.apache.ignite.client.*;
import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.*;
Expand Down Expand Up @@ -200,7 +201,7 @@ public void testTimedOutTask() throws Exception {
return null; return null;
} }
}, },
ComputeTaskTimeoutException.class, ComputeTaskTimeoutCheckedException.class,
null null
); );


Expand Down
Expand Up @@ -32,14 +32,14 @@ public interface ComputeTaskFuture<R> extends IgniteFuture<R> {
/** /**
* {@inheritDoc} * {@inheritDoc}
* *
* @throws ComputeTaskTimeoutException If task execution timed out. * @throws org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException If task execution timed out.
*/ */
@Override public R get(); @Override public R get();


/** /**
* {@inheritDoc} * {@inheritDoc}
* *
* @throws ComputeTaskTimeoutException If task execution timed out. * @throws org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException If task execution timed out.
*/ */
@Override public R get(long timeout, TimeUnit unit); @Override public R get(long timeout, TimeUnit unit);


Expand Down
Expand Up @@ -17,8 +17,45 @@


package org.apache.ignite.compute; package org.apache.ignite.compute;


import org.apache.ignite.*;
import org.jetbrains.annotations.*;

/** /**
* TODO * This exception indicates that task execution timed out. It is thrown from
* {@link org.apache.ignite.compute.ComputeTaskFuture#get()} method.
*/ */
public class ComputeTaskTimeoutException { public class ComputeTaskTimeoutException extends IgniteException {
/** */
private static final long serialVersionUID = 0L;

/**
* Creates task timeout exception with given task execution ID and
* error message.
*
* @param msg Error message.
*/
public ComputeTaskTimeoutException(String msg) {
super(msg);
}

/**
* Creates new task timeout exception given throwable as a cause and
* source of error message.
*
* @param cause Non-null throwable cause.
*/
public ComputeTaskTimeoutException(Throwable cause) {
this(cause.getMessage(), cause);
}

/**
* Creates task timeout exception with given task execution ID,
* error message and optional nested exception.
*
* @param msg Error message.
* @param cause Optional nested exception (can be {@code null}).
*/
public ComputeTaskTimeoutException(String msg, @Nullable Throwable cause) {
super(msg, cause);
}
} }
Expand Up @@ -18,8 +18,8 @@
package org.apache.ignite.compute.gridify.aop; package org.apache.ignite.compute.gridify.aop;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.compute.gridify.*; import org.apache.ignite.compute.gridify.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.gridify.*; import org.apache.ignite.internal.util.gridify.*;
import java.lang.reflect.*; import java.lang.reflect.*;
Expand Down Expand Up @@ -136,7 +136,7 @@ protected Object execute(IgniteCompute compute, Class<?> cls, GridifyRangeArgume
end = Long.MAX_VALUE; end = Long.MAX_VALUE;


if (now > end) if (now > end)
throw new ComputeTaskTimeoutException("Timeout occurred while waiting for completion."); throw new ComputeTaskTimeoutCheckedException("Timeout occurred while waiting for completion.");


Collection<?> res = compute.withTimeout(timeout == 0 ? 0L : (end - now)).execute( Collection<?> res = compute.withTimeout(timeout == 0 ? 0L : (end - now)).execute(
new GridifyDefaultRangeTask(cls, nodeFilter, threshold, splitSize, false), new GridifyDefaultRangeTask(cls, nodeFilter, threshold, splitSize, false),
Expand Down
Expand Up @@ -18,8 +18,8 @@
package org.apache.ignite.compute.gridify.aop; package org.apache.ignite.compute.gridify.aop;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.compute.gridify.*; import org.apache.ignite.compute.gridify.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.gridify.*; import org.apache.ignite.internal.util.gridify.*;
import java.lang.reflect.*; import java.lang.reflect.*;
Expand Down Expand Up @@ -134,7 +134,7 @@ protected Object execute(Method mtd, IgniteCompute compute, Class<?> cls, Gridif


while (true) { while (true) {
if (now > end) if (now > end)
throw new ComputeTaskTimeoutException("Timeout occurred while waiting for completion."); throw new ComputeTaskTimeoutCheckedException("Timeout occurred while waiting for completion.");


GridifyRangeArgument taskArg = createGridifyArgument(arg, res); GridifyRangeArgument taskArg = createGridifyArgument(arg, res);


Expand Down
Expand Up @@ -506,6 +506,13 @@ protected Object readResolve() throws ObjectStreamException {
return prj.compute(); return prj.compute();
} }


/** {@inheritDoc} */
@Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) {
assert fut instanceof ComputeTaskInternalFuture : fut;

return ((ComputeTaskInternalFuture<R>)fut).publicFuture();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public <R> ComputeTaskFuture<R> future() { @Override public <R> ComputeTaskFuture<R> future() {
return (ComputeTaskFuture<R>)super.future(); return (ComputeTaskFuture<R>)super.future();
Expand Down
Expand Up @@ -18,8 +18,8 @@
package org.apache.ignite.internal.executor; package org.apache.ignite.internal.executor;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
Expand Down Expand Up @@ -213,7 +213,7 @@ protected Object readResolve() throws ObjectStreamException {
try { try {
fut.get(end - now); fut.get(end - now);
} }
catch (ComputeTaskTimeoutException e) { catch (ComputeTaskTimeoutCheckedException e) {
U.error(log, "Failed to get task result: " + fut, e); U.error(log, "Failed to get task result: " + fut, e);


return false; return false;
Expand Down Expand Up @@ -242,9 +242,11 @@ protected Object readResolve() throws ObjectStreamException {
try { try {
comp.call(task); comp.call(task);


return addFuture(comp.<T>future()); IgniteFutureImpl<T> fut = (IgniteFutureImpl<T>)comp.future();

return addFuture(fut.internalFuture());
} }
catch (IgniteCheckedException e) { catch (IgniteException e) {
// Should not be thrown since uses asynchronous execution. // Should not be thrown since uses asynchronous execution.
return addFuture(new GridFinishedFutureEx<T>(e)); return addFuture(new GridFinishedFutureEx<T>(e));
} }
Expand All @@ -261,7 +263,9 @@ protected Object readResolve() throws ObjectStreamException {
try { try {
comp.run(task); comp.run(task);


IgniteInternalFuture<T> fut = comp.future().chain(new CX1<IgniteInternalFuture<?>, T>() { IgniteInternalFuture<T> fut0 = ((IgniteFutureImpl<T>)comp.future()).internalFuture();

IgniteInternalFuture<T> fut = fut0.chain(new CX1<IgniteInternalFuture<?>, T>() {
@Override public T applyx(IgniteInternalFuture<?> fut) throws IgniteCheckedException { @Override public T applyx(IgniteInternalFuture<?> fut) throws IgniteCheckedException {
fut.get(); fut.get();


Expand All @@ -271,7 +275,7 @@ protected Object readResolve() throws ObjectStreamException {


return addFuture(fut); return addFuture(fut);
} }
catch (IgniteCheckedException e) { catch (IgniteException e) {
// Should not be thrown since uses asynchronous execution. // Should not be thrown since uses asynchronous execution.
return addFuture(new GridFinishedFutureEx<T>(e)); return addFuture(new GridFinishedFutureEx<T>(e));
} }
Expand All @@ -288,9 +292,11 @@ protected Object readResolve() throws ObjectStreamException {
try { try {
comp.run(task); comp.run(task);


return addFuture(comp.future()); IgniteFutureImpl<?> fut = (IgniteFutureImpl<?>)comp.future();

return addFuture(fut.internalFuture());
} }
catch (IgniteCheckedException e) { catch (IgniteException e) {
// Should not be thrown since uses asynchronous execution. // Should not be thrown since uses asynchronous execution.
return addFuture(new GridFinishedFutureEx<>(e)); return addFuture(new GridFinishedFutureEx<>(e));
} }
Expand Down Expand Up @@ -376,7 +382,7 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, lo
try { try {
fut.get(end - now); fut.get(end - now);
} }
catch (ComputeTaskTimeoutException ignore) { catch (ComputeTaskTimeoutCheckedException ignore) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Timeout occurred during getting task result: " + fut); log.debug("Timeout occurred during getting task result: " + fut);


Expand Down Expand Up @@ -715,7 +721,7 @@ private class TaskFutureWrapper<T> implements Future<T> {


throw e2; throw e2;
} }
catch (ComputeTaskTimeoutException e) { catch (ComputeTaskTimeoutCheckedException e) {
throw new ExecutionException("Task execution timed out during waiting for task result: " + fut, e); throw new ExecutionException("Task execution timed out during waiting for task result: " + fut, e);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand Down
Expand Up @@ -23,11 +23,11 @@
import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cache.datastructures.*;
import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*; import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*; import org.apache.ignite.fs.*;
import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
Expand Down Expand Up @@ -1314,7 +1314,7 @@ private boolean clear(GridCacheVersion obsoleteVer, K key,
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("All remote nodes left while cache clear [cacheName=" + name() + "]"); log.debug("All remote nodes left while cache clear [cacheName=" + name() + "]");
} }
catch (ComputeTaskTimeoutException e) { catch (ComputeTaskTimeoutCheckedException e) {
U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
"'networkTimeout' configuration property) [cacheName=" + name() + "]"); "'networkTimeout' configuration property) [cacheName=" + name() + "]");


Expand Down Expand Up @@ -3832,7 +3832,7 @@ void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... ar
* @param args Arguments. * @param args Arguments.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws IgniteCheckedException { throws IgniteCheckedException {
ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());


Expand Down Expand Up @@ -4170,7 +4170,7 @@ private int globalSize(boolean primaryOnly) throws IgniteCheckedException {


return primaryOnly ? primarySize() : size(); return primaryOnly ? primarySize() : size();
} }
catch (ComputeTaskTimeoutException e) { catch (ComputeTaskTimeoutCheckedException e) {
U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
"'networkTimeout' configuration property) [cacheName=" + name() + "]"); "'networkTimeout' configuration property) [cacheName=" + name() + "]");


Expand Down

0 comments on commit f4ca658

Please sign in to comment.