diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java new file mode 100644 index 0000000000..29adfb9837 --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.benchmarks.actors.tcp.async; + +import org.apache.iggy.bench.models.cli.GlobalCliArgs; +import org.apache.iggy.bench.models.common.generator.DataBatch; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Partitioning; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; + +public final class TcpAsyncPinnedProducerActor { + + private static final long PARTITION_ID = 0L; + + private final GlobalCliArgs globalCliArgs; + private final int actorId; + private final StreamId streamId; + private final TopicId topicId; + private final Partitioning partitioning; + private final DataBatch fullBatch; + private final long targetMessageBatches; + private final long targetDataBytes; + private AsyncIggyTcpClient client; + + public TcpAsyncPinnedProducerActor( + GlobalCliArgs globalCliArgs, + int actorId, + String streamName, + String topicName, + DataBatch fullBatch, + long targetMessageBatches, + long targetDataBytes) { + this.globalCliArgs = globalCliArgs; + this.actorId = actorId; + this.streamId = StreamId.of(streamName); + this.topicId = TopicId.of(topicName); + this.partitioning = Partitioning.partitionId(PARTITION_ID); + this.fullBatch = fullBatch; + this.targetMessageBatches = targetMessageBatches; + this.targetDataBytes = targetDataBytes; + } + + public CompletableFuture run() { + try { + return AsyncIggyTcpClient.builder() + .credentials(globalCliArgs.username(), globalCliArgs.password()) + .buildAndLogin() + .thenCompose(client -> { + this.client = client; + long warmupDeadline = globalCliArgs.warmupTimeMs() <= 0L + ? 0L + : System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(globalCliArgs.warmupTimeMs()); + + return sendMessages(warmupDeadline, 0L, 0L) + .handle((ignored, sendFailure) -> { + CompletableFuture closeFuture = client.close(); + + if (sendFailure != null) { + CompletableFuture failedAfterClose = + closeFuture.handle((closeIgnored, closeFailure) -> { + throw new CompletionException(sendFailure); + }); + return failedAfterClose; + } + + return closeFuture; + }) + .thenCompose(future -> future); + }); + } catch (RuntimeException exception) { + return CompletableFuture.failedFuture(exception); + } + } + + private CompletableFuture sendMessages(long warmupDeadline, long sentBatches, long sentBytes) { + if (warmupDeadline > 0L) { + if (System.nanoTime() >= warmupDeadline) { + return sendMessages(0L, 0L, 0L); + } + + return sendBatch(fullBatch, false) + .thenCompose(ignored -> sendMessages(warmupDeadline, sentBatches, sentBytes)); + } + + if (targetDataBytes > 0L ? sentBytes >= targetDataBytes : sentBatches >= targetMessageBatches) { + return CompletableFuture.completedFuture(null); + } + + return sendBatch(fullBatch, true).thenCompose(ignored -> { + long updatedSentBatches = sentBatches + 1L; + long updatedSentBytes = sentBytes + fullBatch.userDataBytes(); + return sendMessages(0L, updatedSentBatches, updatedSentBytes); + }); + } + + private CompletableFuture sendBatch(DataBatch currentBatch, boolean recordMetrics) { + return client.messages().sendMessages(streamId, topicId, partitioning, currentBatch.messages()); + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java new file mode 100644 index 0000000000..303f420742 --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.benchmarks.runners.tcp.async; + +import org.apache.iggy.bench.benchmarks.actors.tcp.async.TcpAsyncPinnedProducerActor; +import org.apache.iggy.bench.common.generator.BenchmarkBatchGenerator; +import org.apache.iggy.bench.common.provision.ResourceProvisioner; +import org.apache.iggy.bench.models.cli.GlobalCliArgs; +import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs; +import org.apache.iggy.bench.models.common.generator.DataBatch; +import org.apache.iggy.bench.models.common.provision.ProvisionedResources; + +import java.util.concurrent.CompletableFuture; + +public final class TcpAsyncPinnedProducer { + + private final GlobalCliArgs globalCliArgs; + private final PinnedProducerCliArgs pinnedProducerCliArgs; + private final ResourceProvisioner resourceProvisioner; + private ProvisionedResources provisionedResources; + + TcpAsyncPinnedProducer( + GlobalCliArgs globalCliArgs, + PinnedProducerCliArgs pinnedProducerCliArgs, + ResourceProvisioner resourceProvisioner) { + this.globalCliArgs = globalCliArgs; + this.pinnedProducerCliArgs = pinnedProducerCliArgs; + this.resourceProvisioner = resourceProvisioner; + + provisionResources(); + runBenchmark().join(); + } + + public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs, PinnedProducerCliArgs pinnedProducerCliArgs) { + this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner()); + } + + private void provisionResources() { + this.provisionedResources = resourceProvisioner.provisionResources(globalCliArgs, pinnedProducerCliArgs); + } + + private CompletableFuture runBenchmark() { + try { + String topicName = provisionedResources.topicNames().get(0); + var batchGenerator = + new BenchmarkBatchGenerator(globalCliArgs.messageSize(), globalCliArgs.messagesPerBatch()); + DataBatch fullBatch = batchGenerator.generateBatch(); + long targetMessageBatches = globalCliArgs.totalData() > 0L ? 0L : globalCliArgs.messageBatches(); + long targetDataBytes = + globalCliArgs.totalData() > 0L ? globalCliArgs.totalData() / pinnedProducerCliArgs.producers() : 0L; + var actorRuns = new CompletableFuture[pinnedProducerCliArgs.producers()]; + + for (int index = 0; index < pinnedProducerCliArgs.producers(); index++) { + String streamName = provisionedResources.streamNames().get(index); + var actor = new TcpAsyncPinnedProducerActor( + globalCliArgs, + index + 1, + streamName, + topicName, + fullBatch, + targetMessageBatches, + targetDataBytes); + actorRuns[index] = actor.run(); + } + + return CompletableFuture.allOf(actorRuns); + } catch (RuntimeException exception) { + return CompletableFuture.failedFuture(exception); + } + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java deleted file mode 100644 index a366744383..0000000000 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iggy.bench.benchmarks.tcp.async; - -import org.apache.iggy.bench.models.cli.GlobalCliArgs; -import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs; -import org.apache.iggy.bench.models.provision.ProvisionedResources; -import org.apache.iggy.bench.provision.ResourceProvisioner; - -public final class TcpAsyncPinnedProducer { - - private final GlobalCliArgs globalCliArgs; - private final PinnedProducerCliArgs pinnedProducerCliArgs; - private final ResourceProvisioner resourceProvisioner; - - TcpAsyncPinnedProducer( - GlobalCliArgs globalCliArgs, - PinnedProducerCliArgs pinnedProducerCliArgs, - ResourceProvisioner resourceProvisioner) { - this.globalCliArgs = globalCliArgs; - this.pinnedProducerCliArgs = pinnedProducerCliArgs; - this.resourceProvisioner = resourceProvisioner; - } - - public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs, PinnedProducerCliArgs pinnedProducerCliArgs) { - this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner()); - } - - public ProvisionedResources provisionResources() { - return resourceProvisioner.provisionResources(globalCliArgs, pinnedProducerCliArgs); - } - - public void run() {} -} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java index 9d58a6fcf4..9485e209c5 100644 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java @@ -19,7 +19,7 @@ package org.apache.iggy.bench.cli; -import org.apache.iggy.bench.benchmarks.tcp.async.TcpAsyncPinnedProducer; +import org.apache.iggy.bench.benchmarks.runners.tcp.async.TcpAsyncPinnedProducer; import org.apache.iggy.bench.models.cli.GlobalCliArgs; import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs; import org.slf4j.Logger; @@ -97,9 +97,7 @@ public Integer call() { pinnedProducerCliArgs.validate(); log.info("Starting the Pinned Producer benchmark..."); - var benchmark = new TcpAsyncPinnedProducer(globalCliArgs, pinnedProducerCliArgs); - benchmark.provisionResources(); - benchmark.run(); + new TcpAsyncPinnedProducer(globalCliArgs, pinnedProducerCliArgs); return ExitCode.OK; } catch (RuntimeException exception) { diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java new file mode 100644 index 0000000000..53bd54ea0e --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.common.enums; + +public enum ActorKind { + PRODUCER("producer"), + CONSUMER("consumer"), + PRODUCING_CONSUMER("producing_consumer"); + + private final String value; + + ActorKind(String value) { + this.value = value; + } + + public String value() { + return value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java new file mode 100644 index 0000000000..d186d48206 --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.common.enums; + +public enum BenchmarkKind { + PINNED_PRODUCER("pinned_producer"), + PINNED_CONSUMER("pinned_consumer"), + PINNED_PRODUCER_AND_CONSUMER("pinned_producer_and_consumer"), + BALANCED_PRODUCER("balanced_producer"), + BALANCED_CONSUMER_GROUP("balanced_consumer_group"), + BALANCED_PRODUCER_AND_CONSUMER_GROUP("balanced_producer_and_consumer_group"), + END_TO_END_PRODUCING_CONSUMER("end_to_end_producing_consumer"), + END_TO_END_PRODUCING_CONSUMER_GROUP("end_to_end_producing_consumer_group"); + + private final String value; + + BenchmarkKind(String value) { + this.value = value; + } + + public String value() { + return value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java new file mode 100644 index 0000000000..fa22d198ce --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.common.enums; + +public enum GroupKind { + PRODUCERS("producers"), + CONSUMERS("consumers"), + PRODUCERS_AND_CONSUMERS("producers_and_consumers"), + PRODUCING_CONSUMERS("producing_consumers"); + + private final String value; + + GroupKind(String value) { + this.value = value; + } + + public String value() { + return value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java new file mode 100644 index 0000000000..f71c500f1e --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.common.enums; + +public enum TransportKind { + TCP("tcp"), + HTTP("http"); + + private final String value; + + TransportKind(String value) { + this.value = value; + } + + public String value() { + return value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java similarity index 95% rename from foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java rename to foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java index 2a9b2def40..26b3c83424 100644 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iggy.bench.exception; +package org.apache.iggy.bench.common.exception; public class BenchmarkException extends RuntimeException { diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java new file mode 100644 index 0000000000..91ed032bb0 --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.common.generator; + +import org.apache.iggy.bench.models.common.generator.DataBatch; +import org.apache.iggy.message.Message; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; + +public final class BenchmarkBatchGenerator { + + private static final byte[] ALPHANUMERIC = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".getBytes(StandardCharsets.US_ASCII); + + private final int messagesPerBatch; + private final String payloadTemplate; + + public BenchmarkBatchGenerator(int messageSize, int messagesPerBatch) { + this.messagesPerBatch = messagesPerBatch; + this.payloadTemplate = randomPayload(messageSize); + } + + public DataBatch generateBatch() { + return buildBatch(messagesPerBatch); + } + + public DataBatch generateBatch(int messagesInBatch) { + return buildBatch(messagesInBatch); + } + + private DataBatch buildBatch(int messagesInBatch) { + var messages = new ArrayList(messagesInBatch); + long userDataBytes = 0L; + long totalBytes = 0L; + + for (int messageIndex = 0; messageIndex < messagesInBatch; messageIndex++) { + var message = Message.of(payloadTemplate); + messages.add(message); + userDataBytes += message.payload().length; + totalBytes += message.getSize(); + } + + return new DataBatch(messages, userDataBytes, totalBytes); + } + + private static String randomPayload(int messageSize) { + var payload = new byte[messageSize]; + var random = ThreadLocalRandom.current(); + + for (int index = 0; index < messageSize; index++) { + payload[index] = ALPHANUMERIC[random.nextInt(ALPHANUMERIC.length)]; + } + + return new String(payload, StandardCharsets.US_ASCII); + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java new file mode 100644 index 0000000000..a3c7605d27 --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.common.provision; + +import org.apache.iggy.bench.common.exception.BenchmarkException; +import org.apache.iggy.bench.models.cli.GlobalCliArgs; +import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs; +import org.apache.iggy.bench.models.common.provision.ProvisionedResources; +import org.apache.iggy.client.blocking.tcp.IggyTcpClient; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.topic.CompressionAlgorithm; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public final class ResourceProvisioner { + + private static final Logger log = LoggerFactory.getLogger(ResourceProvisioner.class); + + public ProvisionedResources provisionResources( + GlobalCliArgs globalCliArgs, PinnedProducerCliArgs pinnedProducerCliArgs) { + var streamNames = new ArrayList(); + var topicNames = List.of("topic-1"); + + try { + try (var client = IggyTcpClient.builder() + .credentials(globalCliArgs.username(), globalCliArgs.password()) + .buildAndLogin()) { + var existingStreamNames = client.streams().getStreams().stream() + .map(stream -> stream.name()) + .toList(); + + for (var i = 1; i <= pinnedProducerCliArgs.producers(); i++) { + var streamName = "bench-stream-" + i; + streamNames.add(streamName); + + if (existingStreamNames.contains(streamName) && !globalCliArgs.reuseStreams()) { + log.info("Deleting pre-existing stream '{}'", streamName); + client.streams().deleteStream(StreamId.of(streamName)); + } + + if (existingStreamNames.contains(streamName) && globalCliArgs.reuseStreams()) { + log.info("Appending to existing stream '{}'", streamName); + } else { + log.info("Creating the test stream '{}'", streamName); + + client.streams().createStream(streamName); + + var maxTopicSize = pinnedProducerCliArgs.maxTopicSize() == 0L + ? "server default" + : Long.toString(pinnedProducerCliArgs.maxTopicSize()); + var messageExpiry = pinnedProducerCliArgs.messageExpiry() == 0L + ? "never" + : Long.toString(pinnedProducerCliArgs.messageExpiry()); + + log.info( + "Creating the test topic '{}' for stream '{}' with max topic size: {}, message expiry: {}", + topicNames.get(0), + streamName, + maxTopicSize, + messageExpiry); + + client.topics() + .createTopic( + StreamId.of(streamName), + 1L, + CompressionAlgorithm.None, + BigInteger.valueOf(pinnedProducerCliArgs.messageExpiry()), + BigInteger.valueOf(pinnedProducerCliArgs.maxTopicSize()), + Optional.empty(), + topicNames.get(0)); + } + } + } + } catch (RuntimeException exception) { + throw new BenchmarkException("Failed to provision benchmark resources.", exception); + } + + return new ProvisionedResources(streamNames, topicNames); + } +} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java index 51e92b41aa..a603e14255 100644 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java @@ -19,7 +19,7 @@ package org.apache.iggy.bench.models.cli; -import org.apache.iggy.bench.exception.BenchmarkException; +import org.apache.iggy.bench.common.exception.BenchmarkException; public record GlobalCliArgs( int messageSize, diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java index dd355a929d..56ee439374 100644 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java @@ -19,7 +19,7 @@ package org.apache.iggy.bench.models.cli; -import org.apache.iggy.bench.exception.BenchmarkException; +import org.apache.iggy.bench.common.exception.BenchmarkException; public record PinnedProducerCliArgs(int streams, int producers, long maxTopicSize, long messageExpiry) { diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java new file mode 100644 index 0000000000..d491c81523 --- /dev/null +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.bench.models.common.generator; + +import org.apache.iggy.message.Message; + +import java.util.List; + +public record DataBatch(List messages, long userDataBytes, long totalBytes) {} diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java similarity index 94% rename from foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java rename to foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java index e5411e24e5..d560c1c4ab 100644 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java +++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iggy.bench.models.provision; +package org.apache.iggy.bench.models.common.provision; import java.util.List; diff --git a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java b/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java deleted file mode 100644 index dc5c716254..0000000000 --- a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iggy.bench.provision; - -import org.apache.iggy.bench.exception.BenchmarkException; -import org.apache.iggy.bench.models.cli.GlobalCliArgs; -import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs; -import org.apache.iggy.bench.models.provision.ProvisionedResources; -import org.apache.iggy.client.blocking.tcp.IggyTcpClient; -import org.apache.iggy.identifier.StreamId; -import org.apache.iggy.topic.CompressionAlgorithm; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public final class ResourceProvisioner { - - private static final Logger log = LoggerFactory.getLogger(ResourceProvisioner.class); - - public ProvisionedResources provisionResources( - GlobalCliArgs globalCliArgs, PinnedProducerCliArgs pinnedProducerCliArgs) { - var streamNames = new ArrayList(); - var topicNames = List.of("topic-1"); - - try (var client = IggyTcpClient.builder() - .credentials(globalCliArgs.username(), globalCliArgs.password()) - .buildAndLogin()) { - var existingStreamNames = client.streams().getStreams().stream() - .map(stream -> stream.name()) - .toList(); - - for (var i = 1; i <= pinnedProducerCliArgs.producers(); i++) { - var streamName = "bench-stream-" + i; - streamNames.add(streamName); - - if (existingStreamNames.contains(streamName) && !globalCliArgs.reuseStreams()) { - log.info("Deleting pre-existing stream '{}'", streamName); - client.streams().deleteStream(StreamId.of(streamName)); - } - - if (existingStreamNames.contains(streamName) && globalCliArgs.reuseStreams()) { - log.info("Appending to existing stream '{}'", streamName); - } else { - log.info("Creating the test stream '{}'", streamName); - - client.streams().createStream(streamName); - - var maxTopicSize = pinnedProducerCliArgs.maxTopicSize() == 0L - ? "server default" - : Long.toString(pinnedProducerCliArgs.maxTopicSize()); - var messageExpiry = pinnedProducerCliArgs.messageExpiry() == 0L - ? "never" - : Long.toString(pinnedProducerCliArgs.messageExpiry()); - - log.info( - "Creating the test topic '{}' for stream '{}' with max topic size: {}, message expiry: {}", - topicNames.get(0), - streamName, - maxTopicSize, - messageExpiry); - - client.topics() - .createTopic( - StreamId.of(streamName), - 1L, - CompressionAlgorithm.None, - BigInteger.valueOf(pinnedProducerCliArgs.messageExpiry()), - BigInteger.valueOf(pinnedProducerCliArgs.maxTopicSize()), - Optional.empty(), - topicNames.get(0)); - } - } - } catch (RuntimeException exception) { - throw new BenchmarkException("Failed to provision benchmark resources.", exception); - } - - return new ProvisionedResources(streamNames, topicNames); - } -}