Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 16, 2015
1 parent 9f5b287 commit 9cd987c
Show file tree
Hide file tree
Showing 36 changed files with 309 additions and 114 deletions.
Expand Up @@ -40,6 +40,10 @@ public class CacheDummyPersonStore extends CacheStoreAdapter<Long, Person> {
@CacheNameResource
private String cacheName;

/** */
@CacheStoreSessionResource
private CacheStoreSession ses;

/** Dummy database. */
private Map<Long, Person> dummyDB = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -106,4 +110,11 @@ public class CacheDummyPersonStore extends CacheStoreAdapter<Long, Person> {

return ses != null ? ses.transaction() : null;
}

/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
}
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.cache.store.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.Transaction;
import org.hibernate.*;
import org.hibernate.cfg.*;
Expand All @@ -42,6 +43,10 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
/** Session factory. */
private SessionFactory sesFactory;

/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -275,4 +280,11 @@ private Session session(@Nullable Transaction tx) {

return ses != null ? ses.transaction() : null;
}

/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.cache.store.*;
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;

Expand All @@ -38,6 +39,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** Transaction metadata attribute name. */
private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION";

/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;

/**
* Constructor.
*
Expand Down Expand Up @@ -69,7 +74,7 @@ private void prepareDb() throws IgniteException {
@Override public void txEnd(boolean commit) {
Transaction tx = transaction();

Map<String, Connection> props = session().properties();
Map<String, Connection> props = ses.properties();

try (Connection conn = props.remove(ATTR_NAME)) {
if (conn != null) {
Expand Down Expand Up @@ -230,7 +235,7 @@ private void prepareDb() throws IgniteException {
*/
private Connection connection(@Nullable Transaction tx) throws SQLException {
if (tx != null) {
Map<Object, Object> props = session().properties();
Map<Object, Object> props = ses.properties();

Connection conn = (Connection)props.get(ATTR_NAME);

Expand Down Expand Up @@ -298,8 +303,6 @@ private Person person(Long id, String firstName, String lastName) {
* @return Current transaction.
*/
@Nullable private Transaction transaction() {
CacheStoreSession ses = session();

return ses != null ? ses.transaction() : null;
}
}
Expand Up @@ -68,7 +68,7 @@
* @param <V> Value type.
* @param <I> Input type.
*/
public abstract class CacheLoadOnlyStoreAdapter<K, V, I> extends CacheStore<K, V> {
public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K, V> {
/**
* Default batch size (number of records read with {@link #inputIterator(Object...)}
* and then submitted to internal pool at a time).
Expand Down
Expand Up @@ -17,10 +17,8 @@

package org.apache.ignite.cache.store;

import org.apache.ignite.*;
import org.apache.ignite.cache.store.jdbc.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;

Expand Down Expand Up @@ -117,14 +115,7 @@
*
* @see CacheStoreSession
*/
public abstract class CacheStore<K, V> implements CacheLoader<K, V>, CacheWriter<K, V> {
/** */
private CacheStoreSession ses;

/** */
@IgniteInstanceResource
private Ignite ignite;

public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> {
/**
* Loads all values from underlying persistent storage. Note that keys are not
* passed, so it is up to implementation to figure out what to load. This method
Expand All @@ -144,7 +135,7 @@ public abstract class CacheStore<K, V> implements CacheLoader<K, V>, CacheWriter
* {@link org.apache.ignite.cache.GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method.
* @throws CacheLoaderException If loading failed.
*/
public abstract void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException;
public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException;

/**
* Tells store to commit or rollback a transaction depending on the value of the {@code 'commit'}
Expand All @@ -155,21 +146,5 @@ public abstract class CacheStore<K, V> implements CacheLoader<K, V>, CacheWriter
* may bring cache transaction into {@link TransactionState#UNKNOWN} which will
* consequently cause all transacted entries to be invalidated.
*/
public abstract void txEnd(boolean commit) throws CacheWriterException;

/**
* Gets session for current cache operation. Returns {@code null} if store is used with atomic cache.
*
* @return Session for current cache operation.
*/
public CacheStoreSession session() {
return ses;
}

/**
* @return {@link Ignite} instance.
*/
public Ignite ignite() {
return ignite;
}
public void txEnd(boolean commit) throws CacheWriterException;
}
Expand Up @@ -36,7 +36,7 @@
* implementation because it is essentially up to the user to invoke it with
* specific arguments.
*/
public abstract class CacheStoreAdapter<K, V> extends CacheStore<K, V> {
public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> {
/**
* Default empty implementation. This method needs to be overridden only if
* {@link org.apache.ignite.cache.GridCache#loadCache(IgniteBiPredicate, long, Object...)} method
Expand Down
Expand Up @@ -17,12 +17,15 @@

package org.apache.ignite.cache.store;

import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;

import java.util.*;

/**
* Session for the cache store operations.
*
* @see CacheStoreSessionResource
*/
public interface CacheStoreSession {
/**
Expand Down
Expand Up @@ -76,7 +76,7 @@
* <p>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
*/
public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> implements LifecycleAware {
public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, LifecycleAware {
/** Max attempt write count. */
protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;

Expand All @@ -92,6 +92,14 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl
/** Empty column value. */
protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null };

/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;

/** Auto injected ignite instance. */
@IgniteInstanceResource
private Ignite ignite;

/** Auto-injected logger instance. */
@LoggerResource
protected IgniteLogger log;
Expand Down Expand Up @@ -1230,6 +1238,20 @@ public void setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThresho
this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
}

/**
* @return Ignite instance.
*/
protected Ignite ignite() {
return ignite;
}

/**
* @return Store session.
*/
protected CacheStoreSession session() {
return ses;
}

/**
* Entry mapping description.
*/
Expand Down
Expand Up @@ -142,6 +142,10 @@ public class CacheJdbcBlobStore<K, V> extends CacheStoreAdapter<K, V> {
/** Flag for schema initialization. */
private boolean initSchema = true;

/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;

/** Log. */
@LoggerResource
private IgniteLogger log;
Expand Down Expand Up @@ -582,4 +586,11 @@ protected <X> X fromBytes(byte[] bytes) throws IgniteCheckedException {

return ses != null ? ses.transaction() : null;
}

/**
* @return Store session.
*/
protected CacheStoreSession session() {
return ses;
}
}
Expand Up @@ -41,7 +41,7 @@
* properties are optional, so users should only change what they need.
*/
@SuppressWarnings("RedundantFieldInitialization")
public class CacheConfiguration extends MutableConfiguration {
public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default size of preload thread pool. */
public static final int DFLT_PRELOAD_THREAD_POOL_SIZE = 2;

Expand Down Expand Up @@ -686,9 +686,9 @@ public void setEvictMaxOverflowRatio(float evictMaxOverflowRatio) {

/**
* Gets eviction filter to specify which entries should not be evicted
* (except explicit evict by calling {@link Entry#evict()}).
* If {@link org.apache.ignite.cache.eviction.CacheEvictionFilter#evictAllowed(Entry)} method returns
* {@code false} then eviction policy will not be notified and entry will
* (except explicit evict by calling {@link IgniteCache#localEvict(Collection)}).
* If {@link org.apache.ignite.cache.eviction.CacheEvictionFilter#evictAllowed(javax.cache.Cache.Entry)} method
* returns {@code false} then eviction policy will not be notified and entry will
* never be evicted.
* <p>
* If not provided, any entry may be evicted depending on
Expand All @@ -697,7 +697,7 @@ public void setEvictMaxOverflowRatio(float evictMaxOverflowRatio) {
* @return Eviction filter or {@code null}.
*/
@SuppressWarnings("unchecked")
public <K, V> CacheEvictionFilter<K, V> getEvictionFilter() {
public CacheEvictionFilter<K, V> getEvictionFilter() {
return (CacheEvictionFilter<K, V>)evictFilter;
}

Expand All @@ -706,7 +706,7 @@ public <K, V> CacheEvictionFilter<K, V> getEvictionFilter() {
*
* @param evictFilter Eviction filter.
*/
public <K, V> void setEvictionFilter(CacheEvictionFilter<K, V> evictFilter) {
public void setEvictionFilter(CacheEvictionFilter<K, V> evictFilter) {
this.evictFilter = evictFilter;
}

Expand All @@ -717,7 +717,7 @@ public <K, V> void setEvictionFilter(CacheEvictionFilter<K, V> evictFilter) {
* When not set, default value is {@link #DFLT_EAGER_TTL}.
* <p>
* <b>Note</b> that this flag only matters for entries expiring based on
* {@link Entry#timeToLive()} value and should not be confused with entry
* {@link javax.cache.expiry.ExpiryPolicy} and should not be confused with entry
* evictions based on configured {@link org.apache.ignite.cache.eviction.CacheEvictionPolicy}.
*
* @return Flag indicating whether Ignite will eagerly remove expired entries.
Expand Down
Expand Up @@ -33,7 +33,7 @@
/**
* Cache store wrapper that ensures that there will be no more that one thread loading value from underlying store.
*/
public class CacheStoreBalancingWrapper<K, V> extends CacheStore<K, V> {
public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
/** */
public static final int DFLT_LOAD_ALL_THRESHOLD = 5;

Expand Down
Expand Up @@ -29,7 +29,7 @@
/**
* Store implementation wrapping {@link CacheLoader} and {@link CacheWriter}.
*/
class GridCacheLoaderWriterStore<K, V> extends CacheStore<K, V> implements LifecycleAware {
class GridCacheLoaderWriterStore<K, V> implements CacheStore<K, V>, LifecycleAware {
/** */
private final CacheLoader<K, V> ldr;

Expand Down
Expand Up @@ -587,7 +587,7 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();

for (int i = 0; i < cfgs.length; i++) {
CacheConfiguration cfg = new CacheConfiguration(cfgs[i]);
CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);

// Initialize defaults.
initialize(cfg);
Expand Down

0 comments on commit 9cd987c

Please sign in to comment.