Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…cubator-ignite into ignite-96

Conflicts:
	modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
  • Loading branch information
Yakov Zhdanov committed Feb 6, 2015
1 parent 8090f07 commit 5df2b65
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 105 deletions.
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.internal.processors.cache;

import org.jetbrains.annotations.*;

import javax.cache.*;

/**
Expand All @@ -29,6 +31,9 @@ public class CacheEntryImpl<K, V> implements Cache.Entry<K, V> {
/** */
private final V val;

/** Version. */
private Object ver;

/**
* @param key Key.
* @param val Value.
Expand All @@ -48,6 +53,20 @@ public CacheEntryImpl(K key, V val) {
return val;
}

/**
* @return Version.
*/
@Nullable public Object version() {
return ver;
}

/**
* @param ver Version.
*/
public void version(Object ver) {
this.ver = ver;
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> T unwrap(Class<T> cls) {
Expand Down
Expand Up @@ -1304,7 +1304,8 @@ public Set<GridCacheEntryEx<K, V>> allEntries() {

/** {@inheritDoc} */
@Override public Set<Entry<K, V>> primaryEntrySetx(IgnitePredicate<Entry<K, V>>... filter) {
return map.entriesx(F.and(filter, F.<K, V>cachePrimary()));
//return map.entriesx(F.and(filter, F.<K, V>cachePrimary()));
return null; // TODO ignite-96
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3715,9 +3716,24 @@ public CacheMetricsImpl metrics0() {
if (keyCheck)
validateCacheKey(key);

GridCacheEntryEx<K, V> entry = peekEx(key);
try {
GridCacheEntryEx<K, V> e = entry0(key, ctx.discovery().topologyVersion(), false, false);

if (e == null)
return false;

// Delegate to near if dht.
if (e.isDht() && CU.isNearEnabled(ctx)) {
GridCache<K, V> near = ctx.isDht() ? ctx.dht().near() : ctx.near();

return near.isLockedByThread(key) || e.lockedByThread();
}

return entry != null && entry.wrap(false).isLockedByThread();
return e.lockedByThread();
}
catch (GridCacheEntryRemovedException ignore) {
return false;
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -4923,7 +4939,8 @@ public Set<Entry<K, V>> entrySet(@Nullable Collection<? extends K> keys,
*/
public Set<Entry<K, V>> primaryEntrySet(
@Nullable IgnitePredicate<Entry<K, V>>... filter) {
return map.entries(F.and(filter, F.<K, V>cachePrimary()));
// return map.entries(F.and(filter, F.<K, V>cachePrimary()));
return null; // TODO ignite-96
}

/**
Expand All @@ -4939,15 +4956,18 @@ public Set<K> keySet(@Nullable IgnitePredicate<Entry<K, V>>... filter) {
* @return Primary key set.
*/
public Set<K> primaryKeySet(@Nullable IgnitePredicate<Entry<K, V>>... filter) {
return map.keySet(F.and(filter, F.<K, V>cachePrimary()));
// return map.keySet(F.and(filter, F.<K, V>cachePrimary()));
return null;
// TODO ignite-96
}

/**
* @param filter Filters to evaluate.
* @return Primary values.
*/
public Collection<V> primaryValues(@Nullable IgnitePredicate<Entry<K, V>>... filter) {
return map.values(F.and(filter, F.<K, V>cachePrimary()));
// return map.values(F.and(filter, F.<K, V>cachePrimary()));
return null;
}

/**
Expand Down
Expand Up @@ -1237,7 +1237,7 @@ private IgnitePredicate<Entry<K, V>>[] versionFilter(final EvictionInfo info) {
// then we should not evict entry.
return cctx.vararg(new P1<Entry<K, V>>() {
@Override public boolean apply(Entry<K, V> e) {
GridCacheVersion ver = (GridCacheVersion)e.version();
GridCacheVersion ver = (GridCacheVersion)((CacheEntryImpl)e).version();

return info.version().equals(ver) && F.isAll(info.filter());
}
Expand Down
Expand Up @@ -75,12 +75,14 @@ public GridCacheIterator(Iterable<? extends Entry<K, V>> c,
@Override public void remove() {
it.remove();

try {
// try {
// Back remove operation by actual cache.
cur.removex();
}
catch (IgniteCheckedException e) {
throw new GridClosureException(e);
}
// cur.removex();
// TODO ignite-96

// }
// catch (IgniteCheckedException e) {
// throw new GridClosureException(e);
// }
}
}
Expand Up @@ -85,12 +85,14 @@ public GridCacheKeySet(GridCacheContext<K, V> ctx, Collection<? extends Entry<K,

map.remove(o);

try {
e.removex();
}
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
}
// // TODO ignite-96

// try {
// e.removex();
// }
// catch (IgniteCheckedException ex) {
// throw new IgniteException(ex);
// }

return true;
}
Expand Down
Expand Up @@ -167,7 +167,8 @@ public GridCacheMapAdapter(CacheProjection<K, V> prj) {

/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "RedundantCast"})
@Override public Set<Entry<K, V>> entrySet() {
return (Set<Entry<K, V>>)(Set<? extends Entry<K, V>>)prj.entrySet();
@Override public Set<Map.Entry<K, V>> entrySet() {
// return (Set<Map.Entry<K, V>>)(Set<? extends Map.Entry<K, V>>)prj.entrySet();
return null; // TODO ignite-96
}
}
Expand Up @@ -3695,7 +3695,11 @@ protected V saveValueForIndexUnlocked() throws IgniteCheckedException {

@Override public Entry<K, V> wrap(boolean prjAware) {
try {
return new CacheEntryImpl<>(key, rawGetOrUnmarshal(true));
CacheEntryImpl<K, V> entry = new CacheEntryImpl<>(key, rawGetOrUnmarshal(true));

entry.version(ver);

return entry;
}
catch (IgniteCheckedException e) {
throw new RuntimeException("Fixme"); //TODO ignite-96
Expand Down
Expand Up @@ -135,12 +135,13 @@ private void advance() {
if (F.isAll(e, filter) && F.eq(o, e.getValue())) {
it.remove();

try {
e.removex();
}
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
}
// TODO ignite-96
// try {
// e.removex();
// }
// catch (IgniteCheckedException ex) {
// throw new IgniteException(ex);
// }

rmv = true;
}
Expand Down
Expand Up @@ -868,7 +868,8 @@ private PartitionEntrySet(int partId) {

Entry<K, V> entry = (Entry<K, V>)o;

return partId == entry.partition() && F.eq(entry.peek(), peek(entry.getKey()));
return partId == ctx.affinity().partition(entry.getKey()) &&
F.eq(entry.getValue(), peek(entry.getKey()));
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -958,7 +959,8 @@ private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry<K, V>> partI
if (last == null)
throw new IllegalStateException();

ctx.cache().remove(last.getKey(), last.getValue());
// TODO ignite-96
// ctx.cache().remove(last.getKey(), last.getValue());
}

/**
Expand Down
Expand Up @@ -434,7 +434,9 @@ public Set<K> nearKeySet(@Nullable IgnitePredicate<Entry<K, V>> filter) {

/** {@inheritDoc} */
@Override public Collection<V> primaryValues(@Nullable IgnitePredicate<Entry<K, V>>... filter) {
return new GridCacheValueCollection<>(ctx, entrySet(filter), ctx.vararg(F.<K, V>cachePrimary()));
return null;
// return new GridCacheValueCollection<>(ctx, entrySet(filter), ctx.vararg(F.<K, V>cachePrimary()));
// TODO ignite-96
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -471,13 +473,6 @@ public boolean evictNearOnly(K key, @Nullable IgnitePredicate<Entry<K, V>>[] fil
return super.compact(key, filter) | dht().compact(key, filter);
}

/** {@inheritDoc} */
@Override public Entry<K, V> entry(K key) {
// We don't try wrap entry from near or dht cache.
// Created object will be wrapped once some method is called.
return new GridPartitionedCacheEntryImpl<>(ctx.projectionPerCall(), ctx, key, null);
}

/**
* Peeks only near cache without looking into DHT cache.
*
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;

import javax.cache.Cache.*;
import java.util.*;
import java.util.concurrent.*;

Expand Down
Expand Up @@ -192,7 +192,9 @@ public GridCacheContinuousQueryHandler() {
@Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> e, boolean recordEvt) {
GridCacheContext<K, V> cctx = cacheContext(ctx);

if (cctx.isReplicated() && !skipPrimaryCheck && !e.primary())
if (cctx.isReplicated() &&
!skipPrimaryCheck &&
!cctx.affinity().primary(cctx.localNode(), e.getKey(), cctx.topology().topologyVersion()))
return;

boolean notify;
Expand Down
Expand Up @@ -394,12 +394,7 @@ public class GridFunc {
private static final IgnitePredicate CACHE_ENTRY_NO_GET_VAL = new IgnitePredicate() {
@SuppressWarnings({"unchecked"})
@Override public boolean apply(Object o) {
try {
return ((Entry)o).get() == null;
}
catch (IgniteCheckedException e) {
throw new GridClosureException(e);
}
return ((Entry)o).getValue() == null;
}

@Override public String toString() {
Expand All @@ -411,7 +406,7 @@ public class GridFunc {
private static final IgnitePredicate CACHE_ENTRY_HAS_PEEK_VAL = new IgnitePredicate() {
@SuppressWarnings({"unchecked"})
@Override public boolean apply(Object o) {
return ((Entry)o).peek() != null;
return ((Entry)o).getValue() != null;
}

@Override public String toString() {
Expand All @@ -423,38 +418,14 @@ public class GridFunc {
private static final IgnitePredicate CACHE_ENTRY_NO_PEEK_VAL = new IgnitePredicate() {
@SuppressWarnings({"unchecked"})
@Override public boolean apply(Object o) {
return ((Entry)o).peek() == null;
return ((Entry)o).getValue() == null;
}

@Override public String toString() {
return "Cache entry no-peek-value predicate.";
}
};

/** */
private static final IgnitePredicate CACHE_ENTRY_PRIMARY = new IgnitePredicate() {
@SuppressWarnings({"unchecked"})
@Override public boolean apply(Object o) {
return ((Entry)o).primary();
}

@Override public String toString() {
return "Cache primary entry predicate.";
}
};

/** */
private static final IgnitePredicate CACHE_ENTRY_BACKUP = new IgnitePredicate() {
@SuppressWarnings({"unchecked"})
@Override public boolean apply(Object o) {
return ((Entry)o).backup();
}

@Override public String toString() {
return "Cache backup entry predicate.";
}
};

/** */
private static final IgniteClosure<ClusterNode, UUID> NODE2ID = new IgniteClosure<ClusterNode, UUID>() {
@Override public UUID apply(ClusterNode n) {
Expand Down Expand Up @@ -7210,7 +7181,7 @@ public static boolean contains(Integer[] arr, Integer val) {
* @return Predicate that accepts {@link Entry} value and compares its value
* to the given value.
*/
public static <K, V> IgnitePredicate<Entry<K, V>> mapValue(@Nullable final V val) {
public static <K, V> IgnitePredicate<Map.Entry<K, V>> mapValue(@Nullable final V val) {
return new P1<Map.Entry<K, V>>() {
@Override public boolean apply(Map.Entry<K, V> e) {
return e.getValue().equals(val);
Expand All @@ -7228,7 +7199,7 @@ public static <K, V> IgnitePredicate<Entry<K, V>> mapValue(@Nullable final V val
* @return Predicate that accepts {@code Map.Entry} value and compares its key
* to the given value.
*/
public static <K, V> IgnitePredicate<Entry<K, V>> mapKey(@Nullable final K key) {
public static <K, V> IgnitePredicate<Map.Entry<K, V>> mapKey(@Nullable final K key) {
return new P1<Map.Entry<K, V>>() {
@Override public boolean apply(Map.Entry<K, V> e) {
return e.getKey().equals(key);
Expand Down Expand Up @@ -7835,34 +7806,6 @@ public static <K, V> IgnitePredicate<Entry<K, V>> cacheNoPeekValue() {
return (IgnitePredicate<Entry<K, V>>)CACHE_ENTRY_NO_PEEK_VAL;
}

/**
* Gets predicate which returns {@code true} if {@link org.apache.ignite.cache.Entry#primary()}
* method returns {@code true}.
*
* @param <K> Cache key type.
* @param <V> Cache value type.
* @return Predicate which returns {@code true} if {@link org.apache.ignite.cache.Entry#primary()}
* method returns {@code true}.
*/
@SuppressWarnings({"unchecked"})
public static <K, V> IgnitePredicate<Entry<K, V>> cachePrimary() {
return (IgnitePredicate<Entry<K, V>>)CACHE_ENTRY_PRIMARY;
}

/**
* Gets predicate which returns {@code true} if {@link org.apache.ignite.cache.Entry#primary()}
* method returns {@code false}.
*
* @param <K> Cache key type.
* @param <V> Cache value type.
* @return Predicate which returns {@code true} if {@link org.apache.ignite.cache.Entry#primary()}
* method returns {@code false}.
*/
@SuppressWarnings({"unchecked"})
public static <K, V> IgnitePredicate<Entry<K, V>> cacheBackup() {
return (IgnitePredicate<Entry<K, V>>)CACHE_ENTRY_BACKUP;
}

/**
* Gets predicate which returns true if {@link org.apache.ignite.cache.Entry#get()}
* method returns value that is contained in given collection. Note that if collection
Expand Down Expand Up @@ -7925,7 +7868,7 @@ public static <K, V> IgnitePredicate<Entry<K, V>> cacheContainsPeek(
return isEmpty(vals) ? F.<Entry<K, V>>alwaysFalse() :
new IgnitePredicate<Entry<K, V>>() {
@Override public boolean apply(Entry<K, V> e) {
V v = e.peek();
V v = e.getValue();

assert vals != null;

Expand Down

0 comments on commit 5df2b65

Please sign in to comment.