Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 2, 2015
1 parent d89ef5b commit a1cc0a9
Show file tree
Hide file tree
Showing 20 changed files with 205 additions and 159 deletions.
Expand Up @@ -36,7 +36,7 @@ public interface IgniteDataLoadCacheUpdater<K, V> extends Serializable {
* *
* @param cache Cache. * @param cache Cache.
* @param entries Collection of entries. * @param entries Collection of entries.
* @throws IgniteCheckedException If failed. * @throws IgniteException If failed.
*/ */
public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteCheckedException; public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
} }
Expand Up @@ -519,10 +519,17 @@ private void add(Map<String, Object> attrs, String name, @Nullable Serializable
*/ */
@SuppressWarnings({"CatchGenericClass"}) @SuppressWarnings({"CatchGenericClass"})
private void notifyLifecycleBeans(LifecycleEventType evt) throws IgniteCheckedException { private void notifyLifecycleBeans(LifecycleEventType evt) throws IgniteCheckedException {
if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) {
for (LifecycleBean bean : cfg.getLifecycleBeans()) for (LifecycleBean bean : cfg.getLifecycleBeans())
if (bean != null) if (bean != null) {
bean.onLifecycleEvent(evt); try {
bean.onLifecycleEvent(evt);
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
}
} }


/** /**
Expand Down Expand Up @@ -986,7 +993,7 @@ else if (X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedExcep
nodes = nodes0.size(); nodes = nodes0.size();
cpus = metrics.getTotalCpus(); cpus = metrics.getTotalCpus();
} }
catch (IgniteCheckedException ignore) { catch (IgniteException ignore) {
// No-op. // No-op.
} }


Expand Down
Expand Up @@ -1066,7 +1066,7 @@ public IgniteCache<K, V> flagOn(CacheFlag flag) {
* @return Cache exception. * @return Cache exception.
*/ */
private CacheException cacheException(IgniteCheckedException e) { private CacheException cacheException(IgniteCheckedException e) {
return U.convertCacheException(e); return U.convertToCacheException(e);
} }


/** /**
Expand Down
Expand Up @@ -98,10 +98,10 @@ public static <K, V> IgniteDataLoadCacheUpdater<K, V> groupLocked() {
* @param cache Cache. * @param cache Cache.
* @param rmvCol Keys to remove. * @param rmvCol Keys to remove.
* @param putMap Entries to put. * @param putMap Entries to put.
* @throws IgniteCheckedException If failed. * @throws IgniteException If failed.
*/ */
protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol, protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol,
Map<K, V> putMap) throws IgniteCheckedException { Map<K, V> putMap) {
assert rmvCol != null || putMap != null; assert rmvCol != null || putMap != null;


// Here we assume that there are no key duplicates, so the following calls are valid. // Here we assume that there are no key duplicates, so the following calls are valid.
Expand All @@ -120,8 +120,7 @@ private static class Individual<K, V> implements IgniteDataLoadCacheUpdater<K, V
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
throws IgniteCheckedException {
assert cache != null; assert cache != null;
assert !F.isEmpty(entries); assert !F.isEmpty(entries);


Expand All @@ -148,8 +147,7 @@ private static class Batched<K, V> implements IgniteDataLoadCacheUpdater<K, V> {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
throws IgniteCheckedException {
assert cache != null; assert cache != null;
assert !F.isEmpty(entries); assert !F.isEmpty(entries);


Expand Down Expand Up @@ -189,8 +187,7 @@ private static class BatchedSorted<K, V> implements IgniteDataLoadCacheUpdater<K
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
throws IgniteCheckedException {
assert cache != null; assert cache != null;
assert !F.isEmpty(entries); assert !F.isEmpty(entries);


Expand Down Expand Up @@ -230,8 +227,7 @@ private static class GroupLocked<K, V> implements IgniteDataLoadCacheUpdater<K,
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
throws IgniteCheckedException {
assert cache != null; assert cache != null;
assert !F.isEmpty(entries); assert !F.isEmpty(entries);


Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;


import java.util.*; import java.util.*;


Expand All @@ -36,42 +37,46 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoadCacheUpda
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) {
throws IgniteCheckedException { try {
String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName();


GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context(); GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context();
IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class);
GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);


assert !F.isEmpty(col); assert !F.isEmpty(col);


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); log.debug("Running DR put job [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']');


IgniteInternalFuture<?> f = cache.context().preloader().startFuture(); IgniteInternalFuture<?> f = cache.context().preloader().startFuture();


if (!f.isDone()) if (!f.isDone())
f.get(); f.get();


for (Map.Entry<K, V> entry0 : col) { for (Map.Entry<K, V> entry0 : col) {
GridCacheRawVersionedEntry<K, V> entry = (GridCacheRawVersionedEntry<K, V>)entry0; GridCacheRawVersionedEntry<K, V> entry = (GridCacheRawVersionedEntry<K, V>)entry0;


entry.unmarshal(ctx.config().getMarshaller()); entry.unmarshal(ctx.config().getMarshaller());


K key = entry.key(); K key = entry.key();


GridCacheDrInfo<V> val = entry.value() != null ? entry.expireTime() != 0 ? GridCacheDrInfo<V> val = entry.value() != null ? entry.expireTime() != 0 ?
new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) :
new GridCacheDrInfo<>(entry.value(), entry.version()) : null; new GridCacheDrInfo<>(entry.value(), entry.version()) : null;


if (val == null) if (val == null)
cache.removeAllDr(Collections.singletonMap(key, entry.version())); cache.removeAllDr(Collections.singletonMap(key, entry.version()));
else else
cache.putAllDr(Collections.singletonMap(key, val)); cache.putAllDr(Collections.singletonMap(key, val));
} }


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("DR put job finished [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']'); log.debug("DR put job finished [nodeId=" + ctx.localNodeId() + ", cacheName=" + cacheName + ']');
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
} }
} }
Expand Up @@ -815,7 +815,7 @@ boolean onMasterNodeLeft() {
log.debug("Successfully executed GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " + log.debug("Successfully executed GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " +
"[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']'); "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']');
} }
catch (IgniteException e) { catch (Exception e) {
U.error(log, "Failed to execute GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " + U.error(log, "Failed to execute GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " +
"[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']', e); "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']', e);
} }
Expand Down
Expand Up @@ -655,7 +655,7 @@ public static IgniteException convertException(IgniteCheckedException e) {
* @param e Ignite checked exception. * @param e Ignite checked exception.
* @return Ignite runtime exception. * @return Ignite runtime exception.
*/ */
@Nullable public static CacheException convertCacheException(IgniteCheckedException e) { @Nullable public static CacheException convertToCacheException(IgniteCheckedException e) {
if (e instanceof CachePartialUpdateCheckedException) if (e instanceof CachePartialUpdateCheckedException)
return new CachePartialUpdateException((CachePartialUpdateCheckedException)e); return new CachePartialUpdateException((CachePartialUpdateCheckedException)e);


Expand Down
50 changes: 36 additions & 14 deletions modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
Expand Up @@ -30,21 +30,43 @@
*/ */
public interface IgniteFuture<V> extends Future<V> { public interface IgniteFuture<V> extends Future<V> {
/** /**
* @throws IgniteException * Synchronously waits for completion of the computation and
* @throws IgniteInterruptedException * returns computation result.
*
* @return Computation result.
* @throws IgniteInterruptedException Subclass of {@link IgniteException} thrown if the wait was interrupted.
* @throws IgniteFutureCancelledException Subclass of {@link IgniteException} throws if computation was cancelled.
* @throws IgniteException If computation failed.
*/ */
@Override public V get() throws IgniteException, IgniteInterruptedException; @Override public V get() throws IgniteException;


public V get(long timeout); /**
* Synchronously waits for completion of the computation for
* up to the timeout specified and returns computation result.
* This method is equivalent to calling {@link #get(long, TimeUnit) get(long, TimeUnit.MILLISECONDS)}.
*
* @param timeout The maximum time to wait in milliseconds.
* @return Computation result.
* @throws IgniteInterruptedException Subclass of {@link IgniteException} thrown if the wait was interrupted.
* @throws IgniteFutureCancelledException Subclass of {@link IgniteException} throws if computation was cancelled.
* @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link IgniteException} thrown if the wait was timed out.
* @throws IgniteException If computation failed.
*/
public V get(long timeout)throws IgniteException;


/** /**
* @param timeout * Synchronously waits for completion of the computation for
* @param unit * up to the timeout specified and returns computation result.
* @return *
* @throws IgniteException * @param timeout The maximum time to wait.
* @throws IgniteInterruptedException * @param unit The time unit of the {@code timeout} argument.
* @return Computation result.
* @throws IgniteInterruptedException Subclass of {@link IgniteException} thrown if the wait was interrupted.
* @throws IgniteFutureCancelledException Subclass of {@link IgniteException} throws if computation was cancelled.
* @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link IgniteException} thrown if the wait was timed out.
* @throws IgniteException If computation failed.
*/ */
@Override public V get(long timeout, TimeUnit unit) throws IgniteException, IgniteInterruptedException, IgniteFutureTimeoutException; @Override public V get(long timeout, TimeUnit unit) throws IgniteException;


/** /**
* Cancels this future. * Cancels this future.
Expand Down Expand Up @@ -77,7 +99,7 @@ public interface IgniteFuture<V> extends Future<V> {
* immediately notified within the same thread. * immediately notified within the same thread.
* <p> * <p>
* Default value is {@code false}. To change the default, set * Default value is {@code false}. To change the default, set
* {@link IgniteSystemProperties#GG_FUT_SYNC_NOTIFICATION} system property to {@code true}. * {@link IgniteSystemProperties#IGNITE_FUT_SYNC_NOTIFICATION} system property to {@code true}.
* *
* @param syncNotify Flag to turn on or off synchronous listener notification. * @param syncNotify Flag to turn on or off synchronous listener notification.
*/ */
Expand All @@ -91,7 +113,7 @@ public interface IgniteFuture<V> extends Future<V> {
* immediately notified within the same thread. * immediately notified within the same thread.
* <p> * <p>
* Default value is {@code false}. To change the default, set * Default value is {@code false}. To change the default, set
* {@link IgniteSystemProperties#GG_FUT_SYNC_NOTIFICATION} system property to {@code true}. * {@link IgniteSystemProperties#IGNITE_FUT_SYNC_NOTIFICATION} system property to {@code true}.
* *
* @return Synchronous listener notification flag. * @return Synchronous listener notification flag.
*/ */
Expand All @@ -106,7 +128,7 @@ public interface IgniteFuture<V> extends Future<V> {
* started the future, or in a different thread). * started the future, or in a different thread).
* <p> * <p>
* Default value is {@code false}. To change the default, set * Default value is {@code false}. To change the default, set
* {@link IgniteSystemProperties#GG_FUT_CONCURRENT_NOTIFICATION} system property to {@code true}. * {@link IgniteSystemProperties#IGNITE_FUT_CONCURRENT_NOTIFICATION} system property to {@code true}.
* *
* @param concurNotify Flag to turn on or off concurrent listener notification. * @param concurNotify Flag to turn on or off concurrent listener notification.
*/ */
Expand All @@ -121,7 +143,7 @@ public interface IgniteFuture<V> extends Future<V> {
* started the future, or in a different thread). * started the future, or in a different thread).
* <p> * <p>
* Default value is {@code false}. To change the default, set * Default value is {@code false}. To change the default, set
* {@link IgniteSystemProperties#GG_FUT_CONCURRENT_NOTIFICATION} system property to {@code true}. * {@link IgniteSystemProperties#IGNITE_FUT_CONCURRENT_NOTIFICATION} system property to {@code true}.
* *
* @return Concurrent listener notification flag * @return Concurrent listener notification flag
*/ */
Expand Down
Expand Up @@ -101,7 +101,7 @@ public interface LifecycleBean {
* This method is called when lifecycle event occurs. * This method is called when lifecycle event occurs.
* *
* @param evt Lifecycle event. * @param evt Lifecycle event.
* @throws IgniteCheckedException Thrown in case of any errors. * @throws IgniteException Thrown in case of any errors.
*/ */
public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException; public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException;
} }
Expand Up @@ -73,7 +73,7 @@ public void testEmptyProjection() throws Exception {


assert false; assert false;
} }
catch (ClusterGroupEmptyCheckedException e) { catch (ClusterGroupEmptyException e) {
info("Caught expected exception: " + e); info("Caught expected exception: " + e);
} }
} }
Expand Down
Expand Up @@ -94,7 +94,7 @@ private static class TestLifecycleBean extends TestLifecycleAware implements Lif
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { @Override public void onLifecycleEvent(LifecycleEventType evt) {
// No-op. // No-op.
} }
} }
Expand Down
Expand Up @@ -250,7 +250,7 @@ private LifeCycleBaseBean() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { @Override public void onLifecycleEvent(LifecycleEventType evt) {
callsCntr.get(evt).incrementAndGet(); callsCntr.get(evt).incrementAndGet();
} }


Expand Down Expand Up @@ -282,10 +282,10 @@ private LifeCycleExceptionBean(LifecycleEventType errType, boolean gridErr) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { @Override public void onLifecycleEvent(LifecycleEventType evt) {
if (evt == errType) { if (evt == errType) {
if (gridErr) if (gridErr)
throw new IgniteCheckedException("Expected exception for event: " + evt) { throw new IgniteException("Expected exception for event: " + evt) {
@Override public void printStackTrace(PrintStream s) { @Override public void printStackTrace(PrintStream s) {
// No-op. // No-op.
} }
Expand Down
Expand Up @@ -128,7 +128,7 @@ public void testLargeObjects() throws Exception {


int cnt = 10000; int cnt = 10000;


populate(grid(0).<Integer, byte[]>cache(null), cnt, KBSIZE); populate(grid(0).<Integer, byte[]>jcache(null), cnt, KBSIZE);


int gridCnt = 3; int gridCnt = 3;


Expand Down Expand Up @@ -165,11 +165,11 @@ public void testLargeObjectsWithLifeCycleBean() throws Exception {
@IgniteInstanceResource @IgniteInstanceResource
private Ignite ignite; private Ignite ignite;


@Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { @Override public void onLifecycleEvent(LifecycleEventType evt) {
if (evt == LifecycleEventType.AFTER_GRID_START) { if (evt == LifecycleEventType.AFTER_GRID_START) {
GridCache<Integer, byte[]> c = ignite.cache(null); IgniteCache<Integer, byte[]> c = ignite.jcache(null);


if (c.putxIfAbsent(-1, new byte[1])) { if (c.putIfAbsent(-1, new byte[1])) {
populate(c, cnt, KBSIZE); populate(c, cnt, KBSIZE);


info(">>> POPULATED GRID <<<"); info(">>> POPULATED GRID <<<");
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testLargeObjectsWithLifeCycleBean() throws Exception {
* @param kbSize Size in KB. * @param kbSize Size in KB.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
private void populate(GridCache<Integer, byte[]> c, int cnt, int kbSize) throws IgniteCheckedException { private void populate(IgniteCache<Integer, byte[]> c, int cnt, int kbSize) {
for (int i = 0; i < cnt; i++) for (int i = 0; i < cnt; i++)
c.put(i, value(kbSize)); c.put(i, value(kbSize));
} }
Expand Down

0 comments on commit a1cc0a9

Please sign in to comment.