Skip to content

Commit

Permalink
renaming wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Yakov Zhdanov committed Mar 11, 2015
1 parent 0c9f8eb commit b2c679e
Show file tree
Hide file tree
Showing 27 changed files with 146 additions and 153 deletions.
Expand Up @@ -66,7 +66,7 @@ public static void main(String[] args) throws IgniteException {
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) { try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) {
// Configure loader. // Configure loader.
stmr.perNodeBufferSize(1024); stmr.perNodeBufferSize(1024);
stmr.perNodeParallelStreamOperations(8); stmr.perNodeParallelOperations(8);


for (int i = 0; i < ENTRY_COUNT; i++) { for (int i = 0; i < ENTRY_COUNT; i++) {
stmr.addData(i, Integer.toString(i)); stmr.addData(i, Integer.toString(i));
Expand Down
Expand Up @@ -140,7 +140,7 @@ public static void main(String[] args) throws Exception {


MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);


gen.generateAndWrite(IgniteDataLoaderEntry.class); gen.generateAndWrite(DataStreamerEntry.class);


// gen.generateAndWrite(GridDistributedLockRequest.class); // gen.generateAndWrite(GridDistributedLockRequest.class);
// gen.generateAndWrite(GridDistributedLockResponse.class); // gen.generateAndWrite(GridDistributedLockResponse.class);
Expand Down
Expand Up @@ -52,7 +52,7 @@
* value. * value.
* </li> * </li>
* <li> * <li>
* {@link #perNodeParallelStreamOperations(int)} - sometimes data may be added * {@link #perNodeParallelOperations(int)} - sometimes data may be added
* to the data streamer via {@link #addData(Object, Object)} method faster than it can * to the data streamer via {@link #addData(Object, Object)} method faster than it can
* be put in cache. In this case, new buffered stream messages are sent to remote nodes * be put in cache. In this case, new buffered stream messages are sent to remote nodes
* before responses from previous ones are received. This could cause unlimited heap * before responses from previous ones are received. This could cause unlimited heap
Expand Down Expand Up @@ -102,15 +102,17 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
public String cacheName(); public String cacheName();


/** /**
* Gets flag value indicating that this data streamer assumes that there are no other concurrent updates to the cache. * Gets flag value indicating that this data streamer assumes that
* there are no other concurrent updates to the cache.
* Default is {@code false}. * Default is {@code false}.
* *
* @return Flag value. * @return Flag value.
*/ */
public boolean allowOverwrite(); public boolean allowOverwrite();


/** /**
* Sets flag indicating that this data streamer should assume that there are no other concurrent updates to the cache. * Sets flag indicating that this data streamer should assume
* that there are no other concurrent updates to the cache.
* Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method. * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
* Default is {@code false}. When this flag is set, updates will not be propagated to the cache store. * Default is {@code false}. When this flag is set, updates will not be propagated to the cache store.
* *
Expand Down Expand Up @@ -154,22 +156,22 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
public void perNodeBufferSize(int bufSize); public void perNodeBufferSize(int bufSize);


/** /**
* Gets maximum number of parallel stream operations for a single node. * Gets maximum number of parallel update operations for a single node.
* *
* @return Maximum number of parallel stream operations for a single node. * @return Maximum number of parallel stream operations for a single node.
*/ */
public int perNodeParallelStreamOperations(); public int perNodeParallelOperations();


/** /**
* Sets maximum number of parallel stream operations for a single node. * Sets maximum number of parallel update operations for a single node.
* <p> * <p>
* This method should be called prior to {@link #addData(Object, Object)} call. * This method should be called prior to {@link #addData(Object, Object)} call.
* <p> * <p>
* If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}. * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
* *
* @param parallelOps Maximum number of parallel stream operations for a single node. * @param parallelOps Maximum number of parallel stream operations for a single node.
*/ */
public void perNodeParallelStreamOperations(int parallelOps); public void perNodeParallelOperations(int parallelOps);


/** /**
* Gets automatic flush frequency. Essentially, this is the time after which the * Gets automatic flush frequency. Essentially, this is the time after which the
Expand Down Expand Up @@ -284,7 +286,7 @@ public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, I
* @throws IgniteInterruptedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
* @throws IllegalStateException If grid has been concurrently stopped or * @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer. * {@link #close(boolean)} has already been called on streamer.
* @see #allowOverwrite() * @see #allowOverwrite()
*/ */
public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException, public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
IllegalStateException; IllegalStateException;
Expand All @@ -298,7 +300,7 @@ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, Ig
* the streamer. * the streamer.
* <p> * <p>
* Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default) * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
* then data streamer will not overwrite existing cache entries for better performance * then data streamer will not overwrite existing cache entries for better performance
* (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true}) * (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
* *
* @param entries Collection of entries to be streamed. * @param entries Collection of entries to be streamed.
Expand All @@ -318,7 +320,7 @@ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, Ig
* the streamer. * the streamer.
* <p> * <p>
* Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default) * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
* then data streamer will not overwrite existing cache entries for better performance * then data streamer will not overwrite existing cache entries for better performance
* (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true}) * (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
* *
* @param entries Map to be streamed. * @param entries Map to be streamed.
Expand Down
Expand Up @@ -247,7 +247,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* *
* @return Data streamer processor. * @return Data streamer processor.
*/ */
public <K, V> IgniteDataStreamerProcessor<K, V> dataStream(); public <K, V> DataStreamerProcessor<K, V> dataStream();


/** /**
* Gets file system processor. * Gets file system processor.
Expand Down
Expand Up @@ -200,7 +200,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable


/** */ /** */
@GridToStringInclude @GridToStringInclude
private IgniteDataStreamerProcessor dataLdrProc; private DataStreamerProcessor dataLdrProc;


/** */ /** */
@GridToStringInclude @GridToStringInclude
Expand Down Expand Up @@ -457,8 +457,8 @@ else if (comp instanceof GridAffinityProcessor)
affProc = (GridAffinityProcessor)comp; affProc = (GridAffinityProcessor)comp;
else if (comp instanceof GridRestProcessor) else if (comp instanceof GridRestProcessor)
restProc = (GridRestProcessor)comp; restProc = (GridRestProcessor)comp;
else if (comp instanceof IgniteDataStreamerProcessor) else if (comp instanceof DataStreamerProcessor)
dataLdrProc = (IgniteDataStreamerProcessor)comp; dataLdrProc = (DataStreamerProcessor)comp;
else if (comp instanceof IgfsProcessorAdapter) else if (comp instanceof IgfsProcessorAdapter)
igfsProc = (IgfsProcessorAdapter)comp; igfsProc = (IgfsProcessorAdapter)comp;
else if (comp instanceof GridOffHeapProcessor) else if (comp instanceof GridOffHeapProcessor)
Expand Down Expand Up @@ -671,8 +671,8 @@ public void addHelper(Object helper) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override public <K, V> IgniteDataStreamerProcessor<K, V> dataStream() { @Override public <K, V> DataStreamerProcessor<K, V> dataStream() {
return (IgniteDataStreamerProcessor<K, V>)dataLdrProc; return (DataStreamerProcessor<K, V>)dataLdrProc;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -69,7 +69,7 @@ public enum GridTopic {
TOPIC_IGFS, TOPIC_IGFS,


/** */ /** */
TOPIC_DATALOAD, TOPIC_DATASTREAM,


/** */ /** */
TOPIC_STREAM, TOPIC_STREAM,
Expand Down
Expand Up @@ -755,7 +755,7 @@ public void start(final IgniteConfiguration cfg,
startProcessor(new GridTaskProcessor(ctx)); startProcessor(new GridTaskProcessor(ctx));
startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
startProcessor(new GridRestProcessor(ctx)); startProcessor(new GridRestProcessor(ctx));
startProcessor(new IgniteDataStreamerProcessor(ctx)); startProcessor(new DataStreamerProcessor(ctx));
startProcessor(new GridStreamProcessor(ctx)); startProcessor(new GridStreamProcessor(ctx));
startProcessor((GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration()))); startProcessor((GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
startProcessor(new GridContinuousProcessor(ctx)); startProcessor(new GridContinuousProcessor(ctx));
Expand Down
Expand Up @@ -385,12 +385,12 @@ public GridIoMessageFactory(MessageFactory[] ext) {
break; break;


case 62: case 62:
msg = new GridDataLoadRequest(); msg = new DataStreamerRequest();


break; break;


case 63: case 63:
msg = new GridDataLoadResponse(); msg = new DataStreamerResponse();


break; break;


Expand Down Expand Up @@ -525,7 +525,7 @@ public GridIoMessageFactory(MessageFactory[] ext) {
break; break;


case 95: case 95:
msg = new IgniteDataLoaderEntry(); msg = new DataStreamerEntry();


break; break;


Expand Down
Expand Up @@ -3732,7 +3732,7 @@ protected void checkJta() throws IgniteCheckedException {
final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
IgniteDataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex()); DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());


try { try {
ldr.updater(new IgniteDrDataStreamerCacheUpdater()); ldr.updater(new IgniteDrDataStreamerCacheUpdater());
Expand Down Expand Up @@ -3883,18 +3883,18 @@ public IgniteInternalFuture<?> loadAll(
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException { private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
try (final IgniteDataStreamerImpl<KeyCacheObject, CacheObject> ldr = try (final DataStreamerImpl<KeyCacheObject, CacheObject> ldr =
ctx.kernalContext().<KeyCacheObject, CacheObject>dataStream().dataStreamer(ctx.namex())) { ctx.kernalContext().<KeyCacheObject, CacheObject>dataStream().dataStreamer(ctx.namex())) {
ldr.allowOverwrite(true); ldr.allowOverwrite(true);
ldr.skipStore(true); ldr.skipStore(true);


final Collection<IgniteDataLoaderEntry> col = new ArrayList<>(ldr.perNodeBufferSize()); final Collection<DataStreamerEntry> col = new ArrayList<>(ldr.perNodeBufferSize());


Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys);


ctx.store().loadAllFromStore(null, keys0, new CIX2<KeyCacheObject, Object>() { ctx.store().loadAllFromStore(null, keys0, new CIX2<KeyCacheObject, Object>() {
@Override public void applyx(KeyCacheObject key, Object val) { @Override public void applyx(KeyCacheObject key, Object val) {
col.add(new IgniteDataLoaderEntry(key, ctx.toCacheObject(val))); col.add(new DataStreamerEntry(key, ctx.toCacheObject(val)));


if (col.size() == ldr.perNodeBufferSize()) { if (col.size() == ldr.perNodeBufferSize()) {
ldr.addDataInternal(col); ldr.addDataInternal(col);
Expand Down Expand Up @@ -3926,7 +3926,7 @@ public void localLoad(Collection<? extends K> keys,
Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys);


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
IgniteDataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex()); DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());


try { try {
ldr.updater(new IgniteDrDataStreamerCacheUpdater()); ldr.updater(new IgniteDrDataStreamerCacheUpdater());
Expand Down Expand Up @@ -5882,7 +5882,7 @@ private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, GridCac
final Collection<GridCacheRawVersionedEntry> col; final Collection<GridCacheRawVersionedEntry> col;


/** */ /** */
final IgniteDataStreamerImpl<K, V> ldr; final DataStreamerImpl<K, V> ldr;


/** */ /** */
final ExpiryPolicy plc; final ExpiryPolicy plc;
Expand All @@ -5893,7 +5893,7 @@ private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, GridCac
* @param plc Optional expiry policy. * @param plc Optional expiry policy.
*/ */
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
IgniteDataStreamerImpl<K, V> ldr, DataStreamerImpl<K, V> ldr,
@Nullable ExpiryPolicy plc) { @Nullable ExpiryPolicy plc) {
this.p = p; this.p = p;
this.ldr = ldr; this.ldr = ldr;
Expand Down
Expand Up @@ -283,11 +283,11 @@ private GlobalRemoveAllCallable(String cacheName, long topVer) {
else else
dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;


try (IgniteDataStreamerImpl<KeyCacheObject, Object> dataLdr = try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
(IgniteDataStreamerImpl)ignite.dataStreamer(cacheName)) { (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0); ((DataStreamerImpl)dataLdr).maxRemapCount(0);


dataLdr.updater(IgniteDataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); dataLdr.updater(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());


for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
if (!locPart.isEmpty() && locPart.primary(topVer)) { if (!locPart.isEmpty() && locPart.primary(topVer)) {
Expand Down
Expand Up @@ -32,7 +32,7 @@
/** /**
* Raw versioned entry. * Raw versioned entry.
*/ */
public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry implements public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implements
GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable { GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;
Expand Down
Expand Up @@ -27,7 +27,7 @@
/** /**
* Bundled factory for cache updaters. * Bundled factory for cache updaters.
*/ */
public class IgniteDataStreamerCacheUpdaters { public class DataStreamerCacheUpdaters {
/** */ /** */
private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual(); private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();


Expand Down
Expand Up @@ -28,7 +28,7 @@
/** /**
* *
*/ */
public class IgniteDataLoaderEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message { public class DataStreamerEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand All @@ -43,15 +43,15 @@ public class IgniteDataLoaderEntry implements Map.Entry<KeyCacheObject, CacheObj
/** /**
* *
*/ */
public IgniteDataLoaderEntry() { public DataStreamerEntry() {
// No-op. // No-op.
} }


/** /**
* @param key Key. * @param key Key.
* @param val Value. * @param val Value.
*/ */
public IgniteDataLoaderEntry(KeyCacheObject key, CacheObject val) { public DataStreamerEntry(KeyCacheObject key, CacheObject val) {
this.key = key; this.key = key;
this.val = val; this.val = val;
} }
Expand Down Expand Up @@ -165,6 +165,6 @@ public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(IgniteDataLoaderEntry.class, this); return S.toString(DataStreamerEntry.class, this);
} }
} }
Expand Up @@ -18,34 +18,25 @@
package org.apache.ignite.internal.processors.datastream; package org.apache.ignite.internal.processors.datastream;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;


/** /**
* Data streamer future. * Data streamer future.
*/ */
class IgniteDataStreamerFuture extends GridFutureAdapter<Object> { class DataStreamerFuture extends GridFutureAdapter<Object> {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** Data loader. */ /** Data loader. */
@GridToStringExclude @GridToStringExclude
private IgniteDataStreamerImpl dataLdr; private DataStreamerImpl dataLdr;


/** /**
* Default constructor for {@link java.io.Externalizable} support.
*/
public IgniteDataStreamerFuture() {
// No-op.
}

/**
* @param ctx Context.
* @param dataLdr Data streamer. * @param dataLdr Data streamer.
*/ */
IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) { DataStreamerFuture(DataStreamerImpl dataLdr) {
assert dataLdr != null; assert dataLdr != null;


this.dataLdr = dataLdr; this.dataLdr = dataLdr;
Expand All @@ -64,6 +55,6 @@ public IgniteDataStreamerFuture() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(IgniteDataStreamerFuture.class, this, super.toString()); return S.toString(DataStreamerFuture.class, this, super.toString());
} }
} }

0 comments on commit b2c679e

Please sign in to comment.