Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

replaced setState(byte[]) and byte[] getState() callback implementati…

…ons with corresponding streamed versions
  • Loading branch information...
commit 2390a004c0973fb49b9c3b9a870107682c8ed9a0 1 parent ef7416e
@belaban authored
View
4 doc/API_Changes.txt
@@ -102,8 +102,10 @@ API changes in 3.0.0
- State transfer API changes:
---------------------------
+ - Removed byte[] getState() and void setState(byte[] data) callbacks in MessageListener and Receiver. Use the callbacks
+ which use input and output streams instead
-
+
View
35 doc/tutorial/en/code/SimpleChat.java
@@ -4,10 +4,9 @@
import org.jgroups.View;
import org.jgroups.util.Util;
+import java.io.*;
import java.util.List;
import java.util.LinkedList;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
public class SimpleChat extends ReceiverAdapter {
JChannel channel;
@@ -26,35 +25,25 @@ public void receive(Message msg) {
}
}
- public byte[] getState() {
+ public void getState(OutputStream output) throws Exception {
synchronized(state) {
- try {
- return Util.objectToByteBuffer(state);
- }
- catch(Exception e) {
- e.printStackTrace();
- return null;
- }
+ Util.objectToStream(state, new DataOutputStream(output));
}
}
- public void setState(byte[] new_state) {
- try {
- List<String> list=(List<String>)Util.objectFromByteBuffer(new_state);
- synchronized(state) {
- state.clear();
- state.addAll(list);
- }
- System.out.println("received state (" + list.size() + " messages in chat history):");
- for(String str: list) {
- System.out.println(str);
- }
+ public void setState(InputStream input) throws Exception {
+ List<String> list=(List<String>)Util.objectFromStream(new DataInputStream(input));
+ synchronized(state) {
+ state.clear();
+ state.addAll(list);
}
- catch(Exception e) {
- e.printStackTrace();
+ System.out.println("received state (" + list.size() + " messages in chat history):");
+ for(String str: list) {
+ System.out.println(str);
}
}
+
private void start() throws Exception {
channel=new JChannel();
channel.setReceiver(this);
View
2  src/org/jgroups/Channel.java
@@ -377,8 +377,6 @@ public Receiver getReceiver() {
*
* @see MessageListener#getState(java.io.OutputStream)
* @see MessageListener#setState(java.io.InputStream)
- * @see MessageListener#getState()
- * @see MessageListener#setState(byte[])
*
* @return true If the state transfer was successful, false otherwise
* @exception ChannelNotConnectedException The channel must be connected to receive messages.
View
11 src/org/jgroups/MessageListener.java
@@ -13,16 +13,7 @@
* @param msg
*/
void receive(Message msg);
- /**
- * Answers the group state; e.g., when joining.
- * @return byte[]
- */
- byte[] getState();
- /**
- * Sets the group state; e.g., when joining.
- * @param state
- */
- void setState(byte[] state);
+
/**
* Allows an application to write a state through a provided OutputStream. When done, the OutputStream doesn't need
View
11 src/org/jgroups/ReceiverAdapter.java
@@ -11,17 +11,10 @@
public void receive(Message msg) {
}
- public byte[] getState() {
- return null;
+ public void getState(OutputStream output) throws Exception {
}
- public void setState(byte[] state) {
- }
-
- public void getState(OutputStream ostream) throws Exception {
- }
-
- public void setState(InputStream istream) throws Exception {
+ public void setState(InputStream input) throws Exception {
}
public void viewAccepted(View view) {
View
38 src/org/jgroups/blocks/ReplicatedHashMap.java
@@ -536,43 +536,7 @@ public V _replace(K key, V value) {
public void receive(Message msg) {}
- public byte[] getState() {
- K key;
- V val;
- Map<K,V> copy=new HashMap<K,V>();
-
- for(Map.Entry<K,V> entry:entrySet()) {
- key=entry.getKey();
- val=entry.getValue();
- copy.put(key, val);
- }
- try {
- return Util.objectToByteBuffer(copy);
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled())
- log.error("exception marshalling state: " + ex);
- return null;
- }
- }
-
- public void setState(byte[] new_state) {
- HashMap<K,V> new_copy;
-
- try {
- new_copy=(HashMap<K,V>)Util.objectFromByteBuffer(new_state);
- if(new_copy == null)
- return;
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled())
- log.error("exception unmarshalling state: " + ex);
- return;
- }
- _putAll(new_copy);
- state_promise.setResult(Boolean.TRUE);
- }
-
+
public void getState(OutputStream ostream) throws Exception {
K key;
View
35 src/org/jgroups/blocks/ReplicatedTree.java
@@ -10,7 +10,7 @@
import org.jgroups.util.Util;
import javax.management.MBeanServer;
-import java.io.Serializable;
+import java.io.*;
import java.util.*;
@@ -560,35 +560,16 @@ public void receive(Message msg) {
}
}
- /** Return a copy of the current cache (tree) */
- public byte[] getState() {
- try {
- return Util.objectToByteBuffer(root.clone());
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) log.error("exception returning cache: " + ex);
- return null;
- }
+
+ public void getState(OutputStream ostream) throws Exception {
+ Util.objectToStream(root.clone(), new DataOutputStream(ostream));
}
- /** Set the cache (tree) to this value */
- public void setState(byte[] new_state) {
- Node new_root=null;
- Object obj;
- if(new_state == null) {
- if(log.isInfoEnabled()) log.info("new cache is null");
- return;
- }
- try {
- obj=Util.objectFromByteBuffer(new_state);
- new_root=(Node)((Node)obj).clone();
- root=new_root;
- notifyAllNodesCreated(root);
- }
- catch(Throwable ex) {
- if(log.isErrorEnabled()) log.error("could not set cache: " + ex);
- }
+ public void setState(InputStream istream) throws Exception {
+ Object obj=Util.objectFromStream(new DataInputStream(istream));
+ root=(Node)((Node)obj).clone();
+ notifyAllNodesCreated(root);
}
/*-------------------- End of MessageListener ----------------------*/
View
39 src/org/jgroups/demos/Draw.java
@@ -308,15 +308,6 @@ public void viewAccepted(View v) {
}
- public byte[] getState() {
- return panel.getState();
- }
-
- public void setState(byte[] state) {
- panel.setState(state);
- }
-
-
public void getState(OutputStream ostream) throws Exception {
panel.writeState(ostream);
}
@@ -436,36 +427,6 @@ public void componentResized(ComponentEvent e) {
}
- public byte[] getState() {
- byte[] retval=null;
- if(state == null) return null;
- synchronized(state) {
- try {
- retval=Util.objectToByteBuffer(state);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- return retval;
- }
-
- @SuppressWarnings("unchecked")
- public void setState(byte[] buf) {
- synchronized(state) {
- try {
- Map<Point,Color> tmp=(Map<Point,Color>)Util.objectFromByteBuffer(buf);
- state.clear();
- state.putAll(tmp);
- System.out.println("received state: " + buf.length + " bytes, " + state.size() + " entries");
- createOffscreenImage(true);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
-
public void writeState(OutputStream outstream) throws IOException {
if(state == null)
return;
View
32 src/org/jgroups/demos/QuoteServer.java
@@ -8,6 +8,10 @@
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Date;
import java.util.Enumeration;
import java.util.Hashtable;
@@ -48,14 +52,7 @@ public void viewAccepted(View new_view) {
System.out.println("Accepted view (" + new_view.size() + new_view.getMembers() + ')');
}
- public void suspect(Address suspected_mbr) {
- }
-
- public void block() {
- }
- public void unblock() {
- }
public void start() {
try {
@@ -101,27 +98,14 @@ public void printAllStocks() {
System.out.println(stocks);
}
- public void receive(Message msg) {
+ public void getState(OutputStream ostream) throws Exception {
+ Util.objectToStream(stocks, new DataOutputStream(ostream));
}
- public byte[] getState() {
- try {
- return Util.objectToByteBuffer(stocks.clone());
- }
- catch(Exception ex) {
- ex.printStackTrace();
- return null;
- }
+ public void setState(InputStream istream) throws Exception {
+ integrate((Hashtable)Util.objectFromStream(new DataInputStream(istream)));
}
- public void setState(byte[] state) {
- try {
- integrate((Hashtable)Util.objectFromByteBuffer(state));
- }
- catch(Exception ex) {
- ex.printStackTrace();
- }
- }
public static void main(String args[]) {
try {
View
29 src/org/jgroups/demos/StompDraw.java
@@ -333,35 +333,6 @@ public void componentResized(ComponentEvent e) {
}
- public byte[] getState() {
- byte[] retval=null;
- if(state == null) return null;
- synchronized(state) {
- try {
- retval=Util.objectToByteBuffer(state);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- return retval;
- }
-
- @SuppressWarnings("unchecked")
- public void setState(byte[] buf) {
- synchronized(state) {
- try {
- Map<Point,Color> tmp=(Map<Point,Color>)Util.objectFromByteBuffer(buf);
- state.clear();
- state.putAll(tmp);
- System.out.println("received state: " + buf.length + " bytes, " + state.size() + " entries");
- createOffscreenImage(true);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
public void writeState(OutputStream outstream) throws IOException {
synchronized(state) {
View
85 src/org/jgroups/demos/TotalOrder.java
@@ -10,6 +10,10 @@
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@@ -152,52 +156,7 @@ public TotalOrder(String title, long timeout, int num_fields, int field_size, St
this.num=num;
setFont(def_font);
- try {
- channel=new JChannel(props);
- channel.setReceiver(new ReceiverAdapter() {
- public void receive(Message msg) {
- try {
- TotOrderRequest req=new TotOrderRequest();
- ByteBuffer buf=ByteBuffer.wrap(msg.getBuffer());
- req.init(buf);
- processRequest(req);
- }
- catch(Exception e) {
- System.err.println(e);
- }
- }
- public byte[] getState() {
- int[][] copy_of_state=canvas.getCopyOfState();
- try {
- return Util.objectToByteBuffer(copy_of_state);
- }
- catch(Exception e) {
- e.printStackTrace();
- return null;
- }
- }
-
- public void setState(byte[] state) {
- try {
- canvas.setState(Util.objectFromByteBuffer(state));
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
-
- public void viewAccepted(View view) {
- System.out.println("view = " + view);
- }
- });
- channel.connect("TotalOrderGroup");
- channel.getState(null, 8000);
- }
- catch(Exception e) {
- e.printStackTrace();
- System.exit(-1);
- }
start.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
@@ -269,6 +228,42 @@ public void actionPerformed(ActionEvent e) {
s=canvas.getSize();
s.height+=100;
setSize(s);
+
+ try {
+ channel=new JChannel(props);
+ channel.setReceiver(new ReceiverAdapter() {
+ public void receive(Message msg) {
+ try {
+ TotOrderRequest req=new TotOrderRequest();
+ ByteBuffer buf=ByteBuffer.wrap(msg.getBuffer());
+ req.init(buf);
+ processRequest(req);
+ }
+ catch(Exception e) {
+ System.err.println(e);
+ }
+ }
+
+ public void getState(OutputStream output) throws Exception {
+ int[][] copy_of_state=canvas.getCopyOfState();
+ Util.objectToStream(copy_of_state, new DataOutputStream(output));
+ }
+
+ public void setState(InputStream input) throws Exception {
+ canvas.setState(Util.objectFromStream(new DataInputStream(input)));
+ }
+
+ public void viewAccepted(View view) {
+ System.out.println("view = " + view);
+ }
+ });
+ channel.connect("TotalOrderGroup");
+ channel.getState(null, 8000);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
}
View
29 src/org/jgroups/demos/wb/GraphPanel.java
@@ -282,35 +282,6 @@ public void stop() {
}
-
- public byte[] getState() { // return the copy previously saved by saveState()
- try {
- synchronized(nodes) {
- return Util.objectToByteBuffer(nodes);
- }
- }
- catch(Throwable ex) {
- ex.printStackTrace();
- return null;
- }
- }
-
-
- public void setState(byte[] data) throws Exception {
- java.util.List<Node> new_state=(java.util.List<Node>)Util.objectFromByteBuffer(data);
-
- boolean do_repaint=false;
- synchronized(nodes) {
- nodes.clear();
- if(new_state != null) {
- nodes.addAll(new_state);
- do_repaint=true;
- }
- }
- if(do_repaint)
- repaint();
- }
-
public void getState(OutputStream output) throws Exception {
DataOutputStream out=new DataOutputStream(new BufferedOutputStream(output, 1000));
try {
View
13 src/org/jgroups/demos/wb/Whiteboard.java
@@ -36,18 +36,7 @@ public void receive(Message m) {
;
}
- public byte[] getState() {
- return panel.getState();
- }
-
- public void setState(byte[] new_state) {
- try {
- panel.setState(new_state);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
+
public void getState(OutputStream ostream) throws Exception {
panel.getState(ostream);
View
8 tests/junit/org/jgroups/tests/ChannelTestBase.java
@@ -367,19 +367,11 @@ public void block() {
events.append('b');
}
- public byte[] getState() {
- events.append('g');
- return null;
- }
public void getState(OutputStream ostream) throws Exception {
events.append('g');
}
- public void setState(byte[] state) {
- events.append('s');
- }
-
public void setState(InputStream istream) throws Exception {
events.append('s');
}
View
57 tests/junit/org/jgroups/tests/ConcurrentStartupTest.java
@@ -152,65 +152,22 @@ public void viewAccepted(View new_view) {
}
}
- @SuppressWarnings("unchecked")
- public void setState(byte[] state) {
- super.setState(state);
- try{
- List<Address> tmp = (List) Util.objectFromByteBuffer(state);
- synchronized(this.state) {
- this.state.addAll(tmp);
- log.info(channel.getAddress() + ": state is " + this.state);
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
-
- public byte[] getState() {
- super.getState();
- List<Address> tmp = null;
- synchronized(state) {
- tmp = new LinkedList<Address>(state);
- try{
- return Util.objectToByteBuffer(tmp);
- }catch(Exception e){
- e.printStackTrace();
- return null;
- }
- }
- }
public void getState(OutputStream ostream) throws Exception {
super.getState(ostream);
- ObjectOutputStream oos = null;
- try{
- oos = new ObjectOutputStream(ostream);
- List<Address> tmp = null;
- synchronized(state) {
- tmp = new LinkedList<Address>(state);
- }
- oos.writeObject(tmp);
- oos.flush();
- }
- finally {
- Util.close(oos);
+ synchronized(state) {
+ Util.objectToStream(state, new DataOutputStream(ostream));
}
}
@SuppressWarnings("unchecked")
public void setState(InputStream istream) throws Exception {
super.setState(istream);
- ObjectInputStream ois = null;
- try{
- ois = new ObjectInputStream(istream);
- List<Address> tmp = (List) ois.readObject();
- synchronized(state){
- state.clear();
- state.addAll(tmp);
- log.info(channel.getAddress() + ": state is " + state);
- }
- }finally{
- Util.close(ois);
+ List<Address> tmp = (List)Util.objectFromStream(new DataInputStream(istream));
+ synchronized(state){
+ state.clear();
+ state.addAll(tmp);
+ log.info(channel.getAddress() + ": state is " + state);
}
}
}
View
5 tests/junit/org/jgroups/tests/FlushTest.java
@@ -7,7 +7,6 @@
import org.jgroups.util.Util;
import org.testng.annotations.Test;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -425,10 +424,6 @@ public String getEventSequence() {
return events.toString();
}
- public byte[] getState() {
- events.append('g');
- return new byte[] { 'b', 'e', 'l', 'a' };
- }
public void getState(OutputStream ostream) throws Exception {
super.getState(ostream);
View
23 tests/junit/org/jgroups/tests/LargeStateTransferTest.java
@@ -121,23 +121,8 @@ public Provider(int size) {
state=new byte[size];
}
- public byte[] getState() {
- return state;
- }
-
public void getState(OutputStream ostream) throws Exception {
- DataOutputStream out=null;
- try{
- out=new DataOutputStream(ostream);
- out.writeInt(state.length);
- out.write(state, 0, state.length);
- }
- finally{
- Util.close(out);
- }
- }
- public void setState(byte[] state) {
- throw new UnsupportedOperationException("not implemented by provider");
+ Util.objectToStream(state, new DataOutputStream(ostream));
}
}
@@ -149,13 +134,7 @@ public Requester(Promise<Integer> p) {
this.promise=p;
}
- public byte[] getState() {
- throw new UnsupportedOperationException("not implemented by requester");
- }
- public void setState(byte[] state) {
- promise.setResult(new Integer(state.length));
- }
public void setState(InputStream istream) throws Exception {
DataInputStream in=null;
int size=0;
View
59 tests/junit/org/jgroups/tests/ReconciliationTest.java
@@ -386,70 +386,23 @@ public void receive(Message msg) {
Object key=modification[0];
Object val=modification[1];
synchronized(data) {
- // System.out.println("****** [" + name + "] received PUT(" +
- // key + ", " + val + ") " + " from " + msg.getSrc() + "
- // *******");
data.put(key, val);
}
}
- public byte[] getState() {
- byte[] state=null;
- synchronized(data) {
- try {
- state=Util.objectToByteBuffer(data);
- }
- catch(Exception e) {
- e.printStackTrace();
- return null;
- }
- }
- return state;
- }
-
- @SuppressWarnings("unchecked")
- public void setState(byte[] state) {
- Map<Object,Object> m;
- try {
- m=(Map<Object,Object>)Util.objectFromByteBuffer(state);
- synchronized(data) {
- data.clear();
- data.putAll(m);
- }
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
public void getState(OutputStream ostream) throws Exception {
- ObjectOutputStream oos=null;
- try {
- oos=new ObjectOutputStream(ostream);
- synchronized(data) {
- oos.writeObject(data);
- }
- oos.flush();
- }
- finally {
- Util.close(oos);
+ synchronized(data) {
+ Util.objectToStream(data, new DataOutputStream(ostream));
}
}
@SuppressWarnings("unchecked")
public void setState(InputStream istream) throws Exception {
- ObjectInputStream ois=null;
- try {
- ois=new ObjectInputStream(istream);
- Map<Object,Object> m=(Map<Object,Object>)ois.readObject();
- synchronized(data) {
- data.clear();
- data.putAll(m);
- }
-
- }
- finally {
- Util.close(ois);
+ Map<Object,Object> m=(Map<Object,Object>)Util.objectFromStream(new DataInputStream(istream));
+ synchronized(data) {
+ data.clear();
+ data.putAll(m);
}
}
View
49 tests/junit/org/jgroups/tests/StateTransferTest.java
@@ -164,60 +164,21 @@ public void receive(Message msg) {
semaphore.release();
}
- public byte[] getState() {
- synchronized(map) {
- try {
- return Util.objectToByteBuffer(map);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- return null;
- }
- @SuppressWarnings("unchecked")
- public void setState(byte[] state) {
- synchronized(map) {
- try {
- Map<Object,Object> tmp=(Map<Object,Object>)Util.objectFromByteBuffer(state);
- map.putAll(tmp);
- log.info(channel.getAddress() + ": received state, map has " + map.size() + " elements");
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
public void getState(OutputStream ostream) throws Exception {
- ObjectOutputStream out=null;
synchronized(map) {
- try {
- out=new ObjectOutputStream(ostream);
- out.writeObject(map);
- out.close();
- }
- finally {
- Util.close(out);
- }
+ Util.objectToStream(map, new DataOutputStream(ostream));
}
}
@SuppressWarnings("unchecked")
public void setState(InputStream istream) throws Exception {
- ObjectInputStream in=null;
+ Map<Object,Object> tmp=(Map<Object,Object>)Util.objectFromStream(new DataInputStream(istream));
synchronized(map) {
- try {
- in=new ObjectInputStream(istream);
- Map<Object,Object> tmp=(Map<Object,Object>)in.readObject();
- Util.close(in);
- map.putAll(tmp);
- log.info(channel.getAddress() + ": received state, map has " + map.size() + " elements");
- }
- finally {
- Util.close(in);
- }
+ map.clear();
+ map.putAll(tmp);
+ log.info(channel.getAddress() + ": received state, map has " + map.size() + " elements");
}
}
View
21 tests/junit/org/jgroups/tests/StateTransferTest2.java
@@ -166,16 +166,6 @@ public Object getReceivedState() {
return received_state;
}
- public byte[] getState() {
- if(get_error)
- throw new RuntimeException("[dummy failure] state could not be serialized");
- try {
- return Util.objectToByteBuffer(state_to_send);
- }
- catch(Exception e) {
- throw new RuntimeException("failed getting state", e);
- }
- }
public void getState(OutputStream ostream) throws Exception {
if(get_error)
@@ -196,17 +186,6 @@ public void setState(InputStream istream) throws Exception {
Util.close(in);
}
}
-
- public void setState(byte[] state) {
- if(set_error)
- throw new RuntimeException("[dummy failure] state could not be set");
- try {
- this.received_state=Util.objectFromByteBuffer(state);
- }
- catch(Exception e) {
- throw new RuntimeException("failed setting state", e);
- }
- }
}
}
View
22 tests/other/org/jgroups/tests/LargeState.java
@@ -87,28 +87,6 @@ public void viewAccepted(View new_view) {
System.out.println("-- view: " + new_view);
}
- public byte[] getState() {
- if(state == null) {
- System.out.println("creating state of " + Util.printBytes(size));
- state=createLargeState(size);
- }
- if(delay > 0)
- Util.sleep(delay);
- if(provider_fails)
- throw new RuntimeException("booom - provider failed");
- System.out.println("--> returning " + Util.printBytes(state.length));
- return state;
- }
-
- public void setState(byte[] state) {
- stop=System.currentTimeMillis();
- if(state != null) {
- this.state=state;
- if(requester_fails)
- throw new RuntimeException("booom - requester failed");
- System.out.println("<-- received " + Util.printBytes(state.length) + " in " + (stop-start) + "ms");
- }
- }
public void setState(InputStream istream) throws Exception {
total_received=0;
Please sign in to comment.
Something went wrong with that request. Please try again.