Permalink
Browse files

ns

  • Loading branch information...
1 parent ba52a61 commit 12a1a03c2d592d26420ceb2f79667999188236f2 Bela Ban committed Jul 15, 2011
Showing with 39 additions and 23 deletions.
  1. +11 −11 src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
  2. +28 −12 tests/other/org/jgroups/tests/LargeState.java
@@ -317,9 +317,9 @@ private Address determineCoordinator() {
}
private void handleViewChange(View v) {
- Address old_coord;
+ Address old_coord;
List<Address> new_members=v.getMembers();
- boolean send_up_null_state_rsp=false;
+ boolean send_up_exception=false;
synchronized(members) {
old_coord=(!members.isEmpty()? members.get(0) : null);
@@ -330,20 +330,20 @@ private void handleViewChange(View v) {
// Note this only takes a coordinator crash into account, a getState(target, timeout), where target is not
// null is not handled ! (Usually we get the state from the coordinator)
// http://jira.jboss.com/jira/browse/JGRP-148
- if(waiting_for_state_response && old_coord != null && !members.contains(old_coord)) {
- send_up_null_state_rsp=true;
- }
+ if(waiting_for_state_response && old_coord != null && !members.contains(old_coord))
+ send_up_exception=true;
}
- if(send_up_null_state_rsp) {
+ if(send_up_exception) {
if(log.isWarnEnabled())
- log.warn(local_addr + ": discovered that the state provider (" + old_coord
- + ") left; returning null state to application");
- StateHeader hdr=new StateHeader(StateHeader.STATE_RSP, null);
- handleStateRsp(hdr, null); // sends up null GET_STATE_OK
+ log.warn(local_addr + ": discovered that the state provider (" + old_coord + ") left");
+ waiting_for_state_response=false;
+ Exception ex=new EOFException("state provider " + old_coord + " left");
+ StateTransferResult result=new StateTransferResult(ex);
+ up_prot.up(new Event(Event.GET_STATE_OK, result));
+ openBarrierAndResumeStable();
}
-
synchronized(state_requesters) {
boolean was_empty=state_requesters.isEmpty();
state_requesters.removeAll(new_members);
@@ -35,12 +35,15 @@
boolean provider=true, provider_fails=false, requester_fails=false;
int size=100000;
int total_received=0;
+ long delay=0;
- public void start(boolean provider, int size, String props, boolean provider_fails, boolean requester_fails) throws Exception {
+ public void start(boolean provider, int size, String props, boolean provider_fails,
+ boolean requester_fails, long delay) throws Exception {
this.provider=provider;
this.provider_fails=provider_fails;
this.requester_fails=requester_fails;
+ this.delay=delay;
channel=new JChannel(props);
channel.setReceiver(this);
channel.connect("TestChannel");
@@ -58,11 +61,9 @@ public void start(boolean provider, int size, String props, boolean provider_fai
Util.sleep(10000);
}
}
- System.out.println("Getting state");
start=System.currentTimeMillis();
try {
- rc=channel.getState(null, 0);
- System.out.println("getState(), rc=" + rc);
+ channel.getState(null, 0);
}
catch(Exception ex) {
ex.printStackTrace();
@@ -88,12 +89,14 @@ public void viewAccepted(View new_view) {
public byte[] getState() {
if(state == null) {
- System.out.println("creating state of " + size + " bytes");
+ System.out.println("creating state of " + Util.printBytes(size));
state=createLargeState(size);
- if(provider_fails)
- throw new RuntimeException("booom - provider failed");
}
- System.out.println("--> returning state: " + state.length + " bytes");
+ if(delay > 0)
+ Util.sleep(delay);
+ if(provider_fails)
+ throw new RuntimeException("booom - provider failed");
+ System.out.println("--> returning " + Util.printBytes(state.length));
return state;
}
@@ -103,7 +106,7 @@ public void setState(byte[] state) {
this.state=state;
if(requester_fails)
throw new RuntimeException("booom - requester failed");
- System.out.println("<-- Received byte[] state, size=" + Util.printBytes(state.length) + " (took " + (stop-start) + "ms)");
+ System.out.println("<-- received " + Util.printBytes(state.length) + " in " + (stop-start) + "ms");
}
}
@@ -115,35 +118,44 @@ public void setState(InputStream istream) throws Exception {
received=istream.read(buf);
if(received < 0)
break;
+ if(delay > 0)
+ Util.sleep(delay);
total_received+=received;
if(requester_fails)
throw new Exception("booom - requester failed");
}
stop=System.currentTimeMillis();
- System.out.println("<-- Received stream state, size=" + Util.printBytes(total_received) + " (took " + (stop-start) + "ms)");
+ System.out.println("<-- received " + Util.printBytes(total_received) + " in " + (stop-start) + "ms");
}
public void getState(OutputStream ostream) throws Exception {
int frag_size=size / 10;
+ long bytes=0;
for(int i=0; i < 10; i++) {
byte[] buf=new byte[frag_size];
ostream.write(buf);
+ bytes+=buf.length;
if(provider_fails)
throw new Exception("booom - provider failed");
+ if(delay > 0)
+ Util.sleep(delay);
}
int remaining=size - (10 * frag_size);
if(remaining > 0) {
byte[] buf=new byte[remaining];
ostream.write(buf);
+ bytes+=buf.length;
}
+ System.out.println("--> wrote " + Util.printBytes(bytes));
}
public static void main(String[] args) {
boolean provider=false, provider_fails=false, requester_fails=false;
int size=1024 * 1024;
String props=null;
+ long delay=0;
for(int i=0; i < args.length; i++) {
if("-help".equals(args[i])) {
@@ -170,13 +182,17 @@ public static void main(String[] args) {
props=args[++i];
continue;
}
+ if("-delay".equals(args[i])) {
+ delay=Long.parseLong(args[++i]);
+ continue;
+ }
help();
return;
}
try {
- new LargeState().start(provider, size, props, provider_fails, requester_fails);
+ new LargeState().start(provider, size, props, provider_fails, requester_fails, delay);
}
catch(Exception e) {
e.printStackTrace();
@@ -185,7 +201,7 @@ public static void main(String[] args) {
static void help() {
System.out.println("LargeState [-help] [-size <size of state in bytes] [-provider] " +
- "[-props <properties>] [-provider_fails] [-requester_fails]");
+ "[-props <properties>] [-provider_fails] [-requester_fails] [-delay <ms>]");
}
}

0 comments on commit 12a1a03

Please sign in to comment.