Skip to content

Commit

Permalink
Merge branches 'ignite-394' and 'sprint-2' of https://git-wip-us.apac…
Browse files Browse the repository at this point in the history
…he.org/repos/asf/incubator-ignite into ignite-394

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
	modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
  • Loading branch information
Yakov Zhdanov committed Mar 11, 2015
1 parent 00fd3c3 commit 0c9f8eb
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 20 deletions.
Expand Up @@ -18,7 +18,7 @@
package org.apache.ignite.codegen; package org.apache.ignite.codegen;


import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.extensions.communication.*;
Expand Down
Expand Up @@ -3732,10 +3732,10 @@ protected void checkJta() throws IgniteCheckedException {
final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataStream().dataLoader(ctx.namex()); IgniteDataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());


try { try {
ldr.updater(new IgniteDrDataStreamerCacheUpdater<K, V>()); ldr.updater(new IgniteDrDataStreamerCacheUpdater());


LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc);


Expand Down Expand Up @@ -3883,7 +3883,8 @@ public IgniteInternalFuture<?> loadAll(
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException { private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { try (final IgniteDataStreamerImpl<KeyCacheObject, CacheObject> ldr =
ctx.kernalContext().<KeyCacheObject, CacheObject>dataStream().dataStreamer(ctx.namex())) {
ldr.allowOverwrite(true); ldr.allowOverwrite(true);
ldr.skipStore(true); ldr.skipStore(true);


Expand Down Expand Up @@ -3925,10 +3926,10 @@ public void localLoad(Collection<? extends K> keys,
Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys);


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); IgniteDataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());


try { try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); ldr.updater(new IgniteDrDataStreamerCacheUpdater());


LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0);


Expand Down
Expand Up @@ -20,7 +20,7 @@
import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.marshaller.*; import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.extensions.communication.*;
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.cacheobject.*; import org.apache.ignite.internal.processors.cacheobject.*;
import org.apache.ignite.internal.processors.dataload.*;
import org.apache.ignite.internal.processors.dr.*; import org.apache.ignite.internal.processors.dr.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
Expand Down Expand Up @@ -1354,7 +1353,7 @@ private DataStreamerPda(Object... objs) {
* Isolated updater which only loads entry initial value. * Isolated updater which only loads entry initial value.
*/ */
private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>, private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>,
GridDataLoadCacheUpdaters.InternalUpdater { IgniteDataStreamerCacheUpdaters.InternalUpdater {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.dataload.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
Expand Down Expand Up @@ -142,7 +141,7 @@ public IgniteDataStreamerProcessor(GridKernalContext ctx) {
* @param cacheName Cache name ({@code null} for default cache). * @param cacheName Cache name ({@code null} for default cache).
* @return Data loader. * @return Data loader.
*/ */
public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName) { public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
if (!busyLock.enterBusy()) if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to create data streamer (grid is stopping)."); throw new IllegalStateException("Failed to create data streamer (grid is stopping).");


Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.dataload.*;
import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;
Expand Down Expand Up @@ -146,6 +145,6 @@ class IgniteDataStreamerUpdateJob implements GridPlainCallable<Object> {
* @return {@code True} if need to unwrap internal entries. * @return {@code True} if need to unwrap internal entries.
*/ */
private boolean unwrapEntries() { private boolean unwrapEntries() {
return !(updater instanceof GridDataLoadCacheUpdaters.InternalUpdater); return !(updater instanceof IgniteDataStreamerCacheUpdaters.InternalUpdater);
} }
} }
Expand Up @@ -23,7 +23,6 @@
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.dataload.*;
import org.apache.ignite.internal.processors.datastream.*; import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
Expand All @@ -33,7 +32,7 @@
/** /**
* Data center replication cache updater for data streamer. * Data center replication cache updater for data streamer.
*/ */
public class IgniteDrDataStreamerCacheUpdater<K, V> implements IgniteDataStreamer.Updater<K, V>, public class IgniteDrDataStreamerCacheUpdater implements IgniteDataStreamer.Updater<KeyCacheObject, CacheObject>,
IgniteDataStreamerCacheUpdaters.InternalUpdater { IgniteDataStreamerCacheUpdaters.InternalUpdater {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;
Expand Down
Expand Up @@ -21,8 +21,6 @@
import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
Expand Down Expand Up @@ -127,7 +125,7 @@ public void testAddDataFromMap() throws Exception {


Ignite g0 = grid(0); Ignite g0 = grid(0);


IgniteDataStreamerImpl<Integer, String> dataLdr = g0.dataStreamer(null); IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);


Map<Integer, String> map = U.newHashMap(KEYS_COUNT); Map<Integer, String> map = U.newHashMap(KEYS_COUNT);


Expand Down
Expand Up @@ -882,10 +882,10 @@ public void testCustomUserUpdater() throws Exception {
startGrid(2); startGrid(2);
startGrid(3); startGrid(3);


try (IgniteDataLoader<String, TestObject> ldr = ignite.dataLoader(null)) { try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) {
ldr.allowOverwrite(true); ldr.allowOverwrite(true);


ldr.updater(new IgniteDataLoader.Updater<String, TestObject>() { ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() {
@Override public void update(IgniteCache<String, TestObject> cache, @Override public void update(IgniteCache<String, TestObject> cache,
Collection<Map.Entry<String, TestObject>> entries) { Collection<Map.Entry<String, TestObject>> entries) {
for (Map.Entry<String, TestObject> e : entries) { for (Map.Entry<String, TestObject> e : entries) {
Expand Down

0 comments on commit 0c9f8eb

Please sign in to comment.