Skip to content

Commit

Permalink
IGNITE-541 - Dynamic caching in SpringCacheManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Kulichenko committed Mar 20, 2015
1 parent dbfaf0a commit be25aa3
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 755 deletions.
Expand Up @@ -18,52 +18,28 @@
package org.apache.ignite.cache.spring; package org.apache.ignite.cache.spring;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
import org.springframework.cache.*; import org.springframework.cache.*;
import org.springframework.cache.support.*; import org.springframework.cache.support.*;


import java.io.*;

/** /**
* Spring cache implementation. * Spring cache implementation.
*/ */
class SpringCache implements Cache, Serializable { class SpringCache implements Cache {
/** */
private String name;

/** */
private Ignite ignite;

/** */
private CacheProjection<Object, Object> cache;

/** */ /** */
private IgniteClosure<Object, Object> keyFactory; private final IgniteCache<Object, Object> cache;


/** /**
* @param name Cache name.
* @param ignite Ignite instance.
* @param cache Cache. * @param cache Cache.
* @param keyFactory Key factory.
*/ */
SpringCache(String name, SpringCache(IgniteCache<Object, Object> cache) {
Ignite ignite,
CacheProjection<?, ?> cache,
IgniteClosure<Object, Object> keyFactory)
{
assert cache != null; assert cache != null;


this.name = name; this.cache = cache;
this.ignite = ignite;
this.cache = (CacheProjection<Object, Object>)cache;
this.keyFactory = keyFactory != null ? keyFactory : F.identity();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String getName() { @Override public String getName() {
return name; return cache.getName();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -73,116 +49,42 @@ class SpringCache implements Cache, Serializable {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Cache.ValueWrapper get(Object key) { @Override public Cache.ValueWrapper get(Object key) {
try { Object val = cache.get(key);
Object val = cache.get(keyFactory.apply(key));

return val != null ? new SimpleValueWrapper(val) : null;
return val != null ? new SimpleValueWrapper(val) : null;
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to get value from cache [cacheName=" + cache.name() +
", key=" + key + ']', e);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> T get(Object key, Class<T> type) { @Override public <T> T get(Object key, Class<T> type) {
try { Object val = cache.get(key);
Object val = cache.get(keyFactory.apply(key));

if (val != null && type != null && !type.isInstance(val))
if (val != null && type != null && !type.isInstance(val)) throw new IllegalStateException("Cached value is not of required type [cacheName=" + cache.getName() +
throw new IllegalStateException("Cached value is not of required type [cacheName=" + cache.name() + ", key=" + key + ", val=" + val + ", requiredType=" + type + ']');
", key=" + key + ", val=" + val + ", requiredType=" + type + ']');

return (T)val;
return (T)val;
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to get value from cache [cacheName=" + cache.name() +
", key=" + key + ']', e);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void put(Object key, Object val) { @Override public void put(Object key, Object val) {
try { cache.put(key, val);
cache.putx(keyFactory.apply(key), val);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put value to cache [cacheName=" + cache.name() +
", key=" + key + ", val=" + val + ']', e);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public ValueWrapper putIfAbsent(Object key, Object val) { @Override public ValueWrapper putIfAbsent(Object key, Object val) {
try { Object old = cache.putIfAbsent(key, val);
Object old = cache.putIfAbsent(keyFactory.apply(key), val);

return old != null ? new SimpleValueWrapper(old) : null;
return old != null ? new SimpleValueWrapper(old) : null;
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put value to cache [cacheName=" + cache.name() +
", key=" + key + ", val=" + val + ']', e);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void evict(Object key) { @Override public void evict(Object key) {
try { cache.remove(key);
cache.removex(keyFactory.apply(key));
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to remove value from cache [cacheName=" + cache.name() +
", key=" + key + ']', e);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void clear() { @Override public void clear() {
try { cache.removeAll();
ignite.compute(cache.gridProjection()).broadcast(new ClearClosure(cache));
}
catch (IgniteException e) {
throw new IgniteException("Failed to clear cache [cacheName=" + cache.name() + ']', e);
}
}

/**
* Closure that removes all entries from cache.
*/
private static class ClearClosure extends CAX implements Externalizable {
/** */
private static final long serialVersionUID = 0L;

/** Cache projection. */
private CacheProjection<Object, Object> cache;

/**
* For {@link Externalizable}.
*/
public ClearClosure() {
// No-op.
}

/**
* @param cache Cache projection.
*/
private ClearClosure(CacheProjection<Object, Object> cache) {
this.cache = cache;
}

/** {@inheritDoc} */
@Override public void applyx() throws IgniteCheckedException {
cache.localRemoveAll();
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(cache);
}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cache = (CacheProjection<Object, Object>)in.readObject();
}
} }
} }
Expand Up @@ -19,14 +19,12 @@


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*; import org.jdk8.backport.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
import org.springframework.beans.factory.*; import org.springframework.beans.factory.*;
import org.springframework.cache.CacheManager; import org.springframework.cache.*;


import java.util.*; import java.util.*;
import java.util.concurrent.*;


/** /**
* Implementation of Spring cache abstraction based on Ignite cache. * Implementation of Spring cache abstraction based on Ignite cache.
Expand Down Expand Up @@ -124,13 +122,16 @@
* will try to use default Grid instance (the one with the {@code null} * will try to use default Grid instance (the one with the {@code null}
* name). If it doesn't exist, exception will be thrown. * name). If it doesn't exist, exception will be thrown.
* <h1>Starting Remote Nodes</h1> * <h1>Starting Remote Nodes</h1>
* Remember that the node started inside your application is an entry point * Keep in mind that the node started inside your application is an entry point
* to the whole topology it connects to. You can start as many remote standalone * to the whole topology it connects to. You can start as many remote standalone
* nodes as you need using {@code bin/ignite.{sh|bat}} scripts provided in * nodes as you need using {@code bin/ignite.{sh|bat}} scripts provided in
* Ignite distribution, and all these nodes will participate * Ignite distribution, and all these nodes will participate
* in caching data. * in caching the data.
*/ */
public class SpringCacheManager implements CacheManager, InitializingBean { public class SpringCacheManager implements CacheManager, InitializingBean {
/** Caches map. */
private final ConcurrentMap<String, SpringCache> caches = new ConcurrentHashMap8<>();

/** Grid configuration file path. */ /** Grid configuration file path. */
private String cfgPath; private String cfgPath;


Expand All @@ -140,8 +141,14 @@ public class SpringCacheManager implements CacheManager, InitializingBean {
/** Grid name. */ /** Grid name. */
private String gridName; private String gridName;


/** Dynamic cache configuration template. */
private CacheConfiguration<Object, Object> dynamicCacheCfg;

/** Dynamic near cache configuration template. */
private NearCacheConfiguration<Object, Object> dynamicNearCacheCfg;

/** Ignite instance. */ /** Ignite instance. */
protected Ignite grid; private Ignite ignite;


/** /**
* Gets configuration file path. * Gets configuration file path.
Expand Down Expand Up @@ -197,10 +204,45 @@ public void setGridName(String gridName) {
this.gridName = gridName; this.gridName = gridName;
} }


/**
* Gets dynamic cache configuration template.
*
* @return Dynamic cache configuration template.
*/
public CacheConfiguration<Object, Object> getDynamicCacheConfiguration() {
return dynamicCacheCfg;
}

/**
* Sets dynamic cache configuration template.
*
* @param dynamicCacheCfg Dynamic cache configuration template.
*/
public void setDynamicCacheConfiguration(CacheConfiguration<Object, Object> dynamicCacheCfg) {
this.dynamicCacheCfg = dynamicCacheCfg;
}

/**
* Gets dynamic near cache configuration template.
*
* @return Dynamic near cache configuration template.
*/
public NearCacheConfiguration<Object, Object> getDynamicNearCacheConfiguration() {
return dynamicNearCacheCfg;
}

/**
* Sets dynamic cache configuration template.
*
* @param dynamicNearCacheCfg Dynamic cache configuration template.
*/
public void setDynamicNearCacheConfiguration(NearCacheConfiguration<Object, Object> dynamicNearCacheCfg) {
this.dynamicNearCacheCfg = dynamicNearCacheCfg;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings("IfMayBeConditional")
@Override public void afterPropertiesSet() throws Exception { @Override public void afterPropertiesSet() throws Exception {
assert grid == null; assert ignite == null;


if (cfgPath != null && cfg != null) { if (cfgPath != null && cfg != null) {
throw new IllegalArgumentException("Both 'configurationPath' and 'configuration' are " + throw new IllegalArgumentException("Both 'configurationPath' and 'configuration' are " +
Expand All @@ -210,33 +252,44 @@ public void setGridName(String gridName) {
} }


if (cfgPath != null) if (cfgPath != null)
grid = Ignition.start(cfgPath); ignite = Ignition.start(cfgPath);
else if (cfg != null) else if (cfg != null)
grid = Ignition.start(cfg); ignite = Ignition.start(cfg);
else else
grid = Ignition.ignite(gridName); ignite = Ignition.ignite(gridName);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public org.springframework.cache.Cache getCache(String name) { @Override public org.springframework.cache.Cache getCache(String name) {
assert grid != null; assert ignite != null;


try { SpringCache cache = caches.get(name);
return new SpringCache(name, grid, ((IgniteKernal)grid).cache(name), null);
} if (cache == null) {
catch (IllegalArgumentException ignored) { CacheConfiguration<Object, Object> cacheCfg = dynamicCacheCfg != null ?
return null; new CacheConfiguration<>(dynamicCacheCfg) : new CacheConfiguration<>();

NearCacheConfiguration<Object, Object> nearCacheCfg = dynamicNearCacheCfg != null ?
new NearCacheConfiguration<>(dynamicNearCacheCfg) : null;

cacheCfg.setName(name);

// TODO: IGNITE-541 - Provide near cfg.
cache = new SpringCache(ignite.getOrCreateCache(cacheCfg));

SpringCache old = caches.putIfAbsent(name, cache);

if (old != null)
cache = old;
} }

return cache;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Collection<String> getCacheNames() { @Override public Collection<String> getCacheNames() {
assert grid != null; assert ignite != null;


return F.viewReadOnly(((IgniteKernal)grid).caches(), new IgniteClosure<IgniteCache<?,?>, String>() { return new ArrayList<>(caches.keySet());
@Override public String apply(IgniteCache<?, ?> c) {
return c.getName();
}
});
} }
} }

0 comments on commit be25aa3

Please sign in to comment.