diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/ApiPerfTestBase.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/ApiPerfTestBase.java
new file mode 100644
index 0000000000000..267dabd79df74
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/ApiPerfTestBase.java
@@ -0,0 +1,250 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.core.http.policy.HttpPipelinePolicy;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import javax.net.ssl.SSLException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The Base Performance Test class for API based Perf Tests.
+ * @param the performance test options to use while running the test.
+ */
+public abstract class ApiPerfTestBase extends PerfTestBase {
+ private final reactor.netty.http.client.HttpClient recordPlaybackHttpClient;
+ private final URI testProxy;
+ private final TestProxyPolicy testProxyPolicy;
+ private String recordingId;
+ private long completedOperations;
+
+
+ // Derived classes should use the ConfigureClientBuilder() method by default. If a ClientBuilder does not
+ // follow the standard convention, it can be configured manually using these fields.
+ protected final HttpClient httpClient;
+ protected final Iterable policies;
+
+ /**
+ * Creates an instance of the Http Based Performance test.
+ * @param options the performance test options to use while running the test.
+ * @throws IllegalStateException if an errors is encountered with building ssl context.
+ */
+ public ApiPerfTestBase(TOptions options) {
+ super(options);
+ final SslContext sslContext;
+ if (options.isInsecure()) {
+ try {
+ sslContext = SslContextBuilder.forClient()
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ } catch (SSLException e) {
+ throw new IllegalStateException(e);
+ }
+
+ reactor.netty.http.client.HttpClient nettyHttpClient = reactor.netty.http.client.HttpClient.create()
+ .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+
+ httpClient = new NettyAsyncHttpClientBuilder(nettyHttpClient).build();
+ } else {
+ sslContext = null;
+ httpClient = null;
+ }
+
+ if (options.getTestProxies() != null && !options.getTestProxies().isEmpty()) {
+ if (options.isInsecure()) {
+ recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create()
+ .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+ } else {
+ recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create();
+ }
+
+ testProxy = options.getTestProxies().get(parallelIndex % options.getTestProxies().size());
+ testProxyPolicy = new TestProxyPolicy(testProxy);
+ policies = Arrays.asList(testProxyPolicy);
+ } else {
+ recordPlaybackHttpClient = null;
+ testProxy = null;
+ testProxyPolicy = null;
+ policies = null;
+ }
+ }
+
+ /**
+ * Attempts to configure a ClientBuilder using reflection. If a ClientBuilder does not follow the standard convention,
+ * it can be configured manually using the "httpClient" and "policies" fields.
+ * @param clientBuilder The client builder.
+ * @throws IllegalStateException If reflective access to get httpClient or addPolicy methods fail.
+ */
+ protected void configureClientBuilder(Object clientBuilder) {
+ if (httpClient != null || policies != null) {
+ Class> clientBuilderClass = clientBuilder.getClass();
+
+ try {
+ if (httpClient != null) {
+ Method httpClientMethod = clientBuilderClass.getMethod("httpClient", HttpClient.class);
+ httpClientMethod.invoke(clientBuilder, httpClient);
+ }
+
+ if (policies != null) {
+ Method addPolicyMethod = clientBuilderClass.getMethod("addPolicy", HttpPipelinePolicy.class);
+ for (HttpPipelinePolicy policy : policies) {
+ addPolicyMethod.invoke(clientBuilder, policy);
+ }
+ }
+ } catch (ReflectiveOperationException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void runAll(long endNanoTime) {
+ completedOperations = 0;
+ lastCompletionNanoTime = 0;
+ long startNanoTime = System.nanoTime();
+ while (System.nanoTime() < endNanoTime) {
+ completedOperations += runTest();
+ lastCompletionNanoTime = System.nanoTime() - startNanoTime;
+ }
+ }
+
+ @Override
+ public Mono runAllAsync(long endNanoTime) {
+ completedOperations = 0;
+ lastCompletionNanoTime = 0;
+ long startNanoTime = System.nanoTime();
+
+ return Flux.just(1)
+ .repeat()
+ .flatMap(i -> runTestAsync(), 1)
+ .doOnNext(v -> {
+ completedOperations += v;
+ lastCompletionNanoTime = System.nanoTime() - startNanoTime;
+ })
+ .takeWhile(i -> System.nanoTime() < endNanoTime)
+ .then();
+ }
+
+ /**
+ * Indicates how many operations were completed in a single run of the test.
+ * Good to be used for batch operations.
+ *
+ * @return the number of successful operations completed.
+ */
+ abstract int runTest();
+
+ /**
+ * Indicates how many operations were completed in a single run of the async test.
+ * Good to be used for batch operations.
+ *
+ * @return the number of successful operations completed.
+ */
+ abstract Mono runTestAsync();
+
+ /**
+ * Stops playback tests.
+ * @return An empty {@link Mono}.
+ */
+ public Mono stopPlaybackAsync() {
+ return recordPlaybackHttpClient
+ .headers(h -> {
+ // The Recording id to track the recording session on the Test Proxy Server.
+ h.set("x-recording-id", recordingId);
+ // Indicates Test Proxy Server to purge the cached recording.
+ h.set("x-purge-inmemory-recording", Boolean.toString(true));
+ })
+ .post()
+ .uri(testProxy.resolve("/playback/stop"))
+ .response()
+ .doOnSuccess(response -> {
+ testProxyPolicy.setMode(null);
+ testProxyPolicy.setRecordingId(null);
+ })
+ .then();
+ }
+
+ private Mono startRecordingAsync() {
+ return Mono.defer(() -> recordPlaybackHttpClient
+ .post()
+ .uri(testProxy.resolve("/record/start"))
+ .response()
+ .doOnNext(response -> {
+ recordingId = response.responseHeaders().get("x-recording-id");
+ }).then());
+ }
+
+ private Mono stopRecordingAsync() {
+ return Mono.defer(() -> recordPlaybackHttpClient
+ .headers(h -> h.set("x-recording-id", recordingId))
+ .post()
+ .uri(testProxy.resolve("/record/stop"))
+ .response()
+ .then());
+ }
+
+ private Mono startPlaybackAsync() {
+ return Mono.defer(() -> recordPlaybackHttpClient
+ .headers(h -> h.set("x-recording-id", recordingId))
+ .post()
+ .uri(testProxy.resolve("/playback/start"))
+ .response()
+ .doOnNext(response -> {
+ recordingId = response.responseHeaders().get("x-recording-id");
+ }).then());
+ }
+
+
+ /**
+ * Records responses and starts tests in playback mode.
+ * @return
+ */
+ @Override
+ Mono postSetupAsync() {
+ if (testProxyPolicy != null) {
+
+ // Make one call to Run() before starting recording, to avoid capturing one-time setup like authorization requests.
+ return runSyncOrAsync()
+ .then(startRecordingAsync())
+ .then(Mono.defer(() -> {
+ testProxyPolicy.setRecordingId(recordingId);
+ testProxyPolicy.setMode("record");
+ return Mono.empty();
+ }))
+ .then(runSyncOrAsync())
+ .then(stopRecordingAsync())
+ .then(startPlaybackAsync())
+ .then(Mono.defer(() -> {
+ testProxyPolicy.setRecordingId(recordingId);
+ testProxyPolicy.setMode("playback");
+ return Mono.empty();
+ }));
+ }
+ return Mono.empty();
+ }
+
+ private Mono runSyncOrAsync() {
+ return Mono.defer(() -> {
+ if (options.isSync()) {
+ return Mono.fromFuture(CompletableFuture.supplyAsync(() -> runTest())).then();
+ } else {
+ return runTestAsync().then();
+ }
+ });
+ }
+
+ @Override
+ public long getCompletedOperations() {
+ return completedOperations;
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/BatchPerfTest.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/BatchPerfTest.java
new file mode 100644
index 0000000000000..4ab36aa521b30
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/BatchPerfTest.java
@@ -0,0 +1,52 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Represents the abstraction of a Performance test class running operations in batches.
+ *
+ *
+ * The performance test class needs to extend this class. The test class should override {@link BatchPerfTest#runBatch()}
+ * and {@link BatchPerfTest#runBatchAsync()} methods and the synchronous and asynchronous test logic respectively.
+ * To add any test setup and logic the test class should override {@link BatchPerfTest#globalSetupAsync()}
+ * and {@link BatchPerfTest#globalCleanupAsync()} methods .
+ *
+ * @param the options configured for the test.
+ */
+public abstract class BatchPerfTest extends ApiPerfTestBase {
+
+ /**
+ * Creates an instance of Batch performance test.
+ * @param options the options configured for the test.
+ * @throws IllegalStateException if SSL context cannot be created.
+ */
+ public BatchPerfTest(TOptions options) {
+ super(options);
+ }
+
+
+ /**
+ * Run batch operation API perf test.
+ * @return the number of operations successfully completed.
+ */
+ public abstract int runBatch();
+
+ /**
+ * Run batch operation async API perf test.
+ * @return A {@link Mono} containing number of operations successfully completed.
+ */
+ public abstract Mono runBatchAsync();
+
+ @Override
+ int runTest() {
+ return runBatch();
+ }
+
+ @Override
+ Mono runTestAsync() {
+ return runBatchAsync();
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/EventPerfTest.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/EventPerfTest.java
new file mode 100644
index 0000000000000..c09b48c6ce5f2
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/EventPerfTest.java
@@ -0,0 +1,93 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Represents the abstraction of a Performance test class.
+ *
+ *
+ * The performance test class needs to extend this class.
+ * To add any test setup and logic the test class should override {@link EventPerfTest#globalSetupAsync()}
+ * and {@link EventPerfTest#globalCleanupAsync()} methods .
+ *
+ * @param the options configured for the test.
+ */
+public abstract class EventPerfTest extends PerfTestBase {
+
+ private final AtomicLong completedOps;
+
+ private volatile boolean errorRaised;
+
+ private long startTime;
+ private Throwable throwable;
+
+ /**
+ * Creates an instance of performance test.
+ * @param options the options configured for the test.
+ * @throws IllegalStateException if SSL context cannot be created.
+ */
+ public EventPerfTest(TOptions options) {
+ super(options);
+ if (options.getTestProxies() != null && options.getTestProxies().size() > 0) {
+ throw new IllegalStateException("Test Proxies are not supported for Event Perf Tests.");
+ }
+ completedOps = new AtomicLong(0);
+ }
+
+ /**
+ * Indicates an event was raised, and records its count internally.
+ */
+ public void eventRaised() {
+ completedOps.getAndIncrement();
+ lastCompletionNanoTime = System.nanoTime() - startTime;
+ }
+ /**
+ * Indicates an error was raised, and stops the performance test flow.
+ */
+ public void errorRaised(Throwable throwable) {
+ synchronized (this) {
+ errorRaised = true;
+ lastCompletionNanoTime = System.nanoTime() - startTime;
+ this.throwable = throwable;
+ notify();
+ }
+ }
+
+ @Override
+ public void runAll(long endNanoTime) {
+ startTime = System.nanoTime();
+ completedOps.set(0);
+ errorRaised = false;
+ lastCompletionNanoTime = 0;
+
+ synchronized (this) {
+ try {
+ wait((endNanoTime - startTime) / 1000000);
+ } catch (InterruptedException e) { }
+ if (errorRaised) {
+ throw new RuntimeException(throwable);
+ }
+ }
+
+ }
+
+ @Override
+ public Mono runAllAsync(long endNanoTime) {
+ return Mono.fromCallable(() -> {
+ runAll(endNanoTime);
+ return Mono.empty();
+ }).then();
+ }
+
+ @Override
+ public long getCompletedOperations() {
+ return completedOps.get();
+ }
+
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockBatchReceiverTest.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockBatchReceiverTest.java
new file mode 100644
index 0000000000000..5ff80e14336e3
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockBatchReceiverTest.java
@@ -0,0 +1,80 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import com.azure.core.util.IterableStream;
+import com.beust.jcommander.Parameter;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Test class for Mock Batch Receiver.
+ */
+public class MockBatchReceiverTest extends BatchPerfTest {
+ final MockReceiver mockReceiver;
+ /**
+ * Creates an instance of performance test.
+ *
+ * @param options the options configured for the test.
+ * @throws IllegalStateException if SSL context cannot be created.
+ */
+ public MockBatchReceiverTest(MockReceiverOptions options) {
+ super(options);
+ mockReceiver = new MockReceiver();
+ }
+
+ @Override
+ public int runBatch() {
+ return ((Long) mockReceiver.receive(options.minMessageCount, options.maxMessageCount).stream().count())
+ .intValue();
+ }
+
+ @Override
+ public Mono runBatchAsync() {
+ return mockReceiver.receiveAsync(options.minMessageCount, options.maxMessageCount)
+ .count().map(count -> count.intValue());
+ }
+
+ /**
+ * Options class for Mock Receiver Test.
+ */
+ public static class MockReceiverOptions extends PerfStressOptions {
+ @Parameter(names = {"--max-message-count" }, description = "Max messages to Receive")
+ private int maxMessageCount = 10;
+
+ @Parameter(names = {"--min-message-count" }, description = "Min messages to Receive")
+ private int minMessageCount = 0;
+
+ /**
+ * Get Max message count;
+ * @return the max message count.
+ */
+ public int getMaxMessageCount() {
+ return maxMessageCount;
+ }
+
+ /**
+ * Get the Min Message count.
+ * @return the Min
+ */
+ public int getMinMessageCount() {
+ return minMessageCount;
+ }
+ }
+
+ private static class MockReceiver {
+ public IterableStream receive(int minMessageCount, int maxMessageCount) {
+ int returnedMessages = (int) ((Math.random() * (maxMessageCount - minMessageCount)) + minMessageCount);
+
+ return IterableStream.of(IntStream.range(1, returnedMessages).boxed().collect(Collectors.toList()));
+ }
+
+ public Flux receiveAsync(int minMessageCount, int maxMessageCount) {
+ return Flux.fromIterable(receive(minMessageCount, maxMessageCount));
+ }
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockErrorContext.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockErrorContext.java
new file mode 100644
index 0000000000000..ebac59cd00998
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockErrorContext.java
@@ -0,0 +1,41 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+/**
+ * Represents Mock Error Context class for {@link MockEventProcessor}
+ */
+public class MockErrorContext {
+ private final Throwable throwable;
+ private final int partition;
+
+ /**
+ * Creates an instance of the Mock Error Context
+ *
+ * @param partition the target partition
+ * @param throwable the error
+ */
+ public MockErrorContext(int partition, Throwable throwable) {
+ this.throwable = throwable;
+ this.partition = partition;
+ }
+
+ /**
+ * Get the error
+ *
+ * @return the throwable error
+ */
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ /**
+ * Get the target partition
+ *
+ * @return the target partition
+ */
+ public int getPartition() {
+ return partition;
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventContext.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventContext.java
new file mode 100644
index 0000000000000..b081f8b364108
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventContext.java
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+/**
+ * Represents Mock Event Context for {@link MockEventProcessor}
+ */
+public class MockEventContext {
+ private final String eventData;
+ private final int partition;
+
+ /**
+ * Creates an instance of Mock Event Context
+ * @param partition the target partition
+ * @param eventData the data for the partition
+ */
+ public MockEventContext(int partition, String eventData) {
+ this.partition = partition;
+ this.eventData = eventData;
+ }
+
+ /**
+ * Get the event data.
+ * @return the event data.
+ */
+ public String getEventData() {
+ return eventData;
+ }
+
+ /**
+ * Get the target partition
+ *
+ * @return the target partition
+ */
+ public int getPartition() {
+ return partition;
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventProcessor.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventProcessor.java
new file mode 100644
index 0000000000000..db6e66d8a41a8
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventProcessor.java
@@ -0,0 +1,163 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+/**
+ * Represents a Mock event processor.
+ */
+public class MockEventProcessor {
+ private final Consumer processError;
+ private final Consumer processEvent;
+ private volatile boolean process;
+ private final double maxEventsPerSecondPerPartition;
+ private final int maxEventsPerSecond;
+ private final int partitions;
+ private final Duration errorAfter;
+ private boolean errorRaised;
+ private final ReentrantLock errorLock;
+ private volatile boolean processPartitions;
+
+ private final MockEventContext[] mockEventContexts;
+ private int[] eventsRaised;
+ private long startTime;
+
+ private final AtomicReference> runner = new AtomicReference<>();
+ private final AtomicReference scheduler = new AtomicReference<>();
+
+ /**
+ * Creates an instance of a mock event processor
+ *
+ * @param partitions the number of partitions
+ * @param maxEventsPerSecond the maximum events per second to send, optional.
+ * @param errorAfter the duration after which processor should error out, optional.
+ * @param processError the consumer to process the error.
+ * @param processEvent the consumer to process the event.
+ */
+ public MockEventProcessor(int partitions, int maxEventsPerSecond, Duration errorAfter,
+ Consumer processError, Consumer processEvent) {
+ this.processError = processError;
+ this.processEvent = processEvent;
+ this.partitions = partitions;
+ this.maxEventsPerSecond = maxEventsPerSecond;
+ this.maxEventsPerSecondPerPartition = ((double) maxEventsPerSecond) / partitions;
+ this.errorAfter = errorAfter;
+ this.errorLock = new ReentrantLock();
+ this.processPartitions = true;
+
+ mockEventContexts = new MockEventContext[partitions];
+ IntStream.range(0, partitions).boxed().forEach(integer -> {
+ mockEventContexts[integer] = new MockEventContext(integer, "Hello");
+ });
+ this.eventsRaised = new int[partitions];
+ }
+
+ /**
+ * Starts the event processor.
+ */
+ public synchronized void start() {
+ eventsRaised = new int[eventsRaised.length];
+ process = true;
+ errorRaised = false;
+ processPartitions = true;
+ startTime = System.nanoTime();
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ scheduler.set(executor);
+
+ runner.set(scheduler.get().schedule(this::processEvents,
+ 0l, TimeUnit.MILLISECONDS));
+ }
+
+ private Mono processEvents() {
+ if (processPartitions) {
+ Flux.range(0, partitions)
+ .parallel()
+ .runOn(Schedulers.boundedElastic())
+ .subscribe(integer -> process(integer));
+ processPartitions = false;
+ }
+ return Mono.empty();
+ }
+
+ private void process(int partition) {
+ MockEventContext mockEventContext = mockEventContexts[partition];
+
+ if (maxEventsPerSecond > 0) {
+ while (process) {
+ long elapsedTime = (System.nanoTime() - startTime);
+ if (errorAfter != null && !errorRaised
+ && (errorAfter.compareTo(Duration.ofNanos(elapsedTime)) < 0)) {
+ errorLock.lock();
+ try {
+ if (!errorRaised) {
+ processError(partition, new IllegalStateException("Test Exception"));
+ errorRaised = true;
+ }
+ } finally {
+ errorLock.unlock();
+ }
+ } else {
+ int eventsSent = eventsRaised[partition];
+ double targetEventsSent = ((double) (elapsedTime / 1_000_000_000))
+ * maxEventsPerSecondPerPartition;
+ if (eventsSent < targetEventsSent) {
+ processEvent.accept(mockEventContext);
+ eventsRaised[partition]++;
+ } else {
+ try {
+ Thread.sleep((long) ((1 / maxEventsPerSecondPerPartition) * 1000));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ } else {
+ while (process) {
+ if (errorAfter != null && !errorRaised
+ && (errorAfter.compareTo(Duration.ofNanos((System.nanoTime() - startTime))) < 0)) {
+ errorLock.lock();
+ try {
+ if (!errorRaised) {
+ processError(partition, new IllegalStateException("Test Exception"));
+ errorRaised = true;
+ }
+ } finally {
+ errorLock.unlock();
+ }
+ } else {
+ processEvent.accept(mockEventContext);
+ eventsRaised[partition]++;
+ }
+ }
+ }
+ }
+
+ private void processError(int partition, Throwable throwable) {
+ processError.accept(new MockErrorContext(partition, throwable));
+ stop();
+ }
+
+ /**
+ * Stops the Event Processor.
+ */
+ public synchronized void stop() {
+ runner.get().cancel(true);
+ scheduler.get().shutdown();
+ this.process = false;
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventProcessorTest.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventProcessorTest.java
new file mode 100644
index 0000000000000..729f668f9a5ae
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/MockEventProcessorTest.java
@@ -0,0 +1,90 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import com.beust.jcommander.Parameter;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.function.Consumer;
+
+/**
+ * Represents a Mock Event Processor Test.
+ */
+public class MockEventProcessorTest extends EventPerfTest {
+ private final MockEventProcessor mockEventProcessor;
+
+ /**
+ * Creates an instance of Mock Event Processor Test
+ * @param perfStressOptions the options to used to configure the test.
+ */
+ public MockEventProcessorTest(MockEventProcessorPerfOptions perfStressOptions) {
+ super(perfStressOptions);
+ Consumer errorProcessor = mockErrorContext -> errorRaised(mockErrorContext.getThrowable());
+
+ Consumer eventProcessor = mockEventContext -> eventRaised();
+
+ Duration errorAfter = perfStressOptions.getErrorAfterInSeconds() > 0
+ ? Duration.ofSeconds(perfStressOptions.getErrorAfterInSeconds()) : null;
+
+ mockEventProcessor = new MockEventProcessor(perfStressOptions.getPartitions(), perfStressOptions.getMaxEventsPerSecond(), errorAfter,
+ errorProcessor, eventProcessor);
+ }
+
+ @Override
+ public Mono setupAsync() {
+ return super.setupAsync().then(Mono.defer(() -> {
+ mockEventProcessor.start();
+ return Mono.empty();
+ }));
+ }
+
+ @Override
+ public Mono cleanupAsync() {
+ return Mono.defer(() -> {
+ mockEventProcessor.stop();
+ return Mono.empty();
+ }).then(super.cleanupAsync());
+ }
+
+ /**
+ * Represents the perf options for Mock Event Processor Test.
+ */
+ public static class MockEventProcessorPerfOptions extends PerfStressOptions {
+ @Parameter(names = { "-meps", "--maxEventsPerSecond" }, description = "Maximum Events to send per second.")
+ private int maxEventsPerSecond = 0;
+
+
+ @Parameter(names = { "-ea", "--errorAfter" }, description = "Error After duration in seconds.")
+ private int errorAfterInSeconds = 0;
+
+ @Parameter(names = { "-pt", "--partitions" }, description = "Number of Partitions.")
+ private int partitions = 1;
+
+
+ /**
+ * Get Error after duration in seconds.
+ * @return the error after duration in seconds.
+ */
+ public int getErrorAfterInSeconds() {
+ return errorAfterInSeconds;
+ }
+
+ /**
+ * Get Maximum events per second.
+ * @return the max events per second.
+ */
+ public int getMaxEventsPerSecond() {
+ return maxEventsPerSecond;
+ }
+
+ /**
+ * Get Maximum events per second.
+ * @return the max events per second.
+ */
+ public int getPartitions() {
+ return partitions;
+ }
+ }
+}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java
index 155c96c0a9639..5fc590b4d9e37 100644
--- a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
/**
* Represents the main program class which reflectively runs and manages the performance tests.
@@ -29,16 +30,13 @@
public class PerfStressProgram {
private static final int NANOSECONDS_PER_SECOND = 1_000_000_000;
- private static int[] completedOperations;
- private static long[] lastCompletionNanoTimes;
-
- private static int getCompletedOperations() {
- return IntStream.of(completedOperations).sum();
+ private static int getCompletedOperations(PerfTestBase>[] tests) {
+ return Stream.of(tests).mapToInt(perfStressTest -> Long.valueOf(perfStressTest.getCompletedOperations()).intValue()).sum();
}
- private static double getOperationsPerSecond() {
- return IntStream.range(0, completedOperations.length)
- .mapToDouble(i -> completedOperations[i] / (((double) lastCompletionNanoTimes[i]) / NANOSECONDS_PER_SECOND))
+ private static double getOperationsPerSecond(PerfTestBase>[] tests) {
+ return IntStream.range(0, tests.length)
+ .mapToDouble(i -> tests[i].getCompletedOperations() / (((double) tests[i].lastCompletionNanoTime) / NANOSECONDS_PER_SECOND))
.sum();
}
@@ -55,9 +53,11 @@ public static void run(Class>[] classes, String[] args) {
try {
classList.add(Class.forName("com.azure.perf.test.core.NoOpTest"));
+ classList.add(Class.forName("com.azure.perf.test.core.MockEventProcessorTest"));
classList.add(Class.forName("com.azure.perf.test.core.ExceptionTest"));
classList.add(Class.forName("com.azure.perf.test.core.SleepTest"));
classList.add(Class.forName("com.azure.perf.test.core.HttpPipelineTest"));
+ classList.add(Class.forName("com.azure.perf.test.core.MockBatchReceiverTest"));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -121,11 +121,11 @@ public static void run(Class> testClass, PerfStressOptions options) {
Disposable setupStatus = printStatus("=== Setup ===", () -> ".", false, false);
Disposable cleanupStatus = null;
- PerfStressTest>[] tests = new PerfStressTest>[options.getParallel()];
+ PerfTestBase>[] tests = new PerfTestBase>[options.getParallel()];
for (int i = 0; i < options.getParallel(); i++) {
try {
- tests[i] = (PerfStressTest>) testClass.getConstructor(options.getClass()).newInstance(options);
+ tests[i] = (PerfTestBase>) testClass.getConstructor(options.getClass()).newInstance(options);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | SecurityException | NoSuchMethodException e) {
throw new RuntimeException(e);
@@ -138,7 +138,7 @@ public static void run(Class> testClass, PerfStressOptions options) {
boolean startedPlayback = false;
try {
- Flux.just(tests).flatMap(PerfStressTest::setupAsync).blockLast();
+ Flux.just(tests).flatMap(PerfTestBase::setupAsync).blockLast();
setupStatus.dispose();
if (options.getTestProxies() != null && !options.getTestProxies().isEmpty()) {
@@ -147,7 +147,7 @@ public static void run(Class> testClass, PerfStressOptions options) {
try {
ForkJoinPool forkJoinPool = new ForkJoinPool(tests.length);
forkJoinPool.submit(() -> {
- IntStream.range(0, tests.length).parallel().forEach(i -> tests[i].recordAndStartPlayback());
+ IntStream.range(0, tests.length).parallel().forEach(i -> tests[i].postSetupAsync().block());
}).get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e);
@@ -174,15 +174,21 @@ public static void run(Class> testClass, PerfStressOptions options) {
try {
if (startedPlayback) {
Disposable playbackStatus = printStatus("=== Stop Playback ===", () -> ".", false, false);
- Flux.just(tests).flatMap(PerfStressTest::stopPlaybackAsync).blockLast();
+ Flux.just(tests).flatMap(perfTestBase -> {
+ if (perfTestBase instanceof ApiPerfTestBase) {
+ return ((ApiPerfTestBase>) perfTestBase).stopPlaybackAsync();
+ } else {
+ return Mono.error(new IllegalStateException("Test Proxy not supported."));
+ }
+ }).blockLast();
playbackStatus.dispose();
- }
+ }
} finally {
if (!options.isNoCleanup()) {
cleanupStatus = printStatus("=== Cleanup ===", () -> ".", false, false);
-
+
Flux.just(tests).flatMap(t -> t.cleanupAsync()).blockLast();
- }
+ }
}
}
} finally {
@@ -212,18 +218,16 @@ public static void run(Class> testClass, PerfStressOptions options) {
* @throws RuntimeException if the execution fails.
* @throws IllegalStateException if zero operations completed of the performance test.
*/
- public static void runTests(PerfStressTest>[] tests, boolean sync, int parallel, int durationSeconds, String title) {
- completedOperations = new int[parallel];
- lastCompletionNanoTimes = new long[parallel];
+ public static void runTests(PerfTestBase>[] tests, boolean sync, int parallel, int durationSeconds, String title) {
long endNanoTime = System.nanoTime() + ((long) durationSeconds * 1000000000);
int[] lastCompleted = new int[]{0};
Disposable progressStatus = printStatus(
"=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal\t\tAverage", () -> {
- int totalCompleted = getCompletedOperations();
+ int totalCompleted = getCompletedOperations(tests);
int currentCompleted = totalCompleted - lastCompleted[0];
- double averageCompleted = getOperationsPerSecond();
+ double averageCompleted = getOperationsPerSecond(tests);
lastCompleted[0] = totalCompleted;
return String.format("%d\t\t%d\t\t%.2f", currentCompleted, totalCompleted, averageCompleted);
@@ -233,7 +237,7 @@ public static void runTests(PerfStressTest>[] tests, boolean sync, int paralle
if (sync) {
ForkJoinPool forkJoinPool = new ForkJoinPool(parallel);
forkJoinPool.submit(() -> {
- IntStream.range(0, parallel).parallel().forEach(i -> runLoop(tests[i], i, endNanoTime));
+ IntStream.range(0, parallel).parallel().forEach(i -> tests[i].runAll(endNanoTime));
}).get();
} else {
@@ -249,7 +253,7 @@ public static void runTests(PerfStressTest>[] tests, boolean sync, int paralle
Flux.range(0, parallel)
.parallel()
.runOn(Schedulers.boundedElastic())
- .flatMap(i -> runLoopAsync(tests[i], i, endNanoTime))
+ .flatMap(i -> tests[i].runAllAsync(endNanoTime))
.then()
.block();
}
@@ -266,11 +270,11 @@ public static void runTests(PerfStressTest>[] tests, boolean sync, int paralle
System.out.println("=== Results ===");
- int totalOperations = getCompletedOperations();
+ int totalOperations = getCompletedOperations(tests);
if (totalOperations == 0) {
throw new IllegalStateException("Zero operations has been completed");
}
- double operationsPerSecond = getOperationsPerSecond();
+ double operationsPerSecond = getOperationsPerSecond(tests);
double secondsPerOperation = 1 / operationsPerSecond;
double weightedAverageSeconds = totalOperations / operationsPerSecond;
@@ -279,29 +283,6 @@ public static void runTests(PerfStressTest>[] tests, boolean sync, int paralle
System.out.println();
}
- private static void runLoop(PerfStressTest> test, int index, long endNanoTime) {
- long startNanoTime = System.nanoTime();
- while (System.nanoTime() < endNanoTime) {
- test.run();
- completedOperations[index]++;
- lastCompletionNanoTimes[index] = System.nanoTime() - startNanoTime;
- }
- }
-
- private static Mono runLoopAsync(PerfStressTest> test, int index, long endNanoTime) {
- long startNanoTime = System.nanoTime();
-
- return Flux.just(1)
- .repeat()
- .flatMap(i -> test.runAsync().then(Mono.just(1)), 1)
- .doOnNext(v -> {
- completedOperations[index]++;
- lastCompletionNanoTimes[index] = System.nanoTime() - startNanoTime;
- })
- .takeWhile(i -> System.nanoTime() < endNanoTime)
- .then();
- }
-
private static Disposable printStatus(String header, Supplier
* @param the options configured for the test.
*/
-public abstract class PerfStressTest {
- private final reactor.netty.http.client.HttpClient recordPlaybackHttpClient;
- private final URI testProxy;
- private final TestProxyPolicy testProxyPolicy;
- private String recordingId;
-
- protected final TOptions options;
-
- // Derived classes should use the ConfigureClientBuilder() method by default. If a ClientBuilder does not
- // follow the standard convention, it can be configured manually using these fields.
- protected final HttpClient httpClient;
- protected final Iterable policies;
-
- private static final AtomicInteger GLOBAL_PARALLEL_INDEX = new AtomicInteger();
- protected final int parallelIndex;
-
+public abstract class PerfStressTest extends ApiPerfTestBase {
/**
* Creates an instance of performance test.
* @param options the options configured for the test.
* @throws IllegalStateException if SSL context cannot be created.
*/
public PerfStressTest(TOptions options) {
- this.options = options;
- this.parallelIndex = GLOBAL_PARALLEL_INDEX.getAndIncrement();
-
- final SslContext sslContext;
-
- if (options.isInsecure()) {
- try {
- sslContext = SslContextBuilder.forClient()
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
- } catch (SSLException e) {
- throw new IllegalStateException(e);
- }
-
- reactor.netty.http.client.HttpClient nettyHttpClient = reactor.netty.http.client.HttpClient.create()
- .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
-
- httpClient = new NettyAsyncHttpClientBuilder(nettyHttpClient).build();
- } else {
- sslContext = null;
- httpClient = null;
- }
-
- if (options.getTestProxies() != null && !options.getTestProxies().isEmpty()) {
- if (options.isInsecure()) {
- recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create()
- .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
- } else {
- recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create();
- }
-
- testProxy = options.getTestProxies().get(parallelIndex % options.getTestProxies().size());
- testProxyPolicy = new TestProxyPolicy(testProxy);
- policies = Arrays.asList(testProxyPolicy);
- } else {
- recordPlaybackHttpClient = null;
- testProxy = null;
- testProxyPolicy = null;
- policies = null;
- }
+ super(options);
}
- /**
- * Attempts to configure a ClientBuilder using reflection. If a ClientBuilder does not follow the standard convention,
- * it can be configured manually using the "httpClient" and "policies" fields.
- * @param clientBuilder The client builder.
- * @throws IllegalStateException If reflective access to get httpClient or addPolicy methods fail.
- */
- protected void configureClientBuilder(Object clientBuilder) {
- if (httpClient != null || policies != null) {
- Class> clientBuilderClass = clientBuilder.getClass();
-
- try {
- if (httpClient != null) {
- Method httpClientMethod = clientBuilderClass.getMethod("httpClient", HttpClient.class);
- httpClientMethod.invoke(clientBuilder, httpClient);
- }
-
- if (policies != null) {
- Method addPolicyMethod = clientBuilderClass.getMethod("addPolicy", HttpPipelinePolicy.class);
- for (HttpPipelinePolicy policy : policies) {
- addPolicyMethod.invoke(clientBuilder, policy);
- }
- }
- } catch (ReflectiveOperationException e) {
- throw new IllegalStateException(e);
- }
- }
+ @Override
+ int runTest() {
+ run();
+ return 1;
}
- /**
- * Runs the setup required prior to running the performance test.
- * @return An empty {@link Mono}
- */
- public Mono globalSetupAsync() {
- return Mono.empty();
- }
-
- /**
- * Runs the setup required prior to running an individual thread in the performance test.
- * @return An empty {@link Mono}
- */
- public Mono setupAsync() {
- return Mono.empty();
- }
-
- /**
- * Records responses and starts tests in playback mode.
- */
- public void recordAndStartPlayback() {
- // Make one call to Run() before starting recording, to avoid capturing one-time setup like authorization requests.
- runSyncOrAsync();
-
- startRecordingAsync().block();
-
- testProxyPolicy.setRecordingId(recordingId);
- testProxyPolicy.setMode("record");
-
- runSyncOrAsync();
- stopRecordingAsync().block();
- startPlaybackAsync().block();
-
- testProxyPolicy.setRecordingId(recordingId);
- testProxyPolicy.setMode("playback");
- }
-
- private void runSyncOrAsync() {
- if (options.isSync()) {
- run();
- } else {
- runAsync().block();
- }
+ @Override
+ Mono runTestAsync() {
+ return runAsync().then(Mono.just(1));
}
/**
@@ -174,72 +47,4 @@ private void runSyncOrAsync() {
* @return An empty {@link Mono}
*/
public abstract Mono runAsync();
-
- /**
- * Stops playback tests.
- * @return An empty {@link Mono}.
- */
- public Mono stopPlaybackAsync() {
- return recordPlaybackHttpClient
- .headers(h -> {
- h.set("x-recording-id", recordingId);
- h.set("x-purge-inmemory-recording", Boolean.toString(true));
- })
- .post()
- .uri(testProxy.resolve("/playback/stop"))
- .response()
- .doOnSuccess(response -> {
- testProxyPolicy.setMode(null);
- testProxyPolicy.setRecordingId(null);
- })
- .then();
- }
-
- /**
- * Runs the cleanup logic after an individual thread finishes in the performance test.
- * @return An empty {@link Mono}
- */
- public Mono cleanupAsync() {
- return Mono.empty();
- }
-
- /**
- * Runs the cleanup logic after the performance test finishes.
- * @return An empty {@link Mono}
- */
- public Mono globalCleanupAsync() {
- return Mono.empty();
- }
-
- private Mono startRecordingAsync() {
- return recordPlaybackHttpClient
- .post()
- .uri(testProxy.resolve("/record/start"))
- .response()
- .doOnNext(response -> {
- recordingId = response.responseHeaders().get("x-recording-id");
- })
- .then();
- }
-
- private Mono stopRecordingAsync() {
- return recordPlaybackHttpClient
- .headers(h -> h.set("x-recording-id", recordingId))
- .post()
- .uri(testProxy.resolve("/record/stop"))
- .response()
- .then();
- }
-
- private Mono startPlaybackAsync() {
- return recordPlaybackHttpClient
- .headers(h -> h.set("x-recording-id", recordingId))
- .post()
- .uri(testProxy.resolve("/playback/start"))
- .response()
- .doOnNext(response -> {
- recordingId = response.responseHeaders().get("x-recording-id");
- })
- .then();
- }
}
diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfTestBase.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfTestBase.java
new file mode 100644
index 0000000000000..0fc4804591006
--- /dev/null
+++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfTestBase.java
@@ -0,0 +1,94 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.perf.test.core;
+
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Performance Test Base class.
+ * @param the options used to configure the options class.
+ */
+public abstract class PerfTestBase {
+ protected final TOptions options;
+ protected long lastCompletionNanoTime;
+ protected final int parallelIndex;
+
+ private static final AtomicInteger GLOBAL_PARALLEL_INDEX = new AtomicInteger();
+
+ /**
+ * Creates an instance of Perf Test Base class.
+ * @param options options used to configure the perf test.
+ */
+ public PerfTestBase(TOptions options) {
+ this.options = options;
+ this.parallelIndex = GLOBAL_PARALLEL_INDEX.getAndIncrement();
+
+ }
+
+ /**
+ * Runs the setup required prior to running the performance test.
+ * @return An empty {@link Mono}
+ */
+ public Mono globalSetupAsync() {
+ return Mono.empty();
+ }
+
+ /**
+ * Runs the setup required prior to running an individual thread in the performance test.
+ * @return An empty {@link Mono}
+ */
+ public Mono setupAsync() {
+ return Mono.empty();
+ }
+
+ /**
+ * Runs the sync perf test until specified system nano time.
+ * @param endNanoTime the target time to run the performance test for.
+ */
+ public abstract void runAll(long endNanoTime);
+
+ /**
+ * Runs the async perf test until specified system nano time.
+ * @param endNanoTime the target time to run the performance test for.
+ * @return A {@link Mono} containing void.
+ */
+ public abstract Mono runAllAsync(long endNanoTime);
+
+ /**
+ * Runs before cleanup stage.
+ *
+ * @return A {@link Mono} containing void.
+ */
+ Mono preCleanupAsync() {
+ return Mono.empty();
+ }
+
+ /**
+ * Runs after performance test finishes.
+ * @return A {@link Mono} containing void.
+ */
+ public Mono cleanupAsync() {
+ return Mono.empty();
+ }
+
+ /**
+ * Runs before performance test is triggered.
+ * @return A {@link Mono} containing void.
+ */
+ public Mono globalCleanupAsync() {
+ return Mono.empty();
+ }
+
+ Mono postSetupAsync() {
+ return Mono.empty();
+ }
+
+ /**
+ * Get completed operations.
+ * @return the completed operations.
+ */
+ public abstract long getCompletedOperations();
+}