diff --git a/src/org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER.java b/src/org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER.java index fc0f34614ee..80b7e0f5b50 100644 --- a/src/org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER.java +++ b/src/org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER.java @@ -13,7 +13,9 @@ import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -25,10 +27,9 @@ /** * STREAMING_STATE_TRANSFER, as its name implies, allows a * streaming state transfer between two channel instances. - * *

* - * Major advantage of this approach is that transfering application state to a + * Major advantage of this approach is that transferring application state to a * joining member of a group does not entail loading of the complete application * state into memory. Application state, for example, might be located entirely * on some form of disk based storage. The default STATE_TRANSFER @@ -37,8 +38,14 @@ * does not. Thus STREAMING_STATE_TRANSFER protocol is able to * transfer application state that is very large (>1Gb) without a likelihood of * such transfer resulting in OutOfMemoryException. + *

* + * STREAMING_STATE_TRANSFER allows use of either default channel transport or + * separate tcp sockets for state transfer. If firewalls are not a concern then separate tcp sockets + * should be used as they offer faster state transfer. Transport for state transfer is selected using + * use_default_transport boolean property. *

+ * * * Channel instance can be configured with either * STREAMING_STATE_TRANSFER or STATE_TRANSFER but @@ -65,51 +72,80 @@ */ public class STREAMING_STATE_TRANSFER extends Protocol { - private final static String NAME = "STREAMING_STATE_TRANSFER"; + private final static String NAME="STREAMING_STATE_TRANSFER"; - private Address local_addr = null; - - @GuardedBy("members") - private final Vector

members = new Vector
(); + /* ----------------------------- Properties ------------------------------------ */ /* - * set to true while waiting for a STATE_RSP + * The interface (NIC) used to accept state requests */ - private boolean waiting_for_state_response = false; + private InetAddress bind_addr; /* - * JMX statistics - * + * The port listening for state requests. Default value of 0 binds to any (ephemeral) port */ - private AtomicInteger num_state_reqs = new AtomicInteger(0); + private int bind_port=0; - private AtomicLong num_bytes_sent = new AtomicLong(0); + /* + * Maximum number of pool threads serving state requests. Default is 5 + */ + private int max_pool=5; - private volatile double avg_state_size = 0; + /* + * Keep alive for pool threads serving state requests. Default is 20000 msec + */ + private long pool_thread_keep_alive = 20*1000; /* - * properties - * + * Buffer size for state transfer. Default is 8 KB */ - private InetAddress bind_addr; + private int socket_buffer_size=8 * 1024; + + /* + * If true default transport will be used for state transfer rather than seperate TCP sockets. + * Default is false. + */ + boolean use_default_transport = false; + + + /* --------------------------------------------- JMX statistics ------------------------------------------------------ */ - private int bind_port = 0; + private final AtomicInteger num_state_reqs=new AtomicInteger(0); - private int max_pool = 5; + private final AtomicLong num_bytes_sent=new AtomicLong(0); - private long pool_thread_keep_alive; + private volatile double avg_state_size=0; - private int socket_buffer_size = 8 * 1024; - private volatile boolean flushProtocolInStack = false; + /* --------------------------------------------- Fields ------------------------------------------------------ */ + private Address local_addr=null; + + @GuardedBy("members") + private final Vector
members; + /* - * plumbing to provide state - * + * BlockingQueue used for transfer of state Message(s) if default transport is used + * Only state recipient uses this queue + */ + private final BlockingQueue stateQueue; + + /* + * Runnable that listens for state requests and spawns threads to serve those requests + * if socket transport is used */ private StateProviderThreadSpawner spawner; - public STREAMING_STATE_TRANSFER(){} + /* + * Set to true if FLUSH protocol is detected in protocol stack + */ + private AtomicBoolean flushProtocolInStack= new AtomicBoolean(false); + + + public STREAMING_STATE_TRANSFER() { + members=new Vector
(); + stateQueue = new LinkedBlockingQueue(); + } public final String getName() { return NAME; @@ -128,7 +164,7 @@ public double getAverageStateSize() { } public Vector requiredDownServices() { - Vector retval = new Vector(); + Vector retval=new Vector(); retval.addElement(new Integer(Event.GET_DIGEST)); retval.addElement(new Integer(Event.SET_DIGEST)); return retval; @@ -138,7 +174,7 @@ public void resetStats() { super.resetStats(); num_state_reqs.set(0); num_bytes_sent.set(0); - avg_state_size = 0; + avg_state_size=0; } public boolean setProperties(Properties props) { @@ -167,6 +203,7 @@ public boolean setProperties(Properties props) { log.error("(bind_addr): host " + e.getLocalizedMessage() + " not known"); return false; } + use_default_transport = Util.parseBoolean(props, "use_default_transport", false); bind_port = Util.parseInt(props, "start_port", 0); socket_buffer_size = Util.parseInt(props, "socket_buffer_size", 8 * 1024); // 8K max_pool = Util.parseInt(props, "max_pool", 5); @@ -182,7 +219,7 @@ public boolean setProperties(Properties props) { public void init() throws Exception {} public void start() throws Exception { - Map map = new HashMap(); + Map map=new HashMap(); map.put("state_transfer", Boolean.TRUE); map.put("protocol_class", getClass().getName()); up_prot.up(new Event(Event.CONFIG, map)); @@ -190,113 +227,120 @@ public void start() throws Exception { public void stop() { super.stop(); - waiting_for_state_response = false; - if(spawner != null){ + if(spawner != null) { spawner.stop(); } } public Object up(Event evt) { - switch(evt.getType()){ - - case Event.MSG: - Message msg = (Message) evt.getArg(); - StateHeader hdr = (StateHeader) msg.getHeader(getName()); - if(hdr != null){ - switch(hdr.type){ - case StateHeader.STATE_REQ: - handleStateReq(hdr); - break; - case StateHeader.STATE_RSP: - handleStateRsp(hdr); - break; - default: - if(log.isErrorEnabled()) - log.error("type " + hdr.type + " not known in StateHeader"); - break; + switch(evt.getType()) { + + case Event.MSG: + Message msg=(Message)evt.getArg(); + StateHeader hdr=(StateHeader)msg.getHeader(getName()); + if(hdr != null) { + switch(hdr.type) { + case StateHeader.STATE_REQ: + handleStateReq(hdr); + break; + case StateHeader.STATE_RSP: + handleStateRsp(hdr); + break; + case StateHeader.STATE_PART: + try { + stateQueue.put(msg); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + break; + default: + if(log.isErrorEnabled()) + log.error("type " + hdr.type + " not known in StateHeader"); + break; + } + return null; } - return null; - } - break; + break; - case Event.SET_LOCAL_ADDRESS: - local_addr = (Address) evt.getArg(); - break; + case Event.SET_LOCAL_ADDRESS: + local_addr=(Address)evt.getArg(); + break; - case Event.TMP_VIEW: - case Event.VIEW_CHANGE: - handleViewChange((View) evt.getArg()); - break; + case Event.TMP_VIEW: + case Event.VIEW_CHANGE: + handleViewChange((View)evt.getArg()); + break; - case Event.CONFIG: - Map config = (Map) evt.getArg(); - if(bind_addr == null && (config != null && config.containsKey("bind_addr"))){ - bind_addr = (InetAddress) config.get("bind_addr"); - if(log.isDebugEnabled()) - log.debug("using bind_addr from CONFIG event " + bind_addr); - } - if(config != null && config.containsKey("state_transfer")){ - log.error("Protocol stack cannot contain two state transfer protocols. Remove either one of them"); - } - break; + case Event.CONFIG: + Map config=(Map)evt.getArg(); + if(bind_addr == null && (config != null && config.containsKey("bind_addr"))) { + bind_addr=(InetAddress)config.get("bind_addr"); + if(log.isDebugEnabled()) + log.debug("using bind_addr from CONFIG event " + bind_addr); + } + if(config != null && config.containsKey("state_transfer")) { + log.error("Protocol stack cannot contain two state transfer protocols. Remove either one of them"); + } + break; } return up_prot.up(evt); } public Object down(Event evt) { - switch(evt.getType()){ - - case Event.TMP_VIEW: - case Event.VIEW_CHANGE: - handleViewChange((View) evt.getArg()); - break; - - case Event.GET_STATE: - StateTransferInfo info = (StateTransferInfo) evt.getArg(); - Address target; - if(info.target == null){ - target = determineCoordinator(); - }else{ - target = info.target; - if(target.equals(local_addr)){ - if(log.isErrorEnabled()) - log.error("GET_STATE: cannot fetch state from myself !"); - target = null; + switch(evt.getType()) { + + case Event.TMP_VIEW: + case Event.VIEW_CHANGE: + handleViewChange((View)evt.getArg()); + break; + + case Event.GET_STATE: + StateTransferInfo info=(StateTransferInfo)evt.getArg(); + Address target; + if(info.target == null) { + target=determineCoordinator(); } - } - if(target == null){ - if(log.isDebugEnabled()) - log.debug("GET_STATE: first member (no state)"); - up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo())); - }else{ - Message state_req = new Message(target, null, null); - state_req.putHeader(NAME, new StateHeader(StateHeader.STATE_REQ, - local_addr, - info.state_id)); - if(log.isDebugEnabled()) - log.debug("GET_STATE: asking " + target - + " for state, passing down a SUSPEND_STABLE event, timeout=" - + info.timeout); + else { + target=info.target; + if(target.equals(local_addr)) { + if(log.isErrorEnabled()) + log.error("GET_STATE: cannot fetch state from myself !"); + target=null; + } + } + if(target == null) { + if(log.isDebugEnabled()) + log.debug("GET_STATE: first member (no state)"); + up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo())); + } + else { + Message state_req=new Message(target, null, null); + state_req.putHeader(getName(), new StateHeader(StateHeader.STATE_REQ, + local_addr, + info.state_id)); + if(log.isDebugEnabled()) + log.debug("GET_STATE: asking " + target + + " for state, passing down a SUSPEND_STABLE event, timeout=" + + info.timeout); + + down_prot.down(new Event(Event.SUSPEND_STABLE, new Long(info.timeout))); + down_prot.down(new Event(Event.MSG, state_req)); + } + return null; // don't pass down any further ! - down_prot.down(new Event(Event.SUSPEND_STABLE, new Long(info.timeout))); - waiting_for_state_response = true; - down_prot.down(new Event(Event.MSG, state_req)); - } - return null; // don't pass down any further ! + case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED: + if(log.isDebugEnabled()) + log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received,passing down a RESUME_STABLE event"); - case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED: - if(log.isDebugEnabled()) - log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received,passing down a RESUME_STABLE event"); - - down_prot.down(new Event(Event.RESUME_STABLE)); - return null; - case Event.CONFIG: - Map config = (Map) evt.getArg(); - if(config != null && config.containsKey("flush_supported")){ - flushProtocolInStack = true; - } - break; + down_prot.down(new Event(Event.RESUME_STABLE)); + return null; + case Event.CONFIG: + Map config=(Map)evt.getArg(); + if(config != null && config.containsKey("flush_supported")) { + flushProtocolInStack.set(true); + } + break; } @@ -317,7 +361,7 @@ public Object down(Event evt) { * @return true if use of digests is required, false otherwise */ private boolean isDigestNeeded() { - return !flushProtocolInStack; + return !flushProtocolInStack.get(); } private void respondToStateRequester(String id, Address stateRequester, boolean open_barrier) throws IOException{ @@ -329,49 +373,46 @@ private void respondToStateRequester(String id, Address stateRequester, boolean Thread t = getThreadFactory().newThread(spawner,"STREAMING_STATE_TRANSFER server socket acceptor"); t.start(); } - - Digest digest = null; - if(isDigestNeeded()){ - if(log.isDebugEnabled()) - log.debug("passing down GET_DIGEST"); - digest = (Digest) down_prot.down(Event.GET_DIGEST_EVT); - } - - Message state_rsp = new Message(stateRequester); - StateHeader hdr = new StateHeader(StateHeader.STATE_RSP, - local_addr, - spawner.getServerSocketAddress(), - digest, - id); - state_rsp.putHeader(NAME, hdr); - + + Digest digest=isDigestNeeded()? (Digest)down_prot.down(Event.GET_DIGEST_EVT) : null; + + Message state_rsp=new Message(stateRequester); + StateHeader hdr=new StateHeader(StateHeader.STATE_RSP, + local_addr, + use_default_transport?null:spawner.getServerSocketAddress(), + digest, + id); + state_rsp.putHeader(getName(), hdr); + if(log.isDebugEnabled()) log.debug("Responding to state requester " + state_rsp.getDest() + " with address " - + spawner.getServerSocketAddress() + + (use_default_transport?null:spawner.getServerSocketAddress()) + " and digest " + digest); down_prot.down(new Event(Event.MSG, state_rsp)); - if(stats){ + if(stats) { num_state_reqs.incrementAndGet(); } - if(open_barrier) - down_prot.down(new Event(Event.OPEN_BARRIER)); + down_prot.down(new Event(Event.OPEN_BARRIER)); + + if(use_default_transport){ + openAndProvideOutputStreamToStateRecipient(stateRequester, id); + } } private ThreadPoolExecutor setupThreadPool() { - ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, - max_pool, - pool_thread_keep_alive, - TimeUnit.MILLISECONDS, - new SynchronousQueue()); - - - ThreadFactory factory = new ThreadFactory() { + ThreadPoolExecutor threadPool=new ThreadPoolExecutor(0, + max_pool, + pool_thread_keep_alive, + TimeUnit.MILLISECONDS, + new SynchronousQueue()); + + ThreadFactory factory=new ThreadFactory() { public Thread newThread(final Runnable command) { - return getThreadFactory().newThread(command, "STREAMING_STATE_TRANSFER sender"); + return getThreadFactory().newThread(command, "STREAMING_STATE_TRANSFER sender"); } }; threadPool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(threadPool.getRejectedExecutionHandler())); @@ -380,9 +421,9 @@ public Thread newThread(final Runnable command) { } private Address determineCoordinator() { - synchronized(members){ - for(Address member:members){ - if(!local_addr.equals(member)){ + synchronized(members) { + for(Address member:members) { + if(!local_addr.equals(member)) { return member; } } @@ -391,45 +432,30 @@ private Address determineCoordinator() { } private void handleViewChange(View v) { - Address old_coord; - Vector
new_members = v.getMembers(); - boolean send_up_null_state_rsp = false; - - synchronized(members){ - old_coord = (!members.isEmpty() ? members.firstElement() : null); + Vector
new_members=v.getMembers(); + synchronized(members) { members.clear(); members.addAll(new_members); - - if(waiting_for_state_response && old_coord != null && !members.contains(old_coord)){ - send_up_null_state_rsp = true; - } - } - - if(send_up_null_state_rsp){ - log.warn("discovered that the state provider (" + old_coord - + ") crashed; will return null state to application"); } } private void handleStateReq(StateHeader hdr) { - Address sender = hdr.sender; - String id = hdr.state_id; - if(sender == null){ + Address sender=hdr.sender; + String id=hdr.state_id; + if(sender == null) { if(log.isErrorEnabled()) log.error("sender is null !"); return; - } + } - if(isDigestNeeded()) // FLUSH protocol is not present - { - down_prot.down(new Event(Event.CLOSE_BARRIER)); // drain (and block) - // incoming msgs - // until after state - // has been returned + if(isDigestNeeded()) { + down_prot.down(new Event(Event.CLOSE_BARRIER)); + // drain (and block) incoming msgs until after state has been returned } - try{ - respondToStateRequester(id,sender,isDigestNeeded()); - }catch(Throwable t){ + try { + respondToStateRequester(id, sender, isDigestNeeded()); + } + catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed fetching state from application", t); if(isDigestNeeded()) @@ -437,31 +463,78 @@ private void handleStateReq(StateHeader hdr) { } } - void handleStateRsp(StateHeader hdr) { - Digest tmp_digest = hdr.my_digest; - - waiting_for_state_response = false; - if(isDigestNeeded()){ - if(tmp_digest == null){ + void handleStateRsp(final StateHeader hdr) { + Digest tmp_digest=hdr.my_digest; + if(isDigestNeeded()) { + if(tmp_digest == null) { if(log.isWarnEnabled()) log.warn("digest received from " + hdr.sender + " is null, skipping setting digest !"); - }else{ + } + else { down_prot.down(new Event(Event.SET_DIGEST, tmp_digest)); } } - connectToStateProvider(hdr); + if(use_default_transport){ + //have to use another thread to read state while state recipient + //has to accept state messages from state provider + Thread t = getThreadFactory().newThread(new Runnable() { + public void run() { + openAndProvideInputStreamToStateProvider(hdr); + } + }, "STREAMING_STATE_TRANSFER state reader"); + t.start(); + } else{ + connectToStateProvider(hdr); + } + } + + private void openAndProvideInputStreamToStateProvider(StateHeader hdr) { + String tmp_state_id=hdr.getStateId(); + StateInputStream wrapper=null; + StateTransferInfo sti=null; + + try { + wrapper = new StateInputStream(); + sti = new StateTransferInfo(hdr.sender, new BufferedInputStream(wrapper,socket_buffer_size), tmp_state_id); + up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); + } catch (IOException e) { + // pass null stream up so that JChannel.getState() returns false + log.error("Could not provide state recipient with appropriate stream",e); + InputStream is = null; + sti = new StateTransferInfo(hdr.sender, is, tmp_state_id); + up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); + } finally { + Util.close(wrapper); + } + } + + public void openAndProvideOutputStreamToStateRecipient(Address stateRequester, String state_id) { + StateOutputStream wrapper=null; + try { + wrapper=new StateOutputStream(stateRequester,state_id); + StateTransferInfo sti=new StateTransferInfo(stateRequester, new BufferedOutputStream(wrapper,socket_buffer_size), state_id); + up_prot.up(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti)); + } + catch(IOException e) { + if(log.isWarnEnabled()) { + log.warn("StateOutputStream could not be given to application", e); + } + } + finally { + Util.close(wrapper); + } } private void connectToStateProvider(StateHeader hdr) { - IpAddress address = hdr.bind_addr; - String tmp_state_id = hdr.getStateId(); - StreamingInputStreamWrapper wrapper = null; - StateTransferInfo sti = null; - Socket socket = new Socket(); - try{ + IpAddress address=hdr.bind_addr; + String tmp_state_id=hdr.getStateId(); + StreamingInputStreamWrapper wrapper=null; + StateTransferInfo sti=null; + Socket socket=new Socket(); + try { socket.bind(new InetSocketAddress(bind_addr, 0)); - int bufferSize = socket.getReceiveBufferSize(); + int bufferSize=socket.getReceiveBufferSize(); socket.setReceiveBufferSize(socket_buffer_size); if(log.isDebugEnabled()) log.debug("Connecting to state provider " + address.getIpAddress() @@ -479,31 +552,33 @@ private void connectToStateProvider(StateHeader hdr) { + " passing inputstream up..."); // write out our state_id and address - ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); + ObjectOutputStream out=new ObjectOutputStream(socket.getOutputStream()); out.writeObject(tmp_state_id); out.writeObject(local_addr); - wrapper = new StreamingInputStreamWrapper(socket); - sti = new StateTransferInfo(hdr.sender, wrapper, tmp_state_id); - up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); - }catch(IOException e){ - if(log.isWarnEnabled()){ + wrapper=new StreamingInputStreamWrapper(socket); + sti=new StateTransferInfo(hdr.sender, wrapper, tmp_state_id); + up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); + } + catch(IOException e) { + if(log.isWarnEnabled()) { log.warn("State reader socket thread spawned abnormaly", e); } // pass null stream up so that JChannel.getState() returns false - InputStream is = null; - sti = new StateTransferInfo(hdr.sender, is, tmp_state_id); - up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); - }finally{ - if(!socket.isConnected()){ + InputStream is=null; + sti=new StateTransferInfo(hdr.sender, is, tmp_state_id); + up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti)); + } + finally { + if(!socket.isConnected()) { if(log.isWarnEnabled()) - log.warn("Could not connect to state provider. Closing socket..."); + log.warn("Could not connect to state provider. Closing socket..."); } Util.close(wrapper); Util.close(socket); - } - } + } + } /* * ------------------------ End of Private Methods @@ -511,33 +586,33 @@ private void connectToStateProvider(StateHeader hdr) { */ private class StateProviderThreadSpawner implements Runnable { - ExecutorService pool; + private final ExecutorService pool; - ServerSocket serverSocket; + private final ServerSocket serverSocket; - IpAddress address; + private final IpAddress address; Thread runner; - volatile boolean running = true; + private volatile boolean running=true; - public StateProviderThreadSpawner(ExecutorService pool,ServerSocket stateServingSocket){ + public StateProviderThreadSpawner(ExecutorService pool,ServerSocket stateServingSocket) { super(); - this.pool = pool; - this.serverSocket = stateServingSocket; - this.address = new IpAddress(STREAMING_STATE_TRANSFER.this.bind_addr, - serverSocket.getLocalPort()); + this.pool=pool; + this.serverSocket=stateServingSocket; + this.address=new IpAddress(STREAMING_STATE_TRANSFER.this.bind_addr, + serverSocket.getLocalPort()); } public void run() { - runner = Thread.currentThread(); - for(;running;){ - try{ + runner=Thread.currentThread(); + for(;running;) { + try { if(log.isDebugEnabled()) log.debug("StateProviderThreadSpawner listening at " + getServerSocketAddress() + "..."); - final Socket socket = serverSocket.accept(); + final Socket socket=serverSocket.accept(); pool.execute(new Runnable() { public void run() { if(log.isDebugEnabled()) @@ -549,11 +624,12 @@ public void run() { } }); - }catch(IOException e){ - if(log.isWarnEnabled()){ + } + catch(IOException e) { + if(log.isWarnEnabled()) { // we get this exception when we close server socket // exclude that case - if(serverSocket != null && !serverSocket.isClosed()){ + if(serverSocket != null && !serverSocket.isClosed()) { log.warn("Spawning socket from server socket finished abnormaly", e); } } @@ -566,20 +642,20 @@ public IpAddress getServerSocketAddress() { } public void stop() { - running = false; - try{ - if(serverSocket != null && !serverSocket.isClosed()){ - serverSocket.close(); - } - }catch(IOException e){ - }finally{ + running=false; + try { + serverSocket.close(); + } + catch(Exception ignored) {} + finally { if(log.isDebugEnabled()) log.debug("Waiting for StateProviderThreadSpawner to die ... "); - if(runner != null){ - try{ - runner.join(3000); - }catch(InterruptedException ignored){ + if(runner != null) { + try { + runner.join(Global.THREAD_SHUTDOWN_WAIT_TIME); + } + catch(InterruptedException ignored) { Thread.currentThread().interrupt(); } } @@ -588,10 +664,11 @@ public void stop() { log.debug("Shutting the thread pool down... "); pool.shutdownNow(); - try{ + try { pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS); - }catch(InterruptedException ignored){ + } + catch(InterruptedException ignored) { Thread.currentThread().interrupt(); } } @@ -602,10 +679,10 @@ public void stop() { private class StateProviderHandler { public void process(Socket socket) { - StreamingOutputStreamWrapper wrapper = null; - ObjectInputStream ois = null; - try{ - int bufferSize = socket.getSendBufferSize(); + StreamingOutputStreamWrapper wrapper=null; + ObjectInputStream ois=null; + try { + int bufferSize=socket.getSendBufferSize(); socket.setSendBufferSize(socket_buffer_size); if(log.isDebugEnabled()) log.debug("Running on " + Thread.currentThread() @@ -618,43 +695,46 @@ public void process(Socket socket) { + " and was reset to " + socket.getSendBufferSize() + ", passing outputstream up... "); - - ois = new ObjectInputStream(socket.getInputStream()); - String state_id = (String) ois.readObject(); - Address stateRequester = (Address) ois.readObject(); - wrapper = new StreamingOutputStreamWrapper(socket); - StateTransferInfo sti = new StateTransferInfo(stateRequester, wrapper, state_id); + + ois=new ObjectInputStream(socket.getInputStream()); + String state_id=(String)ois.readObject(); + Address stateRequester=(Address)ois.readObject(); + wrapper=new StreamingOutputStreamWrapper(socket); + StateTransferInfo sti=new StateTransferInfo(stateRequester, wrapper, state_id); up_prot.up(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti)); - }catch(IOException e){ - if(log.isWarnEnabled()){ + } + catch(IOException e) { + if(log.isWarnEnabled()) { log.warn("State writer socket thread spawned abnormaly", e); } - }catch(ClassNotFoundException e){ + } + catch(ClassNotFoundException e) { // thrown by ois.readObject() // should never happen since String/Address are core classes - }finally{ - if(!socket.isConnected()){ + } + finally { + if(!socket.isConnected()) { if(log.isWarnEnabled()) - log.warn("Could not receive connection from state receiver. Closing socket..."); - } - Util.close(wrapper); - Util.close(socket); + log.warn("Could not receive connection from state receiver. Closing socket..."); + } + Util.close(wrapper); + Util.close(socket); } } } - private class StreamingInputStreamWrapper extends InputStream { + private class StreamingInputStreamWrapper extends InputStream { - private InputStream delegate; - - private Socket inputStreamOwner; - - private final AtomicBoolean closed = new AtomicBoolean(false); + private final InputStream delegate; + + private final Socket inputStreamOwner; + + private final AtomicBoolean closed=new AtomicBoolean(false); - public StreamingInputStreamWrapper(Socket inputStreamOwner) throws IOException{ - super(); - this.inputStreamOwner = inputStreamOwner; - this.delegate = new BufferedInputStream(inputStreamOwner.getInputStream()); + public StreamingInputStreamWrapper(Socket inputStreamOwner) throws IOException { + super(); + this.inputStreamOwner=inputStreamOwner; + this.delegate=new BufferedInputStream(inputStreamOwner.getInputStream()); } public int available() throws IOException { @@ -662,14 +742,14 @@ public int available() throws IOException { } public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - if (log.isDebugEnabled()) { + if(closed.compareAndSet(false, true)) { + if(log.isDebugEnabled()) { log.debug("State reader is closing the socket "); } Util.close(delegate); Util.close(inputStreamOwner); up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); - down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); + down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); } } @@ -703,34 +783,35 @@ public long skip(long n) throws IOException { } private class StreamingOutputStreamWrapper extends OutputStream { - private Socket outputStreamOwner; + private final Socket outputStreamOwner; - private OutputStream delegate; - - private final AtomicBoolean closed = new AtomicBoolean(false); + private final OutputStream delegate; - private long bytesWrittenCounter = 0; + private final AtomicBoolean closed=new AtomicBoolean(false); - public StreamingOutputStreamWrapper(Socket outputStreamOwner) throws IOException{ + private long bytesWrittenCounter=0; + + public StreamingOutputStreamWrapper(Socket outputStreamOwner) throws IOException { super(); - this.outputStreamOwner = outputStreamOwner; - this.delegate = new BufferedOutputStream(outputStreamOwner.getOutputStream()); + this.outputStreamOwner=outputStreamOwner; + this.delegate=new BufferedOutputStream(outputStreamOwner.getOutputStream()); } public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - if (log.isDebugEnabled()) { + if(closed.compareAndSet(false, true)) { + if(log.isDebugEnabled()) { log.debug("State writer is closing the socket "); } Util.close(delegate); Util.close(outputStreamOwner); up_prot.up(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); - down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); + down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); if (stats) { - avg_state_size = num_bytes_sent.addAndGet(bytesWrittenCounter) / num_state_reqs.doubleValue(); - } + avg_state_size = num_bytes_sent.addAndGet(bytesWrittenCounter) + / num_state_reqs.doubleValue(); + } } } @@ -740,67 +821,200 @@ public void flush() throws IOException { public void write(byte[] b, int off, int len) throws IOException { delegate.write(b, off, len); - bytesWrittenCounter += len; + bytesWrittenCounter+=len; } public void write(byte[] b) throws IOException { delegate.write(b); - if(b != null){ - bytesWrittenCounter += b.length; + if(b != null) { + bytesWrittenCounter+=b.length; } } public void write(int b) throws IOException { delegate.write(b); - bytesWrittenCounter += 1; + bytesWrittenCounter+=1; } } + + private class StateInputStream extends InputStream { + + private final AtomicBoolean closed; + + public StateInputStream() throws IOException { + super(); + this.closed = new AtomicBoolean(false); + } + + public void close() throws IOException { + if(closed.compareAndSet(false, true)) { + if(log.isDebugEnabled()) { + log.debug("State reader is closing the stream"); + } + stateQueue.clear(); + up(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); + down(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED)); + super.close(); + } + } - public static class StateHeader extends Header implements Streamable { - public static final byte STATE_REQ = 1; + public int read() throws IOException { + if(closed.get()) return -1; + final byte[] array = new byte[1]; + return read(array); + } - public static final byte STATE_RSP = 2; + public int read(byte[] b, int off, int len) throws IOException { + if(closed.get()) return -1; + Message m = null; + try { + m = stateQueue.take(); + StateHeader hdr=(StateHeader)m.getHeader(getName()); + if(hdr.type == StateHeader.STATE_PART){ + return readAndTransferPayload(m,b,off,len); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return -1; + } + return -1; + } + + private int readAndTransferPayload(Message m, byte[] b, int off, int len) { + byte[] buffer = m.getBuffer(); + if (log.isDebugEnabled()) { + log.debug(local_addr + " reading chunk of state " + + "byte[] b=" + b.length + ", off=" + off + ", buffer.length=" + + buffer.length); + } + System.arraycopy(buffer, 0, b, off, buffer.length); + return buffer.length; + } + + public int read(byte[] b) throws IOException { + if(closed.get()) return -1; + return read(b,0,b.length); + } + } + + private class StateOutputStream extends OutputStream { + + private final Address stateRequester; + private final String state_id; + private final AtomicBoolean closed; + private long bytesWrittenCounter=0; + + public StateOutputStream(Address stateRequester, String state_id) throws IOException { + super(); + this.stateRequester=stateRequester; + this.state_id=state_id; + this.closed=new AtomicBoolean(false); + } + + public void close() throws IOException { + if(closed.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug("State writer " + local_addr + + " is closing the output stream for state_id " + + state_id); + } + up(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); + down(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED)); + if (stats) { + avg_state_size = num_bytes_sent.addAndGet(bytesWrittenCounter) + / num_state_reqs.doubleValue(); + } + super.close(); + } + } + + public void write(byte[] b, int off, int len) throws IOException { + if(closed.get()) throw closed(); + sendMessage(b,off,len); + } - long id = 0; // state transfer ID (to separate multiple state + public void write(byte[] b) throws IOException { + if(closed.get()) throw closed(); + sendMessage(b, 0, b.length); + } - // transfers at the same time) + public void write(int b) throws IOException { + if(closed.get()) throw closed(); + byte buf [] = new byte[1]; + write(buf); + } + + private void sendMessage(byte[] b, int off, int len) throws IOException{ + Message m = new Message(stateRequester); + m.putHeader(getName(), new StateHeader(StateHeader.STATE_PART,local_addr,state_id)); + m.setBuffer(b, off, len); + bytesWrittenCounter+=(len-off); + if (Thread.interrupted()) { + throw interrupted((int)bytesWrittenCounter); + } + down_prot.down(new Event(Event.MSG, m)); + if (log.isDebugEnabled()) { + log.debug(local_addr + " sent chunk of state to " + + stateRequester + "byte[] b=" + b.length + ", off=" + + off + ", len=" + len); + } + } + + private IOException closed() { + return new IOException("The output stream is closed"); + } - byte type = 0; + private InterruptedIOException interrupted(int cnt) { + final InterruptedIOException ex = new InterruptedIOException(); + ex.bytesTransferred = cnt; + return ex; + } + } + + public static class StateHeader extends Header implements Streamable { + public static final byte STATE_REQ=1; + + public static final byte STATE_RSP=2; + + public static final byte STATE_PART=3; + + long id=0; // state transfer ID (to separate multiple state transfers at the same time) + + byte type=0; Address sender; // sender of state STATE_REQ or STATE_RSP - Digest my_digest = null; // digest of sender (if type is STATE_RSP) + Digest my_digest=null; // digest of sender (if type is STATE_RSP) - IpAddress bind_addr = null; + IpAddress bind_addr=null; - String state_id = null; // for partial state transfer + String state_id=null; // for partial state transfer - public StateHeader(){ // for externalization - } + public StateHeader() {} // for externalization - public StateHeader(byte type,Address sender,String state_id){ - this.type = type; - this.sender = sender; - this.state_id = state_id; + public StateHeader(byte type,Address sender,String state_id) { + this.type=type; + this.sender=sender; + this.state_id=state_id; } - public StateHeader(byte type,Address sender,long id,Digest digest){ - this.type = type; - this.sender = sender; - this.id = id; - this.my_digest = digest; + public StateHeader(byte type,Address sender,long id,Digest digest) { + this.type=type; + this.sender=sender; + this.id=id; + this.my_digest=digest; } public StateHeader(byte type, Address sender, IpAddress bind_addr, Digest digest, - String state_id){ - this.type = type; - this.sender = sender; - this.my_digest = digest; - this.bind_addr = bind_addr; - this.state_id = state_id; + String state_id) { + this.type=type; + this.sender=sender; + this.my_digest=digest; + this.bind_addr=bind_addr; + this.state_id=state_id; } public int getType() { @@ -818,10 +1032,10 @@ public String getStateId() { public boolean equals(Object o) { StateHeader other; - if(sender != null && o != null){ + if(sender != null && o != null) { if(!(o instanceof StateHeader)) return false; - other = (StateHeader) o; + other=(StateHeader)o; return sender.equals(other.sender) && id == other.id; } return false; @@ -829,13 +1043,13 @@ public boolean equals(Object o) { public int hashCode() { if(sender != null) - return sender.hashCode() + (int) id; + return sender.hashCode() + (int)id; else - return (int) id; + return (int)id; } public String toString() { - StringBuilder sb = new StringBuilder(); + StringBuilder sb=new StringBuilder(); sb.append("type=").append(type2Str(type)); if(sender != null) sb.append(", sender=").append(sender).append(" id=").append(id); @@ -845,13 +1059,15 @@ public String toString() { } static String type2Str(int t) { - switch(t){ - case STATE_REQ: - return "STATE_REQ"; - case STATE_RSP: - return "STATE_RSP"; - default: - return ""; + switch(t) { + case STATE_REQ: + return "STATE_REQ"; + case STATE_RSP: + return "STATE_RSP"; + case STATE_PART: + return "STATE_PART"; + default: + return ""; } } @@ -864,13 +1080,13 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(state_id); } - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - sender = (Address) in.readObject(); - id = in.readLong(); - type = in.readByte(); - my_digest = (Digest) in.readObject(); - bind_addr = (IpAddress) in.readObject(); - state_id = in.readUTF(); + public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + sender=(Address)in.readObject(); + id=in.readLong(); + type=in.readByte(); + my_digest=(Digest)in.readObject(); + bind_addr=(IpAddress)in.readObject(); + state_id=in.readUTF(); } public void writeTo(DataOutputStream out) throws IOException { @@ -885,28 +1101,28 @@ public void writeTo(DataOutputStream out) throws IOException { public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { - type = in.readByte(); - id = in.readLong(); - sender = Util.readAddress(in); - my_digest = (Digest) Util.readStreamable(Digest.class, in); - bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, in); - state_id = Util.readString(in); + type=in.readByte(); + id=in.readLong(); + sender=Util.readAddress(in); + my_digest=(Digest)Util.readStreamable(Digest.class, in); + bind_addr=(IpAddress)Util.readStreamable(IpAddress.class, in); + state_id=Util.readString(in); } public int size() { - int retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type + int retval=Global.LONG_SIZE + Global.BYTE_SIZE; // id and type - retval += Util.size(sender); + retval+=Util.size(sender); - retval += Global.BYTE_SIZE; // presence byte for my_digest + retval+=Global.BYTE_SIZE; // presence byte for my_digest if(my_digest != null) - retval += my_digest.serializedSize(); - - retval += Util.size(bind_addr); + retval+=my_digest.serializedSize(); + + retval+=Util.size(bind_addr); - retval += Global.BYTE_SIZE; // presence byte for state_id + retval+=Global.BYTE_SIZE; // presence byte for state_id if(state_id != null) - retval += state_id.length() + 2; + retval+=state_id.length() + 2; return retval; } }