Skip to content

Commit

Permalink
HSEARCH-1296 Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanne committed Apr 17, 2013
1 parent 5e8bf9b commit 1414939
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 192 deletions.

This file was deleted.

This file was deleted.

Expand Up @@ -24,22 +24,43 @@

package org.hibernate.search.backend.impl.jgroups;

import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;

/**
* Channel message sender.
* We use the MessageDispatcher instead of the JChannel to be able to use blocking
* operations (optionally) without having to rely on the RSVP protocol
* being configured on the stack.
*
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2013 Red Hat Inc.
*/
class DispatcherMessageSender extends AbstractMessageSender {
final class DispatcherMessageSender implements MessageSender {

private static final Log log = LoggerFactory.make();

private final MessageDispatcher dispatcher;
private final Channel channel;

DispatcherMessageSender(final MessageDispatcher dispatcher) {
super( dispatcher.getChannel() );
this.dispatcher = dispatcher;
this.channel = dispatcher.getChannel();
}

public Address getAddress() {
return channel.getAddress();
}

public View getView() {
return channel.getView();
}

public void stop() {
Expand All @@ -48,10 +69,28 @@ public void stop() {

public void send(final Message message, final boolean synchronous) throws Exception {
final RequestOptions options = synchronous ? RequestOptions.SYNC() : RequestOptions.ASYNC();
dispatcher.castMessage( null, message, options );
options.setExclusionList( dispatcher.getChannel().getAddress() );
RspList<Object> rspList = dispatcher.castMessage( null, message, options );
//JGroups won't throw these automatically as it would with a JChannel usage,
//so we provide the same semantics by throwing the JGroups specific exceptions
//as appropriate
if ( synchronous ) {
for ( Rsp rsp : rspList.values() ) {
if ( !rsp.wasReceived() ) {
if ( rsp.wasSuspected() ) {
throw log.jgroupsSuspectingPeer( rsp.getSender() );
}
else {
throw log.jgroupsRpcTimeout( rsp.getSender() );
}
}
else {
if ( rsp.hasException() ) {
throw log.jgroupsRemoteException( rsp.getSender(), rsp.getException(), rsp.getException() );
}
}
}
}
}

@Override
public void start() {
}
}

0 comments on commit 1414939

Please sign in to comment.