Skip to content

Commit

Permalink
ignite-gg-10882 - WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharuk committed Jun 29, 2016
1 parent 715824c commit a48f10a
Show file tree
Hide file tree
Showing 35 changed files with 465 additions and 868 deletions.
55 changes: 36 additions & 19 deletions modules/core/src/main/java/org/apache/ignite/IgniteCache.java
Expand Up @@ -67,8 +67,8 @@
* Main entry point for all <b>Data Grid APIs.</b> You can get a named cache by calling {@link Ignite#cache(String)} * Main entry point for all <b>Data Grid APIs.</b> You can get a named cache by calling {@link Ignite#cache(String)}
* method. * method.
* <h1 class="header">Functionality</h1> * <h1 class="header">Functionality</h1>
* This API extends {@link javax.cache.Cache} API which contains {@code JCache (JSR107)} cache functionality * This API extends {@link Cache} API which contains {@code JCache (JSR107)} cache functionality
* and documentation. In addition to {@link javax.cache.Cache} functionality this API provides: * and documentation. In addition to {@link Cache} functionality this API provides:
* <ul> * <ul>
* <li>Ability to perform basic atomic Map-like operations available on {@code JCache} API.</li> * <li>Ability to perform basic atomic Map-like operations available on {@code JCache} API.</li>
* <li>Ability to bulk load cache via {@link #loadCache(IgniteBiPredicate, Object...)} method. * <li>Ability to bulk load cache via {@link #loadCache(IgniteBiPredicate, Object...)} method.
Expand All @@ -95,7 +95,7 @@
* @param <K> Cache key type. * @param <K> Cache key type.
* @param <V> Cache value type. * @param <V> Cache value type.
*/ */
public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncSupport { public interface IgniteCache<K, V> extends Cache<K, V>, IgniteAsyncSupport {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteCache<K, V> withAsync(); @Override public IgniteCache<K, V> withAsync();


Expand Down Expand Up @@ -136,6 +136,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
*/ */
public IgniteCache<K, V> withNoRetries(); public IgniteCache<K, V> withNoRetries();


/**
* Gets an instance of {@code IgniteCache} that will be allowed to execute cache operations (read, write)
* regardless of partition loss policy.
*
* @return Cache without partition loss protection.
*/
public IgniteCache<K, V> withPartitionRecover();

/** /**
* Returns cache that will operate with binary objects. * Returns cache that will operate with binary objects.
* <p> * <p>
Expand Down Expand Up @@ -225,7 +233,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* previous value. * previous value.
* <p> * <p>
* If write-through is enabled, the stored value will be persisted to {@link CacheStore} * If write-through is enabled, the stored value will be persisted to {@link CacheStore}
* via {@link CacheStore#write(javax.cache.Cache.Entry)} method. * via {@link CacheStore#write(Cache.Entry)} method.
* <h2 class="header">Transactions</h2> * <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction * This method is transactional and will enlist the entry into ongoing transaction
* if there is one. * if there is one.
Expand Down Expand Up @@ -320,7 +328,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
public void localEvict(Collection<? extends K> keys); public void localEvict(Collection<? extends K> keys);


/** /**
* Peeks at in-memory cached value using default optinal peek mode. * Peeks at in-memory cached value using default optional peek mode.
* <p> * <p>
* This method will not load value from any persistent store or from a remote node. * This method will not load value from any persistent store or from a remote node.
* <h2 class="header">Transactions</h2> * <h2 class="header">Transactions</h2>
Expand Down Expand Up @@ -689,10 +697,10 @@ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments); @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments);


/** /**
* Invokes an {@link CacheEntryProcessor} against the {@link javax.cache.Cache.Entry} specified by * Invokes an {@link CacheEntryProcessor} against the {@link Cache.Entry} specified by
* the provided key. If an {@link javax.cache.Cache.Entry} does not exist for the specified key, * the provided key. If an {@link Cache.Entry} does not exist for the specified key,
* an attempt is made to load it (if a loader is configured) or a surrogate * an attempt is made to load it (if a loader is configured) or a surrogate
* {@link javax.cache.Cache.Entry}, consisting of the key with a null value is used instead. * {@link Cache.Entry}, consisting of the key with a null value is used instead.
* <p> * <p>
* An instance of entry processor must be stateless as it may be invoked multiple times on primary and * An instance of entry processor must be stateless as it may be invoked multiple times on primary and
* backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always
Expand Down Expand Up @@ -731,11 +739,11 @@ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends
EntryProcessor<K, V, T> entryProcessor, Object... args); EntryProcessor<K, V, T> entryProcessor, Object... args);


/** /**
* Invokes an {@link CacheEntryProcessor} against the set of {@link javax.cache.Cache.Entry}s * Invokes an {@link CacheEntryProcessor} against the set of {@link Cache.Entry}s
* specified by the set of keys. * specified by the set of keys.
* <p> * <p>
* If an {@link javax.cache.Cache.Entry} does not exist for the specified key, an attempt is made * If an {@link Cache.Entry} does not exist for the specified key, an attempt is made
* to load it (if a loader is configured) or a surrogate {@link javax.cache.Cache.Entry}, * to load it (if a loader is configured) or a surrogate {@link Cache.Entry},
* consisting of the key and a value of null is provided. * consisting of the key and a value of null is provided.
* <p> * <p>
* The order that the entries for the keys are processed is undefined. * The order that the entries for the keys are processed is undefined.
Expand All @@ -748,7 +756,7 @@ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends
* {@link Map} of {@link EntryProcessorResult}s, one result per key. Should the * {@link Map} of {@link EntryProcessorResult}s, one result per key. Should the
* {@link CacheEntryProcessor} or Caching implementation throw an exception, the * {@link CacheEntryProcessor} or Caching implementation throw an exception, the
* exception is wrapped and re-thrown when a call to * exception is wrapped and re-thrown when a call to
* {@link javax.cache.processor.EntryProcessorResult#get()} is made. * {@link EntryProcessorResult#get()} is made.
* <p> * <p>
* An instance of entry processor must be stateless as it may be invoked multiple times on primary and * An instance of entry processor must be stateless as it may be invoked multiple times on primary and
* backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always
Expand Down Expand Up @@ -783,7 +791,7 @@ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
* For distributed caches, if called on clients, stops client cache, if called on a server node, * For distributed caches, if called on clients, stops client cache, if called on a server node,
* just closes this cache instance and does not destroy cache data. * just closes this cache instance and does not destroy cache data.
* <p> * <p>
* After cache instance is closed another {@link IgniteCache} instance for the same * After cache instance is closed another {@code IgniteCache} instance for the same
* cache can be created using {@link Ignite#cache(String)} method. * cache can be created using {@link Ignite#cache(String)} method.
*/ */
@Override public void close(); @Override public void close();
Expand Down Expand Up @@ -867,15 +875,24 @@ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
public IgniteFuture<?> active(boolean active); public IgniteFuture<?> active(boolean active);


/** /**
* Lost partitions. * Gets a collection of lost partition IDs.
*
* @return Lost paritions. * @return Lost paritions.
*/ */
public Set<Integer> lostPartitions(); public Collection<Integer> lostPartitions();


/** /**
* Unmarks partitions as lost. * Clear data in lost partitions. This method is neither transactional nor atomic. A user must make
* @param partitions Partitions to recover. * sure there are no concurrent updates to the lost partitions while this method is in progress.
* @return Future that will be done when state is changed. *
* @return Future that will be done when data is cleared.
*/
public IgniteFuture<?> clearLostPartitions();

/**
* Clears partition's lost state and moves cache to a normal mode.
*
* @return Future that will be done when partition state is reset.
*/ */
public IgniteFuture<?> recoverPartitions(Set<Integer> partitions); public IgniteFuture<?> resetLostPartitions();
} }
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cache;

import org.apache.ignite.IgniteCache;

/**
* Partition loss policy. Defines how cache will behave in a case when one or more partitions are lost
* because of a node(s) failure.
* <p>
* All <code>*_SAFE</code> policies prevent a user from interaction with partial data in lost partitions until
* {@link IgniteCache#resetLostPartitions()} method is called. <code>*_ALL</code> policies allow working with
* partial data in lost partitions.
* <p>
* <code>READ_ONLY_*</code> and <code>READ_WRITE_*</code> policies do not automatically change partition state
* and thus do not change rebalancing assignments for such partitions.
*/
public enum PartitionLossPolicy {
/**
* All writes to the cache will be failed with an exception, reads will only be allowed for keys in
* non-lost partitions. Reads from lost partitions will be failed with an exception.
*/
READ_ONLY_SAFE,

/**
* All writes to the cache will be failed with an exception. All reads will proceed as if all partitions
* were in a consistent state. The result of reading from a lost partition is undefined and may be different
* on different nodes in the cluster.
*/
READ_ONLY_ALL,

/**
* All reads and writes will be allowed for keys in valid partitions. All reads and writes for keys
* in lost partitions will be failed with an exception.
*/
READ_WRITE_SAFE,

/**
* All reads and writes will proceed as if all partitions were in a consistent state. The result of reading
* from a lost partition is undefined and may be different on different nodes in the cluster.
*/
READ_WRITE_ALL,

/**
* If a partition is lost, clear all intermediate partition data and reset it's state.
*/
IGNORE_SAFE,

/**
* If partition is lost, reset it's state and do not clear intermediate data. The result of reading from
* a previously lost and not cleared partition is undefined and may be different on different nodes in the
* cluster.
*/
IGNORE_ALL
}
Expand Up @@ -144,7 +144,7 @@ public PageMemoryNoStoreImpl(
DirectMemoryProvider directMemoryProvider, DirectMemoryProvider directMemoryProvider,
GridCacheSharedContext<?, ?> sharedCtx, GridCacheSharedContext<?, ?> sharedCtx,
int pageSize int pageSize
) { ) {
assert log != null || sharedCtx != null; assert log != null || sharedCtx != null;


this.log = sharedCtx != null ? sharedCtx.logger(PageMemoryNoStoreImpl.class) : log; this.log = sharedCtx != null ? sharedCtx.logger(PageMemoryNoStoreImpl.class) : log;
Expand Down
Expand Up @@ -429,7 +429,7 @@ else if (!req.clientStartOnly()) {
else else
initStartedCacheOnCoordinator(fut, cacheId); initStartedCacheOnCoordinator(fut, cacheId);
} }
else if (req.modify()) { else if (req.activation()) {
if (!crd || !lateAffAssign) { if (!crd || !lateAffAssign) {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId); GridCacheContext cacheCtx = cctx.cacheContext(cacheId);


Expand Down

0 comments on commit a48f10a

Please sign in to comment.