Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 2, 2015
1 parent 60f1967 commit a88b637
Show file tree
Hide file tree
Showing 18 changed files with 171 additions and 101 deletions.
Expand Up @@ -90,9 +90,9 @@ public static void main(String[] args) throws Exception {
* Populates cache in real time with numbers and keeps count for every number. * Populates cache in real time with numbers and keeps count for every number.
* *
* @param g Grid. * @param g Grid.
* @throws IgniteCheckedException If failed. * @throws IgniteException If failed.
*/ */
private static void streamData(final Ignite g) throws IgniteCheckedException { private static void streamData(final Ignite g) throws IgniteException {
try (IgniteDataLoader<Integer, Long> ldr = g.dataLoader(CACHE_NAME)) { try (IgniteDataLoader<Integer, Long> ldr = g.dataLoader(CACHE_NAME)) {
// Set larger per-node buffer size since our state is relatively small. // Set larger per-node buffer size since our state is relatively small.
ldr.perNodeBufferSize(2048); ldr.perNodeBufferSize(2048);
Expand Down
58 changes: 29 additions & 29 deletions modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
Expand Up @@ -18,7 +18,7 @@
package org.apache.ignite; package org.apache.ignite;


import org.apache.ignite.dataload.*; import org.apache.ignite.dataload.*;
import org.apache.ignite.internal.*; import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import java.util.*; import java.util.*;
Expand All @@ -34,14 +34,14 @@
* the loader. * the loader.
* <p> * <p>
* Also note that {@code GridDataLoader} is not the only way to load data into cache. * Also note that {@code GridDataLoader} is not the only way to load data into cache.
* Alternatively you can use {@link org.apache.ignite.cache.GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
* method to load data from underlying data store. You can also use standard * method to load data from underlying data store. You can also use standard
* cache {@code put(...)} and {@code putAll(...)} operations as well, but they most * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
* likely will not perform as well as this class for loading data. And finally, * likely will not perform as well as this class for loading data. And finally,
* data can be loaded from underlying data store on demand, whenever it is accessed - * data can be loaded from underlying data store on demand, whenever it is accessed -
* for this no explicit data loading step is needed. * for this no explicit data loading step is needed.
* <p> * <p>
* {@code GridDataLoader} supports the following configuration properties: * {@code IgniteDataLoader} supports the following configuration properties:
* <ul> * <ul>
* <li> * <li>
* {@link #perNodeBufferSize(int)} - when entries are added to data loader via * {@link #perNodeBufferSize(int)} - when entries are added to data loader via
Expand Down Expand Up @@ -115,9 +115,9 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable {
* Default is {@code false}. * Default is {@code false}.
* *
* @param isolated Flag value. * @param isolated Flag value.
* @throws IgniteCheckedException If failed. * @throws IgniteException If failed.
*/ */
public void isolated(boolean isolated) throws IgniteCheckedException; public void isolated(boolean isolated) throws IgniteException;


/** /**
* Gets flag indicating that write-through behavior should be disabled for data loading. * Gets flag indicating that write-through behavior should be disabled for data loading.
Expand Down Expand Up @@ -209,7 +209,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable {
* *
* @return Future for this loading process. * @return Future for this loading process.
*/ */
public IgniteInternalFuture<?> future(); public IgniteFuture<?> future();


/** /**
* Optional deploy class for peer deployment. All classes loaded by a data loader * Optional deploy class for peer deployment. All classes loaded by a data loader
Expand All @@ -235,12 +235,12 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable {
* *
* @param key Key. * @param key Key.
* @return Future fo this operation. * @return Future fo this operation.
* @throws IgniteCheckedException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or * @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
*/ */
public IgniteInternalFuture<?> removeData(K key) throws IgniteCheckedException, IgniteInterruptedCheckedException, IllegalStateException; public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException;


/** /**
* Adds data for loading on remote node. This method can be called from multiple * Adds data for loading on remote node. This method can be called from multiple
Expand All @@ -253,12 +253,12 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable {
* @param key Key. * @param key Key.
* @param val Value or {@code null} if respective entry must be removed from cache. * @param val Value or {@code null} if respective entry must be removed from cache.
* @return Future fo this operation. * @return Future fo this operation.
* @throws IgniteCheckedException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or * @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
*/ */
public IgniteInternalFuture<?> addData(K key, @Nullable V val) throws IgniteCheckedException, IgniteInterruptedCheckedException, public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
IllegalStateException; IllegalStateException;


/** /**
Expand All @@ -271,12 +271,12 @@ public IgniteInternalFuture<?> addData(K key, @Nullable V val) throws IgniteChec
* *
* @param entry Entry. * @param entry Entry.
* @return Future fo this operation. * @return Future fo this operation.
* @throws IgniteCheckedException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or * @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
*/ */
public IgniteInternalFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheckedException, IgniteInterruptedCheckedException, public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
IllegalStateException; IllegalStateException;


/** /**
Expand All @@ -292,7 +292,7 @@ public IgniteInternalFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheck
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
* @return Future for this load operation. * @return Future for this load operation.
*/ */
public IgniteInternalFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;


/** /**
* Adds data for loading on remote node. This method can be called from multiple * Adds data for loading on remote node. This method can be called from multiple
Expand All @@ -307,7 +307,7 @@ public IgniteInternalFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheck
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
* @return Future for this load operation. * @return Future for this load operation.
*/ */
public IgniteInternalFuture<?> addData(Map<K, V> entries) throws IllegalStateException; public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;


/** /**
* Loads any remaining data, but doesn't close the loader. Data can be still added after * Loads any remaining data, but doesn't close the loader. Data can be still added after
Expand All @@ -318,43 +318,43 @@ public IgniteInternalFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheck
* another thread to complete flush and exit. If you don't want to wait in this case, * another thread to complete flush and exit. If you don't want to wait in this case,
* use {@link #tryFlush()} method. * use {@link #tryFlush()} method.
* *
* @throws IgniteCheckedException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or * @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
* @see #tryFlush() * @see #tryFlush()
*/ */
public void flush() throws IgniteCheckedException, IgniteInterruptedCheckedException, IllegalStateException; public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;


/** /**
* Makes an attempt to load remaining data. This method is mostly similar to {@link #flush}, * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
* with the difference that it won't wait and will exit immediately. * with the difference that it won't wait and will exit immediately.
* *
* @throws IgniteCheckedException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or * @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on loader. * {@link #close(boolean)} has already been called on loader.
* @see #flush() * @see #flush()
*/ */
public void tryFlush() throws IgniteCheckedException, IgniteInterruptedCheckedException, IllegalStateException; public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;


/** /**
* Loads any remaining data and closes this loader. * Loads any remaining data and closes this loader.
* *
* @param cancel {@code True} to cancel ongoing loading operations. * @param cancel {@code True} to cancel ongoing loading operations.
* @throws IgniteCheckedException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted.
*/ */
public void close(boolean cancel) throws IgniteCheckedException, IgniteInterruptedCheckedException; public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;


/** /**
* Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method. * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
* <p> * <p>
* The method is invoked automatically on objects managed by the * The method is invoked automatically on objects managed by the
* {@code try-with-resources} statement. * {@code try-with-resources} statement.
* *
* @throws IgniteCheckedException If failed to close data loader. * @throws IgniteException If failed to close data loader.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
*/ */
@Override public void close() throws IgniteCheckedException, IgniteInterruptedCheckedException; @Override public void close() throws IgniteException, IgniteInterruptedException;
} }
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.compute.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.dataload.*;
import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
Expand Down Expand Up @@ -3632,7 +3633,9 @@ protected void checkJta() throws IgniteCheckedException {
final long topVer = ctx.affinity().affinityTopologyVersion(); final long topVer = ctx.affinity().affinityTopologyVersion();


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);

try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());


LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl);
Expand All @@ -3641,6 +3644,9 @@ protected void checkJta() throws IgniteCheckedException {


c.onDone(); c.onDone();
} }
finally {
ldr.closeEx(false);
}
} }
else { else {
// Version for all loaded entries. // Version for all loaded entries.
Expand Down Expand Up @@ -3802,7 +3808,9 @@ public void localLoad(Collection<? extends K> keys) throws IgniteCheckedExceptio
final long topVer = ctx.affinity().affinityTopologyVersion(); final long topVer = ctx.affinity().affinityTopologyVersion();


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);

try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());


LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0);
Expand All @@ -3811,6 +3819,9 @@ public void localLoad(Collection<? extends K> keys) throws IgniteCheckedExceptio


c.onDone(); c.onDone();
} }
finally {
ldr.closeEx(false);
}
} }
else { else {
// Version for all loaded entries. // Version for all loaded entries.
Expand All @@ -3830,29 +3841,24 @@ public void localLoad(Collection<? extends K> keys) throws IgniteCheckedExceptio
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException {
ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); globalLoadCacheAsync(p, args).get();

IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover();

comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));
} }


/** /**
* @param p Predicate. * @param p Predicate.
* @param args Arguments. * @param args Arguments.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
* @return Load cache future.
*/ */
IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws IgniteCheckedException { throws IgniteCheckedException {
ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());


IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover(); ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);

comp = comp.withAsync();

comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));


return comp.future(); return ctx.kernalContext().closure().callAsync(BROADCAST,
Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args)),
nodes.nodes());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -5547,7 +5553,7 @@ private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> {
final Collection<Map.Entry<K, V>> col; final Collection<Map.Entry<K, V>> col;


/** */ /** */
final IgniteDataLoader<K, V> ldr; final IgniteDataLoaderImpl<K, V> ldr;


/** */ /** */
final long ttl; final long ttl;
Expand All @@ -5557,7 +5563,7 @@ private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> {
* @param ldr Loader. * @param ldr Loader.
* @param ttl TTL. * @param ttl TTL.
*/ */
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoader<K, V> ldr, long ttl) { private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoaderImpl<K, V> ldr, long ttl) {
this.p = p; this.p = p;
this.ldr = ldr; this.ldr = ldr;
this.ttl = ttl; this.ttl = ttl;
Expand Down
Expand Up @@ -154,7 +154,7 @@ public GridCacheContext<K, V> context() {


try { try {
if (isAsync()) if (isAsync())
curFut.set(ctx.cache().globalLoadCacheAsync(p, args)); setFuture(ctx.cache().globalLoadCacheAsync(p, args));
else else
ctx.cache().globalLoadCache(p, args); ctx.cache().globalLoadCache(p, args);
} }
Expand Down
Expand Up @@ -63,7 +63,7 @@ public GridAtomicCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, Grid


break; break;
} }
catch (CachePartialUpdateException e) { catch (CachePartialUpdateCheckedException e) {
if (cnt++ == MAX_UPDATE_RETRIES) if (cnt++ == MAX_UPDATE_RETRIES)
throw e; throw e;
else { else {
Expand Down
Expand Up @@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {


/** Data loader. */ /** Data loader. */
@GridToStringExclude @GridToStringExclude
private IgniteDataLoader dataLdr; private IgniteDataLoaderImpl dataLdr;


/** /**
* Default constructor for {@link Externalizable} support. * Default constructor for {@link Externalizable} support.
Expand All @@ -47,7 +47,7 @@ public GridDataLoaderFuture() {
* @param ctx Context. * @param ctx Context.
* @param dataLdr Data loader. * @param dataLdr Data loader.
*/ */
GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoader dataLdr) { GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
super(ctx); super(ctx);


assert dataLdr != null; assert dataLdr != null;
Expand All @@ -60,7 +60,7 @@ public GridDataLoaderFuture() {
checkValid(); checkValid();


if (onCancelled()) { if (onCancelled()) {
dataLdr.close(true); dataLdr.closeEx(true);


return true; return true;
} }
Expand Down
Expand Up @@ -119,12 +119,12 @@ public GridDataLoaderProcessor(GridKernalContext ctx) {
U.interrupt(flusher); U.interrupt(flusher);
U.join(flusher, log); U.join(flusher, log);


for (IgniteDataLoader<?, ?> ldr : ldrs) { for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');


try { try {
ldr.close(cancel); ldr.closeEx(cancel);
} }
catch (IgniteInterruptedCheckedException e) { catch (IgniteInterruptedCheckedException e) {
U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e); U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e);
Expand All @@ -143,7 +143,7 @@ public GridDataLoaderProcessor(GridKernalContext ctx) {
* @param compact {@code true} if data loader should transfer data in compact format. * @param compact {@code true} if data loader should transfer data in compact format.
* @return Data loader. * @return Data loader.
*/ */
public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName, boolean compact) { public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
if (!busyLock.enterBusy()) if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to create data loader (grid is stopping)."); throw new IllegalStateException("Failed to create data loader (grid is stopping).");


Expand All @@ -152,7 +152,7 @@ public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName, boolean com


ldrs.add(ldr); ldrs.add(ldr);


ldr.future().listenAsync(new CI1<IgniteInternalFuture<?>>() { ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) { @Override public void apply(IgniteInternalFuture<?> f) {
boolean b = ldrs.remove(ldr); boolean b = ldrs.remove(ldr);


Expand Down

0 comments on commit a88b637

Please sign in to comment.