Skip to content

Commit

Permalink
Fixed issues with event processing in cluster nodes.
Browse files Browse the repository at this point in the history
- Repeated failed to process events were logged because the head of
the event queue wasn't being removed. Instead, the queue was traversed
through its iterator and the Iterator.remove() were being called only
upon a success. Hence further processing tasks got the same event that
kept failing.

- It was possible that a REMOVE event was queued while its corresponding
ADD event wasn't yet processed, leading to unnecessary logging of a failed
ADD event, since the object was already deleted from the catalog. Added a
check to verify whether a REMOVE event for the same object is scheduled
before complaining when processing a ADD/CHANGE event whose object is not
found in the catalog.

- The Hazelcast synchronizer was not aware of application lifecycle, so
when a node joined the cluster it starts trying to process events before
the application is fully loaded, and keeps processing events when the
application is shuttting down. Added a 'started' state to HzSynchronizer
that's set to true once the app is ready and to false when it's shutting down,
preventing the attempt to process events when the catalog is in a state of
flux, which is specially problematic with jdbcconfig. This avoids
ConcurrentModificationExceptions to be thrown while the synchronizer tries
to notify the catalog listeners which are being added/removed, and hence
may try to notify a catalog listener that's no longer valid.

- Homogeinized the log messages to contain the current and origin node ids
and a consistent representation of the events.

- Guarded the traversal of catalog listeners using a local copy since the
GeoSever's CatalogImpl.listeners list is not thread safe.
  • Loading branch information
groldan authored and jodygarnett committed Jan 15, 2015
1 parent dab86c0 commit df7eceb
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 87 deletions.
43 changes: 43 additions & 0 deletions src/main/java/org/geoserver/cluster/ConfigChangeEvent.java
@@ -1,5 +1,7 @@
package org.geoserver.cluster; package org.geoserver.cluster;


import static com.google.common.base.Objects.equal;
import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;


Expand Down Expand Up @@ -42,6 +44,8 @@
import org.geoserver.config.impl.LoggingInfoImpl; import org.geoserver.config.impl.LoggingInfoImpl;
import org.geoserver.config.impl.SettingsInfoImpl; import org.geoserver.config.impl.SettingsInfoImpl;


import com.google.common.base.Objects;

/** /**
* Event for * Event for
* @author Justin Deoliveira, OpenGeo * @author Justin Deoliveira, OpenGeo
Expand Down Expand Up @@ -113,6 +117,45 @@ public ConfigChangeEvent(String id, String name, Class<? extends Info> clazz, Ty
this.type = type; this.type = type;
} }


@Override
public String toString() {
StringBuilder sb = new StringBuilder(String.valueOf(type)).append(" ");

Serializable source = getSource();
if (source != null) {
sb.append('(').append(source).append(") ");
}

sb.append("[id:").append(id).append(", name:").append(name).append("]");
return sb.toString();
}

/**
* Equals is based on {@link #getObjectId() id}, {@link #getObjectName() name}, and
* {@link #getChangeType() changeType}. {@link #getObjectClass() class} is left off because it
* can be a proxy class and id/name/type are good enough anyways (given ids are unique, no two
* objects of different class can have the same id).
*/
@Override
public boolean equals(Object o) {
if (!(o instanceof ConfigChangeEvent)) {
return false;
}
ConfigChangeEvent e = (ConfigChangeEvent) o;
return equal(id, e.id) && equal(type, e.type);
}

/**
* Hash code is based on {@link #getObjectId() id}, {@link #getObjectName() name}, and
* {@link #getChangeType() changeType}. {@link #getObjectClass() class} is left off because it
* can be a proxy class and id/name/type are good enough anyways (given ids are unique, no two
* objects of different class can have the same id).
*/
@Override
public int hashCode() {
return Objects.hashCode(ConfigChangeEvent.class, id, name, type);
}

public String getObjectId() { public String getObjectId() {
return id; return id;
} }
Expand Down
24 changes: 22 additions & 2 deletions src/main/java/org/geoserver/cluster/Event.java
@@ -1,6 +1,9 @@
package org.geoserver.cluster; package org.geoserver.cluster;


import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Objects;


/** /**
* Base class for events to be signalled across the cluster. Carries an identifier for the * Base class for events to be signalled across the cluster. Carries an identifier for the
Expand All @@ -10,9 +13,9 @@ public class Event implements Serializable {


/** serialVersionUID */ /** serialVersionUID */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

Serializable source; Serializable source;

/** /**
* Set an identifier for the node on which the event originates. * Set an identifier for the node on which the event originates.
* @param source * @param source
Expand All @@ -29,4 +32,21 @@ public Serializable getSource() {
return source; return source;
} }


@Override
public String toString() {
return new StringBuilder("Event[").append(source).append(']').toString();
}

@Override
public int hashCode() {
return Objects.hashCode(Event.class, source);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Event)) {
return false;
}
return Objects.equal(source, ((Event) o).source);
}
} }
Expand Up @@ -17,7 +17,7 @@ public class GeoServerSynchronizer extends ConfigurationListenerAdapter


protected ClusterConfigWatcher configWatcher; protected ClusterConfigWatcher configWatcher;


public final void initialize(ClusterConfigWatcher configWatcher) { public void initialize(ClusterConfigWatcher configWatcher) {
this.configWatcher = configWatcher; this.configWatcher = configWatcher;
} }


Expand Down
@@ -1,5 +1,7 @@
package org.geoserver.cluster.hazelcast; package org.geoserver.cluster.hazelcast;


import static java.lang.String.format;

import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.util.Iterator; import java.util.Iterator;
Expand All @@ -17,7 +19,6 @@
import org.geoserver.catalog.StyleInfo; import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo; import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogAddEvent; import org.geoserver.catalog.event.CatalogAddEvent;
import org.geoserver.catalog.event.CatalogEvent;
import org.geoserver.catalog.event.CatalogListener; import org.geoserver.catalog.event.CatalogListener;
import org.geoserver.catalog.event.CatalogPostModifyEvent; import org.geoserver.catalog.event.CatalogPostModifyEvent;
import org.geoserver.catalog.event.CatalogRemoveEvent; import org.geoserver.catalog.event.CatalogRemoveEvent;
Expand All @@ -35,6 +36,8 @@
import org.geoserver.config.ServiceInfo; import org.geoserver.config.ServiceInfo;
import org.geoserver.config.SettingsInfo; import org.geoserver.config.SettingsInfo;


import com.google.common.collect.ImmutableList;

/** /**
* Synchronizer that converts cluster events and dispatches them the GeoServer config/catalog. * Synchronizer that converts cluster events and dispatches them the GeoServer config/catalog.
* <p> * <p>
Expand All @@ -54,47 +57,30 @@ public EventHzSynchronizer(HzCluster cluster, GeoServer gs) {
protected void processEventQueue(Queue<Event> q) throws Exception { protected void processEventQueue(Queue<Event> q) throws Exception {
Catalog cat = gs.getCatalog(); Catalog cat = gs.getCatalog();
Iterator<Event> it = q.iterator(); Iterator<Event> it = q.iterator();
while (it.hasNext()) { while (it.hasNext() && isStarted()) {
Event e = it.next(); final Event event = it.next();
if (e instanceof ConfigChangeEvent) { it.remove();
ConfigChangeEvent ce = (ConfigChangeEvent) e; LOGGER.fine(format("%s - Processing event %s", nodeId(), event));
if (event instanceof ConfigChangeEvent) {
ConfigChangeEvent ce = (ConfigChangeEvent) event;
Type t = ce.getChangeType(); Type t = ce.getChangeType();
Class<? extends Info> clazz = ce.getObjectInterface(); Class<? extends Info> clazz = ce.getObjectInterface();
String id = ce.getObjectId(); String id = ce.getObjectId();
String name = ce.getObjectName(); String name = ce.getObjectName();


if (CatalogInfo.class.isAssignableFrom(clazz)) { if (CatalogInfo.class.isAssignableFrom(clazz)) {
//catalog event //catalog event
CatalogInfo subj = null; CatalogInfo subj;
if (WorkspaceInfo.class.isAssignableFrom(clazz)) {
subj = cat.getWorkspace(id);
}
else if (NamespaceInfo.class.isAssignableFrom(clazz)) {
subj = cat.getNamespace(id);
}
else if (StoreInfo.class.isAssignableFrom(clazz)) {
subj = cat.getStore(id, (Class<StoreInfo>) clazz);
}
else if (ResourceInfo.class.isAssignableFrom(clazz)) {
subj = cat.getResource(id, (Class<ResourceInfo>) clazz);
}
else if (LayerInfo.class.isAssignableFrom(clazz)) {
subj = cat.getLayer(id);
}
else if (StyleInfo.class.isAssignableFrom(clazz)) {
subj = cat.getStyle(id);
}
else if (LayerGroupInfo.class.isAssignableFrom(clazz)) {
subj = cat.getLayerGroup(id);
}
Method notifyMethod; Method notifyMethod;
CatalogEventImpl evt; CatalogEventImpl evt;
switch(t) { switch(t) {
case ADD: case ADD:
subj = getCatalogInfo(cat, id, clazz);
notifyMethod = CatalogListener.class.getMethod("handleAddEvent", CatalogAddEvent.class); notifyMethod = CatalogListener.class.getMethod("handleAddEvent", CatalogAddEvent.class);
evt = new CatalogAddEventImpl(); evt = new CatalogAddEventImpl();
break; break;
case MODIFY: case MODIFY:
subj = getCatalogInfo(cat, id, clazz);
notifyMethod = CatalogListener.class.getMethod("handlePostModifyEvent", CatalogPostModifyEvent.class); notifyMethod = CatalogListener.class.getMethod("handlePostModifyEvent", CatalogPostModifyEvent.class);
evt = new CatalogPostModifyEventImpl(); evt = new CatalogPostModifyEventImpl();
break; break;
Expand All @@ -108,26 +94,37 @@ else if (LayerGroupInfo.class.isAssignableFrom(clazz)) {
throw new IllegalStateException("Should not happen"); throw new IllegalStateException("Should not happen");
} }


if (subj == null) { if (subj == null) {//can't happen if type == DELETE
ConfigChangeEvent removeEvent = new ConfigChangeEvent(id, name, clazz,
Type.REMOVE);
if (queue.contains(removeEvent)) {
LOGGER.fine(format("%s - Ignoring event %s, a remove is queued.",
nodeId(), event));
continue;
}
//this could be latency in the catalog itself, abort processing since //this could be latency in the catalog itself, abort processing since
// events need to processed in order and further events might depend // events need to processed in order and further events might depend
// on this event // on this event
LOGGER.warning(String.format("Received %s event for (%s, %s) but could" String message = format(
+ " not find in catalog", t.name(), id, clazz.getSimpleName())); "%s - Error processing event %s but object not found in catalog", nodeId(),
return; event);
LOGGER.warning(message);
continue;
} }


evt.setSource(subj); evt.setSource(subj);

try { try {
for (CatalogListener l:cat.getListeners()){ for (CatalogListener l: ImmutableList.copyOf(cat.getListeners())){
// Don't notify self otherwise the event bounces back out into the // Don't notify self otherwise the event bounces back out into the
// cluster. // cluster.
if(l!=this) notifyMethod.invoke(l, evt); if (l != this && isStarted()) {
notifyMethod.invoke(l, evt);
}
} }
} }
catch(Exception ex) { catch(Exception ex) {
LOGGER.log(Level.WARNING, "Event dispatch failed", ex); LOGGER.log(Level.WARNING,
format("%s - Event dispatch failed: %s", nodeId(), event), ex);
} }


} }
Expand Down Expand Up @@ -158,13 +155,32 @@ else if (LayerGroupInfo.class.isAssignableFrom(clazz)) {
if(l!=this) notifyMethod.invoke(l, subj); if(l!=this) notifyMethod.invoke(l, subj);
} }
catch(Exception ex) { catch(Exception ex) {
LOGGER.log(Level.WARNING, "Event dispatch failed", ex); LOGGER.log(Level.WARNING,
format("%s - Event dispatch failed: %s", nodeId(), event), ex);
} }
} }
} }
} }

it.remove();
} }
} }

private CatalogInfo getCatalogInfo(Catalog cat, String id, Class<? extends Info> clazz) {
CatalogInfo subj = null;
if (WorkspaceInfo.class.isAssignableFrom(clazz)) {
subj = cat.getWorkspace(id);
} else if (NamespaceInfo.class.isAssignableFrom(clazz)) {
subj = cat.getNamespace(id);
} else if (StoreInfo.class.isAssignableFrom(clazz)) {
subj = cat.getStore(id, (Class<StoreInfo>) clazz);
} else if (ResourceInfo.class.isAssignableFrom(clazz)) {
subj = cat.getResource(id, (Class<ResourceInfo>) clazz);
} else if (LayerInfo.class.isAssignableFrom(clazz)) {
subj = cat.getLayer(id);
} else if (StyleInfo.class.isAssignableFrom(clazz)) {
subj = cat.getStyle(id);
} else if (LayerGroupInfo.class.isAssignableFrom(clazz)) {
subj = cat.getLayerGroup(id);
}
return subj;
}
} }

0 comments on commit df7eceb

Please sign in to comment.