List of Protocols
This chapter describes the most frequently used protocols, and their configuration. Ergonomics ([Ergonomics]) strives to reduce the number of properties that have to be configured, by dynamically adjusting them at run time, however, this is not yet in place.
Meanwhile, we recommend that users should copy one of the predefined configurations (shipped with JGroups), e.g. udp.xml or tcp.xml, and make only minimal changes to it.
This section is work in progress; we strive to update the documentation as we make changes to the code.
Properties availabe in every protocol
The table below lists properties that are available in all protocols, as they’re defined in the superclass of all protocols, org.jgroups.stack.Protocol.
| Name | Description |
|---|---|
stats |
Whether the protocol should collect protocol-specific runtime statistics. What those
statistics are (or whether they even exist) depends on the particular protocol.
See the |
ergonomics |
Turns on ergonomics. See [Ergonomics] for details. |
id |
Gives the protocol a different ID if needed so we can have multiple instances of it in the same stack |
System properties
The table below lists system properties which can be used to override attribute values, mostly in protocols. For example, if we have a config like this:
<UDP bind_addr="127.0.0.1" />, then the bind address will be 127.0.0.1. However, if we run the system with -Djgroups.bind_addr=1.2.3.4 (see below),
the bind address will be 1.2.3.4.
Note that if we use our own variables, like this:
<UDP bind_addr="\${my.bind_addr:127.0.0.1}" />, then system property my.bind_addr takes precedence over jgroups.bind_addr. In the above case, the actual bind
address chosen will be:
System properties |
Bind address picked |
|
|
|
|
none |
|
| Name | Description |
|---|---|
jgroups.bind_addr |
The network interface to be used. Example: |
jgroups.external_addr |
The external bind address to be used. Overrides |
jgroups.external_port |
The external port to be used by the transport. |
jgroups.tcp.client_bind_addr |
The network interface client sockets in TCP should bind to. Overrides |
jgroups.tcpping.initial_hosts |
The list of initial hosts in TCPPING |
jgroups.udp.mcast_addr |
The multicast address used by UDP |
jgroups.udp.mcast_port |
The multicast port used by UDP |
jgroups.udp.ip_ttl |
The TTL used by UDP |
jgroups.mping.mcast_addr |
The multicast address used by MPING |
jgroups.mping.mcast_port |
The multicast port used by MPING |
jgroups.mping.ip_ttl |
The TTL used by MPING |
jgroups.conf.magic_number_file |
The file where the mappings between IDs and classes are defined.
Default: |
jgroups.conf.protocol_id_file |
The file where the mappings between IDs and protocols are defined.
Default: |
jgroups.name_cache.max_elements |
The max number of elements in the NameCache which holds mappings between addresses and logical names |
jgroups.name_cache.max_age |
The max age (in milliseconds) a mapping between address and logical name is kept in the NameCache. Not that elements are only evicted if there’s not enough room. |
jgroups.ipmcast.prefix |
Needed if an IPv4 multicast address is used in an IPv6 system. The prefix (default: |
jgroups.use.jdk_logger |
If true, the JDK logger ( |
jgroups.log_class |
The fully qualified name of a class implementing a logger to be used. See [Logging] for details. |
jgroups.msg.default_headers |
System prop for defining the default number of headers in a Message (default: 4). |
Transport
TP is the base class for all transports, e.g. UDP and TCP. All of the properties
defined here are inherited by the subclasses. The properties for TP are:
${TP}
bind_addr can be set to the address of a network interface, e.g. 192.168.1.5.
It can also be set for the entire stack using system property -Djgroups.bind_addr, which
provides a value for bind_addr unless it has already been set in the XML config.
The following special values are also recognized for bind_addr:
- GLOBAL
-
Picks a global IP address if available. If not, falls back to a SITE_LOCAL IP address.
- SITE_LOCAL
-
Picks a site local (non routable) IP address, e.g. from the 192.168.0.0 or 10.0.0.0 address range.
- LINK_LOCAL
-
Picks a link-local IP address, from 169.254.1.0 through 169.254.254.255.
- NON_LOOPBACK
-
Picks any non loopback address.
- LOOPBACK
-
Pick a loopback address, e.g. 127.0.0.1.
- match-interface
-
Pick an address which matches a pattern against the interface name, e.g. match-interface:eth.\*
- match-address
-
Pick an address which matches a pattern against the host address, e.g. match-address:192.168.\*
- match-host
-
Pick an address which matches a pattern against the host name, e.g. match-host:linux.\*
- custom
-
Use custom code to pick the bind address. The value after
customneeds to be the fully qualified name of a class implementingSupplier<InetAddress>, e.g. bind_addr="custom:com.acme.BindAddressPicker".
An example of setting the bind address in UDP to use a site local address is:
<UDP bind_addr="SITE_LOCAL" />This will pick any address of any interface that’s site-local, e.g. a 192.168.x.x or 10.x.x.x address.
Since 4.0, it is possible to define a list of addresses in bind_addr. Each entry of the list will be tried and the
first entry that works will be used. Example:
<UDP bind_addr="match-interface:eth2,10.5.5.5,match-interface:en.\*,127.0.0.1" />This would try to bind to eth2 first. If not found, then an interface with address 10.5.5.5 would be tried,
then an interface starting with en would be tried. If still not found, we’d bind to 127.0.0.1.
UDP
UDP uses IP multicast for sending messages to all members of a group and UDP datagrams for unicast messages (sent to a single member). When started, it opens a unicast and multicast socket: the unicast socket is used to send/receive unicast messages, whereas the multicast socket sends and receives multicast messages. The channel’s physical address will be the address and port number of the unicast socket.
A protocol stack with UDP as transport protocol is typically used with clusters whose members run in the same subnet. If running across subnets, an admin has to ensure that IP multicast is enabled across subnets. It is often the case that IP multicast is not enabled across subnets. In such cases, the stack has to either use UDP without IP multicasting or other transports such as TCP.
${UDP}
TCP
Specifying TCP in your protocol stack tells JGroups to use TCP to send messages between cluster members. Instead of using a multicast bus, the cluster members create a mesh of TCP connections.
For example, while UDP sends 1 IP multicast packet when sending a message to a cluster of 10 members, TCP needs to send the message 9 times. It sends the same message to the first member, to the second member, and so on (excluding itself as the message is looped back internally).
This is slow, as the cost of sending a group message is O(n) with TCP, where it is O(1) with UDP. As the cost of sending a group message with TCP is a function of the cluster size, it becomes higher with larger clusters.
|
Note
|
We recommend to use UDP for larger clusters, whenever possible |
${BasicTCP}
${TCP}
TCP_NIO2
TCP_NIO2 is similar to TCP, but uses NIO (= Non blocking IO) to send messages to and receive messages from members. Contrary to TCP, it doesn’t use 1 thread per connection, but handles accepts, connects, reads and writes in a single thread.
All of these operations are guaranteed to never block.
For example, if a read is supposed to receive 1000 bytes and only reveived 700, the read reads the 700 bytes, saves them somewhere and later - when the remaining 300 bytes have been received - is notified to complete the read and then returns the 1000 bytes to the application.
Using a single thread is not a problem, as operations will never block. The only potentially blocking operation, namely delivering messages up to the application, is done via the regular or OOB thread pools, as usual.
While TCP and TCP_NIO2 both have the N-1 problem of sending cluster wide messages (contrary to UDP), TCP_NIO2 is able to handle a larger number of connections than TCP, as it doesn’t use the thread-per-connection model, and - contrary to TCP, but similar to UDP - it doesn’t block when sending or receiving messages.
${BasicTCP}
${TCP_NIO2}
Initial membership discovery
The task of the discovery is to find an initial membership, which is used to determine the current coordinator. Once a coordinator is found, the joiner sends a JOIN request to the coord.
Discovery is also called periodically by MERGE2 (see [MERGE2]), to see if we have
diverging cluster membership information.
Discovery
Discovery is the superclass for all discovery protocols and therefore its
properties below can be used in any subclass.
Discovery sends a discovery request, and waits for num_initial_members discovery responses, or timeout ms, whichever occurs first, before returning. Note that break_on_coord_rsp="true" will return as soon as we have a response from a coordinator.
${Discovery}
Discovery and local caches
Besides finding the current coordinator in order to send a JOIN request to it, discovery also fetches information about members and adds it to its local caches. This information includes the logical name, UUID and IP address/port of each member. When discovery responses are received, the information in it will be added to the local caches.
Since 3.5 it is possible to define this information in a single file, with each line providing information about one member. The file contents look like this:
m1.1 1 10.240.78.26:7800 T m2.1 2 10.240.122.252:7800 F m3.1 3 10.240.199.15:7800 F
This file defines information about 3 members m1.1, m2.1 and m3.1. The first element ("m1.1") is the
logical name. Next comes the UUID (1), followed by the IP address and port (10.240.78.26:7800).
T means that the member is the current coordinator.
Methods dumpCache() can be used to write the current contents of any member to a file (in the above
format) and addToCache() can be used to add the contents of a file to any member. These operations
can for example be invoked via JMX or probe.sh.
Refer to the section on FILE_PING for more information on how to use these files to speed up
the discovery process.
PING
Initial (dirty) discovery of members. Used to detect the coordinator (oldest member), by mcasting PING requests to an IP multicast address.
Each member responds with a packet {C, A}, where C=coordinator’s address and A=own address. After N milliseconds or M replies, the joiner determines the coordinator from the responses, and sends a JOIN request to it (handled by GMS). If nobody responds, we assume we are the first member of a group.
Unlike TCPPING, PING employs dynamic discovery, meaning that the member does not have to know in advance where other cluster members are.
PING uses the IP multicasting capabilities of the transport to send a discovery request to the cluster. It therefore requires UDP as transport.
${PING}
TCPPING
TCPPING is used with TCP as transport, and uses a static list of cluster members’s addresses. See TCPPING for details.
${TCPPING}
|
Note
|
It is recommended to include the addresses of all cluster members in initial_hosts.
|
TCPGOSSIP
TCPGOSSIP uses an external GossipRouter to discover the members of a cluster. See TCPGOSSIP for details.
${TCPGOSSIP}
MPING
MPING (=Multicast PING) uses IP multicast to discover the initial membership. It can be used with all transports, but usually is used in combination with TCP. TCP usually requires TCPPING, which has to list all cluster members explicitly, but MPING doesn’t have this requirement. The typical use case for this is when we want TCP as transport, but multicasting for discovery so we don’t have to define a static list of initial hosts in TCPPING
MPING uses its own multicast socket for discovery. Properties bind_addr (can also be set via $$-Djgroups.bind_addr=$$), mcast_addr and mcast_port can be used to configure it.
Note that MPING requires a separate thread listening on the multicast socket for discovery requests.
${MPING}
FILE_PING
FILE_PING can be used instead of GossipRouter in cases where no external process is desired.
Since 3.5, the way FILE_PING performs discovery has changed. The following paragraphs describe the new mechanism to discover members via FILE_PING or subclasses (e.g. S3_PING GOOGLE_PING or GOOGLE_PING2), so this applies to all cloud-based stores as well.
Instead of storing 1 file per member in the file system or cloud store, we only store 1 file for all members. This has the advantage, especially in cloud stores, that the number of reads is not a function of the cluster size, e.g. we don’t have to perform 1000 reads for member discovery in a 1000 node cluster, but just a single read.
This is important as the cost of 1000 times the round trip time of a (REST) call to the cloud store is certainly higher that the cost of a single call. There may also be a charge for calls to the cloud, so a reduced number of calls lead to reduced charges for cloud store access, especially in large clusters.
The current coordinator is always in charge of writing the file; participants never write it, but only read it. When there is a split and we have multiple coordinator, we may also have multiple files.
The name of a file is always UUID.logical_name.list, e.g. 0000-0000-000000000001.m1.1.list, which has
a UUID of 1, a logical name of "m1.1" and the suffix ".list".
Removing a member which crashed or left gracefully
When we have view {A,B,C,D} (A being the coordinator), the file 2f73fcac-aecb-2a98-4300-26ca4b1016d2.A.list might
have the following contents:
C c0a6f4f8-a4a3-60c1-8420-07c81c0256d6 192.168.1.168:7802 F D 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e 192.168.1.168:7803 F A 2f73fcac-aecb-2a98-4300-26ca4b1016d2 192.168.1.168:7800 T B c6afa01d-494f-f340-c0db-9795102ac2a3 192.168.1.168:7801 F
It shows the 4 members with their UUIDs, IP addreses and ports, and the coordinator (A). When we now make C leave (gracefully, or by killing it), the file should have 3 lines, but it doesn’t:
C c0a6f4f8-a4a3-60c1-8420-07c81c0256d6 192.168.1.168:7802 F D 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e 192.168.1.168:7803 F A 2f73fcac-aecb-2a98-4300-26ca4b1016d2 192.168.1.168:7800 T B c6afa01d-494f-f340-c0db-9795102ac2a3 192.168.1.168:7801 F
Indeed, the entry for C is still present! Why?
The reason is that the entry for C is marked as removable, but the entry is not removed straight away, because that would require a call to the store, which might be expensive, or cost money. For instance, if the backend store is cloud based, then the REST call to the cloud store might cost money.
Therefore, removable members are only removed when the logical cache size exceeds its capacity. The capacity is defined
in TP.logical_addr_cache_max_size. Alternatively, if TP.logical_addr_cache_reaper_interval is greater than 0,
then a reaper task will scan the logical cache every logical_addr_cache_reaper_interval milliseconds and remove
elements marked as removable and older than TP.logical_addr_cache_expiration milliseconds.
We can look at the logical cache with JMX or probe (slightly edited):
[belasmac] /Users/bela/jgroups-azure$ probe.sh uuids #1 (338 bytes): local_addr=A [ip=192.168.1.168:7800, version=4.0.0-SNAPSHOT, cluster=draw, 3 mbr(s)] local_addr=A uuids=3 elements: A: 2f73fcac-aecb-2a98-4300-26ca4b1016d2: 192.168.1.168:7800 (9 secs old) D: 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e: 192.168.1.168:7803 (5 secs old) B: c6afa01d-494f-f340-c0db-9795102ac2a3: 192.168.1.168:7801 (1 secs old) #2 (338 bytes): local_addr=B [ip=192.168.1.168:7801, version=4.0.0-SNAPSHOT, cluster=draw, 3 mbr(s)] local_addr=B uuids=3 elements: A: 2f73fcac-aecb-2a98-4300-26ca4b1016d2: 192.168.1.168:7800 (1 secs old) D: 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e: 192.168.1.168:7803 (5 secs old) B: c6afa01d-494f-f340-c0db-9795102ac2a3: 192.168.1.168:7801 (2 secs old) #3 (339 bytes): local_addr=D [ip=192.168.1.168:7803, version=4.0.0-SNAPSHOT, cluster=draw, 3 mbr(s)] local_addr=D uuids=3 elements: A: 2f73fcac-aecb-2a98-4300-26ca4b1016d2: 192.168.1.168:7800 (1 secs old) D: 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e: 192.168.1.168:7803 (11 secs old) B: c6afa01d-494f-f340-c0db-9795102ac2a3: 192.168.1.168:7801 (1 secs old) 3 responses (3 matches, 0 non matches)
This shows that the reaper must have removed the stale entry for C already.
If we start C again and then kill it again and immediately look at the file, then the contents are:
D 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e 192.168.1.168:7803 F A 2f73fcac-aecb-2a98-4300-26ca4b1016d2 192.168.1.168:7800 T B c6afa01d-494f-f340-c0db-9795102ac2a3 192.168.1.168:7801 F C 5b36fe23-b151-6859-3953-97addfa2534d 192.168.1.168:7802 F
We can see that C is still present.
|
Note
|
If we restart C a couple of time, the file will actually list multiple Cs. However, each entry is is different, as only the logical name is the same, but the actual addresses (UUIDs) are different. |
Running probe immediately after restarting C, before the reaper kicks in, it indeed shows the old C as being removable:
[belasmac] /Users/bela/jgroups-azure$ probe.sh uuids #1 (423 bytes): local_addr=A [ip=192.168.1.168:7800, version=4.0.0-SNAPSHOT, cluster=draw, 3 mbr(s)] local_addr=A uuids=4 elements: A: 2f73fcac-aecb-2a98-4300-26ca4b1016d2: 192.168.1.168:7800 (5 secs old) C: 5b36fe23-b151-6859-3953-97addfa2534d: 192.168.1.168:7802 (5 secs old, removable) D: 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e: 192.168.1.168:7803 (9 secs old) B: c6afa01d-494f-f340-c0db-9795102ac2a3: 192.168.1.168:7801 (11 secs old) #2 (423 bytes): local_addr=B [ip=192.168.1.168:7801, version=4.0.0-SNAPSHOT, cluster=draw, 3 mbr(s)] local_addr=B uuids=4 elements: A: 2f73fcac-aecb-2a98-4300-26ca4b1016d2: 192.168.1.168:7800 (15 secs old) C: 5b36fe23-b151-6859-3953-97addfa2534d: 192.168.1.168:7802 (5 secs old, removable) D: 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e: 192.168.1.168:7803 (9 secs old) B: c6afa01d-494f-f340-c0db-9795102ac2a3: 192.168.1.168:7801 (5 secs old) #3 (424 bytes): local_addr=D [ip=192.168.1.168:7803, version=4.0.0-SNAPSHOT, cluster=draw, 3 mbr(s)] local_addr=D uuids=4 elements: A: 2f73fcac-aecb-2a98-4300-26ca4b1016d2: 192.168.1.168:7800 (15 secs old) C: 5b36fe23-b151-6859-3953-97addfa2534d: 192.168.1.168:7802 (5 secs old, removable) D: 9db6cf43-138c-d7cb-8eb3-2aa4e7cb5f7e: 192.168.1.168:7803 (5 secs old) B: c6afa01d-494f-f340-c0db-9795102ac2a3: 192.168.1.168:7801 (11 secs old) 3 responses (3 matches, 0 non matches)
Here, we can see that C is marked as removable. Once its entry is 60 seconds old (logical_addr_cache_expiration), then
the reaper (if configured to run) will remove the element on its next run.
Configuration with a preconfigured bootstrap file
To speed up the discovery process when starting a large cluster, a predefined bootstrap file can be used. Every node then needs to have an entry in the file and its UUID and IP address:port needs to be the same as in the file. For example, when using the following bootstrap file:
m1.1 1 10.240.78.26:7800 T m2.1 2 10.240.122.252:7800 F m3.1 3 10.240.199.15:7800 F
, the member called "m1.1" needs to have a UUID of 1, and needs to run on host 10.240.78.26 on port 7800. The UUID can be injected via an AddressGenerator (see UPerf for an example).
When a member starts, it loads the bootstrap file, which contains information about all other members, and thus (ideally) never needs to run a discovery process. In the above example, the new joiner also knows that the current coordinator (marked with a 'T') is m1.1, so it can send its JOIN request to that node.
When the coordinator changes, or members not listed in the file join, the current coordinator writes the file again, so all members have access to the updated information when needed.
If a bootstrap discovery file is to be used, it needs to be placed into the file system or cloud store in the correct location and with the right name (see the Discovery section for naming details).
The design is discussed in more detail in CloudBasedDiscovery.txt
Removal of zombie files
By default, a new coordinator C never removes a file created by an old coordinator A. E.g. in {A,B,C,D} (with
coordinator A), if C becomes coordinator on a split {A,B} | {C,D}, then C doesn’t remove A’s file, as there
is no way for `C to know whether A crashed or whether A was partitioned away.
Every coordinator P installs a shutdown hook which removes P’s file on termination. However, this doesn’t apply
to a process killed ungracefully, e.g. by `kill -9. In this case, no shutdown hook will get called. If we had view
{A,B,C}, and A was killed via kill -9, and B takes over, we’d have files A.list and B.list.
To change this, attribute remove_old_coords_on_view_change can be set to true. In this case, files created by old
coordinators will be removed. In the scenario above, where A crashed, B would remove A.list.
However, if we have a split between {A,B} and {C,D}, C would remove A.list. To prevent this, every coordinator
writes its file again on a view change that has left members or in which the coordinator changed.
There is still a case which can end up with a zombie file that’s never removed: when we have a single member A and
it is killed via kill -9. In this case, file A.list will never get cleaned up and subsequent joiners will ask
A to join, up to GMS.max_join_attempts times.
Zombie cleanup can be solved by setting remove_all_files_on_view_change to true. In this case, a coordinator
removes all files on a view change that has members leaving or changes the coordinator.
|
Note
|
Setting remove_old_coords_on_view_change or remove_all_files_on_view_change to true generates more traffic
to the file system or cloud store. If members are always shut down gracefully, or never killed via kill -9, then
it is recommended to set both attributes to false.
|
${FILE_PING}
JDBC_PING
JDBC_PING uses a DB to store information about cluster nodes used for discovery. All cluster nodes are supposed to be able to access the same DB.
When a node starts, it queries information about existing members from the database, determines the coordinator and then asks the coord to join the cluster. It also inserts information about itself into the table, so others can subsequently find it.
When a node P has crashed, the current coordinator removes P’s information from the DB. However, if there is a network split, then this can be problematic, as crashed members cannot be told from partitioned-away members.
For instance, if we have {A,B,C,D}, and the split creates 2 subclusters {A,B} and {C,D},
then A would remove {C,D} because it thinks they crashed, and - likewise - C would remove {A,B}.
To solve this, every member re-inserts its information into the DB after a view change. So when C and D’s view
changes from `{A,B,C,D} to {C,D}, both sides of the split re-insert their information.
Ditto for the other side of the network split.
The re-insertion is governed by attributes info_writer_max_writes_after_view and info_writer_sleep_time: the former
defines the number of times re-insertion should be done (in a timer task) after each view change and the latter is the
sleep time (in ms) between re-insertions.
The value of this is that dead members are removed from the DB (because they cannot do re-insertion), but network splits are handled, too.
Another attribute clear_table_on_view_change governs how zombies are handled. Zombies are table entries for members
which crashed, but weren’t removed for some reason. E.g. if we have a single member A and kill it (via kill -9), then
it won’t get removed from the table.
If clear_table_on_view_change is set to true, then the coordinator clears the table after a view change (instead of
only removing the crashed members), and everybody re-inserts its own information. This attribute can be set to true if
automatic removal of zombies is desired. However, it is costly, therefore if no zombies ever occur (e.g. because processes
are never killed with kill -9), or zombies are removed by a system admin, then it should be set to false.
|
Note
|
Processes killed with kill -3 are removed from the DB as a shutdown handler will be called on kill -3 (but not on kill -9). |
${JDBC_PING}
BPING
BPING uses UDP broadcasts to discover other nodes. The default broadcast address (dest) is 255.255.255.255, and should be replaced with a subnet specific broadcast, e.g. 192.168.1.255.
${BPING}
RACKSPACE_PING
RACKSPACE_PING uses Rackspace Cloud Files Storage to discover initial members. Each node writes a small object in a shared Rackspace container. New joiners read all addresses from the container and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding object.
This objects are stored under a container called 'jgroups', and each node will write an object name after the cluster name, plus a "/" followed by the address, thus simulating a hierarchical structure.
${RACKSPACE_PING}
S3_PING
S3_PING uses Amazon S3 to discover initial members. New joiners read all addresses from this bucket and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding file.
It’s designed specifically for members running on Amazon EC2, where multicast traffic is not allowed and thus MPING or PING will not work. When Amazon RDS is preferred over S3, or if a shared database is used, an alternative is to use JDBC_PING.
Each instance uploads a small file to an S3 bucket and each instance reads the files out of this bucket to determine the other members.
There are three different ways to use S3_PING, each having its own tradeoffs between security and ease-of-use. These are described in more detail below:
-
Private buckets, Amazon AWS credentials given to each instance
-
Public readable and writable buckets, no credentials given to each instance
-
Public readable but private writable buckets, pre-signed URLs given to each instance Pre-signed URLs are the most secure method since writing to buckets still requires authorization and you don’t have to pass Amazon AWS credentials to every instance. However, they are also the most complex to setup.
Here’s a configuration example for private buckets with credentials given to each instance:
<S3_PING location="my_bucket" access_key="access_key"
secret_access_key="secret_access_key" timeout="2000"
num_initial_members="3"/>Here’s an example for public buckets with no credentials:
<S3_PING location="my_bucket"/>And finally, here’s an example for public readable buckets with pre-signed URLs:
<S3_PING pre_signed_put_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=it1cUUtgCT9ZJyCJDj2xTAcRTFg%3D"
pre_signed_delete_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=u4IFPRq%2FL6%2FAohykIW4QrKjR23g%3D"
/>${S3_PING}
AWS_PING
This is a protocol written by Meltmedia, which uses the AWS API. It is not part of JGroups, but can be downloaded at https://github.com/meltmedia/jgroups-aws.
Native S3 PING
This implementation by Zalando uses the AWS SDK. It is not part of JGroups, but can be found at https://github.com/zalando/jgroups-native-s3-ping. This protocols works with JGroups versions 3.x.
There’s a refactored version of AWS_PING that was ported (in 2017) to run on JGroups 4.x at https://github.com/jgroups-extras/native-s3-ping.
GOOGLE_PING
GOOGLE_PING is a subclass of S3_PING and inherits most of the functionality. It uses Google Cloud Storage to store information about individual members.
The snippet below shows a sample config:
<GOOGLE_PING
location="jgroups-bucket"
access_key="GXXXXXX"
secret_access_key="YYYYYY"
timeout="2000" num_initial_members="3"/>This will use a bucket "jgroups-bucket" or create one if it doesn’t exist, then create another folder under it with the cluster name, and finally use 1 object per member in that location for member info.
${GOOGLE_PING}
|
Note
|
GOOGLE_PING has been deprecated and GOOGLE_PING2 should be used instead.
|
GOOGLE_PING2
GOOGLE_PING2 is the successor to GOOGLE_PING and uses Google’s
client library to access Google Compute Storage. It is the recommended way to access GCS and the project is hosted at
https://github.com/jgroups-extras/jgroups-google.
DNS_PING
DNS_PING uses DNS A or SRV entries to perform discovery. Initially this protocol was designed for
Kubernetes and OpenShift but it suitable for any type of DNS discovery.
|
Note
|
In order to enable DNS discovery for application deployed on Kubernetes/OpenShift one must create a Governing Headless Service with proper selector covering desired Pods. The Service will ensure that DNS entries are populated as soon as Pods are in Ready state. |
The snippet below shows a sample config:
<dns.DNS_PING
dns_address="192.168.0.17"
dns_query="jgroups-dns-ping.myproject.svc.cluster.local" />This will turn on DNS discovery using DNS server at address 192.168.0.17 and DNS query jgroups-dns-ping.myproject.svc.cluster.local.
The dns_address parameter is optional and when it’s missing, the protocol will use the default DNS resolver configured
on the machine.
The dns_query parameter is mandatory. It is used for querying the DNS Server and obtaining information about the cluster members.
The svc.cluster.local part is specific to Kubernetes and OpenShift and might be omitted.
It is also possible to use SRV entries for discovery as shown below:
<dns.DNS_PING
dns_query="_ping._tcp.jgroups-dns-ping.myproject.svc.cluster.local"
dns_record_type="SRV" />Kubernetes SRV entries are created using the following scheme: _my-port-name._my-port-protocol.my-svc.my-namespace.svc.cluster.local.
When the above example is used in Kubernetes or OpenShift, the DNS_PING will form a cluster of all the Pods governed
by a Service named jgroups-dns-ping in myproject Namespace, which expose a TCP port named ping.
For more information, please refer to Kubernetes DNS Admin Guide.
|
Note
|
Note that both KUBE_PING and DNS_PING can be used in Kubernetes/OpenShift. The main difference between them is that KUBE_PING uses Kubernetes API for discovery whereas DNS_PING uses DNS entries. Having said that, DNS_PING should be used together with a Governing Service, which makes it perfect fit for Stateful Sets. |
A working example of using this protocol might be found in https://github.com/slaskawi/jgroups-dns-ping-example.
${DNS_PING}
SWIFT_PING
SWIFT_PING uses Openstack Swift to discover initial members. Each node writes a small object in a shared container. New joiners read all addresses from the container and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding object.
These objects are stored under a container called 'jgroups' (by default), and each node will write an object name after the cluster name, plus a "/" followed by the address, thus simulating a hierarchical structure.
Currently only Openstack Keystone authentication is supported. Here is a sample configuration block:
<SWIFT_PING timeout="2000"
num_initial_members="3"
auth_type="keystone_v_2_0"
auth_url="http://localhost:5000/v2.0/tokens"
username="demo"
password="password"
tenant="demo" />${SWIFT_PING}
KUBE_PING
This Kubernetes-based discovery protocol can be used with OpenShift [2] and uses Kubernetes to discover cluster
members. KUBE_PING is hosted on jgroups-extras; refer to [1] for details.
AZURE_PING
This is a discovery protocol that allows cluster nodes to run on the Azure cloud [1]. For details refer to [2].
PDC - Persistent Discovery Cache
The Persistent Discovery Cache can be used to cache the results of the discovery process persistently. E.g. if we have TCPPING.initial_hosts configured to include only members A and B, but have a lot more members, then other members can bootstrap themselves and find the right coordinator even when neither A nor B are running.
An example of a TCP-based stack configuration is:
<TCP />
<PDC cache_dir="/tmp/jgroups" />
<TCPPING timeout="2000" num_initial_members="20"
initial_hosts="192.168.1.5[7000]" port_range="0"
return_entire_cache="true"
use_disk_cache="true" />${PDC}
MULTI_PING
This discovery protocol allows for multiple discovery protocols to be used in a configuration. MULTI_PING adds
all discovery protocols underneath it in the stack to a list at initialization time.
When a discovery request is
received from above, all discovery protocols in the list are contacted, either sequentially
(async_discovery=false) or in parallel (async_discovery=true).
A sample configuration is shown below:
<TCP ... />
<TCPPING initial_hosts="127.0.0.1[7800]" port_range="0"/>
<PING />
<MPING/>
<FILE_PING/>
<MULTI_PING async_discovery="true"/>
<MERGE3/>
...${MULTI_PING}
Merging after a network partition
If a cluster gets split for some reasons (e.g. network partition), this protocol merges the subclusters back into one cluster. It is only run by the coordinator (the oldest member in a cluster), which periodically multicasts its presence and view information. If another coordinator (for the same cluster) receives this message, it will initiate a merge process. Note that this merges subgroups {A,B} and {C,D,E} back into {A,B,C,D,E}, but it does not merge state. The application has to handle the callback to merge state. See [HandlingNetworkPartitions] for suggestion on merging states.
Following a merge, the coordinator of the merged group can shift from the typical case of "the coordinator is the member who has been up the longest." During the merge process, the coordinators of the various subgroups need to reach a common decision as to who the new coordinator is. In order to ensure a consistent result, each coordinator combines the addresses of all the members in a list and then sorts the list. The first member in the sorted list becomes the coordinator. The sort order is determined by how the address implements the interface. Then JGroups compares based on the UUID. So, take a hypothetical case where two machines were running, with one machine running three separate cluster members and the other two members. If communication between the machines were cut, the following subgroups would form: {A,B} and {C,D,E} Following the merge, the new view would be: {C,D,A,B,E}, with C being the new coordinator.
Note that "A", "B" and so on are just logical names, attached to UUIDs, but the actual sorting is done on the actual UUIDs.
MERGE3
If a cluster gets split for some reasons (e.g. network partition), this protocol merges the subclusters back into one cluster.
All members periodically send an INFO message with their address (UUID), logical name, physical address and ViewId. The ViewId ([ViewId]) is used to see if we have diverging views among the cluster members: periodically, every coordinator looks at the INFO messages received so far and checks if there are any inconsistencies.
If inconsistencies are found, the merge leader will be the member with the lowest address (UUID).
The merge leader then asks the senders of the inconsistent ViewIds for their full views. Once received,
it simply passes a MERGE event up the stack, where the merge will be handled (by GMS) in exactly the same
way as if MERGE2 has generated the MERGE event.
The advantages of MERGE3 are:
-
Sending of INFO messages is spread out over time, preventing message peaks which might cause packet loss. This is especially important in large clusters.
-
Only 1 merge should be running at any time. There are no competing merges going on.
-
An INFO message carries the logical name and physical address of a member. This allows members to update their logical/physical address caches.
-
On the downside,
MERGE3has constant (small) traffic by all members. -
MERGE3was written for an IP multicast capable transport (UDP), but it also works with other transports (such asTCP), although it isn’t as efficient onTCPas onUDP.
Example
<MERGE3 max_interval="10000" min_interval="5000" check_interval="15000"/>This means that every member sends out an INFO message at a random interval in range [5000 .. 10000] ms. Every
15 seconds (check_interval), every coordinator checks if it received a ViewId differing from its own, and initiates
a merge if true.
-
We have subclusters
{A,B,C},{D,E}and{F}. The subcluster coordinators areA,DandF -
The network partition now heals
-
Dchecks its received ViewIds, and sees entries from itself andA-
Since broadcasting of INFO messages is unreliable (as
MERGE3is underneathNAKACK2in the stack), the last INFO message fromFmight have been dropped
-
-
DorAinitiates a merge, which results in view{A,B,C,D,E} -
A bit later, on the next check,
Fsees that its ViewId diverges from the ViewId sent in an INFO message byC -
FandAinitiate a new merge which results in merge view{A,B,C,D,E,F}
Increasing check_interval decreases the chance of partial merges (as shown above), but doesn’t entirely eliminate them:
members are not started at exactly the same time, and therefore their check intervals overlap.
If a member’s interval elapsed just after receiving INFO messages from a subset of the subclusters
(e.g. briefly after a partition healed), then we will still have a partial merge.
${MERGE3}
Failure Detection
The task of failure detection is to probe members of a group and see whether they are alive. When a member is suspected of having failed, then a SUSPECT message is sent to all nodes of the cluster. It is not the task of the failure detection layer to exclude a crashed member (this is done by the group membership protocol, GMS), but simply to notify everyone that a node in the cluster is suspected of having crashed.
The SUSPECT message is handled by the GMS protocol of the current coordinator only; all other members ignore it.
FD
Failure detection based on a logical ring and heartbeat messages.
Members form a logical ring; e.g. in view {A,B,C,D}, A pings B, which pings C, which pings D, which pings A.
'Pinging' means sending a heartbeat.
Each member sends this heartbeat every timeout ms to the neighbor to its right. When a member receives a heartbeat, it
sends back an ack. When the ack is received the timestamp of when a member last heard from its neighbor is reset.
When a member doesn’t receive any heartbeat acks from its neighbor for timeout * max_tries ms,
that member is declared suspected, and will be excluded by GMS.
This is done by FD multicasting a SUSPECT(P) message which is handled by the current coordinator by double-checking
the health of P (using VERIFY_SUSPECT) and - if P still doesn’t reply - by excluding P from the membership.
Note that setting msg_counts_as_heartbeat in P to true causes the timestamp of P in the pinging member to be
reset.
Example
<FD timeout="3000" max_tries="4" />-
The membership is
{A,B,C,D,E}. -
Now C and D crash at the same time
-
B’s next heartbeats won’t get an ack
-
After roughly 12 seconds (4 * 3 secs), B suspects C
-
B now starts sending heartbeats to D
-
-
A (the coordinator) handles the
SUSPECT©message from B and usesVERIFY_SUSPECTto double-check that C is really dead -
After
VERIFY_SUSPECT.timeoutms, A creates a new view{A,B,D,E}excluding C -
After ca. 12 seconds, B sends a
SUSPECT(D)message to the coordinator, which eventually also excludesD
${FD}
FD_ALL
Failure detection based on simple heartbeat protocol. Every member periodically multicasts a heartbeat. Every member also maintains a table of all members (minus itself). When data or a heartbeat from P are received, we reset the timestamp for P to the current time. Periodically, we check for expired members whose timestamp is greater than the timeout, and suspect those.
Example
<FD_ALL timeout="12000" interval="3000" timeout_check_interval="2000"/>-
The membership is
{A,B,C,D,E}. -
Every member broadcasts a heartbeat every 3 seconds. When received, the sender’s timestamp in the table is set to the current time
-
Every member also checks every 2 seconds if any member’s timestamp exceeds the timeout and suspects that member if this is the case
-
Now C and D crash at the same time
-
After roughly 12-13 seconds,
Abroadcasts aSUSPECT(C,D)message -
The coordinator (
A) usesVERIFY_SUSPECTto double check ifCandDare dead -
Acreates a new view{A,B,E}which excludesCandD
|
Note
|
Contrary to FD which suspects adjacent crashed members C and D one by one, FD_ALL suspects C and D in
constant time. FD takes N * (timeout * max_tries) ms, whereas FD_ALL takes timeout ms
|
${FD_ALL}
FD_ALL2
Similar to FD_ALL, but doesn’t use any timestamps. Instead, a boolean flag is associated with each
member. When a message or heartbeat (sent every interval ms) from P is received, P’s flag is set to true.
The heartbeat checker checks every timeout ms for members whose flag is false, suspects those, and
- when done - resets all flags to false again.
The times it takes to suspect a member are the same as for FD_ALL
${FD_ALL2}
FD_SOCK
Failure detection protocol based on a ring of TCP sockets created between cluster members, similar to FD but
not using heartbeat messages.
Each member in a cluster connects to its neighbor (the last member connects to the first), thus forming a ring.
Member B is suspected when its neighbor A detects abnormal closing of its TCP socket
(presumably due to a crash of B). However, if B is about to leave gracefully, it lets its neighbor A
know, so that A doesn’t suspect B.
Example
-
The membership is
{A,B,C,D,E}. -
Members
CandDare killed at the same time -
Bnotices thatCabnormally closed its TCP socket and broadcasts aSUSPECT©message -
The current coordinator (
A) asksVERIFY_SUSPECTto double check thatCis dead -
Meanwhile,
Btries to create a TCP socket to the next-in-line (D) but fails. It therefore broadcasts aSUSPECT(D)message -
Aalso handles this message and asksVERIFY_SUSPECTto double check ifDis dead -
After
VERIFY_SUSPECTcan’t verify thatCandDare still alive,Acreates a new view{A,B,E}and installs it -
The time taken for
FD_SOCKto suspect a member is very small (a few ms)
|
Note
|
It is recommended to use FD_SOCK and FD or FD_ALL together in the same stack: FD_SOCK detects killed
nodes immediately, and FD_ALL (with a higher timeout) detects hung members or kernel panics / crashed switches
(which don’t close the TCP connection) after the timeout.
|
${FD_SOCK}
FD_HOST
To detect the crash or freeze of entire hosts and all of the cluster members running on them, FD_HOST
can be used. It is not meant to be used in isolation, as it doesn’t detect crashed members on the
local host, but in conjunction with other failure detection protocols, such as FD_ALL or FD_SOCK.
FD_HOST can be used when we have multiple cluster members running on a physical box. For example,
if we have members {A,B,C,D} running on host 1 and {M,N,O,P} running on host 2, and host 1 is
powered down, then A, B, C and D are suspected and removed from the cluster together, typically
in one view change.
By default, FD_HOST uses InetAddress.isReachable() to perform liveness checking of other hosts, but
if property cmd is set, then any script or command can be used. FD_HOST will launch the command and
pass the IP address ot the host to be checked as argument. Example: cmd="ping -c 3".
A typical failure detection configuration would look like this:
...
<FD_SOCK/>
<FD_ALL timeout="60000" interval="20000"/>
<FD_HOST interval="10000" timeout="35000" />
...If we have members {A,B,C} on host 192.168.1.3, {M,N,O} on 192.168.1.4 and {X,Y,Z} on 192.168.1.5, then
the behavior is as follows:
| Scenario | Behavior |
|---|---|
Any member (say |
|
Member |
|
Host |
Since this is a graceful shutdown, the OS closes all sockets. |
The power supply to host |
|
Member |
Since this is a graceful leave, none of the failure detection protocols kick in |
${FD_HOST}
VERIFY_SUSPECT
Verifies that a suspected member is really dead by pinging that member one last time before excluding it, and dropping the suspect message if the member does respond.
VERIFY_SUSPECT tries to minimize false suspicions.
The protocol works as follows: it catches SUSPECT events traveling up the stack. Then it verifies that the suspected member is really dead. If yes, it passes the SUSPECT event up the stack, otherwise it discards it. VERIFY_SUSPECT Has to be placed somewhere above the failure detection protocol and below the GMS protocol (receiver of the SUSPECT event). Note that SUSPECT events may be reordered by this protocol.
${VERIFY_SUSPECT}
Reliable message transmission
NAKACK2
NAKACK2 provides reliable delivery and FIFO (= First In First Out) properties for messages sent to all nodes in a cluster.
It performs lossless and FIFO delivery of multicast messages, using negative acks. E.g. when receiving P:1, P:3, P:4, a receiver delivers only P:1, and asks P for retransmission of message 2, queuing P3-4. When P2 is finally received, the receiver will deliver P2-4 to the application.
Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message if that sequence number is not received.
|
Note
|
Note that NAKACK2 can also be configured to send retransmission requests for M to anyone in the cluster, rather than only to the sender of M. |
FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.
${NAKACK2}
UNICAST3
UNICAST3 provides reliable delivery and FIFO (= First In First Out) properties for point-to-point messages between a sender and a receiver.
Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message if that sequence number is not received. UNICAST3 uses a mixture of positive and negative acks (similar to NAKACK2). This reduces the communication overhead required for sending an ack for every message.
FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.
On top of a reliable transport, such as TCP, UNICAST3 is not really needed. However, concurrent delivery of messages from the same sender is prevented by UNICAST3 by acquiring a lock on the sender’s retransmission table, so unless concurrent delivery is desired, UNICAST3 should not be removed from the stack even if TCP is used.
Details of UNICAST3’s design can be found here: UNICAST3
${UNICAST3}
RSVP
The RSVP protocol is not a reliable delivery protocol per se, but augments reliable protocols such as NAKACK, UNICAST or UNICAST2. It should be placed somewhere above these in the stack.
${RSVP}
Message stability
To serve potential retransmission requests, a member has to store received messages until it is known that every member in the cluster has received them. Message stability for a given message M means that M has been seen by everyone in the cluster.
The stability protocol periodically (or when a certain number of bytes have been received) initiates a consensus protocol, which multicasts a stable message containing the highest message numbers for a given member. This is called a digest.
When everyone has received everybody else’s stable messages, a digest is computed which consists of the minimum sequence numbers of all received digests so far. This is the stability vector, and contain only message sequence numbers that have been seen by everyone.
This stability vector is the broadcast to the group and everyone can remove messages from their retransmission tables whose sequence numbers are smaller than the ones received in the stability vector. These messages can then be garbage collected.
STABLE
STABLE garbage collects messages that have been seen by all members of a cluster. Each member has to store all messages because it may be asked to retransmit. Only when we are sure that all members have seen a message can it be removed from the retransmission buffers. STABLE periodically gossips its highest and lowest messages seen. The lowest value is used to compute the min (all lowest seqnos for all members), and messages with a seqno below that min can safely be discarded.
Note that STABLE can also be configured to run when N bytes have been received. This is recommended when sending messages at a high rate, because sending stable messages based on time might accumulate messages faster than STABLE can garbage collect them.
${STABLE}
Group Membership
Group membership takes care of joining new members, handling leave requests by existing members, and handling SUSPECT messages for crashed members, as emitted by failure detection protocols. The algorithm for joining a new member is essentially:
- loop
- find initial members (discovery)
- if no responses:
- become singleton group and break out of the loop
- else:
- determine the coordinator (oldest member) from the responses
- send JOIN request to coordinator
- wait for JOIN response
- if JOIN response received:
- install view and break out of the loop
- else
- sleep for 5 seconds and continue the loop
pbcast.GMS
${GMS}
Joining a new member
Consider the following situation: a new member wants to join a group. The prodedure to do so is:
-
Multicast an (unreliable) discovery request (ping)
-
Wait for n responses or m milliseconds (whichever is first)
-
Every member responds with the address of the coordinator
-
If the initial responses are > 0: determine the coordinator and start the JOIN protocol
-
If the initial response are 0: become coordinator, assuming that no one else is out there
However, the problem is that the initial mcast discovery request might get lost, e.g. when multiple members start at the same time, the outgoing network buffer might overflow, and the mcast packet might get dropped. Nobody receives it and thus the sender will not receive any responses, resulting in an initial membership of 0. This could result in multiple coordinators, and multiple subgroups forming. How can we overcome this problem ? There are two solutions:
-
Increase the timeout, or number of responses received. This will only help if the reason of the empty membership was a slow host. If the mcast packet was dropped, this solution won’t help
-
Add the MERGE2 or MERGE3 protocol. This doesn’t actually prevent multiple initial cordinators, but rectifies the problem by merging different subgroups back into one. Note that this might involve state merging which needs to be done by the application.
Flow control
Flow control takes care of adjusting the rate of a message sender to the rate of the slowest receiver over time. If a sender continuously sends messages at a rate that is faster than the receiver(s), the receivers will either queue up messages, or the messages will get discarded by the receiver(s), triggering costly retransmissions. In addition, there is spurious traffic on the cluster, causing even more retransmissions.
Flow control throttles the sender so the receivers are not overrun with messages.
This is implemented through a credit based system, where each sender has max_credits credits and decrements
them whenever a message is sent. The sender blocks when the credits fall below 0, and only resumes
sending messages when it receives a replenishment message from the receivers.
The receivers maintain a table of credits for all senders and decrement the given sender’s credits as well, when a message is received.
When a sender’s credits drops below a threshold, the receiver will send a replenishment message to
the sender. The threshold is defined by min_bytes or min_threshold.
Note that flow control can be bypassed by setting message flag Message.NO_FC. See [MessageFlags] for details.
The properties for FlowControl are shown below and can be used in MFC and UFC:
${FlowControl}
MFC and UFC
|
Note
|
Flow control is implemented with MFC (Multicast Flow Control) and Unicast Flow Control (UFC). The reason
for 2 separate protocols (which have a common superclass FlowControl) is that multicast flow control should not be
impeded by unicast flow control, and vice versa. Also, performance for the separate implementations could be increased,
plus they can be individually omitted.
|
For example, if no unicast flow control is needed, UFC can be left out of the stack configuration.
MFC
MFC has currently no properties other than those inherited by FlowControl (see above).
${MFC}
UFC
UFC has currently no properties other than those inherited by FlowControl (see above).
${UFC}
Non blocking flow control
Contrary to blocking flow control, which blocks senders from sending a message when credits are lacking, non-blocking flow control avoids blocking the sender thread.
Instead, when a sender has insufficient credits to send a message, the message is queued and the control flow returns to the calling thread. When more credits are received, the queued messages are sent.
This means that a JChannel.send(Message) never blocks and - if the transport is also non-blocking (e.g. TCP_NIO2) -
we have a completely non-blocking stack.
However, if the send rate is always faster than the receive (processing) rate, messages will end up in the queues and the queues will grow, leading to memory exhaustion.
It is therefore possible to fall back to blocking the sender threads if the message queues grow beyond a certain limit.
The attribute to bound a queue is max_queue_size, and defines the max number of bytes the accumulated messages can
have. If that size is exceeded, the addition of a message to a queue will block until messages are removed from the queue.
The max_queue_size attribute is per queue, so for unicast messages we have 1 queue per destination and for multicast
messages we have a single queue for all destinations. For example, if max_queue_size is set to 5M (5 million bytes),
and we have members {A,B,C,D}, then on A the queues for B, C and D will have a combined max size of 15MB.
UFC_NB
This is the non-blocking alternative to UFC. It extends UFC, so all attributes from UFC are inherited.
${UFC_NB}
MFC_NB
This is the non-blocking alternative to MFC. It inherits from MFC, so all attributes are inherited.
${MFC_NB}
Fragmentation
FRAG and FRAG2
FRAG and FRAG2 fragment large messages into smaller ones, send the smaller ones, and at the receiver side, the smaller fragments will get assembled into larger messages again, and delivered to the application. FRAG and FRAG2 work for both unicast and multicast messages.
The difference between FRAG and FRAG2 is that FRAG2 does 1 less copy than FRAG, so it is the recommended fragmentation protocol. FRAG serializes a message to know the exact size required (including headers), whereas FRAG2 only fragments the payload (excluding the headers), so it is faster.
The properties of FRAG2 are:
${FRAG2}
Contrary to FRAG, FRAG2 does not need to serialize a message in order to break it into smaller fragments: it looks only at the message’s buffer, which is a byte array anyway. We assume that the size addition for headers and src and dest addresses is minimal when the transport finally has to serialize the message, so we add a constant (by default 200 bytes). Because of the efficiency gained by not having to serialize the message just to determine its size, FRAG2 is generally recommended over FRAG.
${FRAG3}
FRAG2 needs only half the memory than FRAG2 to handle fragments and the final full message. See https://issues.jboss.org/browse/JGRP-2154 for details.
Ordering
SEQUENCER
SEQUENCER provider total order for multicast (=group) messages by forwarding messages to the current coordinator, which then sends the messages to the cluster on behalf of the original sender. Because it is always the same sender (whose messages are delivered in FIFO order), a global (or total) order is established.
Sending members add every forwarded message M to a buffer and remove M when they receive it. Should the current coordinator crash, all buffered messages are forwarded to the new coordinator.
${SEQUENCER}
Total Order Anycast (TOA)
A total order anycast is a totally ordered message sent to a subset of the cluster members. TOA intercepts messages with an AnycastMessage (carrying a list of addresses) and handles sending of the message in total order. Say the cluster is {A,B,C,D,E} and the Anycast is to {B,C}.
Skeen’s algorithm is used to send the message: B and C each maintain a logical clock (a counter). When a message is to be sent, TOA contacts B and C and asks them for their counters. B and C return their counters (incrementing them for the next request).
The originator of the message then sets the message’s ID to be the max of all returned counters and sends the message. Receivers then deliver the messages in order of their IDs.
The main use of TOA is currently in Infinispan’s transactional caches with partial replication: it is used to apply transactional modifications in total order, so that no two-phase commit protocol has to be run and no locks have to be acquired.
As shown in "Exploiting Total Order Multicast in Weakly Consistent Transactional Caches", when we have many conflicts by different transactions modifying the same keys, TOM fares better than 2PC.
Note that TOA is experimental (as of 3.1).
${tom.TOA}
State Transfer
pbcast.STATE_TRANSFER
STATE_TRANSFER is the existing transfer protocol, which transfers byte[] buffers around. However, at the state provider’s side, JGroups creates an output stream over the byte[] buffer, and passes the ouput stream to the getState(OutputStream) callback, and at the state requester’s side, an input stream is created and passed to the setState(InputStream) callback.
This allows us to continue using STATE_TRANSFER, until the new state transfer protocols are going to replace it (perhaps in 4.0).
In order to transfer application state to a joining member of a cluster, STATE_TRANSFER has to load entire state into memory and send it to a joining member. The major limitation of this approach is that for state transfers that are very large this would likely result in memory exhaustion.
For large state transfer use either the STATE or STATE_SOCK protocol. However, if the state is small, STATE_TRANSFER is okay.
${STATE_TRANSFER}
StreamingStateTransfer
StreamingStateTransfer is the superclass of STATE and STATE_SOCK (see below). Its properties are:
${StreamingStateTransfer}
pbcast.STATE
Overview
STATE was renamed from (2.x) STREAMING_STATE_TRANSFER, and refactored to extend a common superclass StreamingStateTransfer. The other state transfer protocol extending StreamingStateTransfer is STATE_SOCK (see STATE_SOCK).
STATE uses a streaming approach to state transfer; the state provider writes its state to the output stream passed to it in the getState(OutputStream) callback, which chunks the stream up into chunks that are sent to the state requester in separate messages.
The state requester receives those chunks and feeds them into the input stream from which the state is read by the setState(InputStream) callback.
The advantage compared to STATE_TRANSFER is that state provider and requester only need small (transfer) buffers to keep a part of the state in memory, whereas STATE_TRANSFER needs to copy the entire state into memory.
If we for example have a list of 1 million elements, then STATE_TRANSFER would have to create a byte[] buffer out of it, and return the byte[] buffer, whereas a streaming approach could iterate through the list and write each list element to the output stream. Whenever the buffer capacity is reached, we’d then send a message and the buffer would be reused to receive more data.
Configuration
STATE has currently no properties other than those inherited by StreamingStateTransfer (see above).
STATE_SOCK
STATE_SOCK is also a streaming state transfer protocol, but compared to STATE, it doesn’t send the chunks as messages, but uses a TCP socket connection between state provider and requester to transfer the state.
The state provider creates a server socket at a configurable bind address and port, and the address and port are sent back to a state requester in the state response. The state requester then establishes a socket connection to the server socket and passes the socket’s input stream to the setState(InputStream) callback.
Configuration
The configuration options of STATE_SOCK are listed below:
${STATE_SOCK}
BARRIER
BARRIER is used by some of the state transfer protocols, as it lets existing threads complete and blocks new threads to get both the digest and state in one go.
In 3.1, a new mechanism for state transfer will be implemented, eliminating the need for BARRIER. Until then, BARRIER should be used when one of the state transfer protocols is used. BARRIER is part of every default stack which contains a state transfer protocol.
${BARRIER}
pbcast.FLUSH
Flushing forces group members to send all their pending messages prior to a certain event. The process of flushing acquiesces the cluster so that state transfer or a join can be done. It is also called the stop-the-world model as nobody will be able to send messages while a flush is in process. Flush is used in:
- State transfer
-
When a member requests state transfer, it tells everyone to stop sending messages and waits for everyone’s ack. Then it have received everyone’s asks, the application asks the coordinator for its state and ships it back to the requester. After the requester has received and set the state successfully, the requester tells everyone to resume sending messages.
- View changes (e.g.a join)
-
Before installing a new view V2, flushing ensures that all messages sent in the current view V1 are indeed delivered in V1, rather than in V2 (in all non-faulty members). This is essentially Virtual Synchrony.
FLUSH is designed as another protocol positioned just below the channel, on top of the stack (e.g. above STATE_TRANSFER). The STATE_TRANSFER and GMS protocols request a flush by sending an event up the stack, where it is handled by the FLUSH protcol. Another event is sent back by the FLUSH protocol to let the caller know that the flush has completed. When done (e.g. view was installed or state transferred), the protocol sends a message, which will allow everyone in the cluster to resume sending.
A channel is notified that the FLUSH phase has been started by the Receiver.block() callback.
Read more about flushing in [Flushing].
${FLUSH}
Security
Security is used to prevent (1) non-authorized nodes being able to join a cluster and (2) non-members being able to communicate with cluster members.
(1) is handled by AUTH or SASL which allows only authenticated nodes to join a cluster.
(2) is handled by the encryption protocol (SYM_ENCRYPT or ASYM_ENCRYPT) which encrypts messages between cluster members such that a non-member cannot understand them.
Encryption
Encryption is based on a shared secret key that all members of a cluster have. The key is either acquired from a shared keystore (symmetric encryption) or a new joiner fetches it from the coordinator via public/private key exchange (asymmetric encryption).
A sender encrypts a message with the shared secret key and the receivers decrypt it with the same secret key.
By default, only the payload of a message is encrypted, but not the other metadata (e.g. headers, destination address, flags etc).
If (for example) headers are not encrypted, it is possible to use replay attacks, because the sequence number (seqno) of a message is seen. For example, if a seqno is 50, then an attacker might copy the message, and increment the seqno.
To prevent this, the SERIALIZE protocol can be placed on top of SYM_ENCRYPT or ASYM_ENCRYPT. It serializes the entire message into the payload of a new message that’s then encrypted and sent down the stack.
SYM_ENCRYPT
This is done by SYM_ENCRYPT. The configuration includes mainly attributes that define the keystore, e.g. keystore_name
(name of the keystore, needs to be found on the classpath), store_password, key_password and alias.
SYM_ENCRYPT uses store type JCEKS (for details between JKS and JCEKS see here), however keytool uses JKS, therefore
a keystore generated with keytool will not be accessible.
To generate a keystore compatible with JCEKS, use the following command line options to keytool:
keytool -genseckey -alias myKey -keypass changeit -storepass changeit -keyalg Blowfish -keysize 56 -keystore defaultStore.keystore -storetype JCEKS
SYM_ENCRYPT could then be configured as follows:
<SYM_ENCRYPT sym_algorithm="AES"
key_store_name="defaultStore.keystore"
store_password="changeit"
alias="myKey"/>Note that defaultStore.keystore will have to be found in the classpath.
|
Note
|
Both SYM_ENCRYPT and ASYM_ENCRYPT should be placed directly under NAKACK2 (see sample configurations, e.g. sym-encrypt.xml or asym-encrypt.xml). |
${SYM_ENCRYPT}
ASYM_ENCRYPT
Contrary to SYM_ENCRYPT, the secret key is not fetched from a shared keystore, but from the current coordinator C. After new member P joins the cluster (passing the join check done by AUTH), P sends a request to get the secret key (including P’s public key) to C.
C then sends the secret key back to P, encrypted with P’s public key, and P decrypts it with its private key and installs it. From then on, P encrypts and decrypts messages using the secret key.
When a member leaves, C can optionally (based on change_key_on_leave) create a new secret key, and every cluster member
needs to fetch it again, using the public/private key exchange described above.
A stack configured to use asymmetric encryption could look like this:
...
<VERIFY_SUSPECT/>
<ASYM_ENCRYPT
sym_keylength="128"
sym_algorithm="AES/ECB/PKCS5Padding"
asym_keylength="512"
asym_algorithm="RSA"/>
<pbcast.NAKACK2/>
<UNICAST3/>
<pbcast.STABLE/>
<FRAG2/>
<AUTH auth_class="org.jgroups.auth.MD5Token"
auth_value="chris"
token_hash="MD5"/>
<pbcast.GMS join_timeout="2000" />The configuration snippet shows ASYM_ENCRYPT positioned just below NAKACK2, so that headers of the important retransmission protocols NAKACK2 and UNICAST3 are encrypted, too. Note that AUTH should be part of the configuration, or else unauthenticated nodes would be able to acquire the secret key from the coordinator.
${ASYM_ENCRYPT}
SERIALIZE
This protocol serializes every sent message including all of its metadata into a new message and sends it down. When a message is received, it will be deserialized and then sent up the stack. This can be used by the encryption protocols (see Encryption).
${SERIALIZE}
SSL_KEY_EXCHANGE
ASYM_ENCRYPT uses a built-in key exchange protocol for a requester to fetch the secret group key from the key server (usually the coordinator). Such secret key requests are accompanied by the requester’s public key. The key server encrypts the secret key response with the public key of the requester, and the requester decrypts the response with its private key and can then install the new secret group key to encrypt and decrypt group messages.
This works well, however, it is not immune against man-in-the-middle attacks. If MitM attack prevention is required,
a separate key exchange protocol can be added to the stack. ASYM_ENCRYPT needs to be told to use the key exchange
protocol, which has to be located somewhere beneath it in the stack, by setting use_external_key_exchange to true.
A key exchange protocol needs to extend KeyExchange.
SSL_KEY_EXCHANGE implements MitM-safe key exchange by using SSL sockets and client (and, of course, server)
certification. The key server opens an SSL server socket on a given port and requesters create an SSL client socket and
connect to it, then exchange the secret group key and finally close the connection.
As key requesters and the key server require properly configured certificate chains, trust is established between the two parties and secret group keys can be transmitted securely.
|
Note
|
As certificates authenticate the identity of key servers and requesters (usually joining members), AUTH is not
needed as a separate protocol and can be removed from the configuration.
|
Here’s a typical configuration:
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:org:jgroups"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
<UDP />
<PING/>
<MERGE3/>
<FD_ALL timeout="8000" interval="3000"/>
<FD_SOCK/>
<VERIFY_SUSPECT/>
<SSL_KEY_EXCHANGE (2)
keystore_name="/home/bela/certs/my-keystore.jks"
keystore_password="password"
/>
<ASYM_ENCRYPT
use_external_key_exchange="true" (1)
sym_keylength="128"
sym_algorithm="AES"
asym_keylength="512"
asym_algorithm="RSA"/>
<pbcast.NAKACK2/>
<UNICAST3/>
<pbcast.STABLE/>
<FRAG2/>
<pbcast.GMS join_timeout="2000" />
</config>Here SSL_KEY_EXCHANGE is positioned below ASYM_ENCRYPT. The latter is configured to use an external key exchange
protocol (1). The former is configured with a keystore and password (2).
${SSL_KEY_EXCHANGE}
AUTH
Authentication is performed by AUTH. Its main use is to make sure only authenticated members can join a cluster. Other scenarios where a check is performed are:
-
Merging: make sure only authenticated members can merge into a new cluster
-
View installation (if enabled): views and merge views can only be installed by authenticated members
So authentication makes sure that rogue nodes will never be able to be members of a cluster, be it via joining or merging. Note that while AUTH is optional with SYM_ENCRYPT, it is required by ASYM_ENCRYPT: there’s a sanity check that will prevent a member to start if ASYM_ENCRYPT is present but AUTH is absent.
AUTH provides pluggable security that defines if a node should be allowed to join a cluster. AUTH sits below the GMS protocol and listens for JOIN REQUEST messages. When a JOIN REQUEST is received it tries to find an AuthHeader object, inside of which should be an implementation of the AuthToken object.
AuthToken is an abstract class, implementations of which are responsible for providing the actual authentication mechanism. Some basic implementations of AuthToken are provide in the org.jgroups.auth package (SimpleToken, MD5Token and X509Token). Effectivly all these implementations do is encrypt a string (found in the jgroups config) and pass that on the JOIN REQUEST.
When authentication is successful, the message is simply passed up the stack to the GMS protocol. When it fails, the AUTH protocol creates a JOIN RESPONSE message with a failure string and passes it back down the stack. This failure string informs the client of the reason for failure. Clients will then fail to join the group and will throw a SecurityException. If this error string is null then authentication is considered to have passed.
For more information refer to the wiki at AUTH.
${AUTH}
AuthToken implementations
The AuthToken implememntations are listed below. Check the javadoc for details.
| Name | Description |
|---|---|
SimpleToken |
Uses a simple string (password) which is shared, and sent along with the authentication request |
FixedMembershipToken |
A fixed list of IP address:port pairs. If the requester is not in this list, authentication fails |
RegexpMembership |
Uses a regular expression to match against IP address or hostname |
Krb5Token |
Uses Kerberos for authentication |
MD5Token |
Uses an MD5 hash of a simple string (similar to SimpleString above, but hashed instead of plaintext) |
X509Token |
Uses a shared X.509 certificate |
ChallengeResponseToken |
Uses a challenge-response mechanism |
SASL
SASL is an alternative to the AUTH protocol which provides a layer of authentication to JGroups by allowing the use of one of the SASL mechanisms made available by the JDK. SASL sits below the GMS protocol and listens for JOIN / MERGE REQUEST messages. When a JOIN / MERGE REQUEST is received it tries to find a SaslHeader object which contains the initial response required by the chosen SASL mech. This initiates a sequence of challenge/response messages which, if successful, culminates in allowing the new node to join the cluster. The actual validation logic required by the SASL mech must be provided by the user in the form of a standard javax.security.auth.CallbackHandler implementation.
When authentication is successful, the message is simply passed up the stack to the GMS protocol. When it fails, the SASL protocol creates a JOIN / MERGE RESPONSE message with a failure string and passes it back down the stack. This failure string informs the client of the reason for failure. Clients will then fail to join the group and will throw a SecurityException. If this error string is null then authentication is considered to have passed.
SASL can be (minimally) configured as follows:
<config ... >
<UDP />
<PING />
<pbcast.NAKACK />
<UNICAST3 />
<pbcast.STABLE />
<SASL mech="DIGEST-MD5"
client_callback_handler="org.example.ClientCallbackHandler"
server_callback_handler="org.example.ServerCallbackHandler"/>
<pbcast.GMS />
</config>The mech property specifies the SASL mech you want to use, as defined by RFC-4422. You will also need to provide two callback handlers, one used when the node is running as coordinator ($$server_callback_handler$$) and one used in all other cases ($$client_callback_handler$$). Refer to the JDK’s SASL reference guide for more details: http://docs.oracle.com/javase/7/docs/technotes/guides/security/sasl/sasl-refguide.html
The JGroups package comes with a simple properties-based CallbackHandler which can be used when a more complex Kerberos/LDAP approach is not needed. To use this set both the ($$server_callback_handler$$) and the ($$client_callback_handler$$) to org.jgroups.auth.sasl.SimpleAuthorizingCallbackHandler. This CallbackHandler can be configured either programmatically by passing to the constructor an instance of java.util.Properties containing the appropriate properties, or via standard Java system properties (i.e. set on the command-line using the -DpropertyName=propertyValue notation. The following properties are available:
-
sasl.credentials.properties - the path to a property file which contains principal/credential mappings represented as principal=password
-
sasl.local.principal - the name of the principal that is used to identify the local node. It must exist in the sasl.credentials.properties file
-
sasl.roles.properties - (optional) the path to a property file which contains principal/roles mappings represented as principal=role1,role2,role3
-
sasl.role - (optional) if present, authorizes joining nodes only if their principal is
-
sasl.realm - (optional) the name of the realm to use for the SASL mechanisms that require it
${SASL}
Misc
Statistics
STATS exposes various statistics, e.g. number of received multicast and unicast messages, number of bytes sent etc. It should be placed directly over the transport
${STATS}
COMPRESS
COMPRESS compresses messages larger than min_size, and uncompresses them at the receiver’s side. Property compression_level determines how thorough the compression algorith should be (0: no compression, 9: highest compression).
${COMPRESS}
NAMING
If IpAddressUUIDs are used, then the address/logical_name cache may not be populated for all members. Note that this doesn’t affect correctness, but instead of logical names, the real IP addresses of some members will be printed (e.g. in debug logs).
To prevent this, NAMING can be added to the stack. The typical location is somewhere towards the bottom of the stack,
e.g. above the discovery protocol (e.g. PING).
|
Note
|
NAMING is only needed when TP.use_ip_addrs is true.
|
${NAMING}
RELAY2
RELAY2 provides clustering between different sites (local clusters), for multicast and unicast messages. See [Relay2Advanced] for details.
${RELAY2}
RATE_LIMITER
RATE_LIMITER can be used to set a limit on the data sent per time unit. When sending data, only max_bytes can be sent per time_period milliseconds. E.g. if max_bytes="50M" and time_period="1000", then a sender can only send 50MBytes / sec max.
${RATE_LIMITER}
Locking protocols
The locking protocol is org.jgroups.protocols.CENTRAL_LOCK:
${Locking}
CENTRAL_LOCK
CENTRAL_LOCK has the current coordinator of a cluster grants locks, so every node has to communicate with the coordinator to acquire or release a lock. Lock requests by different nodes for the same lock are processed in the order in which they are received.
A coordinator maintains a lock table. To prevent losing the knowledge of who holds which locks, the coordinator can push lock information to a number of backups defined by num_backups. If num_backups is 0, no replication of lock information happens. If num_backups is greater than 0, then the coordinator pushes information about acquired and released locks to all backup nodes. Topology changes might create new backup nodes, and lock information is pushed to those on becoming a new backup node.
The advantage of CENTRAL_LOCK is that all lock requests are granted in the same order across the cluster.
${CENTRAL_LOCK}
CENTRAL_LOCK2
In CENTRAL_LOCK2, the coordinator (= lock issuer) does not backup its lock table to other member(s), but instead a new coordinator fetches information about held locks and pending lock/unlock requests from existing members, before it starts processing lock requests. See [LockService] for details.
${CENTRAL_LOCK2}
CENTRAL_EXECUTOR
CENTRAL_EXECUTOR is an implementation of Executing which is needed by the ExecutionService.
${Executing}
${CENTRAL_EXECUTOR}
COUNTER
COUNTER is the implementation of cluster wide counters, used by the CounterService.
${COUNTER}
SUPERVISOR
SUPERVISOR is a protocol which runs rules which periodically (or event triggered) check conditions and take corrective action if a condition is not met. Example: org.jgroups.protocols.rules.CheckFDMonitor is a rule which periodically checks if FD’s monitor task is running when the cluster size is > 1. If not, the monitor task is started.
The SUPERVISOR is explained in more detail in [Supervisor]
${SUPERVISOR}
FORK
FORK allows ForkChannels to piggy-back messages on a regular channel. Needs to be placed towards the top of the stack. See [ForkChannel] for details.
${FORK}
INJECT_VIEW
INJECT_VIEW exposes a managed operation (injectView) capable of injecting a view by parsing the view state from a string.
The string format is A=A/B/C;B=B/C;C=C (where A,B,C are node names), this would inject view [A,B,C] with A as leader in node A, view [B,C] with B as leader in node B and view [C] in node C.
|
Note
|
Calling injectView("A=A/B/C;B=B/C;C=C"), as an example, just on node B would result only in view [B,C] applied to node B.
|
In order to leverage the injection on multiple nodes at once a tool like Probe can be used,
example: probe.sh op=INJECT_VIEW.injectView["A=A/B/C;B=B/C;C=C"]
|
Note
|
INJECT_VIEW uses logical names to look up real addresses in the logical address cache (located in the
transport). This cache is keyed by address and its values are names. This means that, for example, UUIDs 1 and 6
may map to the same name ("say "A"). If we now look up the address for "A", either 1 or 6 may be returned, depending
on which address mapping was added last to the cache. This means that logical names should be unique, ie.
when running a fork channel.
|
${INJECT_VIEW}