Skip to content

Commit

Permalink
Optimization for single key cache 'get' operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Nov 19, 2015
1 parent ba1d563 commit 1f10306
Show file tree
Hide file tree
Showing 56 changed files with 2,908 additions and 632 deletions.
Expand Up @@ -83,6 +83,8 @@
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
Expand Down Expand Up @@ -696,6 +698,16 @@ public GridIoMessageFactory(MessageFactory[] ext) {


break; break;


case 116:
msg = new GridNearSingleGetRequest();

break;

case 117:
msg = new GridNearSingleGetResponse();

break;

// [-3..114] - this // [-3..114] - this
// [120..123] - DR // [120..123] - DR
// [-4..-22] - SQL // [-4..-22] - SQL
Expand Down
Expand Up @@ -599,27 +599,19 @@ public void onKernalStop() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) { @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K key) {
A.notNull(key, "key"); A.notNull(key, "key");


return getAllAsync( return (IgniteInternalFuture)getAsync(
Collections.singletonList(key), key,
/*force primary*/false, /*force primary*/false,
/*skip tx*/false, /*skip tx*/false,
/*subj id*/null, /*subj id*/null,
/*task name*/null, /*task name*/null,
/*deserialize portable*/false, /*deserialize portable*/false,
/*skip values*/true, /*skip values*/true,
/*can remap*/true /*can remap*/true
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { );
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> map = fut.get();

assert map.isEmpty() || map.size() == 1 : map.size();

return map.isEmpty() ? false : map.values().iterator().next() != null;
}
});
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -1472,6 +1464,52 @@ private Map<K, V> interceptGet(@Nullable Collection<? extends K> keys, Map<K, V>
return res; return res;
} }


/**
* @param key Key.
* @param forcePrimary Force primary.
* @param skipTx Skip tx.
* @param subjId Subj Id.
* @param taskName Task name.
* @param deserializePortable Deserialize portable.
* @param skipVals Skip values.
* @param canRemap Can remap flag.
* @return Future for the get operation.
*/
protected IgniteInternalFuture<V> getAsync(
final K key,
boolean forcePrimary,
boolean skipTx,
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
final boolean skipVals,
boolean canRemap
) {
return getAllAsync(Collections.singletonList(key),
forcePrimary,
skipTx,
subjId,
taskName,
deserializePortable,
skipVals,
canRemap).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();

assert map.isEmpty() || map.size() == 1 : map.size();

if (skipVals) {
Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);

return (V)(val);
}

return map.get(key);
}
});
}

/** /**
* @param keys Keys. * @param keys Keys.
* @param forcePrimary Force primary. * @param forcePrimary Force primary.
Expand Down Expand Up @@ -1524,7 +1562,7 @@ protected IgniteInternalFuture<Map<K, V>> getAllAsync(
* @return Future for the get operation. * @return Future for the get operation.
* @see GridCacheAdapter#getAllAsync(Collection) * @see GridCacheAdapter#getAllAsync(Collection)
*/ */
public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
boolean readThrough, boolean readThrough,
boolean checkTx, boolean checkTx,
@Nullable final UUID subjId, @Nullable final UUID subjId,
Expand Down Expand Up @@ -1605,11 +1643,20 @@ public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable f


final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();


final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled();

Map<KeyCacheObject, GridCacheVersion> misses = null; Map<KeyCacheObject, GridCacheVersion> misses = null;


for (KeyCacheObject key : keys) { for (KeyCacheObject key : keys) {
while (true) { while (true) {
GridCacheEntryEx entry = entryEx(key); GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);

if (entry == null) {
if (!skipVals && ctx.config().isStatisticsEnabled())
ctx.cache().metrics0().onRead(false);

break;
}


try { try {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null, T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
Expand Down Expand Up @@ -4389,11 +4436,7 @@ public Set<K> primaryKeySet(@Nullable CacheEntryPredicate... filter) {
*/ */
@Nullable public V get(K key, boolean deserializePortable) @Nullable public V get(K key, boolean deserializePortable)
throws IgniteCheckedException { throws IgniteCheckedException {
Map<K, V> map = getAllAsync(F.asList(key), deserializePortable).get(); return getAsync(key, deserializePortable).get();

assert map.isEmpty() || map.size() == 1 : map.size();

return map.get(key);
} }


/** /**
Expand All @@ -4409,16 +4452,16 @@ public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializePo
return new GridFinishedFuture<>(e); return new GridFinishedFuture<>(e);
} }


return getAllAsync(Collections.singletonList(key), deserializePortable).chain( String taskName = ctx.kernalContext().job().currentTaskName();
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
@Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();

assert map.isEmpty() || map.size() == 1 : map.size();


return map.get(key); return getAsync(key,
} !ctx.config().isReadFromBackup(),
}); /*skip tx*/false,
null,
taskName,
deserializePortable,
false,
/*can remap*/true);
} }


/** /**
Expand All @@ -4445,10 +4488,10 @@ public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extend
return getAllAsync(keys, return getAllAsync(keys,
!ctx.config().isReadFromBackup(), !ctx.config().isReadFromBackup(),
/*skip tx*/false, /*skip tx*/false,
null, /*subject id*/null,
taskName, taskName,
deserializePortable, deserializePortable,
false, /*skip vals*/false,
/*can remap*/true); /*can remap*/true);
} }


Expand Down
Expand Up @@ -20,11 +20,17 @@
import java.util.Collection; import java.util.Collection;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;


/** /**
* Update future for atomic cache. * Update future for atomic cache.
*/ */
public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
* @return Future version.
*/
public GridCacheVersion version();

/** /**
* Gets future that will be completed when it is safe when update is finished on the given version of topology. * Gets future that will be completed when it is safe when update is finished on the given version of topology.
* *
Expand Down
Expand Up @@ -17,11 +17,8 @@


package org.apache.ignite.internal.processors.cache; package org.apache.ignite.internal.processors.cache;


import java.util.Collection;
import java.util.UUID; import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lang.IgniteUuid;


/** /**
Expand All @@ -33,16 +30,6 @@ public interface GridCacheFuture<R> extends IgniteInternalFuture<R> {
*/ */
public IgniteUuid futureId(); public IgniteUuid futureId();


/**
* @return Future version.
*/
public GridCacheVersion version();

/**
* @return Involved nodes.
*/
public Collection<? extends ClusterNode> nodes();

/** /**
* Callback for when node left. * Callback for when node left.
* *
Expand Down
Expand Up @@ -34,22 +34,24 @@
import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
Expand Down Expand Up @@ -437,7 +439,7 @@ private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInC
case 50: { case 50: {
GridNearGetResponse res = (GridNearGetResponse)msg; GridNearGetResponse res = (GridNearGetResponse)msg;


GridCacheFuture fut = ctx.mvcc().future(res.version(), res.futureId()); CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());


if (fut == null) { if (fut == null) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand All @@ -448,10 +450,7 @@ private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInC


res.error(res.classError()); res.error(res.classError());


if (fut instanceof GridNearGetFuture) fut.onResult(nodeId, res);
((GridNearGetFuture)fut).onResult(nodeId, res);
else
((GridPartitionedGetFuture)fut).onResult(nodeId, res);
} }


break; break;
Expand Down Expand Up @@ -521,6 +520,43 @@ private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInC


break; break;


case 116: {
GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;

GridNearSingleGetResponse res = new GridNearSingleGetResponse(
ctx.cacheId(),
req.futureId(),
req.topologyVersion(),
null,
false,
req.deployInfo() != null);

res.error(req.classError());

sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
}

break;

case 117: {
GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;

GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());

if (fut == null) {
if (log.isDebugEnabled())
log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');

return;
}

res.error(res.classError());

fut.onResult(nodeId, res);
}

break;

default: default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]"); + msg + "]");
Expand Down
Expand Up @@ -499,15 +499,21 @@ protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheOb


int size = col.size(); int size = col.size();


for (int i = 0 ; i < size; i++) { for (int i = 0 ; i < size; i++)
CacheObject obj = col.get(i); prepareMarshalCacheObject(col.get(i), ctx);
}


if (obj != null) { /**
obj.prepareMarshal(ctx.cacheObjectContext()); * @param obj Object.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
protected final void prepareMarshalCacheObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
if (obj != null) {
obj.prepareMarshal(ctx.cacheObjectContext());


if (addDepInfo) if (addDepInfo)
prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx); prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
}
} }
} }


Expand Down
Expand Up @@ -17,10 +17,17 @@


package org.apache.ignite.internal.processors.cache; package org.apache.ignite.internal.processors.cache;


import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;

/** /**
* Distributed future aware of MVCC locking. * Distributed future aware of MVCC locking.
*/ */
public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> { public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> {
/**
* @return Future version.
*/
public GridCacheVersion version();

/** /**
* @param entry Entry which received new owner. * @param entry Entry which received new owner.
* @param owner Owner. * @param owner Owner.
Expand Down

0 comments on commit 1f10306

Please sign in to comment.