Skip to content

Commit

Permalink
IGNITE-9655-merge - Data loader implementation with allowOverwrite flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Feb 13, 2015
1 parent 270246d commit ff2da20
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 37 deletions.
Expand Up @@ -67,7 +67,6 @@ public static void main(String[] args) throws IgniteException {
// Configure loader. // Configure loader.
ldr.perNodeBufferSize(1024); ldr.perNodeBufferSize(1024);
ldr.perNodeParallelLoadOperations(8); ldr.perNodeParallelLoadOperations(8);
ldr.isolated(true);


for (int i = 0; i < ENTRY_COUNT; i++) { for (int i = 0; i < ENTRY_COUNT; i++) {
ldr.addData(i, Integer.toString(i)); ldr.addData(i, Integer.toString(i));
Expand Down
22 changes: 11 additions & 11 deletions modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
Expand Up @@ -70,11 +70,11 @@
* changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}). * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
* </li> * </li>
* <li> * <li>
* {@link #isolated(boolean)} - defines if data loader will assume that there are no other concurrent * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
* updates and allow data loader choose most optimal concurrent implementation. * updates and allow data loader choose most optimal concurrent implementation.
* </li> * </li>
* <li> * <li>
* {@link #updater(org.apache.ignite.IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries. * {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries.
* It allows to provide user-defined custom logic to update the cache in the most effective and flexible way. * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
* </li> * </li>
* <li> * <li>
Expand Down Expand Up @@ -103,21 +103,21 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable {


/** /**
* Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache. * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
* Default is {@code false}. * Default is {@code true}.
* *
* @return Flag value. * @return Flag value.
*/ */
public boolean isolated(); public boolean allowOverwrite();


/** /**
* Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache. * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
* Should not be used when custom cache updater set using {@link #updater(org.apache.ignite.IgniteDataLoader.Updater)} method. * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method.
* Default is {@code false}. * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
* *
* @param isolated Flag value. * @param allowOverwrite Flag value.
* @throws IgniteException If failed. * @throws IgniteException If failed.
*/ */
public void isolated(boolean isolated) throws IgniteException; public void allowOverwrite(boolean allowOverwrite) throws IgniteException;


/** /**
* Gets flag indicating that write-through behavior should be disabled for data loading. * Gets flag indicating that write-through behavior should be disabled for data loading.
Expand Down Expand Up @@ -343,7 +343,7 @@ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, Ig
* *
* @param cancel {@code True} to cancel ongoing loading operations. * @param cancel {@code True} to cancel ongoing loading operations.
* @throws IgniteException If failed to map key to node. * @throws IgniteException If failed to map key to node.
* @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted. * @throws IgniteInterruptedException If thread has been interrupted.
*/ */
public void close(boolean cancel) throws IgniteException, IgniteInterruptedException; public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;


Expand All @@ -359,7 +359,7 @@ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, Ig
@Override public void close() throws IgniteException, IgniteInterruptedException; @Override public void close() throws IgniteException, IgniteInterruptedException;


/** /**
* Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#isolated(boolean)} * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)}
* property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
* performance custom user-defined implementation may help. * performance custom user-defined implementation may help.
* <p> * <p>
Expand All @@ -372,7 +372,7 @@ interface Updater<K, V> extends Serializable {
* *
* @param cache Cache. * @param cache Cache.
* @param entries Collection of entries. * @param entries Collection of entries.
* @throws org.apache.ignite.IgniteException If failed. * @throws IgniteException If failed.
*/ */
public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException; public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
} }
Expand Down
Expand Up @@ -173,6 +173,29 @@ public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<?
return map != null ? F.first(map.keySet()) : null; return map != null ? F.first(map.keySet()) : null;
} }


/**
* Map single key to primary and backup nodes.
*
* @param cacheName Cache name.
* @param key Key to map.
* @return Affinity nodes, primary first.
* @throws IgniteCheckedException If failed.
*/
public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException {
A.notNull(key, "key");

ClusterNode loc = ctx.discovery().localNode();

if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL)
return Collections.singletonList(loc);

long topVer = ctx.discovery().topologyVersion();

AffinityInfo affInfo = affinityCache(cacheName, topVer);

return primaryAndBackups(affInfo, key);
}

/** /**
* Maps single key to a node on default cache. * Maps single key to a node on default cache.
* *
Expand Down Expand Up @@ -213,7 +236,7 @@ public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<?
* @return Cache affinity. * @return Cache affinity.
*/ */
public <K> CacheAffinityProxy<K> affinityProxy(String cacheName) { public <K> CacheAffinityProxy<K> affinityProxy(String cacheName) {
return new CacheAffinityProxy(cacheName); return new CacheAffinityProxy<>(cacheName);
} }


/** /**
Expand Down Expand Up @@ -458,6 +481,17 @@ private <K> ClusterNode primary(AffinityInfo aff, K key) throws IgniteCheckedExc
return nodes.iterator().next(); return nodes.iterator().next();
} }


/**
* @param aff Affinity function.
* @param key Key to check.
* @return Primary and backup nodes.
*/
private <K> List<ClusterNode> primaryAndBackups(AffinityInfo aff, K key) {
int part = aff.affFunc.partition(aff.mapper.affinityKey(key));

return aff.assignment.get(part);
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void printMemoryStats() { @Override public void printMemoryStats() {
X.println(">>>"); X.println(">>>");
Expand Down
Expand Up @@ -18,15 +18,18 @@
package org.apache.ignite.internal.processors.dataload; package org.apache.ignite.internal.processors.dataload;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*; import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*; import org.apache.ignite.events.*;
import org.apache.ignite.internal.*; import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.dr.*;
import org.apache.ignite.internal.processors.portable.*; import org.apache.ignite.internal.processors.portable.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
Expand All @@ -51,9 +54,13 @@
/** /**
* Data loader implementation. * Data loader implementation.
*/ */
@SuppressWarnings("unchecked")
public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed { public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed {
/** Isolated updater. */
private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();

/** Cache updater. */ /** Cache updater. */
private Updater<K, V> updater = GridDataLoadCacheUpdaters.individual(); private Updater<K, V> updater = ISOLATED_UPDATER;


/** */ /** */
private byte[] updaterBytes; private byte[] updaterBytes;
Expand Down Expand Up @@ -278,27 +285,21 @@ public IgniteInternalFuture<?> internalFuture() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean isolated() { @Override public boolean allowOverwrite() {
return updater != GridDataLoadCacheUpdaters.individual(); return updater != ISOLATED_UPDATER;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void isolated(boolean isolated) { @Override public void allowOverwrite(boolean allow) {
if (isolated()) if (allow == allowOverwrite())
return; return;


ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes()); ClusterNode node = F.first(ctx.grid().forCacheNodes(cacheName).nodes());


if (node == null) if (node == null)
throw new IgniteException("Failed to get node for cache: " + cacheName); throw new IgniteException("Failed to get node for cache: " + cacheName);


GridCacheAttributes a = U.cacheAttributes(node, cacheName); updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;

assert a != null;

updater = a.atomicityMode() == CacheAtomicityMode.ATOMIC ?
GridDataLoadCacheUpdaters.<K, V>batched() :
GridDataLoadCacheUpdaters.<K, V>groupLocked();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -444,7 +445,7 @@ private void load0(
boolean initPda = ctx.deploy().enabled() && jobPda == null; boolean initPda = ctx.deploy().enabled() && jobPda == null;


for (Map.Entry<K, V> entry : entries) { for (Map.Entry<K, V> entry : entries) {
ClusterNode node; List<ClusterNode> nodes;


try { try {
K key = entry.getKey(); K key = entry.getKey();
Expand All @@ -457,28 +458,30 @@ private void load0(
initPda = false; initPda = false;
} }


node = ctx.affinity().mapKeyToNode(cacheName, key); nodes = nodes(key);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
resFut.onDone(e); resFut.onDone(e);


return; return;
} }


if (node == null) { if (F.isEmpty(nodes)) {
resFut.onDone(new ClusterTopologyCheckedException("Failed to map key to node " + resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
"(no nodes with cache found in topology) [infos=" + entries.size() + "(no nodes with cache found in topology) [infos=" + entries.size() +
", cacheName=" + cacheName + ']')); ", cacheName=" + cacheName + ']'));


return; return;
} }


Collection<Map.Entry<K, V>> col = mappings.get(node); for (ClusterNode node : nodes) {
Collection<Map.Entry<K, V>> col = mappings.get(node);


if (col == null) if (col == null)
mappings.put(node, col = new ArrayList<>()); mappings.put(node, col = new ArrayList<>());


col.add(entry); col.add(entry);
}
} }


for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) { for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
Expand Down Expand Up @@ -551,6 +554,18 @@ private void load0(
} }
} }


/**
* @param key Key to map.
* @return Nodes to send requests to.
* @throws IgniteCheckedException If failed.
*/
private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
GridAffinityProcessor aff = ctx.affinity();

return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
Collections.singletonList(aff.mapKeyToNode(cacheName, key));
}

/** /**
* Performs flush. * Performs flush.
* *
Expand Down Expand Up @@ -1365,4 +1380,48 @@ public Entries0() {
} }
} }
} }

/**
* Isolated updater which only loads entry initial value.
*/
private static class IsolatedUpdater<K, V> implements Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;

/** {@inheritDoc} */
@Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;

GridCacheAdapter<K, V> internalCache = proxy.context().cache();

if (internalCache.isNear())
internalCache = internalCache.context().near().dht();

GridCacheContext<K, V> cctx = internalCache.context();

long topVer = cctx.affinity().affinityTopologyVersion();

GridCacheVersion ver = cctx.versions().next(topVer);

for (Map.Entry<K, V> e : entries) {
try {
GridCacheEntryEx<K, V> entry = internalCache.entryEx(e.getKey(), topVer);

entry.unswap(true, false);

entry.initialValue(e.getValue(), null, ver, 0, 0, false, topVer, GridDrType.DR_LOAD);

cctx.evicts().touch(entry, topVer);
}
catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
// No-op.
}
catch (IgniteCheckedException ex) {
IgniteLogger log = cache.unwrap(Ignite.class).log();

U.error(log, "Failed to set initial value for cache entry: " + e, ex);
}
}
}
}
} }

0 comments on commit ff2da20

Please sign in to comment.