Skip to content
This repository has been archived by the owner on Jun 1, 2022. It is now read-only.

Commit

Permalink
Make soldierOn loop sleep configurable (#420)
Browse files Browse the repository at this point in the history
* Make soldierOn loop configurable.
* Added a test that sends the same amount via two SimpleStreamSenders - one with a long sleep time, one with a short sleep time, that compares the two total send times.
* Set default sleep time in SendMoneyAggregator to 10ms
* fixed test heavy sleeper sleep duration
* fix the test order in SimpleStreamSenderIT

Signed-off-by: nkramer44 <noah.ph.kramer@gmail.com>
  • Loading branch information
nkramer44 committed Jan 30, 2020
1 parent 549c030 commit 16f1781
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class SimpleStreamSender implements StreamSender {
private final StreamEncryptionService streamEncryptionService;
private final ExecutorService executorService;
private final StreamConnectionManager streamConnectionManager;
private final Optional<UnsignedLong> sendPacketSleep;

/**
* Required-args Constructor.
Expand All @@ -96,6 +97,16 @@ public SimpleStreamSender(final Link link) {
this(new JavaxStreamEncryptionService(), link);
}

/**
* Required-args Constructor.
*
* @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer.
* @param sendPacketSleep amount of time for a thread to sleep before sending more packets
*/
public SimpleStreamSender(final Link link, final Optional<UnsignedLong> sendPacketSleep) {
this(new JavaxStreamEncryptionService(), link, sendPacketSleep);
}

/**
* Required-args Constructor.
*
Expand All @@ -105,7 +116,22 @@ public SimpleStreamSender(final Link link) {
* @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer.
*/
public SimpleStreamSender(final StreamEncryptionService streamEncryptionService, final Link link) {
this(streamEncryptionService, link, newDefaultExecutor());
this(streamEncryptionService, link, newDefaultExecutor(), Optional.empty());
}

/**
* Required-args Constructor.
*
* @param streamEncryptionService An instance of {@link StreamEncryptionService} used to encrypt and decrypted
* end-to-end STREAM packet data (i.e., packets that should only be visible between
* sender and receiver).
* @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer.
* @param sendPacketSleep amount of time for a thread to sleep before sending more packets
*/
public SimpleStreamSender(final StreamEncryptionService streamEncryptionService,
final Link link,
final Optional<UnsignedLong> sendPacketSleep) {
this(streamEncryptionService, link, newDefaultExecutor(), sendPacketSleep);
}

/**
Expand All @@ -120,25 +146,44 @@ public SimpleStreamSender(final StreamEncryptionService streamEncryptionService,
public SimpleStreamSender(
final StreamEncryptionService streamEncryptionService, final Link link, ExecutorService executorService
) {
this(streamEncryptionService, link, executorService, new StreamConnectionManager());
this(streamEncryptionService, link, executorService, new StreamConnectionManager(), Optional.empty());
}

/**
* Required-args Constructor.
*
* @param streamEncryptionService A {@link StreamEncryptionService} used to encrypt and decrypted end-to-end STREAM
* @param streamEncryptionService An instance of {@link StreamEncryptionService} used to encrypt and decrypted
* end-to-end STREAM packet data (i.e., packets that should only be visible between
* sender and receiver).
* @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer.
* @param executorService executorService to run the payments
* @param sendPacketSleep amount of time for a thread to sleep before sending more packets
*/
public SimpleStreamSender(
final StreamEncryptionService streamEncryptionService,
final Link link,
final ExecutorService executorService,
final Optional<UnsignedLong> sendPacketSleep
) {
this(streamEncryptionService, link, executorService, new StreamConnectionManager(), sendPacketSleep);
}

/**
* Required-args Constructor.
* @param streamEncryptionService A {@link StreamEncryptionService} used to encrypt and decrypted end-to-end STREAM
* packet data (i.e., packets that should only be visible between sender and
* receiver).
* @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer.
* @param executorService A {@link ExecutorService} to run the payments.
* @param streamConnectionManager A {@link StreamConnectionManager} that manages connections for all senders and
* receivers in this JVM.
* @param sendPacketSleep amount of time for a thread to sleep before sending more packets
*/
public SimpleStreamSender(
final StreamEncryptionService streamEncryptionService,
final Link link,
final ExecutorService executorService,
final StreamConnectionManager streamConnectionManager
final StreamEncryptionService streamEncryptionService,
final Link link,
final ExecutorService executorService,
final StreamConnectionManager streamConnectionManager,
final Optional<UnsignedLong> sendPacketSleep
) {
this.streamEncryptionService = Objects.requireNonNull(streamEncryptionService);
this.link = Objects.requireNonNull(link);
Expand All @@ -147,6 +192,7 @@ public SimpleStreamSender(
// created using {@link ThreadPoolExecutor} constructors.
this.executorService = Objects.requireNonNull(executorService);
this.streamConnectionManager = Objects.requireNonNull(streamConnectionManager);
this.sendPacketSleep = Objects.requireNonNull(sendPacketSleep);
}

private static ExecutorService newDefaultExecutor() {
Expand All @@ -162,17 +208,18 @@ public CompletableFuture<SendMoneyResult> sendMoney(final SendMoneyRequest reque
Objects.requireNonNull(request);

final StreamConnection streamConnection = this.streamConnectionManager.openConnection(
StreamConnectionId.from(request.destinationAddress(), request.sharedSecret())
StreamConnectionId.from(request.destinationAddress(), request.sharedSecret())
);

return new SendMoneyAggregator(
this.executorService,
streamConnection,
StreamCodecContextFactory.oer(),
this.link,
new AimdCongestionController(),
this.streamEncryptionService,
request
this.executorService,
streamConnection,
StreamCodecContextFactory.oer(),
this.link,
new AimdCongestionController(),
this.streamEncryptionService,
request,
this.sendPacketSleep
).send();
}

Expand Down Expand Up @@ -251,6 +298,8 @@ static class SendMoneyAggregator {

private final AtomicBoolean unrecoverableErrorEncountered;

private long sendPacketSleep;

/**
* Required-args Constructor.
*
Expand All @@ -271,7 +320,8 @@ static class SendMoneyAggregator {
final Link link,
final CongestionController congestionController,
final StreamEncryptionService streamEncryptionService,
final SendMoneyRequest request
final SendMoneyRequest request,
final Optional<UnsignedLong> sendPacketSleep
) {
this.executorService = Objects.requireNonNull(executorService);
this.streamConnection = Objects.requireNonNull(streamConnection);
Expand All @@ -298,6 +348,9 @@ static class SendMoneyAggregator {
this.receiverDenomination = Optional.empty();

this.unrecoverableErrorEncountered = new AtomicBoolean(false);

Objects.requireNonNull(sendPacketSleep);
this.sendPacketSleep = sendPacketSleep.orElse(UnsignedLong.valueOf(10)).longValue();
}

/**
Expand Down Expand Up @@ -471,7 +524,7 @@ private void sendMoneyPacketized() {
try {
// Don't send any more, but wait a bit for outstanding requests to complete so we don't cycle needlessly in
// a while loop that doesn't do anything useful.
Thread.sleep(100);
Thread.sleep(sendPacketSleep);
} catch (InterruptedException e) {
throw new StreamSenderException(e.getMessage(), e);
}
Expand Down Expand Up @@ -585,6 +638,13 @@ void schedule(
paymentTracker.rollback(prepareAmounts, false);
}
}
else {
logger.info("timeout reached, not sending packet");
congestionController.reject(preparePacket.getAmount(), InterledgerRejectPacket.builder()
.code(InterledgerErrorCode.F99_APPLICATION_ERROR)
.message(String.format("Timeout reached before packet could be sent", preparePacket))
.build());
}
});
} catch (RejectedExecutionException e) {
// If we get here, it means the task was unable to be scheduled, so we need to unwind the congestion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setUp() {
.build();
this.sendMoneyAggregator = new SendMoneyAggregator(
executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock,
streamEncryptionServiceMock, request);
streamEncryptionServiceMock, request, Optional.empty());

defaultPrepareAmounts = PrepareAmounts.from(samplePreparePacket(), sampleStreamPacket());
}
Expand Down Expand Up @@ -224,7 +224,7 @@ public void failureToSchedulePutsMoneyBack() {
ExecutorService executor = mock(ExecutorService.class);
this.sendMoneyAggregator = new SendMoneyAggregator(
executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock,
streamEncryptionServiceMock, request);
streamEncryptionServiceMock, request, Optional.empty());

when(executor.submit(any(Runnable.class))).thenThrow(new RejectedExecutionException());

Expand Down Expand Up @@ -365,7 +365,7 @@ public void breakLoopToPreventOversend() throws Exception {

this.sendMoneyAggregator = new SendMoneyAggregator(
executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock,
streamEncryptionServiceMock, request);
streamEncryptionServiceMock, request, Optional.empty());

setSoldierOnBooleans(false, false, true);
when(streamConnectionMock.nextSequence()).thenReturn(UnsignedLong.ONE);
Expand Down Expand Up @@ -490,7 +490,7 @@ public void stopSendingWhenUnrecoverableErrorEncountered() throws Exception {

this.sendMoneyAggregator = new SendMoneyAggregator(
executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock,
streamEncryptionServiceMock, request);
streamEncryptionServiceMock, request, Optional.empty());

when(congestionControllerMock.hasInFlight()).thenAnswer(new Answer<Boolean>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static okhttp3.CookieJar.NO_COOKIES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

import org.interledger.codecs.ilp.InterledgerCodecContextFactory;
import org.interledger.core.InterledgerAddress;
Expand Down Expand Up @@ -34,9 +35,11 @@
import okhttp3.logging.HttpLoggingInterceptor;
import org.assertj.core.data.Offset;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -53,6 +56,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -65,6 +69,7 @@
* Integration tests for {@link SimpleStreamSender} that connects to a running ILP Connector using the information
* supplied in this link, and initiates a STREAM payment.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class SimpleStreamSenderIT {

private static final String AUTH_TOKEN = "password";
Expand Down Expand Up @@ -139,6 +144,7 @@ public void setUp() throws IOException {
RustNodeAccount sender = accountBuilder()
.username(SENDER_ACCOUNT_USERNAME)
.ilpAddress(SENDER_ADDRESS)
.routingRelation(RustNodeAccount.RoutingRelation.CHILD)
.build();

nodeClient.createAccount(sender);
Expand Down Expand Up @@ -310,6 +316,50 @@ public void sendMoneyMultiPacket() {
logger.info("Payment Sent: {}", sendMoneyResult);
}

/**
* Two calls to {@link SimpleStreamSender#sendMoney(SendMoneyRequest)}} that involves multiple packets in parallel.
* First call is to a {@link SimpleStreamSender} with the default sleep time (100ms)
* Second call is to a {@link SimpleStreamSender} with a shorter sleep time (5ms)
*/
@Test
public void sendMoneyMultiPacketDifferentSleepTimes() {
final SendMoneyResult heavySleeperResult = sendMoneyWithConfiguredSleep(Optional.of(UnsignedLong.valueOf(100)), 1000001);
final SendMoneyResult lightSleeperResult = sendMoneyWithConfiguredSleep(Optional.of(UnsignedLong.valueOf(5)), 1000002);

logger.info("Heavy sleeper took {} to send {} packets.", heavySleeperResult.sendMoneyDuration(), heavySleeperResult.totalPackets());
logger.info("Light sleeper took {} to send {} packets.", lightSleeperResult.sendMoneyDuration(), lightSleeperResult.totalPackets());
assertThat(heavySleeperResult.sendMoneyDuration()).isGreaterThan(lightSleeperResult.sendMoneyDuration());
}

private SendMoneyResult sendMoneyWithConfiguredSleep(Optional<UnsignedLong> sleepTime, int streamConnectionId) {
final UnsignedLong paymentAmount = UnsignedLong.valueOf(100000);

StreamSender heavySleeperSender = new SimpleStreamSender(
new JavaxStreamEncryptionService(), link, sleepTime
);

final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(streamConnectionId);

final SendMoneyResult heavySleeperResult = heavySleeperSender.sendMoney(
SendMoneyRequest.builder()
.sourceAddress(SENDER_ADDRESS)
.amount(paymentAmount)
.denomination(Denominations.XRP)
.destinationAddress(connectionDetails.destinationAddress())
.sharedSecret(connectionDetails.sharedSecret())
.paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator()))
.build()
).join();

assertThat(heavySleeperResult.amountDelivered()).isEqualTo(paymentAmount);
assertThat(heavySleeperResult.originalAmount()).isEqualTo(paymentAmount);
assertThat(heavySleeperResult.numFulfilledPackets()).isCloseTo(8, Offset.offset(1));
assertThat(heavySleeperResult.numRejectPackets()).isEqualTo(0);

logger.info("Payment Sent via sender with sleep = {} : {}", sleepTime.orElse(UnsignedLong.valueOf(100)), heavySleeperResult);
return heavySleeperResult;
}

/**
* Multiple calls to {@link SimpleStreamSender#sendMoney(SendMoneyRequest)}} that involves multiple packets in
* parallel, but using different accounts for each Stream, and thus a different Connection.
Expand Down Expand Up @@ -362,38 +412,46 @@ public void sendMoneyMultiThreadedToSeparateAccounts() throws InterruptedExcepti
public void sendMoneyHonorsTimeout() {
final UnsignedLong paymentAmount = UnsignedLong.valueOf(10000000);

// using a sleepy executor here to make sure race condition is handled properly where timeout is reached
// after submitting a sendPacketized task to the executor but before the task is executed
StreamSender streamSender = new SimpleStreamSender(
new JavaxStreamEncryptionService(), link
new JavaxStreamEncryptionService(), link, new SleepyExecutorService(Executors.newFixedThreadPool(5), 5)
);

String username = "sendMoneyHonorsTimeout";
InterledgerAddress address = HOST_ADDRESS.with(username);
RustNodeAccount rustNodeAccount = accountBuilder()
.username(username)
.ilpAddress(address)
.maxPacketAmount(BigInteger.valueOf(100))
.amountPerMinuteLimit(BigInteger.valueOf(1))
.packetsPerMinuteLimit(BigInteger.valueOf(1))
.build();

final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(rustNodeAccount);

final SendMoneyResult sendMoneyResult = streamSender
.sendMoney(
SendMoneyRequest.builder()
.sourceAddress(SENDER_ADDRESS)
.amount(paymentAmount)
.denomination(Denominations.XRP)
.destinationAddress(connectionDetails.destinationAddress())
.sharedSecret(connectionDetails.sharedSecret())
.paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator()))
.timeout(Duration.ofMillis(100))
.build()
).join();

assertThat(sendMoneyResult.successfulPayment()).isFalse();
try {
// loop to test different timeout amounts
for (int i = 0; i < 10; i++) {
final SendMoneyResult sendMoneyResult = streamSender
.sendMoney(
SendMoneyRequest.builder()
.sourceAddress(SENDER_ADDRESS)
.amount(paymentAmount)
.denomination(Denominations.XRP)
.destinationAddress(connectionDetails.destinationAddress())
.sharedSecret(connectionDetails.sharedSecret())
.paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator()))
.timeout(Duration.ofMillis(10 + i * 10))
.build()
).get();
assertThat(sendMoneyResult.successfulPayment()).isFalse();

logger.info("Payment Sent: {}", sendMoneyResult);

logger.info("Payment Sent: {}", sendMoneyResult);
}
} catch (ExecutionException | InterruptedException e) {
logger.error("Error getting completeable future");
logger.error("Error getting completeable future: " + e.toString() + " cause: " + e.getCause());
fail();
}
}

@Test(expected = NoExchangeRateException.class)
Expand Down Expand Up @@ -513,6 +571,7 @@ public void sendMoneyWithWrongLinkPassword() throws IOException {
nodeClient.createAccount(accountBuilder()
.username(connectorAccountUsername)
.ilpAddress(HOST_ADDRESS.with(connectorAccountUsername))
.routingRelation(RustNodeAccount.RoutingRelation.CHILD)
.build());

final UnsignedLong paymentAmount = UnsignedLong.valueOf(1000);
Expand Down
Loading

0 comments on commit 16f1781

Please sign in to comment.