Skip to content

Commit

Permalink
Merge pull request #45 from RxBroadcast/pp-causal-order
Browse files Browse the repository at this point in the history
Add ping pong test for causal order UDP broadcasts
  • Loading branch information
whymarrh committed Mar 28, 2017
2 parents 1868bcb + b707094 commit 5bda9bf
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ script:
- scripts/test rx.broadcast.integration.BasicOrderUdpBroadcastTest
- scripts/test-table-tennis rx.broadcast.integration.pp.PingPongUdpBasicOrder
- scripts/test-table-tennis rx.broadcast.integration.pp.PingPongUdpSingleSourceFifoOrder
- scripts/test-table-tennis rx.broadcast.integration.pp.PingPongUdpCausalOrder
deploy:
provider: script
script: scripts/deploy
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/rx/broadcast/VectorTimestamp.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ final class VectorTimestamp {

private long[] timestamps;

@SuppressWarnings("unused")
VectorTimestamp() {

}

VectorTimestamp(final long[] ids, final long[] timestamps) {
if (ids.length != timestamps.length) {
throw new IllegalArgumentException("IDs and timestamps must contain the same number of elements");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package rx.broadcast.integration.pp;

import org.junit.Test;
import rx.Observable;
import rx.broadcast.Broadcast;
import rx.broadcast.CausalOrder;
import rx.broadcast.UdpBroadcast;
import rx.observers.TestSubscriber;

import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

@SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:AvoidInlineConditionals"})
public class PingPongUdpCausalOrder {
private static final int MESSAGE_COUNT = 100;

private static final long TIMEOUT = 30;

/**
* Receive a PING and respond with a PONG.
* @throws SocketException if the socket could not be opened, or the socket could not bind to the given port.
* @throws UnknownHostException if no IP address for the host machine could be found.
*/
@Test
public final void recv() throws SocketException, UnknownHostException {
final int port = System.getProperty("port") != null
? Integer.valueOf(System.getProperty("port"))
: 54321;
final int destinationPort = System.getProperty("destinationPort") != null
? Integer.valueOf(System.getProperty("destinationPort"))
: 12345;
final DatagramSocket socket = new DatagramSocket(port);
final InetAddress destination = System.getProperty("destination") != null
? InetAddress.getByName(System.getProperty("destination"))
: InetAddress.getByName("localhost");
final Broadcast broadcast = new UdpBroadcast<>(socket, destination, destinationPort, new CausalOrder<>());
final TestSubscriber<Ping> subscriber = new TestSubscriber<>();

broadcast.valuesOfType(Ping.class)
.doOnNext(System.out::println)
.concatMap(ping ->
broadcast.send(new Pong(ping.value))
// Once we've sent the response, we can emit the PING value to the subscriber.
// The cast here is a hack to allow us to concatenate a PING onto the stream.
// Where this is an `Observable<Void>` we know we won't get anything that needs to be casted.
.cast(Ping.class)
.concatWith(Observable.just(ping))
.doOnCompleted(() -> System.out.println("Sent PONG")))
.take(MESSAGE_COUNT)
.subscribe(subscriber);

subscriber.awaitTerminalEventAndUnsubscribeOnTimeout(TIMEOUT, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(MESSAGE_COUNT);
}

/**
* Send a set PING messages to the receiver, expecting PONG messages in response.
* @param args the command line arguments passed to the program
* @throws SocketException if the socket could not be opened, or the socket could not bind to the given port.
* @throws UnknownHostException if no IP address for the host machine could be found.
*/
public static void main(final String[] args) throws InterruptedException, SocketException, UnknownHostException {
final int port = System.getProperty("port") != null
? Integer.valueOf(System.getProperty("port"))
: 54321;
final int destinationPort = System.getProperty("destinationPort") != null
? Integer.valueOf(System.getProperty("destinationPort"))
: 12345;
final DatagramSocket socket = new DatagramSocket(port);
final InetAddress destination = System.getProperty("destination") != null
? InetAddress.getByName(System.getProperty("destination"))
: InetAddress.getByName("localhost");
final Broadcast broadcast = new UdpBroadcast<>(socket, destination, destinationPort, new CausalOrder<>());

Observable.range(1, MESSAGE_COUNT)
.map(Ping::new)
.doOnNext(System.out::println)
.concatMap(value ->
broadcast.send(value)
.doOnCompleted(() -> System.out.printf("Sent %s%n", value))
.cast(Pong.class)
.concatWith(broadcast.valuesOfType(Pong.class).first()))
.timeout(TIMEOUT, TimeUnit.SECONDS)
.toBlocking()
.subscribe(pong ->
System.out.printf("Received %s%n", pong));
}
}

0 comments on commit 5bda9bf

Please sign in to comment.