Skip to content

Commit

Permalink
IGNITE-45 - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Goncharuk committed Mar 10, 2015
1 parent 1cd95ae commit a6a5e48
Show file tree
Hide file tree
Showing 36 changed files with 746 additions and 610 deletions.
21 changes: 20 additions & 1 deletion modules/core/src/main/java/org/apache/ignite/Ignite.java
Expand Up @@ -157,7 +157,7 @@ public interface Ignite extends AutoCloseable {


/** /**
* Creates new {@link ExecutorService} which will execute all submitted * Creates new {@link ExecutorService} which will execute all submitted
* {@link java.util.concurrent.Callable} and {@link Runnable} jobs on nodes in this grid projection. * {@link Callable} and {@link Runnable} jobs on nodes in this grid projection.
* This essentially * This essentially
* creates a <b><i>Distributed Thread Pool</i></b> that can be used as a * creates a <b><i>Distributed Thread Pool</i></b> that can be used as a
* replacement for local thread pools. * replacement for local thread pools.
Expand Down Expand Up @@ -186,6 +186,25 @@ public interface Ignite extends AutoCloseable {
*/ */
public IgniteScheduler scheduler(); public IgniteScheduler scheduler();


/**
* Dynamically starts new cache with the given cache configuration.
*
* @param cacheCfg Cache configuration to use.
*/
public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg);

public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg,
@Nullable NearCacheConfiguration<K, V> nearCfg);

public <K, V> IgniteCache<K, V> createCache(@Nullable NearCacheConfiguration<K, V> nearCfg);

/**
* Stops dynamically started cache.
*
* @param cacheName Cache name to stop.
*/
public void destroyCache(String cacheName);

/** /**
* Gets an instance of {@link IgniteCache} API. {@code IgniteCache} is a fully-compatible * Gets an instance of {@link IgniteCache} API. {@code IgniteCache} is a fully-compatible
* implementation of {@code JCache (JSR 107)} specification. * implementation of {@code JCache (JSR 107)} specification.
Expand Down
Expand Up @@ -343,6 +343,11 @@ public final class IgniteSystemProperties {
*/ */
public static final String IGNITE_EXCEPTION_REGISTRY_MAX_SIZE = "IGNITE_EXCEPTION_REGISTRY_MAX_SIZE"; public static final String IGNITE_EXCEPTION_REGISTRY_MAX_SIZE = "IGNITE_EXCEPTION_REGISTRY_MAX_SIZE";


/**
* Property controlling default behavior of cache client flag.
*/
public static final String IGNITE_CACHE_CLIENT = "IGNITE_CACHE_CLIENT";

/** /**
* Enforces singleton. * Enforces singleton.
*/ */
Expand Down
18 changes: 18 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/Ignition.java
Expand Up @@ -131,6 +131,24 @@ public static boolean isDaemon() {
return IgnitionEx.isDaemon(); return IgnitionEx.isDaemon();
} }


/**
* Sets client mode flag.
*
* @param clientMode Client mode flag.
*/
public static void setClientMode(boolean clientMode) {
IgnitionEx.setClientMode(clientMode);
}

/**
* Gets client mode flag.
*
* @return Client mode flag.
*/
public static boolean isClientMode() {
return IgnitionEx.isClientMode();
}

/** /**
* Gets state of grid default grid. * Gets state of grid default grid.
* *
Expand Down
Expand Up @@ -23,8 +23,12 @@
import org.apache.ignite.cache.affinity.rendezvous.CacheRendezvousAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.CacheRendezvousAffinityFunction;
import org.apache.ignite.cache.eviction.*; import org.apache.ignite.cache.eviction.*;
import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.indexing.*; import org.apache.ignite.spi.indexing.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


Expand Down Expand Up @@ -163,6 +167,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default value for 'readFromBackup' flag. */ /** Default value for 'readFromBackup' flag. */
public static final boolean DFLT_READ_FROM_BACKUP = true; public static final boolean DFLT_READ_FROM_BACKUP = true;


/** Filter that accepts only server nodes. */
public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgnitePredicate<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);

return attr != null && !attr;
}
};

/** Filter that accepts all nodes. */
public static final IgnitePredicate<ClusterNode> ALL_NODES = F.alwaysTrue();

/** Cache name. */ /** Cache name. */
private String name; private String name;


Expand Down Expand Up @@ -322,6 +338,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Collection of type metadata. */ /** Collection of type metadata. */
private Collection<CacheTypeMetadata> typeMeta; private Collection<CacheTypeMetadata> typeMeta;


/** Node filter specifying nodes on which this cache should be deployed. */
private IgnitePredicate<ClusterNode> nodeFilter;

/** Empty constructor (all values are initialized to their defaults). */ /** Empty constructor (all values are initialized to their defaults). */
public CacheConfiguration() { public CacheConfiguration() {
/* No-op. */ /* No-op. */
Expand Down Expand Up @@ -379,6 +398,7 @@ public CacheConfiguration(CompleteConfiguration<K, V> cfg) {
name = cc.getName(); name = cc.getName();
nearStartSize = cc.getNearStartSize(); nearStartSize = cc.getNearStartSize();
nearEvictPlc = cc.getNearEvictionPolicy(); nearEvictPlc = cc.getNearEvictionPolicy();
nodeFilter = cc.getNodeFilter();
preloadMode = cc.getPreloadMode(); preloadMode = cc.getPreloadMode();
preloadBatchSize = cc.getPreloadBatchSize(); preloadBatchSize = cc.getPreloadBatchSize();
preloadDelay = cc.getPreloadPartitionedDelay(); preloadDelay = cc.getPreloadPartitionedDelay();
Expand Down Expand Up @@ -541,6 +561,24 @@ public void setNearEvictionPolicy(@Nullable CacheEvictionPolicy nearEvictPlc) {
this.nearEvictPlc = nearEvictPlc; this.nearEvictPlc = nearEvictPlc;
} }


/**
* Gets filter which determines on what nodes the cache should be started.
*
* @return Predicate specifying on which nodes the cache should be started.
*/
public IgnitePredicate<ClusterNode> getNodeFilter() {
return nodeFilter;
}

/**
* Sets filter which determines on what nodes the cache should be started.
*
* @param nodeFilter Predicate specifying on which nodes the cache should be started.
*/
public void setNodeFilter(IgnitePredicate<ClusterNode> nodeFilter) {
this.nodeFilter = nodeFilter;
}

/** /**
* Gets flag indicating whether eviction is synchronized between primary and * Gets flag indicating whether eviction is synchronized between primary and
* backup nodes on partitioned cache. If this parameter is {@code true} and * backup nodes on partitioned cache. If this parameter is {@code true} and
Expand Down
Expand Up @@ -321,7 +321,10 @@ public class IgniteConfiguration {
private CacheConfiguration[] cacheCfg; private CacheConfiguration[] cacheCfg;


/** Client cache configurations. */ /** Client cache configurations. */
private ClientCacheConfiguration[] clientCacheCfg; private NearCacheConfiguration[] nearCacheCfg;

/** Client mode flag. */
private Boolean clientMode;


/** Transactions configuration. */ /** Transactions configuration. */
private TransactionConfiguration txCfg = new TransactionConfiguration(); private TransactionConfiguration txCfg = new TransactionConfiguration();
Expand Down Expand Up @@ -1569,17 +1572,40 @@ public void setCacheConfiguration(CacheConfiguration... cacheCfg) {
} }


/** /**
* Gets configuration (descriptors) for all client caches. * Gets configuration (descriptors) for all near caches.
* *
* @return Client cache configurations. * @return Client cache configurations.
*/ */
public ClientCacheConfiguration[] getClientCacheConfiguration() { public NearCacheConfiguration[] getNearCacheConfiguration() {
return clientCacheCfg; return nearCacheCfg;
} }


/**
* Sets configuration for all near caches.
*
* @param nearCacheCfg Near cache configurations.
*/
@SuppressWarnings({"ZeroLengthArrayAllocation"}) @SuppressWarnings({"ZeroLengthArrayAllocation"})
public void setClientCacheConfiguration(ClientCacheConfiguration... clientCacheCfg) { public void setNearCacheConfiguration(NearCacheConfiguration... nearCacheCfg) {
this.clientCacheCfg = clientCacheCfg == null ? new ClientCacheConfiguration[0] : clientCacheCfg; this.nearCacheCfg = nearCacheCfg == null ? new NearCacheConfiguration[0] : nearCacheCfg;
}

/**
* Gets client mode flag.
*
* @return Client mode flag.
*/
public Boolean isClientMode() {
return clientMode;
}

/**
* Sets client mode flag.
*
* @param clientMode Client mode flag.
*/
public void setClientMode(boolean clientMode) {
this.clientMode = clientMode;
} }


/** /**
Expand Down
Expand Up @@ -28,7 +28,7 @@
/** /**
* Client cache configuration. * Client cache configuration.
*/ */
public class ClientCacheConfiguration<K, V> extends MutableConfiguration<K, V> { public class NearCacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Cache name. */ /** Cache name. */
private String name; private String name;


Expand All @@ -47,14 +47,14 @@ public class ClientCacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** /**
* Empty constructor. * Empty constructor.
*/ */
public ClientCacheConfiguration() { public NearCacheConfiguration() {
// No-op. // No-op.
} }


/** /**
* @param cfg Configuration to copy. * @param cfg Configuration to copy.
*/ */
public ClientCacheConfiguration(CompleteConfiguration<K, V> cfg) { public NearCacheConfiguration(CompleteConfiguration<K, V> cfg) {
super(cfg); super(cfg);


// Preserve alphabetic order. // Preserve alphabetic order.
Expand All @@ -67,8 +67,8 @@ public ClientCacheConfiguration(CompleteConfiguration<K, V> cfg) {
nearEvictPlc = ccfg.getNearEvictionPolicy(); nearEvictPlc = ccfg.getNearEvictionPolicy();
nearStartSize = ccfg.getNearStartSize(); nearStartSize = ccfg.getNearStartSize();
} }
else if (cfg instanceof ClientCacheConfiguration) { else if (cfg instanceof NearCacheConfiguration) {
ClientCacheConfiguration ccfg = (ClientCacheConfiguration)cfg; NearCacheConfiguration ccfg = (NearCacheConfiguration)cfg;


evictNearSync = ccfg.isEvictNearSynchronized(); evictNearSync = ccfg.isEvictNearSynchronized();
name = ccfg.getName(); name = ccfg.getName();
Expand Down
Expand Up @@ -21,10 +21,10 @@
import org.apache.ignite.cluster.*; import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*; import org.apache.ignite.events.*;
import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.processors.continuous.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
Expand All @@ -44,7 +44,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** Default callback. */ /** Default callback. */
private static final P2<UUID, Event> DFLT_CALLBACK = new P2<UUID, Event>() { private static final IgniteBiPredicate<UUID,Event> DFLT_CALLBACK = new P2<UUID, Event>() {
@Override public boolean apply(UUID uuid, Event e) { @Override public boolean apply(UUID uuid, Event e) {
return true; return true;
} }
Expand Down Expand Up @@ -129,7 +129,9 @@ public GridEventConsumeHandler() {
ctx.continuous().stopRoutine(routineId); ctx.continuous().stopRoutine(routineId);
} }
else { else {
ClusterNode node = ctx.discovery().node(nodeId); GridDiscoveryManager disco = ctx.discovery();

ClusterNode node = disco.node(nodeId);


if (node != null) { if (node != null) {
try { try {
Expand All @@ -138,7 +140,7 @@ public GridEventConsumeHandler() {
if (evt instanceof CacheEvent) { if (evt instanceof CacheEvent) {
String cacheName = ((CacheEvent)evt).cacheName(); String cacheName = ((CacheEvent)evt).cacheName();


if (ctx.config().isPeerClassLoadingEnabled() && U.hasCache(node, cacheName)) { if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) {
wrapper.p2pMarshal(ctx.config().getMarshaller()); wrapper.p2pMarshal(ctx.config().getMarshaller());


wrapper.cacheName = cacheName; wrapper.cacheName = cacheName;
Expand Down
Expand Up @@ -1185,6 +1185,8 @@ private void fillNodeAttributes() throws IgniteCheckedException {


add(ATTR_JVM_PID, U.jvmPid()); add(ATTR_JVM_PID, U.jvmPid());


add(ATTR_CLIENT_MODE, cfg.isClientMode());

// Build a string from JVM arguments, because parameters with spaces are split. // Build a string from JVM arguments, because parameters with spaces are split.
SB jvmArgs = new SB(512); SB jvmArgs = new SB(512);


Expand Down Expand Up @@ -1919,7 +1921,7 @@ private void ackSystemProperties() {


if (log.isDebugEnabled()) if (log.isDebugEnabled())
for (Object key : U.asIterable(System.getProperties().keys())) for (Object key : U.asIterable(System.getProperties().keys()))
log.debug("System property [" + key + '=' + System.getProperty((String) key) + ']'); log.debug("System property [" + key + '=' + System.getProperty((String)key) + ']');
} }


/** /**
Expand Down Expand Up @@ -2250,6 +2252,31 @@ public <K, V> GridCache<K, V> cache(@Nullable String name) {
} }
} }


/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) {
// TODO: implement.
return null;
}

/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg,
@Nullable NearCacheConfiguration<K, V> nearCfg) {
// TODO: implement.
return null;
}

/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createCache(@Nullable NearCacheConfiguration<K, V> nearCfg) {
// TODO: implement.
return null;
}

/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
// TODO: implement.

}

/** /**
* @return Public caches. * @return Public caches.
*/ */
Expand Down
Expand Up @@ -130,7 +130,7 @@ public final class IgniteNodeAttributes {
public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject"; public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject";


/** Cache interceptors. */ /** Cache interceptors. */
public static final String ATTR_CACHE_INTERCEPTORS = ATTR_PREFIX + ".cache.interceptors"; public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client";


/** /**
* Enforces singleton. * Enforces singleton.
Expand Down

0 comments on commit a6a5e48

Please sign in to comment.