Skip to content

Commit

Permalink
Fixed hz-cluster unit test failures. Upgraded Hazelcast from 3.11-BET…
Browse files Browse the repository at this point in the history
…A-1 to 3.11. Upgraded Metrics from 2.1.5 to 4.0.3.
  • Loading branch information
sikeoka committed Dec 13, 2018
1 parent 9ebf8a6 commit 8252045
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 81 deletions.
4 changes: 2 additions & 2 deletions src/community/hz-cluster/pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<version>3.3.1</version> <version>3.3.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.yammer.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-servlet</artifactId> <artifactId>metrics-servlet</artifactId>
<version>${metrics.version}</version> <version>${metrics.version}</version>
</dependency> </dependency>
Expand Down Expand Up @@ -100,7 +100,7 @@
</dependencies> </dependencies>


<properties> <properties>
<metrics.version>2.1.5</metrics.version> <metrics.version>4.0.3</metrics.version>
</properties> </properties>
</project> </project>


Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.geoserver.catalog.StyleInfo; import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WMSLayerInfo; import org.geoserver.catalog.WMSLayerInfo;
import org.geoserver.catalog.WMSStoreInfo; import org.geoserver.catalog.WMSStoreInfo;
import org.geoserver.catalog.WMTSLayerInfo;
import org.geoserver.catalog.WMTSStoreInfo;
import org.geoserver.catalog.WorkspaceInfo; import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.impl.AttributionInfoImpl; import org.geoserver.catalog.impl.AttributionInfoImpl;
import org.geoserver.catalog.impl.CatalogImpl; import org.geoserver.catalog.impl.CatalogImpl;
Expand All @@ -40,6 +42,8 @@
import org.geoserver.catalog.impl.StyleInfoImpl; import org.geoserver.catalog.impl.StyleInfoImpl;
import org.geoserver.catalog.impl.WMSLayerInfoImpl; import org.geoserver.catalog.impl.WMSLayerInfoImpl;
import org.geoserver.catalog.impl.WMSStoreInfoImpl; import org.geoserver.catalog.impl.WMSStoreInfoImpl;
import org.geoserver.catalog.impl.WMTSLayerInfoImpl;
import org.geoserver.catalog.impl.WMTSStoreInfoImpl;
import org.geoserver.catalog.impl.WorkspaceInfoImpl; import org.geoserver.catalog.impl.WorkspaceInfoImpl;
import org.geoserver.config.ContactInfo; import org.geoserver.config.ContactInfo;
import org.geoserver.config.GeoServerInfo; import org.geoserver.config.GeoServerInfo;
Expand Down Expand Up @@ -74,11 +78,13 @@ public class ConfigChangeEvent extends Event {
INTERFACES.put(WorkspaceInfoImpl.class, WorkspaceInfo.class); INTERFACES.put(WorkspaceInfoImpl.class, WorkspaceInfo.class);
INTERFACES.put(DataStoreInfoImpl.class, DataStoreInfo.class); INTERFACES.put(DataStoreInfoImpl.class, DataStoreInfo.class);
INTERFACES.put(WMSStoreInfoImpl.class, WMSStoreInfo.class); INTERFACES.put(WMSStoreInfoImpl.class, WMSStoreInfo.class);
INTERFACES.put(WMTSStoreInfoImpl.class, WMTSStoreInfo.class);
INTERFACES.put(CoverageStoreInfoImpl.class, CoverageStoreInfo.class); INTERFACES.put(CoverageStoreInfoImpl.class, CoverageStoreInfo.class);
INTERFACES.put(StyleInfoImpl.class, StyleInfo.class); INTERFACES.put(StyleInfoImpl.class, StyleInfo.class);
INTERFACES.put(FeatureTypeInfoImpl.class, FeatureTypeInfo.class); INTERFACES.put(FeatureTypeInfoImpl.class, FeatureTypeInfo.class);
INTERFACES.put(CoverageInfoImpl.class, CoverageInfo.class); INTERFACES.put(CoverageInfoImpl.class, CoverageInfo.class);
INTERFACES.put(WMSLayerInfoImpl.class, WMSLayerInfo.class); INTERFACES.put(WMSLayerInfoImpl.class, WMSLayerInfo.class);
INTERFACES.put(WMTSLayerInfoImpl.class, WMTSLayerInfo.class);
INTERFACES.put(MetadataLinkInfoImpl.class, MetadataLinkInfo.class); INTERFACES.put(MetadataLinkInfoImpl.class, MetadataLinkInfo.class);
INTERFACES.put(LayerInfoImpl.class, LayerInfo.class); INTERFACES.put(LayerInfoImpl.class, LayerInfo.class);
INTERFACES.put(LayerGroupInfoImpl.class, LayerGroupInfo.class); INTERFACES.put(LayerGroupInfoImpl.class, LayerGroupInfo.class);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import com.hazelcast.core.Message; import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener; import com.hazelcast.core.MessageListener;
import com.yammer.metrics.Metrics;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import javax.annotation.Nullable; import javax.annotation.Nullable;
Expand Down Expand Up @@ -93,7 +93,7 @@ protected void dispatch(Event e) {
e.setSource(localAddress(cluster.getHz())); e.setSource(localAddress(cluster.getHz()));
topic.publish(e); topic.publish(e);


Metrics.newCounter(getClass(), "dispatched").inc(); incCounter(getClass(), "dispatched");
waitForAck(e); waitForAck(e);
} }


Expand Down Expand Up @@ -157,10 +157,10 @@ private void waitForAck(Event event) {
} }


@Override @Override
protected void processEvent(Event event) throws Exception { protected Future<?> processEvent(Event event) {
Preconditions.checkState(isStarted()); Preconditions.checkState(isStarted());
if (!(event instanceof ConfigChangeEvent)) { if (!(event instanceof ConfigChangeEvent)) {
return; return null;
} }
try { try {
LOGGER.fine(format("%s - Processing event %s", nodeId(), event)); LOGGER.fine(format("%s - Processing event %s", nodeId(), event));
Expand All @@ -176,6 +176,7 @@ protected void processEvent(Event event) throws Exception {
} finally { } finally {
ack(event); ack(event);
} }
return null;
} }


private void processCatalogEvent(final ConfigChangeEvent event) private void processCatalogEvent(final ConfigChangeEvent event)
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
import static java.lang.String.format; import static java.lang.String.format;
import static org.geoserver.cluster.hazelcast.HazelcastUtil.localAddress; import static org.geoserver.cluster.hazelcast.HazelcastUtil.localAddress;


import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.core.ITopic; import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message; import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener; import com.hazelcast.core.MessageListener;
import com.yammer.metrics.Metrics;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
Expand Down Expand Up @@ -56,6 +58,8 @@ public abstract class HzSynchronizer extends GeoServerSynchronizer


protected static Logger LOGGER = Logging.getLogger("org.geoserver.cluster.hazelcast"); protected static Logger LOGGER = Logging.getLogger("org.geoserver.cluster.hazelcast");


private final MetricRegistry registry = new MetricRegistry();

protected final HzCluster cluster; protected final HzCluster cluster;


protected final ITopic<Event> topic; protected final ITopic<Event> topic;
Expand Down Expand Up @@ -96,7 +100,7 @@ public void onMessage(Message<Event> message) {
} }
return; return;
} }
Metrics.newCounter(getClass(), "recieved").inc(); incCounter(getClass(), "recieved");
if (localAddress(cluster.getHz()).equals(event.getSource())) { if (localAddress(cluster.getHz()).equals(event.getSource())) {
if (LOGGER.isLoggable(Level.FINER)) { if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer( LOGGER.finer(
Expand All @@ -113,7 +117,7 @@ public void onMessage(Message<Event> message) {
executor.schedule(new EventWorker(event), syncDelay, TimeUnit.SECONDS); executor.schedule(new EventWorker(event), syncDelay, TimeUnit.SECONDS);
} }


private class EventWorker implements Runnable { private class EventWorker implements Callable<Future<?>> {


private Event event; private Event event;


Expand All @@ -122,17 +126,19 @@ public EventWorker(Event event) {
} }


@Override @Override
public void run() { public Future<?> call() {
if (!isStarted()) { if (!isStarted()) {
return; return null;
} }
Future<?> future = null;
try { try {
processEvent(event); future = processEvent(event);
} catch (Exception e) { } catch (Exception e) {
LOGGER.log(Level.WARNING, format("%s - Event processing failed", nodeId()), e); LOGGER.log(Level.WARNING, format("%s - Event processing failed", nodeId()), e);
} }


Metrics.newCounter(getClass(), "reloads").inc(); incCounter(getClass(), "reloads");
return future;
} }
} }


Expand All @@ -144,7 +150,7 @@ public void run() {
* <p><b>Note:</b> It is the responsibility of subclasses to clear events from the queue as they * <p><b>Note:</b> It is the responsibility of subclasses to clear events from the queue as they
* are processed. * are processed.
*/ */
protected abstract void processEvent(Event event) throws Exception; protected abstract Future<?> processEvent(Event event);


ConfigChangeEvent newChangeEvent(CatalogEvent evt, Type type) { ConfigChangeEvent newChangeEvent(CatalogEvent evt, Type type) {
return newChangeEvent(evt.getSource(), type); return newChangeEvent(evt.getSource(), type);
Expand Down Expand Up @@ -225,6 +231,11 @@ public void handleSettingsRemoved(SettingsInfo settings) {
dispatch(newChangeEvent(settings, Type.REMOVE)); dispatch(newChangeEvent(settings, Type.REMOVE));
} }


/** Increments the counter for the specified class and name by one. */
protected void incCounter(Class<?> clazz, String name) {
this.registry.counter(MetricRegistry.name(clazz, name)).inc();
}

protected String nodeId() { protected String nodeId() {
return HazelcastUtil.nodeId(cluster); return HazelcastUtil.nodeId(cluster);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import static org.geoserver.cluster.hazelcast.HazelcastUtil.localAddress; import static org.geoserver.cluster.hazelcast.HazelcastUtil.localAddress;


import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.Metrics; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -43,28 +44,24 @@ public ReloadHzSynchronizer(HzCluster cluster, GeoServer gs) {
.setDaemon(true) .setDaemon(true)
.setNameFormat("Hz-GeoServer-Reload-%d") .setNameFormat("Hz-GeoServer-Reload-%d")
.build(); .build();
RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.DiscardPolicy();
// a thread pool executor operating out of a blocking queue with maximum of 1 element, which // a thread pool executor operating out of a blocking queue with maximum of 1 element, which
// discards execute requests if the queue is full // discards execute requests if the queue is full
reloadService = reloadService =
new ThreadPoolExecutor( new ThreadPoolExecutor(
1, 1, 1, 0L, TimeUnit.MILLISECONDS, getWorkQueue(), threadFactory);
1, }
0L,
TimeUnit.MILLISECONDS, BlockingQueue<Runnable> getWorkQueue() {
new LinkedBlockingQueue<Runnable>(1), return new LinkedBlockingQueue<>(1);
threadFactory,
rejectionHandler);
} }


@Override @Override
protected void processEvent(Event event) throws Exception { protected Future<?> processEvent(Event event) {
// submit task and return immediately. The task will be ignored if another one is already // submit task and return immediately. The task will be ignored if another one is already
// scheduled // scheduled
reloadService.submit( try {
new Runnable() { return reloadService.submit(
@Override () -> {
public void run() {
// lock during event processing // lock during event processing
eventLock.set(true); eventLock.set(true);
try { try {
Expand All @@ -74,8 +71,14 @@ public void run() {
} finally { } finally {
eventLock.set(false); eventLock.set(false);
} }
} });
}); } catch (RejectedExecutionException e) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(
format("%s - Reload in progress. Ignoring event %s", nodeId(), event));
}
return null;
}
} }


@Override @Override
Expand All @@ -90,6 +93,6 @@ protected void dispatch(Event e) {
e.setSource(localAddress(cluster.getHz())); e.setSource(localAddress(cluster.getHz()));
topic.publish(e); topic.publish(e);


Metrics.newCounter(getClass(), "dispatched").inc(); incCounter(getClass(), "dispatched");
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.geoserver.catalog.StyleInfo; import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WMSLayerInfo; import org.geoserver.catalog.WMSLayerInfo;
import org.geoserver.catalog.WMSStoreInfo; import org.geoserver.catalog.WMSStoreInfo;
import org.geoserver.catalog.WMTSLayerInfo;
import org.geoserver.catalog.WMTSStoreInfo;
import org.geoserver.catalog.WorkspaceInfo; import org.geoserver.catalog.WorkspaceInfo;
import org.geotools.data.DataStore; import org.geotools.data.DataStore;


Expand Down Expand Up @@ -124,12 +126,16 @@ private void proxyVisitory(Object proxy, Method method, CatalogVisitor catalogVi
catalogVisitor.visit((FeatureTypeInfo) proxy); catalogVisitor.visit((FeatureTypeInfo) proxy);
} else if (WMSLayerInfo.class.equals(infoInterface)) { } else if (WMSLayerInfo.class.equals(infoInterface)) {
catalogVisitor.visit((WMSLayerInfo) proxy); catalogVisitor.visit((WMSLayerInfo) proxy);
} else if (WMTSLayerInfo.class.equals(infoInterface)) {
catalogVisitor.visit((WMTSLayerInfo) proxy);
} else if (CoverageStoreInfo.class.equals(infoInterface)) { } else if (CoverageStoreInfo.class.equals(infoInterface)) {
catalogVisitor.visit((CoverageStoreInfo) proxy); catalogVisitor.visit((CoverageStoreInfo) proxy);
} else if (DataStoreInfo.class.equals(infoInterface)) { } else if (DataStoreInfo.class.equals(infoInterface)) {
catalogVisitor.visit((DataStoreInfo) proxy); catalogVisitor.visit((DataStoreInfo) proxy);
} else if (WMSStoreInfo.class.equals(infoInterface)) { } else if (WMSStoreInfo.class.equals(infoInterface)) {
catalogVisitor.visit((WMSStoreInfo) proxy); catalogVisitor.visit((WMSStoreInfo) proxy);
} else if (WMTSStoreInfo.class.equals(infoInterface)) {
catalogVisitor.visit((WMTSStoreInfo) proxy);
} else if (StyleInfo.class.equals(infoInterface)) { } else if (StyleInfo.class.equals(infoInterface)) {
catalogVisitor.visit((StyleInfo) proxy); catalogVisitor.visit((StyleInfo) proxy);
} else if (LayerInfo.class.equals(infoInterface)) { } else if (LayerInfo.class.equals(infoInterface)) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -162,7 +165,7 @@ public Object answer() throws Throwable {
expectLastCall().atLeastOnce(); expectLastCall().atLeastOnce();


executor = createMock(ScheduledExecutorService.class); executor = createMock(ScheduledExecutorService.class);
captureExecutor = new Capture<Runnable>(CaptureType.ALL); captureExecutor = new Capture<>(CaptureType.ALL);
expect(executor.schedule(capture(captureExecutor), anyLong(), (TimeUnit) anyObject())) expect(executor.schedule(capture(captureExecutor), anyLong(), (TimeUnit) anyObject()))
.andStubReturn(null); .andStubReturn(null);
} }
Expand Down Expand Up @@ -192,7 +195,7 @@ MessageListener<Event> getListener() {
protected Capture<CatalogListener> catListenerCapture; protected Capture<CatalogListener> catListenerCapture;
protected Capture<MessageListener<Event>> captureTopicListener; protected Capture<MessageListener<Event>> captureTopicListener;
protected Capture<MessageListener<UUID>> captureAckTopicListener; protected Capture<MessageListener<UUID>> captureAckTopicListener;
protected Capture<Runnable> captureExecutor; protected Capture<Callable<Future<?>>> captureExecutor;
protected Capture<UUID> captureAckTopicPublish; protected Capture<UUID> captureAckTopicPublish;


public List<Object> myMocks() { public List<Object> myMocks() {
Expand Down Expand Up @@ -267,12 +270,19 @@ protected void waitForSync() throws Exception {
// Thread.sleep(SYNC_DELAY*1000+500); // Convert to millis, then add a little extra to be // Thread.sleep(SYNC_DELAY*1000+500); // Convert to millis, then add a little extra to be
// sure // sure


List<Runnable> tasks = captureExecutor.getValues(); List<Callable<Future<?>>> tasks = captureExecutor.getValues();
List<Future<?>> futures = new ArrayList<>(1);


for (Iterator<Runnable> i = tasks.iterator(); i.hasNext(); ) { for (Iterator<Callable<Future<?>>> i = tasks.iterator(); i.hasNext(); ) {
Runnable task = i.next(); Callable<Future<?>> task = i.next();
i.remove(); i.remove();
task.run(); Future<?> future = task.call();
if (future != null) {
futures.add(future);
}
}
for (Future<?> future : futures) {
future.get();
} }
} }


Expand Down

0 comments on commit 8252045

Please sign in to comment.