Skip to content

Commit

Permalink
# ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ashutakGG committed Mar 3, 2015
1 parent 6909cc4 commit 9b33b65
Show file tree
Hide file tree
Showing 30 changed files with 93 additions and 95 deletions.
Expand Up @@ -21,8 +21,8 @@
import org.apache.ignite.examples.*;

/**
* Demonstrates how cache can be populated with data utilizing {@link IgniteDataLoader} API.
* {@link IgniteDataLoader} is a lot more efficient to use than standard
* Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
* {@link IgniteDataStreamer} is a lot more efficient to use than standard
* {@code put(...)} operation as it properly buffers cache requests
* together and properly manages load on remote nodes.
* <p>
Expand Down Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws IgniteException {

long start = System.currentTimeMillis();

try (IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
try (IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
// Configure loader.
ldr.perNodeBufferSize(1024);
ldr.perNodeParallelLoadOperations(8);
Expand Down
Expand Up @@ -92,7 +92,7 @@ public static void main(String[] args) throws IgniteException {
* @throws IgniteException If failed.
*/
private static void streamData(final Ignite ignite) throws IgniteException {
try (IgniteDataLoader<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
try (IgniteDataStreamer<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
// Set larger per-node buffer size since our state is relatively small.
ldr.perNodeBufferSize(2048);

Expand Down Expand Up @@ -140,7 +140,7 @@ private static TimerTask scheduleQuery(final Ignite ignite, Timer timer, final i
/**
* Increments value for key.
*/
private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
/** */
private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
@Override public Void process(MutableEntry<Integer, Long> e, Object... args) {
Expand Down
6 changes: 3 additions & 3 deletions modules/core/src/main/java/org/apache/ignite/Ignite.java
Expand Up @@ -42,7 +42,7 @@
* In addition to {@link ClusterGroup} functionality, from here you can get the following:
* <ul>
* <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li>
* <li>{@link IgniteDataLoader} - functionality for loading data large amounts of data into cache.</li>
* <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li>
* <li>{@link IgniteFs} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li>
* <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li>
* <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li>
Expand Down Expand Up @@ -205,12 +205,12 @@ public interface Ignite extends AutoCloseable {
/**
* Gets a new instance of data loader associated with given cache name. Data loader
* is responsible for loading external data into in-memory data grid. For more information
* refer to {@link IgniteDataLoader} documentation.
* refer to {@link IgniteDataStreamer} documentation.
*
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName);
public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName);

/**
* Gets an instance of IGFS - Ignite In-Memory File System, if one is not
Expand Down
Expand Up @@ -74,7 +74,7 @@
* updates and allow data loader choose most optimal concurrent implementation.
* </li>
* <li>
* {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries.
* {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries.
* It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
* </li>
* <li>
Expand All @@ -87,7 +87,7 @@
* </li>
* </ul>
*/
public interface IgniteDataLoader<K, V> extends AutoCloseable {
public interface IgniteDataStreamer<K, V> extends AutoCloseable {
/** Default max concurrent put operations count. */
public static final int DFLT_MAX_PARALLEL_OPS = 16;

Expand All @@ -111,7 +111,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable {

/**
* Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
* Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method.
* Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
* Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
*
* @param allowOverwrite Flag value.
Expand Down Expand Up @@ -359,12 +359,12 @@ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, Ig
@Override public void close() throws IgniteException, IgniteInterruptedException;

/**
* Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)}
* Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)}
* property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
* performance custom user-defined implementation may help.
* <p>
* Data loader can be configured to use custom implementation of updater instead of default one using
* {@link IgniteDataLoader#updater(IgniteDataLoader.Updater)} method.
* {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
*/
interface Updater<K, V> extends Serializable {
/**
Expand Down
Expand Up @@ -2346,7 +2346,7 @@ public <K, V> GridCache<K, V> cache(@Nullable String name) {
}

/** {@inheritDoc} */
@Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
@Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
guard();

try {
Expand Down
Expand Up @@ -3877,7 +3877,7 @@ protected void checkJta() throws IgniteCheckedException {
final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();

if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);

try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
Expand Down Expand Up @@ -4043,7 +4043,7 @@ public IgniteInternalFuture<?> loadAll(
* @throws IgniteCheckedException If failed.
*/
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
try (final IgniteDataStreamer<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
ldr.allowOverwrite(true);
ldr.skipStore(true);

Expand Down Expand Up @@ -4086,7 +4086,7 @@ public void localLoad(Collection<? extends K> keys,
final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();

if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);

try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
Expand Down Expand Up @@ -6134,7 +6134,7 @@ private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> {
final Collection<Map.Entry<K, V>> col;

/** */
final IgniteDataLoaderImpl<K, V> ldr;
final IgniteDataStreamerImpl<K, V> ldr;

/** */
final ExpiryPolicy plc;
Expand All @@ -6145,7 +6145,7 @@ private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> {
* @param plc Optional expiry policy.
*/
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
IgniteDataLoaderImpl<K, V> ldr,
IgniteDataStreamerImpl<K, V> ldr,
@Nullable ExpiryPolicy plc) {
this.p = p;
this.ldr = ldr;
Expand Down
Expand Up @@ -276,8 +276,8 @@ private GlobalRemoveAllCallable(String cacheName, long topVer) {
else
dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;

try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) {
((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0);
try (IgniteDataStreamer<K, V> dataLdr = ignite.dataLoader(cacheName)) {
((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);

dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());

Expand Down
Expand Up @@ -29,13 +29,13 @@
*/
public class GridDataLoadCacheUpdaters {
/** */
private static final IgniteDataLoader.Updater INDIVIDUAL = new Individual();
private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();

/** */
private static final IgniteDataLoader.Updater BATCHED = new Batched();
private static final IgniteDataStreamer.Updater BATCHED = new Batched();

/** */
private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted();
private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();

/**
* Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
Expand All @@ -44,7 +44,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Single updater.
*/
public static <K, V> IgniteDataLoader.Updater<K, V> individual() {
public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
return INDIVIDUAL;
}

Expand All @@ -55,7 +55,7 @@ public static <K, V> IgniteDataLoader.Updater<K, V> individual() {
*
* @return Batched updater.
*/
public static <K, V> IgniteDataLoader.Updater<K, V> batched() {
public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
return BATCHED;
}

Expand All @@ -66,7 +66,7 @@ public static <K, V> IgniteDataLoader.Updater<K, V> batched() {
*
* @return Batched sorted updater.
*/
public static <K extends Comparable<?>, V> IgniteDataLoader.Updater<K, V> batchedSorted() {
public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
return BATCHED_SORTED;
}

Expand All @@ -93,7 +93,7 @@ protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K>
/**
* Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
*/
private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V> {
private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;

Expand All @@ -120,7 +120,7 @@ private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V>
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> {
private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;

Expand Down Expand Up @@ -160,7 +160,7 @@ private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> {
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
private static class BatchedSorted<K, V> implements IgniteDataLoader.Updater<K, V> {
private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;

Expand Down
Expand Up @@ -48,7 +48,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
private final boolean skipStore;

/** */
private final IgniteDataLoader.Updater<K, V> updater;
private final IgniteDataStreamer.Updater<K, V> updater;

/**
* @param ctx Context.
Expand All @@ -65,7 +65,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
Collection<Map.Entry<K, V>> col,
boolean ignoreDepOwnership,
boolean skipStore,
IgniteDataLoader.Updater<K, V> updater) {
IgniteDataStreamer.Updater<K, V> updater) {
this.ctx = ctx;
this.log = log;

Expand Down
Expand Up @@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {

/** Data loader. */
@GridToStringExclude
private IgniteDataLoaderImpl dataLdr;
private IgniteDataStreamerImpl dataLdr;

/**
* Default constructor for {@link Externalizable} support.
Expand All @@ -47,7 +47,7 @@ public GridDataLoaderFuture() {
* @param ctx Context.
* @param dataLdr Data loader.
*/
GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
GridDataLoaderFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
super(ctx);

assert dataLdr != null;
Expand Down
Expand Up @@ -41,7 +41,7 @@
*/
public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
/** Loaders map (access is not supposed to be highly concurrent). */
private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>();
private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();

/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
Expand All @@ -50,7 +50,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
private Thread flusher;

/** */
private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>();
private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();

/** Marshaller. */
private final Marshaller marsh;
Expand Down Expand Up @@ -80,7 +80,7 @@ public GridDataLoaderProcessor(GridKernalContext ctx) {
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
IgniteDataStreamerImpl<K, V> ldr = flushQ.take();

if (!busyLock.enterBusy())
return;
Expand Down Expand Up @@ -118,7 +118,7 @@ public GridDataLoaderProcessor(GridKernalContext ctx) {
U.interrupt(flusher);
U.join(flusher, log);

for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) {
for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
if (log.isDebugEnabled())
log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');

Expand All @@ -142,12 +142,12 @@ public GridDataLoaderProcessor(GridKernalContext ctx) {
* @param compact {@code true} if data loader should transfer data in compact format.
* @return Data loader.
*/
public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to create data loader (grid is stopping).");

try {
final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);

ldrs.add(ldr);

Expand All @@ -173,7 +173,7 @@ public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
public IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
return dataLoader(cacheName, true);
}

Expand Down Expand Up @@ -234,7 +234,7 @@ private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
}

Collection<Map.Entry<K, V>> col;
IgniteDataLoader.Updater<K, V> updater;
IgniteDataStreamer.Updater<K, V> updater;

try {
col = marsh.unmarshal(req.collectionBytes(), clsLdr);
Expand Down

0 comments on commit 9b33b65

Please sign in to comment.