Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

MODE-1852 Correct deserialization of events over JGroups channel

JGroups needs to know about the classloader that loads the ModeShape
event classes. The easiest way to do this is to extend ObjectInputStream
to supply and use a classloader for deserialization. This is especially
important in AS7/EAP and OSGi, where a single classloader is not used
for all the components.
  • Loading branch information...
commit b661398a364ade973350d796b7b63edb14c17841 1 parent 04037ff
@panghy panghy authored rhauch committed
View
3  modeshape-jcr/src/main/java/org/modeshape/jcr/JcrRepository.java
@@ -1733,7 +1733,8 @@ protected ChangeBus createBus( RepositoryConfiguration.Clustering clusteringConf
boolean separateThreadForSystemWorkspace ) {
RepositoryChangeBus standaloneBus = new RepositoryChangeBus(executor, systemWorkspaceName,
separateThreadForSystemWorkspace);
- return clusteringConfiguration.isEnabled() ? new ClusteredRepositoryChangeBus(clusteringConfiguration, standaloneBus) : standaloneBus;
+ return clusteringConfiguration.isEnabled() ? new ClusteredRepositoryChangeBus(clusteringConfiguration,
+ standaloneBus) : standaloneBus;
}
void suspendExistingUserTransaction() throws SystemException {
View
59 modeshape-jcr/src/main/java/org/modeshape/jcr/bus/ClusteredRepositoryChangeBus.java
@@ -24,6 +24,13 @@
package org.modeshape.jcr.bus;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jgroups.Address;
import org.jgroups.Channel;
@@ -31,11 +38,10 @@
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
-import org.jgroups.util.Util;
import org.modeshape.common.SystemFailureException;
import org.modeshape.common.annotation.ThreadSafe;
-import org.modeshape.common.util.CheckArg;
import org.modeshape.common.logging.Logger;
+import org.modeshape.common.util.CheckArg;
import org.modeshape.jcr.RepositoryConfiguration;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.cache.change.ChangeSetListener;
@@ -44,7 +50,7 @@
/**
* Implementation of a {@link ChangeBus} which can run in a cluster, via JGroups. This bus wraps around another bus, to which it
* delegates all "local" processing of events.
- *
+ *
* @author Horia Chiorean
*/
@ThreadSafe
@@ -155,7 +161,7 @@ public boolean hasObservers() {
/**
* Return whether this bus has been {@link #start() started} and not yet {@link #shutdown() shut down}.
- *
+ *
* @return true if {@link #start()} has been called but {@link #shutdown()} has not, or false otherwise
*/
public boolean isStarted() {
@@ -263,12 +269,20 @@ public boolean unregister( ChangeSetListener observer ) {
return delegate.unregister(observer);
}
- protected static byte[] serialize( ChangeSet changes ) throws Exception {
- return Util.objectToByteBuffer(changes);
+ protected byte[] serialize( ChangeSet changes ) throws Exception {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ ObjectOutputStream stream = new ObjectOutputStream(output);
+ stream.writeObject(changes);
+ stream.close();
+ return output.toByteArray();
}
- protected static ChangeSet deserialize( byte[] data ) throws Exception {
- return (ChangeSet)Util.objectFromByteBuffer(data);
+ protected ChangeSet deserialize( byte[] data ) throws Exception {
+ ObjectInputStreamWithClassLoader input = new ObjectInputStreamWithClassLoader(
+ new ByteArrayInputStream(data), getClass().getClassLoader());
+ ChangeSet toReturn = (ChangeSet) input.readObject();
+ input.close();
+ return toReturn;
}
protected final class Receiver extends ReceiverAdapter {
@@ -279,11 +293,11 @@ public void block() {
}
@Override
- public void receive( Message message ) {
+ public void receive( final Message message ) {
if (!hasObservers()) {
return;
}
- // We have at least one observer ...
+ // We have at least one
try {
// Deserialize the changes ...
ChangeSet changes = deserialize(message.getBuffer());
@@ -335,4 +349,29 @@ public void channelDisconnected( Channel channel ) {
isOpen.set(false);
}
}
+
+ /**
+ * ObjectInputStream extention that allows a different class loader to be used when resolving types.
+ */
+ protected final class ObjectInputStreamWithClassLoader extends ObjectInputStream {
+
+ private final ClassLoader cl;
+
+ public ObjectInputStreamWithClassLoader(InputStream in, ClassLoader cl) throws IOException {
+ super(in);
+ this.cl = cl;
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ if (cl == null) {
+ return super.resolveClass(desc);
+ }
+ try {
+ return Class.forName(desc.getName(), false, cl);
+ } catch (ClassNotFoundException ex) {
+ return super.resolveClass(desc);
+ }
+ }
+ }
}
View
6 modeshape-jcr/src/test/java/org/modeshape/jcr/bus/ClusteredRepositoryChangeBusTest.java
@@ -25,6 +25,8 @@
package org.modeshape.jcr.bus;
import static org.hamcrest.core.Is.is;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.MarshallingConfiguration;
import org.junit.AfterClass;
import static org.junit.Assert.assertThat;
import org.junit.BeforeClass;
@@ -37,6 +39,7 @@
import org.modeshape.jcr.clustering.DefaultChannelProvider;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
/**
* Unit test for {@link ClusteredRepositoryChangeBus}
@@ -336,7 +339,8 @@ public void shouldSendChangeSetThroughRealJGroupsCluster() throws Exception {
}
private ClusteredRepositoryChangeBus startNewBus( String name) throws Exception {
- ClusteredRepositoryChangeBus bus = new ClusteredRepositoryChangeBus(createClusteringConfiguration(name), super.createRepositoryChangeBus());
+ ClusteredRepositoryChangeBus bus = new ClusteredRepositoryChangeBus(createClusteringConfiguration(name),
+ super.createRepositoryChangeBus());
bus.start();
buses.add(bus);
return bus;
Please sign in to comment.
Something went wrong with that request. Please try again.