Skip to content

Commit

Permalink
improved "loadtest" scenario on DittoClientUsageExamples
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jan 29, 2021
1 parent ad864f9 commit 2abe5f3
Showing 1 changed file with 95 additions and 63 deletions.
Expand Up @@ -19,9 +19,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import java.util.UUID;
Expand All @@ -32,6 +35,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.eclipse.ditto.client.changes.ChangeAction;
import org.eclipse.ditto.client.configuration.BasicAuthenticationConfiguration;
Expand Down Expand Up @@ -78,8 +82,8 @@ public final class DittoClientUsageExamples {

private static final Logger LOGGER = LoggerFactory.getLogger(DittoClientUsageExamples.class);

private static final String PROPERTIES_FILE = "ditto-client-starter-local.properties"; // for local development
// private static final String PROPERTIES_FILE = "ditto-client-starter-sandbox.properties";
// private static final String PROPERTIES_FILE = "ditto-client-starter-local.properties"; // for local development
private static final String PROPERTIES_FILE = "ditto-client-starter-aws-dev.properties";
private static final String PROXY_HOST;
private static final String PROXY_PORT;
private static final String DITTO_ENDPOINT_URL;
Expand Down Expand Up @@ -147,56 +151,56 @@ public static void main(final String... args) throws ExecutionException, Interru
useTwinCommandsAndEvents(client, client2);
System.out.println("\n\nFinished with TWIN commands/events demo");
}

if (shouldNotSkip("live.examples")) {
client.live().startConsumption().get();
client2.live().startConsumption().get();

System.out.println("\n\nAbout to continue with LIVE commands/events demo:");
promptEnterKey();

useLiveCommands(client, client2);
System.out.println("\n\nFinished with LIVE commands/events demo");

System.out.println("\n\nAbout to continue with LIVE messages demo:");
promptEnterKey();

useLiveMessages(client, client2);
System.out.println("\n\nFinished with LIVE messages demo");
Thread.sleep(500);
}

if (shouldNotSkip("search.examples")) {
System.out.println("\n\nAbout to continue with search commands:");
promptEnterKey();
useSearchCommands(client);
System.out.println("\n\nFinished with SEARCH commands demo");
Thread.sleep(500);
}
//
// if (shouldNotSkip("live.examples")) {
// client.live().startConsumption().get();
// client2.live().startConsumption().get();
//
// System.out.println("\n\nAbout to continue with LIVE commands/events demo:");
// promptEnterKey();
//
// useLiveCommands(client, client2);
// System.out.println("\n\nFinished with LIVE commands/events demo");
//
// System.out.println("\n\nAbout to continue with LIVE messages demo:");
// promptEnterKey();
//
// useLiveMessages(client, client2);
// System.out.println("\n\nFinished with LIVE messages demo");
// Thread.sleep(500);
// }
//
// if (shouldNotSkip("search.examples")) {
// System.out.println("\n\nAbout to continue with search commands:");
// promptEnterKey();
// useSearchCommands(client);
// System.out.println("\n\nFinished with SEARCH commands demo");
// Thread.sleep(500);
// }

if (shouldNotSkip("load.test")) {
System.out.println("\n\nAbout to continue with small load test:");
promptEnterKey();

final int loadTestThings = 100;
final int loadTestCount = 10;
final int loadTestThings = 1000;
final int loadTestCount = 500;
subscribeForLoadTestUpdateChanges(client2, loadTestCount * loadTestThings, false);
performLoadTestUpdate(client, loadTestCount, loadTestThings, false);
performLoadTestRead(client, loadTestCount, true);
performLoadTestUpdate(client, loadTestCount, loadTestThings, Duration.ofMillis(20), true);
// performLoadTestRead(client, loadTestCount, true);
Thread.sleep(1000);
}

if (shouldNotSkip("policies.examples")) {
System.out.println("\n\nAbout to continue with policy example:");
promptEnterKey();
addNewSubjectToExistingPolicy(client);
System.out.println("\n\nFinished with policy example");
}

client.destroy();
client2.destroy();
System.out.println("\n\nDittoClientUsageExamples successfully completed!");
System.exit(0);
// if (shouldNotSkip("policies.examples")) {
// System.out.println("\n\nAbout to continue with policy example:");
// promptEnterKey();
// addNewSubjectToExistingPolicy(client);
// System.out.println("\n\nFinished with policy example");
// }

// client.destroy();
// client2.destroy();
// System.out.println("\n\nDittoClientUsageExamples successfully completed!");
// System.exit(0);
}

private static void addNewSubjectToExistingPolicy(final DittoClient client)
Expand Down Expand Up @@ -495,8 +499,12 @@ private static void useLiveMessages(final DittoClient backendClient, final Ditto
private static void performLoadTestUpdate(final DittoClient client,
final int updateCount,
final int thingCount,
final Duration delayBetweenUpdates,
final boolean log) {

LOGGER.info("performLoadTestUpdate: About to create '{}' things and doing '{}' updates on each created " +
"thing being a total of '{}' updates...", thingCount, updateCount, thingCount*updateCount);

final JsonObject attributesExample = JsonFactory.newObjectBuilder()
.set("maker", "ACME Inc.")
.set("VIN", UUID.randomUUID().toString())
Expand All @@ -511,6 +519,7 @@ private static void performLoadTestUpdate(final DittoClient client,
.build();

final ExecutorService executorService = Executors.newFixedThreadPool(16);
final List<ThingId> thingIds = new ArrayList<>();
for (int k = 1; k <= thingCount; k++) {
final int thingIdx = k;
executorService.execute(() ->
Expand All @@ -528,41 +537,64 @@ private static void performLoadTestUpdate(final DittoClient client,
if (log) {
LOGGER.info("performLoadTestUpdate: Created new thing: {}", thing);
}
thingIds.add(thingId);
});
}

final long startTs = System.nanoTime();
final AtomicInteger integer = new AtomicInteger(updateCount);
for (int i = updateCount; i >= 0; i--) {
final int counter = i;
while (thingIds.size() < thingCount) {
try {
LOGGER.info("Waiting for things to be created... currently created: '{}'", thingIds.size());
Thread.sleep(1000);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

LOGGER.info("performLoadTestUpdate: starting with attribute updates...");

final long startTs = System.nanoTime();
final AtomicInteger integer = new AtomicInteger(thingCount * updateCount);
IntStream.range(0, updateCount)
.forEach(counter -> thingIds.forEach(thingId -> executorService.execute(() -> {
final long startTs2 = System.nanoTime();
client.twin().forId(thingId).putAttribute("counter", counter,
Options.Modify.responseRequired(false)).whenComplete((_void, throwable) ->
{
if (throwable != null) {
LOGGER.warn("performLoadTestUpdate: Updating attribute failed: {}", throwable.getMessage());
LOGGER.debug("performLoadTestUpdate: Updating attribute failed: {}",
throwable.getMessage());
} else {
final double duration = getDuration(startTs2);
if (log) {
LOGGER.info("performLoadTestUpdate: Single update request ({}) latency: {}ms",
LOGGER.debug("performLoadTestUpdate: Single update request ({}) latency: {}ms",
counter, duration);
}
integer.decrementAndGet();
}
integer.decrementAndGet();
});
}

while (integer.get() > 0) {
try {
Thread.sleep(10);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
if (!delayBetweenUpdates.isZero()) {
try {
TimeUnit.MILLISECONDS.sleep(delayBetweenUpdates.toMillis());
} catch (final InterruptedException e) {
// ignore
}
}
}
final double duration = getDuration(startTs);
LOGGER.info("performLoadTestUpdate: Finished updating '{}' attributes after {}ms - " +
"that are ~{}req/s", updateCount, duration, (int) (updateCount / duration * 1000));
});
})));

while (integer.get() > 0) {
try {
Thread.sleep(10);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
final double duration = getDuration(startTs);
LOGGER.info("performLoadTestUpdate: Finished updating '{}' attributes in '{}' things - a total of '{}' " +
"updates after {}ms - that are ~{}req/s", updateCount, thingCount, thingCount*updateCount,
duration, (int) (thingCount*updateCount / duration * 1000));
}

private static void subscribeForLoadTestUpdateChanges(final DittoClient client, final int count,
Expand Down Expand Up @@ -728,7 +760,7 @@ public static MessagingProvider createMessagingProvider() {
AuthenticationProviders.dummy(DummyAuthenticationConfiguration.newBuilder()
.dummyUsername(DITTO_DUMMY_AUTH_USER)
.build());
} else if (DITTO_OAUTH_CLIENT_ID != null) {
} else if (DITTO_OAUTH_CLIENT_ID != null && !DITTO_OAUTH_CLIENT_ID.isEmpty()) {
final ClientCredentialsAuthenticationConfiguration.ClientCredentialsAuthenticationConfigurationBuilder
authenticationConfigurationBuilder =
ClientCredentialsAuthenticationConfiguration.newBuilder()
Expand Down

0 comments on commit 2abe5f3

Please sign in to comment.