Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 6, 2015
1 parent 7a997ad commit b1959a3
Show file tree
Hide file tree
Showing 8 changed files with 933 additions and 29 deletions.
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import javax.cache.*;
import java.util.*;

/**
*
*/
public class CacheEntryImpl0<K, V> implements Cache.Entry<K, V> {
/** */
private final Map.Entry<K, V> e;

/**
* @param e Entry.
*/
public CacheEntryImpl0(Map.Entry<K, V> e) {
this.e = e;
}

/** {@inheritDoc} */
@Override public K getKey() {
return e.getKey();
}

/** {@inheritDoc} */
@Override public V getValue() {
return e.getValue();
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> T unwrap(Class<T> cls) {
if (!cls.equals(getClass()))
throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);

return (T)this;
}

/** {@inheritDoc} */
public String toString() {
return "CacheEntry [key=" + getKey() + ", val=" + getValue() + ']';
}
}
Expand Up @@ -123,7 +123,8 @@ public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T>
/** Weak reference. */
private final WeakReference<WeakQueryFutureIterator<T>> weakRef;

CacheIteratorConverter<T, V> convert;
/** */
private final CacheIteratorConverter<T, V> convert;

/** Init flag. */
private boolean init;
Expand All @@ -136,6 +137,7 @@ public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T>

/**
* @param fut GridCacheQueryFuture to iterate.
* @param convert Converter.
*/
WeakQueryFutureIterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) {
this.fut = fut;
Expand Down
Expand Up @@ -663,7 +663,50 @@ public IgniteInternalFuture<Boolean> containsKeyAsync(K key, @Nullable IgnitePre

PeekModes modes = parsePeekModes(peekModes);

return null;
List<Iterator<Cache.Entry<K, V>>> its = new ArrayList<>();

if (ctx.isLocal()) {
modes.primary = true;
modes.backup = true;

if (modes.heap)
its.add(iterator(map.entries0().iterator(), !ctx.keepPortable()));
}
else if (modes.heap) {
if (modes.near && ctx.isNear())
its.add(ctx.near().nearEntriesIterator());

if (modes.primary || modes.backup) {
GridDhtCacheAdapter<K, V> cache = ctx.isNear() ? ctx.near().dht() : ctx.dht();

its.add(cache.localEntriesIterator(modes.primary, modes.backup));
}
}

// Swap and offheap are disabled for near cache.
if (modes.primary || modes.backup) {
long topVer = ctx.affinity().affinityTopologyVersion();

GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();

if (modes.swap)
its.add(swapMgr.swapIterator(modes.primary, modes.backup, topVer));

if (modes.offheap)
its.add(swapMgr.offheapIterator(modes.primary, modes.backup, topVer));
}

final Iterator<Cache.Entry<K, V>> it = F.flatIterators(its);

return new Iterable<Cache.Entry<K, V>>() {
@Override public Iterator<Cache.Entry<K, V>> iterator() {
return it;
}

public String toString() {
return "CacheLocalEntries []";
}
};
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -5273,16 +5316,113 @@ protected void validateCacheKeys(Iterable<?> keys) {
}
}

/**
* @param it Internal entry iterator.
* @param deserializePortable Deserialize portable flag.
* @return Public API iterator.
*/
protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<GridCacheEntryEx<K, V>> it,
final boolean deserializePortable) {
return new Iterator<Cache.Entry<K, V>>() {
{
advance();
}

/** */
private Cache.Entry<K, V> next;

@Override public boolean hasNext() {
return next != null;
}

@Override public Cache.Entry<K, V> next() {
if (next == null)
throw new NoSuchElementException();

Cache.Entry<K, V> e = next;

advance();

return e;
}

@Override public void remove() {
throw new UnsupportedOperationException();
}

/**
* Switch to next entry.
*/
private void advance() {
next = null;

while (it.hasNext()) {
GridCacheEntryEx<K, V> entry = it.next();

try {
V val = entry.innerGet(
null,
false,
false,
false,
true,
false,
false,
false,
null,
null,
null,
null,
null);

if (val == null)
continue;

K key = entry.key();

if (deserializePortable && ctx.portableEnabled()) {
key = (K)ctx.unwrapPortableIfNeeded(key, true);
val = (V)ctx.unwrapPortableIfNeeded(val, true);
}

next = new CacheEntryImpl<>(key, val);

break;
}
catch (IgniteCheckedException e) {
throw U.convertToCacheException(e);
}
catch (GridCacheEntryRemovedException ignore) {
// No-op.
}
catch (GridCacheFilterFailedException ignore) {
assert false;
}
}
}
};
}

/**
*
*/
private static class PeekModes {
/** */
boolean near;

/** */
boolean primary;

/** */
boolean backup;

/** */
boolean heap;

/** */
boolean offheap;

/** */
boolean swap;

/** {@inheritDoc} */
Expand Down

0 comments on commit b1959a3

Please sign in to comment.