Skip to content

Commit

Permalink
- Support for parsing of tshark dumps (https://issues.jboss.org/brows…
Browse files Browse the repository at this point in the history
…e/JGRP-2311)

- Terser rep of Messages (toString())
- Added -mappings option
- Parsing messages from stdin
- Better printing of batches
- Added option -binary-to-ascii
- Parsing of TCP or TCP_NIO2 messages is now supported
- Updated documentation
  • Loading branch information
belaban committed May 6, 2019
1 parent bd3ca78 commit d2b196d
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 67 deletions.
6 changes: 3 additions & 3 deletions conf/jg-magic-map.xml
Expand Up @@ -29,9 +29,9 @@
<class id="24" name="org.jgroups.util.FlagsUUID"/>

<!-- Headers -->
<class id="50" name="org.jgroups.protocols.FD$FdHeader"/>
<class id="51" name="org.jgroups.protocols.FD_SOCK$FdHeader"/>
<class id="52" name="org.jgroups.protocols.FragHeader"/>
<class id="50" name="org.jgroups.protocols.FD$FdHeader"/>
<class id="51" name="org.jgroups.protocols.FD_SOCK$FdHeader"/>
<class id="52" name="org.jgroups.protocols.FragHeader"/>
<class id="53" name="org.jgroups.protocols.PingHeader"/>
<class id="54" name="org.jgroups.protocols.VERIFY_SUSPECT$VerifyHeader"/>
<class id="55" name="org.jgroups.protocols.pbcast.GMS$GmsHeader"/>
Expand Down
143 changes: 143 additions & 0 deletions doc/manual/advanced.adoc
Expand Up @@ -2274,6 +2274,149 @@ it is the default). In this mode, probe will ask the member running at `192.168.
members, and then send the request to all of the returned members.
=== Analyzing wire-format packets
When using a packet analyzer such as https://www.wireshark.org[Wireshark],
https://www.wireshark.org/docs/man-pages/tshark.html[tshark] or https://www.tcpdump.org/[tcpdump], the output is
either a PCAP file (e.g. submitted by customers for post-mortem analysis) or a live stream of raw network packets.
Each packet has a number of headers before the data part which contains the JGroups message (or message batch). E.g.
a UDP packet has an ethernet-, IP- and datagram header; a TCP message contains ethernet and IP headers
plus the TCP information.
To see the JGroups messages, the {{ParseMessages}} program parses the _data_ section of a UDP datagram packet or a
TCP or TCP_NIO2 stream and prints the JGroups messages plus its headers. It can be passed a file, or it can read from
stdin and thus be piped to a packet analyzer and print JGroups data in real-time.
==== tshark
tshark is the command-line version of wireshark. To capture UDP packets for the configuration below, which has nodes
binding to the loopback (127.0.0.1) device and ports starting from 7800, the following command can be used:
[source,xml]
----
<config>
<UDP
bind_addr="127.0.0.1" bind_port="7800"
mcast_addr="228.2.2.2"
mcast_port="${jgroups.udp.mcast_port:45588}">
...
</config>
----
`tshark -l -i en0 -i lo0 -T fields -e data udp and portrange 7800-7801 > jgroups-udp.data`
This captures the _data portion only_ (`-T fields -e data`) on the loopback and `en0` devices and only captures packets
sent to or being sent from ports 7800 and 7801.
To print the packets offline, `java org.jgroups.tests.ParseMessages -file jgroups-udp.data` can be used. This would look
like this (edited):
----
[belasmac] /Users/bela$ jt ParseMessages -file bela.dump6 -tcp
1: [B to 192.168.1.105:7800, 33 bytes, flags=OOB|DONT_BUNDLE|INTERNAL], hdrs: TCPPING: [GET_MBRS_REQ cluster=draw initial_discovery=true], TP: [cluster=draw]
2: [A to B, 33 bytes, flags=OOB|DONT_BUNDLE|INTERNAL], hdrs: TCPPING: [GET_MBRS_RSP cluster=null initial_discovery=false], TP: [cluster=draw]
3: [B to A, 0 bytes, flags=OOB|INTERNAL], hdrs: GMS: GmsHeader[JOIN_REQ]: mbr=B, UNICAST3: DATA, seqno=1, first, TP: [cluster=draw]
4: [A to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_SOCK: I_HAVE_SOCK, mbr=A, sock_addr=192.168.1.105:9000, TP: [cluster=draw]
5: batch to B from A (1 messages):
1: [33 bytes, flags=OOB|DONT_BUNDLE|INTERNAL], hdrs: TCPPING: [GET_MBRS_RSP cluster=null initial_discovery=false]
6: batch to B from A (1 messages):
1: [0 bytes, flags=INTERNAL], hdrs: FD_SOCK: WHO_HAS_SOCK, mbr=B
7: [A to <all>, 57 bytes], hdrs: GMS: GmsHeader[VIEW], NAKACK2: [MSG, seqno=1], TP: [cluster=draw]
8: [A to B, 61 bytes, flags=INTERNAL], hdrs: GMS: GmsHeader[JOIN_RSP], UNICAST3: DATA, seqno=1, first, TP: [cluster=draw]
9: [B to A, 0 bytes, flags=INTERNAL], hdrs: FD_SOCK: I_HAVE_SOCK, mbr=B, sock_addr=192.168.1.105:9001, TP: [cluster=draw]
10: [B to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_SOCK: I_HAVE_SOCK, mbr=B, sock_addr=192.168.1.105:9001, TP: [cluster=draw]
11: [B to A, 0 bytes, flags=INTERNAL], hdrs: FD_SOCK: GET_CACHE, TP: [cluster=draw]
12: [A to B, 54 bytes, flags=INTERNAL], hdrs: FD_SOCK: GET_CACHE_RSP, TP: [cluster=draw]
13: [B to A, 0 bytes, flags=OOB|INTERNAL], hdrs: GMS: GmsHeader[VIEW_ACK], UNICAST3: DATA, seqno=2, TP: [cluster=draw]
14: [A to B, 0 bytes, flags=INTERNAL], hdrs: UNICAST3: ACK, seqno=2, ts=1, TP: [cluster=draw]
15: [B to A, 0 bytes, flags=INTERNAL], hdrs: UNICAST3: ACK, seqno=1, ts=1, TP: [cluster=draw]
16: [A to <all>, 0 bytes, flags=OOB|INTERNAL], hdrs: NAKACK2: [HIGHEST_SEQNO, seqno=1], TP: [cluster=draw]
17: [A to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
18: [B to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
19: [B to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=1], TP: [cluster=draw]
...
45: [B to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=27], TP: [cluster=draw]
46: [A to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=2], TP: [cluster=draw]
...
70: [A to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=25], TP: [cluster=draw]
71: [A to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
72: [B to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
73: [A to <all>, 0 bytes, flags=OOB|INTERNAL], hdrs: NAKACK2: [HIGHEST_SEQNO, seqno=25], TP: [cluster=draw]
74: [B to A, 0 bytes, flags=OOB], hdrs: GMS: GmsHeader[LEAVE_REQ]: mbr=B, UNICAST3: DATA, seqno=3, TP: [cluster=draw]
75: [A to B, 0 bytes, flags=OOB|NO_RELIABILITY|INTERNAL], hdrs: GMS: GmsHeader[LEAVE_RSP], TP: [cluster=draw]
----
This shows a typical conversation between A (coordinator) and B: B sends a discovery request, finds A and asks it to join
(`JOIN_REQ`). A then multicasts (`A to <all>`) a view (`VIEW`) to all members, and sends a `JOIN_RSP` to B.
Later, B leaves by sending a `LEAVE_REQ to A` and A responds with a `LEAVE_RSP` message back to B.
Instead of redirecting the output to a file, it could be piped into `ParseMessages`:
`tshark -l -i en0 -i lo0 -T fields -e data udp and portrange 7800-7801 | java org.jgroups.tests.ParseMessages`.
WARNING: When using *TCP* or *TCP_NIO2* (e.g. `tshark -l -i en0 -i lo0 -T fields -e data tcp and portrange 7800-7801`),
`ParseMessages` has to be started with command line option `-tcp`. This ensures that the length preceding each TCP
JGroups message is not interpreted as something else (e.g. the version). Also, on connection establishment, a cookie
and the local address is sent to the remote peer, and this additional data is also parsed correctly with the
`-tcp` option.
==== Wireshark
When we have captured a number of packets in wireshark, we can select `"File"` -> `"Export Specified Packets..."`:
[[WiresharkScreenshotFig]]
.Screenshot of wireshark
image::./images/wireshark.png[Exporting selected packets in wireshark]
Here, we can export selected number of packets, or all packets. In the example, the exported packets will be written to
a file called `jgroups-tcp.pcapng`.
To parse this file and extract only the data sections, `tshark` can be used:
----
[belasmac] /Users/bela$ tshark -r jgroups-tcp.pcapng -Tfields -edata|jt ParseMessages -tcp
1: [21e26d56-2928-7008-1168-0fcf2d2f2b9d to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=118], TP: [cluster=draw]
...
32: [21e26d56-2928-7008-1168-0fcf2d2f2b9d to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=149], TP: [cluster=draw]
33: [21e26d56-2928-7008-1168-0fcf2d2f2b9d to <all>, 0 bytes, flags=OOB|INTERNAL], hdrs: NAKACK2: [HIGHEST_SEQNO, seqno=149], TP: [cluster=draw]
34: [21e26d56-2928-7008-1168-0fcf2d2f2b9d to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
35: [89a4b2f9-c15e-d276-88a5-a01f8cbde58a to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
----
The `-r` options reads packets from the given input file and the `-Tfields -edata` options extract the data portion
(containing the JGroups messages) only. This is piped to `ParseMessages` (with the `-tcp` option, as the wireshark
data was captured from a TCP connection), which prints the JGroups messages to stdout.
==== tcpdump
`tcpdump` can be used with the `-w <filename>` command line option to save all captured packets to a file in PCAPNG
(wireshark/libpcap) format.
In the example below (edited), however, we're piping (options `-U` `-w -`) the captured packets to
`tshark` which extracts the JGroups portion and in turn pipes this to `ParseMessages`:
----
[belasmac] /Users/bela$ sudo tcpdump -n -U -i lo0 -w - tcp and portrange 7800-7801 |tshark -l -r - -Tfields -edata|jt ParseMessages -tcp
tcpdump: listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes
1: [c0b6a011-16f9-699d-dd8c-68f6cc880ddd to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
2: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
3: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=426], TP: [cluster=draw]
4: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 0 bytes, flags=OOB|INTERNAL], hdrs: NAKACK2: [HIGHEST_SEQNO, seqno=426], TP: [cluster=draw]
9: [c0b6a011-16f9-699d-dd8c-68f6cc880ddd to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
10: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
11: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=427], TP: [cluster=draw]
12: [c0b6a011-16f9-699d-dd8c-68f6cc880ddd to 43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c, 44 bytes, flags=OOB|NO_RELIABILITY|INTERNAL], hdrs: STABLE: [STABLE_GOSSIP] view-id= [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c|1], TP: [cluster=draw]
13: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 0 bytes, flags=OOB|INTERNAL], hdrs: NAKACK2: [HIGHEST_SEQNO, seqno=427], TP: [cluster=draw]
14: [c0b6a011-16f9-699d-dd8c-68f6cc880ddd to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
15: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 0 bytes, flags=INTERNAL], hdrs: FD_ALL: heartbeat, TP: [cluster=draw]
16: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=428], TP: [cluster=draw]
17: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=429], TP: [cluster=draw]
18: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=430], TP: [cluster=draw]
19: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=431], TP: [cluster=draw]
20: [43c0d32e-a9ee-cb3d-578d-4af5fcd5c72c to <all>, 13 bytes], hdrs: NAKACK2: [MSG, seqno=432], TP: [cluster=draw]
54 packets captured
134 packets received by filter
0 packets dropped by kernel
----
[[MembershipChangePolicy]]
Expand Down
Binary file added doc/manual/images/wireshark.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 3 additions & 4 deletions src/org/jgroups/Message.java
Expand Up @@ -571,10 +571,9 @@ public Message makeReply() {


public String toString() {
int size=getNumHeaders();
return String.format("[dest: %s sender: %s%s, size=%d bytes%s%s]",
dest, sender,
size > 0? " (" + size + " headers)" : "",
return String.format("[%s to %s, %d bytes%s%s]",
sender,
dest == null? "<all>" : dest,
length,
flags > 0? ", flags=" + flagsToString(flags) : "",
transient_flags > 0? ", transient_flags=" + transientFlagsToString(transient_flags) : "");
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/Connection.java
Expand Up @@ -10,7 +10,7 @@
* Represents a connection to a peer
*/
public abstract class Connection implements Closeable {
protected static final byte[] cookie= { 'b', 'e', 'l', 'a' };
public static final byte[] cookie= { 'b', 'e', 'l', 'a' };
protected Address peer_addr; // address of the 'other end' of the connection
protected long last_access; // timestamp of the last access to this connection (read or write)

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/TpHeader.java
Expand Up @@ -41,7 +41,7 @@ public TpHeader(byte[] n) {
public short getMagicId() {return 60;}

public String toString() {
return String.format("[cluster_name=%s]", new String(cluster_name));
return String.format("[cluster=%s]", new String(cluster_name));
}

public byte[] getClusterName() {return cluster_name;}
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/pbcast/GMS.java
Expand Up @@ -1465,6 +1465,7 @@ public String toString() {
case GET_DIGEST_REQ:
sb.append(": mbr=" + mbr);
break;

case MERGE_REQ:
sb.append(": merge_id=" + merge_id);
break;
Expand Down
10 changes: 5 additions & 5 deletions src/org/jgroups/stack/GossipRouter.java
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

/**
* Router for TCP based group comunication (using layer TCP instead of UDP). Instead of the TCP
Expand Down Expand Up @@ -87,6 +88,9 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener
// mapping between groups and <member address> - <physical addr / logical name> pairs
protected final ConcurrentMap<String,ConcurrentMap<Address,Entry>> address_mappings=new ConcurrentHashMap<>();

protected static final BiConsumer<Short,Message> MSG_CONSUMER=(version,msg) -> {
System.out.printf("dst=%s src=%s (%d bytes): hdrs= %s\n", msg.dest(), msg.src(), msg.getLength(), msg.printHeaders());
};


public GossipRouter(String bind_addr, int local_port) {
Expand Down Expand Up @@ -324,11 +328,7 @@ protected void handleGetMembersRequest(Address sender, DataInput in) {
}

protected static void dump(GossipData data) {
System.out.println("");
List<Message> messages=Util.parse(data.buffer, data.offset, data.length);
if(messages != null)
for(Message msg : messages)
System.out.printf("dst=%s src=%s (%d bytes): hdrs= %s\n", msg.dest(), msg.src(), msg.getLength(), msg.printHeaders());
Util.parse(data.buffer, data.offset, data.length, MSG_CONSUMER, null, false);
}

@Override
Expand Down
97 changes: 59 additions & 38 deletions src/org/jgroups/util/Util.java
Expand Up @@ -3,6 +3,7 @@
import org.jgroups.*;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.blocks.cs.Connection;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log;
Expand Down Expand Up @@ -1044,6 +1045,24 @@ public static String byteArrayToHexString(byte[] b, int offset, int length) {
return sb.toString().toUpperCase();
}

public static void binaryToAscii(InputStream in, OutputStream out) throws IOException {
byte[] input=new byte[2];
for(;;) {
input[0]=(byte)in.read();
if(input[0] == '\n' || input[0] == '\r')
continue;
if(input[0] < 0)
break;
input[1]=(byte)in.read();
if(input[1] == '\n' || input[1] < 0)
break;
String tmp=new String(input);
int val=Integer.parseInt(tmp, 16);
char c=(char)val;
out.write(c);
}
}

public static boolean isAsciiString(String str) {
if(str == null) return false;
for(int i=0; i < str.length(); i++) {
Expand Down Expand Up @@ -1220,68 +1239,70 @@ else if(!oob && internal) {
return batches;
}

public static List<Message> parse(byte[] buf, int offset, int length) {
return parse(new ByteArrayInputStream(buf, offset, length));
public static void parse(byte[] buf, int offset, int length, BiConsumer<Short,Message> msg_consumer,
BiConsumer<Short,MessageBatch> batch_consumer, boolean tcp) {
parse(new ByteArrayInputStream(buf, offset, length), msg_consumer, batch_consumer, tcp);
}

public static List<Message> parse(String filename) throws FileNotFoundException {
return parse(new FileInputStream(filename));
public static void parse(String filename, BiConsumer<Short,Message> msg_consumer,
BiConsumer<Short,MessageBatch> batch_consumer, boolean tcp) throws FileNotFoundException {
parse(new FileInputStream(filename), msg_consumer, batch_consumer, tcp);
}

public static List<Message> parse(InputStream input) {
List<Message> retval=new ArrayList<>();
DataInputStream dis=null;
try {
dis=new DataInputStream(input);

short version;
public static void parse(InputStream input, BiConsumer<Short,Message> msg_consumer,
BiConsumer<Short,MessageBatch> batch_consumer, boolean tcp) {
if(msg_consumer == null && batch_consumer == null)
return;
byte[] tmp=new byte[Global.INT_SIZE];
try(DataInputStream dis=new DataInputStream(input)) {
for(;;) {
try {
version=dis.readShort();
}
catch(IOException io_ex) {
break;
// for TCP, we send the length first; this needs to be skipped as it is not part of the JGroups payload
if(tcp) { // TCP / TCP_NIO2
dis.readFully(tmp);
if(Arrays.equals(Connection.cookie, tmp)) {
// connection establishment; parse version (short) and IpAddress
dis.readShort(); // version
dis.readShort(); // address length (only needed by TCP_NIO2)
IpAddress peer=new IpAddress();
peer.readFrom(dis);
continue;
}
else {
// do nothing - the 4 bytes were the length
// int len=Bits.readInt(tmp, 0);
}
}

System.out.println("version = " + version + " (" + Version.print(version) + ")");
short version=dis.readShort();
byte flags=dis.readByte();
// System.out.println("flags: " + Message.flagsToString(flags));

boolean is_message_list=(flags & LIST) == LIST;
boolean multicast=(flags & MULTICAST) == MULTICAST;

if(is_message_list) { // used if message bundling is enabled
final MessageBatch[] batches=Util.readMessageBatch(dis,multicast);
MessageBatch[] batches=Util.readMessageBatch(dis,multicast);
for(MessageBatch batch: batches) {
if(batch != null)
if(batch == null)
continue;
if(batch_consumer != null)
batch_consumer.accept(version, batch);
else {
for(Message msg: batch)
retval.add(msg);
msg_consumer.accept(version, msg);
}
}
}
else {
Message msg=Util.readMessage(dis);
retval.add(msg);
if(msg_consumer != null)
msg_consumer.accept(version, msg);
}
}
return retval;
}
catch(EOFException ignored) {
}
catch(Throwable t) {
t.printStackTrace();
return null;
}
finally {
Util.close(dis);
}
}

public static String dump(byte[] buf, int offset, int length) {
StringBuilder sb=new StringBuilder();
List<Message> msgs=parse(new ByteArrayInputStream(buf, offset, length));
if(msgs != null)
for(Message msg: msgs)
sb.append(String.format("dst=%s src=%s (%d bytes): hdrs= %s\n", msg.dest(), msg.src(), msg.getLength(), msg.printHeaders()));
return sb.toString();
}


public static void writeView(View view,DataOutput out) throws IOException {
Expand Down

0 comments on commit d2b196d

Please sign in to comment.