Skip to content

Commit

Permalink
ignite-45: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Yakov Zhdanov committed Jan 12, 2015
1 parent 5190b12 commit 5800a75
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 23 deletions.
107 changes: 88 additions & 19 deletions modules/core/src/main/java/org/apache/ignite/IgniteCacheManager.java
Expand Up @@ -10,11 +10,14 @@
package org.apache.ignite; package org.apache.ignite;


import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.*;
import org.gridgain.grid.util.typedef.*;


import javax.cache.*; import javax.cache.*;
import javax.cache.configuration.*; import javax.cache.configuration.*;
import javax.cache.spi.*; import javax.cache.spi.*;
import javax.management.*;
import java.net.*; import java.net.*;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
Expand All @@ -24,7 +27,7 @@
*/ */
public class IgniteCacheManager implements CacheManager { public class IgniteCacheManager implements CacheManager {
/** */ /** */
private final Map<String, Ignite> igniteMap = new HashMap<>(); private final Map<String, IgniteBiTuple<Ignite, IgniteCacheMXBean>> igniteMap = new HashMap<>();


/** */ /** */
private final URI uri; private final URI uri;
Expand All @@ -45,7 +48,7 @@ public class IgniteCacheManager implements CacheManager {
* @param uri Uri. * @param uri Uri.
* @param cachingProvider Caching provider. * @param cachingProvider Caching provider.
* @param clsLdr Class loader. * @param clsLdr Class loader.
* @param props * @param props Properties.
*/ */
public IgniteCacheManager(URI uri, CachingProvider cachingProvider, ClassLoader clsLdr, Properties props) { public IgniteCacheManager(URI uri, CachingProvider cachingProvider, ClassLoader clsLdr, Properties props) {
this.uri = uri; this.uri = uri;
Expand Down Expand Up @@ -102,12 +105,14 @@ public IgniteCacheManager(URI uri, CachingProvider cachingProvider, ClassLoader
} }
} }


Ignite ignite; IgniteCache<K, V> res;


synchronized (igniteMap) { synchronized (igniteMap) {
if (igniteMap.containsKey(cacheName)) if (igniteMap.containsKey(cacheName))
throw new CacheException("Cache already exists [cacheName=" + cacheName + ", manager=" + uri + ']'); throw new CacheException("Cache already exists [cacheName=" + cacheName + ", manager=" + uri + ']');


Ignite ignite;

if (uri.equals(cachingProvider.getDefaultURI())) { if (uri.equals(cachingProvider.getDefaultURI())) {
IgniteConfiguration cfg = new IgniteConfiguration(); IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setGridName("grid-for-" + cacheName); cfg.setGridName("grid-for-" + cacheName);
Expand All @@ -126,26 +131,31 @@ public IgniteCacheManager(URI uri, CachingProvider cachingProvider, ClassLoader
else else
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();


igniteMap.put(cacheName, ignite); res = ignite.jcache(cacheName);

igniteMap.put(cacheName, new T2<>(ignite, new IgniteCacheMXBean(res)));
} }


return ignite.jcache(cacheName); if (((CompleteConfiguration)cacheCfg).isManagementEnabled())
enableManagement(cacheName, true);

return res;
} }


/** /**
* @param cacheName Cache name. * @param cacheName Cache name.
*/ */
private <K, V> IgniteCache<K, V> findCache(String cacheName) { private <K, V> IgniteCache<K, V> findCache(String cacheName) {
Ignite ignite; IgniteBiTuple<Ignite, IgniteCacheMXBean> tuple;


synchronized (igniteMap) { synchronized (igniteMap) {
ignite = igniteMap.get(cacheName); tuple = igniteMap.get(cacheName);
} }


if (ignite == null) if (tuple == null)
return null; return null;


return ignite.jcache(cacheName); return tuple.get1().jcache(cacheName);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -202,8 +212,13 @@ private <K, V> IgniteCache<K, V> findCache(String cacheName) {
*/ */
public boolean isManagedIgnite(Ignite ignite) { public boolean isManagedIgnite(Ignite ignite) {
synchronized (igniteMap) { synchronized (igniteMap) {
return igniteMap.values().contains(ignite); for (IgniteBiTuple<Ignite, IgniteCacheMXBean> tuple : igniteMap.values()) {
if (ignite.equals(tuple.get1()))
return true;
}
} }

return false;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -213,19 +228,48 @@ public boolean isManagedIgnite(Ignite ignite) {
if (cacheName == null) if (cacheName == null)
throw new NullPointerException(); throw new NullPointerException();


Ignite ignite; IgniteBiTuple<Ignite, IgniteCacheMXBean> tuple;


synchronized (igniteMap) { synchronized (igniteMap) {
ignite = igniteMap.remove(cacheName); tuple = igniteMap.remove(cacheName);
} }


if (ignite != null) { if (tuple != null) {
try { try {
ignite.close(); tuple.get1().close();
} }
catch (Exception ignored) { catch (Exception ignored) {


} }

ObjectName objName = getObjectName(cacheName);

MBeanServer mBeanSrv = tuple.get1().configuration().getMBeanServer();

for (ObjectName n : mBeanSrv.queryNames(objName, null)) {
try {
mBeanSrv.unregisterMBean(n);
}
catch (Exception ignored) {

}
}
}
}

/**
* @param cacheName Cache name.
*/
private ObjectName getObjectName(String cacheName) {
String mBeanName = "javax.cache:type=CacheConfiguration,CacheManager="
+ uri.toString().replaceAll(",|:|=|\n", ".")
+ ",Cache=" + cacheName.replaceAll(",|:|=|\n", ".");

try {
return new ObjectName(mBeanName);
}
catch (MalformedObjectNameException e) {
throw new CacheException("Failed to create MBean name: " + mBeanName, e);
} }
} }


Expand All @@ -236,7 +280,32 @@ public boolean isManagedIgnite(Ignite ignite) {
if (cacheName == null) if (cacheName == null)
throw new NullPointerException(); throw new NullPointerException();


throw new UnsupportedOperationException(); IgniteBiTuple<Ignite, IgniteCacheMXBean> tuple;

synchronized (igniteMap) {
tuple = igniteMap.get(cacheName);
}

ObjectName objName = getObjectName(cacheName);
MBeanServer mBeanSrv = tuple.get1().configuration().getMBeanServer();

try {
if (enabled) {
if(mBeanSrv.queryNames(objName, null).isEmpty())
mBeanSrv.registerMBean(tuple.get2(), objName);
}
else {
for (ObjectName n : mBeanSrv.queryNames(objName, null))
mBeanSrv.unregisterMBean(n);

}
}
catch (InstanceAlreadyExistsException | InstanceNotFoundException ignored) {

}
catch (MBeanRegistrationException | NotCompliantMBeanException e) {
throw new CacheException(e);
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -260,15 +329,15 @@ private void ensureNotClosed() throws IllegalStateException {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void close() { @Override public void close() {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
Ignite[] ignites; IgniteBiTuple<Ignite, IgniteCacheMXBean>[] ignites;


synchronized (igniteMap) { synchronized (igniteMap) {
ignites = igniteMap.values().toArray(new Ignite[igniteMap.values().size()]); ignites = igniteMap.values().toArray(new IgniteBiTuple[igniteMap.values().size()]);
} }


for (Ignite ignite : ignites) { for (IgniteBiTuple<Ignite, IgniteCacheMXBean> tuple : ignites) {
try { try {
ignite.close(); tuple.get1().close();
} }
catch (Exception ignored) { catch (Exception ignored) {
// Ignore any exceptions according to javadoc of javax.cache.CacheManager#close() // Ignore any exceptions according to javadoc of javax.cache.CacheManager#close()
Expand Down
Expand Up @@ -747,6 +747,7 @@ public void removeAll(Collection<? extends K> keys) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheManager getCacheManager() { @Override public CacheManager getCacheManager() {
// TODO IGNITE-45 (Support start/close/destroy cache correctly)
IgniteCachingProvider provider = (IgniteCachingProvider)Caching.getCachingProvider( IgniteCachingProvider provider = (IgniteCachingProvider)Caching.getCachingProvider(
IgniteCachingProvider.class.getName(), IgniteCachingProvider.class.getName(),
IgniteCachingProvider.class.getClassLoader()); IgniteCachingProvider.class.getClassLoader());
Expand All @@ -759,14 +760,14 @@ public void removeAll(Collection<? extends K> keys) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void close() { @Override public void close() {
// TODO IGNITE-1. // TODO IGNITE-45 (Support start/close/destroy cache correctly)
throw new UnsupportedOperationException(); getCacheManager().destroyCache(getName());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean isClosed() { @Override public boolean isClosed() {
// TODO IGNITE-1. // TODO IGNITE-45 (Support start/close/destroy cache correctly)
throw new UnsupportedOperationException(); return getCacheManager() == null;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
@@ -0,0 +1,124 @@
/* @java.file.header */

/* _________ _____ __________________ _____
* __ ____/___________(_)______ /__ ____/______ ____(_)_______
* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
*/

package org.apache.ignite.tck;

import com.sun.jmx.mbeanserver.*;

import javax.management.*;

/**
*
*/
public class TCKMBeanServerBuilder extends MBeanServerBuilder {
/** {@inheritDoc} */
@Override public MBeanServer newMBeanServer(String dfltDomain, MBeanServer outer, MBeanServerDelegate delegate) {
MBeanServerDelegate decoratingDelegate = new ServerDelegate(delegate);
return JmxMBeanServer.newMBeanServer(dfltDomain, outer,
decoratingDelegate, false);
}

/**
*
*/
private static class ServerDelegate extends MBeanServerDelegate {
/** */
private final MBeanServerDelegate delegate;

/**
* Constructor
*
* @param delegate the provided delegate
*/
ServerDelegate(MBeanServerDelegate delegate) {
this.delegate = delegate;
}

/** {@inheritDoc} */
@Override public String getSpecificationName() {
return delegate.getSpecificationName();
}

/** {@inheritDoc} */
@Override
public String getSpecificationVersion() {
return delegate.getSpecificationVersion();
}

/** {@inheritDoc} */
@Override
public String getSpecificationVendor() {
return delegate.getSpecificationVendor();
}

/** {@inheritDoc} */
@Override
public String getImplementationName() {
return delegate.getImplementationName();
}

/** {@inheritDoc} */
@Override
public String getImplementationVersion() {
return delegate.getImplementationVersion();
}

/** {@inheritDoc} */
@Override
public String getImplementationVendor() {
return delegate.getImplementationVendor();
}

/** {@inheritDoc} */
@Override
public MBeanNotificationInfo[] getNotificationInfo() {
return delegate.getNotificationInfo();
}

/** {@inheritDoc} */
@Override
public synchronized void addNotificationListener(NotificationListener listener,
NotificationFilter filter,
Object handback) throws
IllegalArgumentException {
delegate.addNotificationListener(listener, filter, handback);
}

/** {@inheritDoc} */
@Override
public synchronized void removeNotificationListener(NotificationListener
listener,
NotificationFilter
filter,
Object handback) throws
ListenerNotFoundException {
delegate.removeNotificationListener(listener, filter, handback);
}

/** {@inheritDoc} */
@Override
public synchronized void removeNotificationListener(NotificationListener
listener) throws
ListenerNotFoundException {
delegate.removeNotificationListener(listener);
}

/** {@inheritDoc} */
@Override
public void sendNotification(Notification notification) {
delegate.sendNotification(notification);
}

/** {@inheritDoc} */
@Override
public synchronized String getMBeanServerId() {
return System.getProperty("org.jsr107.tck.management.agentId");
}
}
}

0 comments on commit 5800a75

Please sign in to comment.