Skip to content

Commit

Permalink
Fix Hazelcast NotificationDispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
NielsCharlier committed Feb 4, 2016
1 parent 4fc6191 commit 2fe972b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 39 deletions.
Expand Up @@ -12,10 +12,9 @@
import org.geoserver.catalog.Catalog; import org.geoserver.catalog.Catalog;
import org.geoserver.cluster.ClusterConfig; import org.geoserver.cluster.ClusterConfig;
import org.geoserver.cluster.ClusterConfigWatcher; import org.geoserver.cluster.ClusterConfigWatcher;
import org.geoserver.config.GeoServerDataDirectory;
import org.geoserver.data.util.IOUtils; import org.geoserver.data.util.IOUtils;
import org.geoserver.platform.GeoServerResourceLoader;
import org.geoserver.platform.resource.Resource; import org.geoserver.platform.resource.Resource;
import org.geoserver.platform.resource.ResourceStore;
import org.geoserver.platform.resource.Resources; import org.geoserver.platform.resource.Resources;
import org.geotools.util.logging.Logging; import org.geotools.util.logging.Logging;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
Expand All @@ -41,7 +40,7 @@ public class HzCluster implements DisposableBean, InitializingBean {
static final String HAZELCAST_FILENAME = "hazelcast.xml"; static final String HAZELCAST_FILENAME = "hazelcast.xml";


HazelcastInstance hz; HazelcastInstance hz;
GeoServerResourceLoader rl; ResourceStore rl;
ClusterConfigWatcher watcher; ClusterConfigWatcher watcher;


private Catalog rawCatalog; private Catalog rawCatalog;
Expand Down Expand Up @@ -120,7 +119,7 @@ public HazelcastInstance getHz() {
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
watcher = loadConfig(); watcher = loadConfig();
if(watcher.get().isEnabled()){ if(watcher.get().isEnabled()){
hz = Hazelcast.newHazelcastInstance(loadHazelcastConfig(rl)); hz = Hazelcast.newHazelcastInstance(loadHazelcastConfig());
CLUSTER = this; CLUSTER = this;
} }
} }
Expand All @@ -136,7 +135,7 @@ public void destroy() throws Exception {
} }
} }


private Config loadHazelcastConfig(GeoServerResourceLoader rl) throws IOException{ private Config loadHazelcastConfig() throws IOException{
Resource hzf = getConfigFile(HAZELCAST_FILENAME, HzCluster.class); Resource hzf = getConfigFile(HAZELCAST_FILENAME, HzCluster.class);
try (InputStream hzIn = hzf.in()) { try (InputStream hzIn = hzf.in()) {
return new XmlConfigBuilder(hzIn).build(); return new XmlConfigBuilder(hzIn).build();
Expand All @@ -148,8 +147,8 @@ private Config loadHazelcastConfig(GeoServerResourceLoader rl) throws IOExceptio
* @param dd * @param dd
* @throws IOException * @throws IOException
*/ */
public void setDataDirectory(GeoServerDataDirectory dd) throws IOException { public void setResourceStore(ResourceStore dd) throws IOException {
rl=dd.getResourceLoader(); rl=dd;
} }


/** /**
Expand Down
@@ -1,13 +1,11 @@
package org.geoserver.cluster.hazelcast; package org.geoserver.cluster.hazelcast;


import java.util.logging.Logger; import java.util.logging.Logger;

import org.geoserver.platform.resource.ResourceNotification; import org.geoserver.platform.resource.ResourceNotification;
import org.geoserver.platform.resource.ResourceNotificationDispatcher; import org.geoserver.platform.resource.ResourceNotificationDispatcher;
import org.geoserver.platform.resource.SimpleResourceNotificationDispatcher; import org.geoserver.platform.resource.SimpleResourceNotificationDispatcher;
import org.geotools.util.logging.Logging; import org.geotools.util.logging.Logging;
import org.springframework.beans.factory.InitializingBean;

import com.google.common.base.Preconditions;
import com.hazelcast.core.ITopic; import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message; import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener; import com.hazelcast.core.MessageListener;
Expand All @@ -19,34 +17,42 @@
* spring configuration file in order for {@link ResourceStore} to find it. * spring configuration file in order for {@link ResourceStore} to find it.
* *
*/ */
public class HzResourceNotificationDispatcher extends SimpleResourceNotificationDispatcher implements InitializingBean, MessageListener<ResourceNotification> { public class HzResourceNotificationDispatcher extends SimpleResourceNotificationDispatcher implements MessageListener<ResourceNotification> {


static final String TOPIC_NAME = "resourceWatcher"; static final String TOPIC_NAME = "resourceWatcher";


private static final Logger LOGGER = Logging.getLogger(HzResourceNotificationDispatcher.class); private static final Logger LOGGER = Logging.getLogger(HzResourceNotificationDispatcher.class);


private HzCluster cluster; private HzCluster cluster;


private ITopic<ResourceNotification> topic() { public HzResourceNotificationDispatcher() {
return cluster.getHz().getTopic(TOPIC_NAME); //lazy loaded cluster
} }


/** public HzResourceNotificationDispatcher(HzCluster cluster) {
* {@code cluster} property to be set in {@code applicationContext.xml}
*/
public void setCluster(HzCluster cluster) {
this.cluster = cluster; this.cluster = cluster;
}

@Override
public void afterPropertiesSet() throws Exception {
Preconditions.checkNotNull(cluster, "HzCluster is not set");
topic().addMessageListener(this); topic().addMessageListener(this);
} }

private ITopic<ResourceNotification> topic() {
if (cluster == null) {
cluster = HzCluster.getInstanceIfAvailable().orNull();
if (cluster != null) {
topic().addMessageListener(this);
}
}
return cluster == null ? null : cluster.getHz().getTopic(TOPIC_NAME);
}


@Override @Override
public void changed(ResourceNotification event) { public void changed(ResourceNotification event) {
topic().publish(event); ITopic<ResourceNotification> topic = topic();
if (topic != null) {
topic.publish(event);
} else {
LOGGER.warning("Failed to publish resource notification, cluster not initialized (yet).");
super.changed(event);
}
} }


@Override @Override
Expand Down
Expand Up @@ -14,7 +14,7 @@
</bean> </bean>
<bean id="hzCluster" <bean id="hzCluster"
class="org.geoserver.cluster.hazelcast.HzCluster"> class="org.geoserver.cluster.hazelcast.HzCluster">
<property name="dataDirectory" ref="dataDirectory"/> <property name="resourceStore" ref="resourceStore"/>
<property name="rawCatalog" ref="rawCatalog"/> <property name="rawCatalog" ref="rawCatalog"/>
</bean> </bean>


Expand All @@ -30,8 +30,5 @@
</bean> </bean>


<!-- override the default one --> <!-- override the default one -->
<!-- TODO Re-enable once GSIP:136 passes --> <bean id="resourceNotificationDispatcher" class="org.geoserver.cluster.hazelcast.HzResourceNotificationDispatcher" />
<!--<bean id="resourceWatcher" class="org.geoserver.cluster.hazelcast.HzResourceWatcher">
<property name="cluster" ref="hzCluster"/>
</bean>-->
</beans> </beans>
Expand Up @@ -13,7 +13,7 @@
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.easymock.IAnswer; import org.easymock.IAnswer;
import org.geoserver.platform.resource.AbstractResourceWatcherTest; import org.geoserver.platform.resource.AbstractResourceNotificationDispatcherTest;
import org.geoserver.platform.resource.ResourceNotification; import org.geoserver.platform.resource.ResourceNotification;
import org.geoserver.platform.resource.ResourceNotificationDispatcher; import org.geoserver.platform.resource.ResourceNotificationDispatcher;


Expand All @@ -28,9 +28,7 @@
* @author Niels Charlier * @author Niels Charlier
* *
*/ */
public class HzResourceWatcherTest extends AbstractResourceWatcherTest { public class HzResourceNotificationDispatcherTest extends AbstractResourceNotificationDispatcherTest {

private HzResourceNotificationDispatcher resourceWatcher;


@Override @Override
protected ResourceNotificationDispatcher initWatcher() throws Exception { protected ResourceNotificationDispatcher initWatcher() throws Exception {
Expand Down Expand Up @@ -66,10 +64,7 @@ public Object answer() throws Throwable {


replay(cluster, topic, hz, hzCluster); replay(cluster, topic, hz, hzCluster);


resourceWatcher = new HzResourceNotificationDispatcher(); return new HzResourceNotificationDispatcher(hzCluster);
resourceWatcher.setCluster(hzCluster);
resourceWatcher.afterPropertiesSet();
return resourceWatcher;
} }


} }
Expand Up @@ -21,7 +21,7 @@
* @author Niels Charlier * @author Niels Charlier
* *
*/ */
public abstract class AbstractResourceWatcherTest { public abstract class AbstractResourceNotificationDispatcherTest {


protected FileSystemResourceStore store; protected FileSystemResourceStore store;
protected ResourceNotificationDispatcher watcher; protected ResourceNotificationDispatcher watcher;
Expand Down
Expand Up @@ -19,7 +19,7 @@
* @author Niels Charlier * @author Niels Charlier
* *
*/ */
public class SimpleResourceNotificationDispatcherTest extends AbstractResourceWatcherTest { public class SimpleResourceNotificationDispatcherTest extends AbstractResourceNotificationDispatcherTest {


@Override @Override
protected ResourceNotificationDispatcher initWatcher() { protected ResourceNotificationDispatcher initWatcher() {
Expand Down

0 comments on commit 2fe972b

Please sign in to comment.