Skip to content

Commit

Permalink
initial commit with cdc patches and libs
Browse files Browse the repository at this point in the history
  • Loading branch information
mimbert committed May 21, 2006
1 parent 0b69245 commit f191d38
Show file tree
Hide file tree
Showing 48 changed files with 2,476 additions and 1,553 deletions.
325 changes: 237 additions & 88 deletions build.xml

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ log4j.appender.file.MaxBackupIndex=1
## Layout for the console appender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%-7d{HH:mm:ss,SSS} [%p] %c: %m%n
log4j.appender.console.layout.ConversionPattern=%r [%p] %C{1}.%M(): - %m%n
log4j.appender.console.layout.ConversionPattern=%r [%p] %c{1}: - %m%n

## Layout for the file appender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%-7d{HH:mm:ss,SSS} [%p] %c: %m%n
log4j.appender.file.layout.ConversionPattern=%r [%p] %C{1}.%M(): - %m%n
log4j.appender.file.layout.ConversionPattern=%r [%p] %c{1}: - %m%n



Expand Down
Binary file added lib/commons-logging-cdc.jar
Binary file not shown.
Binary file added lib/concurrent-cdc.jar
Binary file not shown.
Binary file added lib/junit-cdc.jar
Binary file not shown.
Binary file added lib/log4j-cdc.jar
Binary file not shown.
Binary file added lib/xerces-cdc.jar
Binary file not shown.
31 changes: 12 additions & 19 deletions src/org/jgroups/Channel.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: Channel.java,v 1.20 2006/03/27 08:25:26 belaban Exp $
// $Id: Channel.java,v 1.13.4.1 2006/05/21 09:36:58 mimbert Exp $

package org.jgroups;

Expand Down Expand Up @@ -95,7 +95,7 @@ Destroys the channel and its associated resources (e.g., the protocol stack). Af


/** Shuts down the channel without disconnecting if connected, stops all the threads */
abstract protected void shutdown();
abstract void shutdown();


/**
Expand Down Expand Up @@ -306,7 +306,7 @@ public synchronized void addChannelListener(ChannelListener listener) {
if(listener == null)
return;
if(channel_listeners == null)
channel_listeners=new LinkedHashSet();
channel_listeners=new HashSet();
channel_listeners.add(listener);
}

Expand All @@ -324,9 +324,16 @@ public void setReceiver(Receiver r) {
Sets an option. The following options are currently recognized:
<ol>
<li><code>BLOCK</code>. Turn the reception of BLOCK events on/off (value is Boolean).
Default is off
Default is off. If set to on, receiving VIEW events will be set to on, too.
<li><code>VIEW</code>. Turn the reception of VIEW events on/off (value is Boolean).
Default is on.
<li><code>SUSPECT</code>. Turn the reception of SUSPECT events on/off (value is Boolean).
Default is on.
<li><code>LOCAL</code>. Receive its own broadcast messages to the group
(value is Boolean). Default is on.
<li><code>GET_STATE_EVENTS</code>. Turn the reception of GetState events on/off
(value is Boolean). Default is off, which means that no other members can
ask this member for its state (null will be returned).
<li><code>AUTO_RECONNECT</code>. Turn auto-reconnection on/off. If on, when a member if forced out
of a group (EXIT event), then we will reconnect.
<li><code>AUTO_GETSTATE</code>. Turn automatic fetching of state after an auto-reconnect on/off.
Expand Down Expand Up @@ -375,18 +382,6 @@ abstract public boolean getState(Address target, long timeout)
throws ChannelNotConnectedException, ChannelClosedException;


/**
* Fetches a partial state identified by state_id.
* @param target
* @param state_id
* @param timeout
* @return
* @throws ChannelNotConnectedException
* @throws ChannelClosedException
*/
abstract public boolean getState(Address target, String state_id, long timeout)
throws ChannelNotConnectedException, ChannelClosedException;

/**
Retrieve all states of the group members. Will contact all group members to get
the states. When the method returns true, a <code>SetStateEvent</code> will have been
Expand All @@ -398,9 +393,9 @@ abstract public boolean getState(Address target, String state_id, long timeout)
@return boolean True if the state was retrieved successfully, otherwise false.
@exception ChannelNotConnectedException The channel must be connected to
receive messages.
@exception ChannelClosedException The channel is closed and therefore cannot be used
any longer. A new channel has to be created first.
@deprecated Not really needed - we always want to get the state from a single member
*/
abstract public boolean getAllStates(Vector targets, long timeout)
throws ChannelNotConnectedException, ChannelClosedException;
Expand All @@ -414,8 +409,6 @@ abstract public boolean getAllStates(Vector targets, long timeout)
*/
public abstract void returnState(byte[] state);

/** Returns a given substate (state_id of null means return entire state) */
public abstract void returnState(byte[] state, String state_id);


public static String option2String(int option) {
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/ChannelException.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: ChannelException.java,v 1.6 2005/07/17 11:38:05 chrislott Exp $
// $Id: ChannelException.java,v 1.6.4.1 2006/05/21 09:36:58 mimbert Exp $

package org.jgroups;

Expand All @@ -17,7 +17,7 @@ public ChannelException(String reason) {
}

public ChannelException(String reason, Throwable cause) {
super(reason, cause);
super(reason + " - Cause = " + cause.getMessage());
}

}
93 changes: 45 additions & 48 deletions src/org/jgroups/Message.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: Message.java,v 1.49 2006/05/12 09:35:18 belaban Exp $
// $Id: Message.java,v 1.43.4.1 2006/05/21 09:36:58 mimbert Exp $

package org.jgroups;

Expand Down Expand Up @@ -47,7 +47,7 @@ public class Message implements Externalizable, Streamable {

protected static final Log log=LogFactory.getLog(Message.class);

private static final long serialVersionUID=7966206671974139740L;
static final long serialVersionUID=-1137364035832847034L;

static final byte DEST_SET=1;
static final byte SRC_SET=2;
Expand All @@ -61,22 +61,20 @@ public class Message implements Externalizable, Streamable {

/** Map<Address,Address>. Maintains mappings to canonical addresses */
private static final Map canonicalAddresses=new ConcurrentReaderHashMap();
private static final boolean DISABLE_CANONICALIZATION=Boolean.getBoolean("disable_canonicalization");
private static final boolean DISABLE_CANONICALIZATION;
static {
boolean b;
try {
b=Boolean.getBoolean("disable_canonicalization");
}
catch (java.security.AccessControlException e) {
// this will happen in an applet context
b=false;
}
DISABLE_CANONICALIZATION=b;
}


/** Public constructor
* @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
* it is sent to the group (either to current group or to the group as given
* in the string). If it is a Vector, then it contains a number of addresses
* to which it must be sent. Otherwise, it contains a single destination.<p>
* Addresses are generally untyped (all are of type <em>Object</em>. A channel
* instance must know what types of addresses it expects and downcast
* accordingly.
*/
public Message(Address dest) {
dest_addr=dest;
headers=createHeaders(7);
}

/** Public constructor
* @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
Expand All @@ -91,9 +89,10 @@ public Message(Address dest) {
* not allowed), since we don't copy the contents on clopy() or clone().
*/
public Message(Address dest, Address src, byte[] buf) {
this(dest);
dest_addr=dest;
src_addr=src;
setBuffer(buf);
headers=createHeaders(7);
}

/**
Expand All @@ -114,9 +113,10 @@ public Message(Address dest, Address src, byte[] buf) {
* array index violations and an ArrayIndexOutOfBoundsException will be thrown if invalid
*/
public Message(Address dest, Address src, byte[] buf, int offset, int length) {
this(dest);
dest_addr=dest;
src_addr=src;
setBuffer(buf, offset, length);
headers=createHeaders(7);
}


Expand All @@ -134,9 +134,10 @@ public Message(Address dest, Address src, byte[] buf, int offset, int length) {
* (e.g. buf[0]=0 is not allowed), since we don't copy the contents on clopy() or clone().
*/
public Message(Address dest, Address src, Serializable obj) {
this(dest);
dest_addr=dest;
src_addr=src;
setObject(obj);
headers=createHeaders(7);
}


Expand Down Expand Up @@ -186,7 +187,7 @@ public byte[] getRawBuffer() {
* Returns a copy of the buffer if offset and length are used, otherwise a reference.
* @return byte array with a copy of the buffer.
*/
final public byte[] getBuffer() {
public byte[] getBuffer() {
if(buf == null)
return null;
if(offset == 0 && length == buf.length)
Expand All @@ -198,7 +199,7 @@ final public byte[] getBuffer() {
}
}

final public void setBuffer(byte[] b) {
public void setBuffer(byte[] b) {
buf=b;
if(buf != null) {
offset=0;
Expand All @@ -215,7 +216,7 @@ final public void setBuffer(byte[] b) {
* @param offset The initial position
* @param length The number of bytes
*/
final public void setBuffer(byte[] b, int offset, int length) {
public void setBuffer(byte[] b, int offset, int length) {
buf=b;
if(buf != null) {
if(offset < 0 || offset > buf.length)
Expand Down Expand Up @@ -244,7 +245,7 @@ public Map getHeaders() {
return headers;
}

final public void setObject(Serializable obj) {
public void setObject(Serializable obj) {
if(obj == null) return;
try {
ByteArrayOutputStream out_stream=new ByteArrayOutputStream();
Expand All @@ -257,7 +258,7 @@ final public void setObject(Serializable obj) {
}
}

final public Object getObject() {
public Object getObject() {
if(buf == null) return null;
try {
ByteArrayInputStream in_stream=new ByteArrayInputStream(buf, offset, length);
Expand Down Expand Up @@ -333,7 +334,7 @@ protected Object clone() throws CloneNotSupportedException {
}

public Message makeReply() {
return new Message(src_addr);
return new Message(src_addr, null, null);
}


Expand Down Expand Up @@ -367,10 +368,11 @@ public String toString() {

/** Tries to read an object from the message's buffer and prints it */
public String toStringAsObject() {
Object obj;

if(buf == null) return null;
try {
Object obj=getObject();
obj=getObject();
return obj != null ? obj.toString() : "";
}
catch(Exception e) { // it is not an object
Expand All @@ -387,14 +389,6 @@ public String toStringAsObject() {
* determine whether to fragment a message or not. Fragmentation will then serialize the message,
* therefore getting the correct value.
*/


/**
* Returns the exact size of the marshalled message. Uses method size() of each header to compute the size, so if
* a Header subclass doesn't implement size() we will use an approximation. However, most relevant header subclasses
* have size() implemented correctly. (See org.jgroups.tests.SizeTest).
* @return The number of bytes for the marshalled message
*/
public long size() {
long retval=Global.BYTE_SIZE // leading byte
+ length // buffer
Expand Down Expand Up @@ -478,15 +472,18 @@ public void writeExternal(ObjectOutput out) throws IOException {


public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
int len;
boolean destAddressExist=in.readBoolean();
boolean srcAddressExist;
Object key, value;

if(destAddressExist) {
dest_addr=(Address)Marshaller.read(in);
if(!DISABLE_CANONICALIZATION)
dest_addr=canonicalAddress(dest_addr);
}

boolean srcAddressExist=in.readBoolean();
srcAddressExist=in.readBoolean();
if(srcAddressExist) {
src_addr=(Address)Marshaller.read(in);
if(!DISABLE_CANONICALIZATION)
Expand All @@ -501,10 +498,10 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
length=buf.length;
}

int len=in.readInt();
len=in.readInt();
while(len-- > 0) {
Object key=in.readUTF();
Object value=Marshaller.read(in);
key=in.readUTF();
value=Marshaller.read(in);
headers.put(key, value);
}
}
Expand Down Expand Up @@ -638,7 +635,7 @@ public void readFrom(DataInputStream in) throws IOException, IllegalAccessExcept

/* ----------------------------------- Private methods ------------------------------- */

private static void writeHeader(Header value, DataOutputStream out) throws IOException {
private void writeHeader(Header value, DataOutputStream out) throws IOException {
int magic_number;
String classname;
ObjectOutputStream oos=null;
Expand Down Expand Up @@ -670,9 +667,7 @@ private static void writeHeader(Header value, DataOutputStream out) throws IOExc
}
}
catch(ChannelException e) {
IOException io_ex=new IOException("failed writing header");
io_ex.initCause(e);
throw io_ex;
log.error("failed writing the header", e);
}
finally {
if(oos != null)
Expand All @@ -681,7 +676,7 @@ private static void writeHeader(Header value, DataOutputStream out) throws IOExc
}


private static Header readHeader(DataInputStream in) throws IOException {
private Header readHeader(DataInputStream in) throws IOException {
Header hdr;
boolean use_magic_number=in.readBoolean();
int magic_number;
Expand Down Expand Up @@ -710,19 +705,21 @@ private static Header readHeader(DataInputStream in) throws IOException {
}
}
catch(Exception ex) {
IOException io_ex=new IOException("failed reading header");
io_ex.initCause(ex);
throw io_ex;
throw new IOException("failed read header: " + ex.toString());
}
finally {
// if(ois != null) // we cannot close this because other readers depend on it
// ois.close();
}
return hdr;
}

private static Map createHeaders(int size) {
private Map createHeaders(int size) {
return size > 0? new ConcurrentReaderHashMap(size) : new ConcurrentReaderHashMap();
}


private static Map createHeaders(Map m) {
private Map createHeaders(Map m) {
return new ConcurrentReaderHashMap(m);
}

Expand Down
Loading

0 comments on commit f191d38

Please sign in to comment.