Skip to content

Commit

Permalink
Fix java compile errors (#410)
Browse files Browse the repository at this point in the history
* set correct maven artifactId/groupId

the previous artifactId simplesend was a copy paste error,
leading to failed compilation with maven as two modules with the
same name is not allowed.

* adjust code to azure-eventhubs >1.0.0

the code doesn't compile with these adjustments as the API has changed since 0.15.1
  • Loading branch information
birdayz authored and sjkwak committed Jul 16, 2018
1 parent 0e5fa4a commit e14317f
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 33 deletions.
4 changes: 2 additions & 2 deletions samples/Java/Basic/AdvancedSendOptions/pom.xml
@@ -1,8 +1,8 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>simplesend</groupId>
<groupId>advancedsendoptions</groupId>
<version>1.0.0</version>
<artifactId>simplesend</artifactId>
<artifactId>advancedsendoptions</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Expand Up @@ -6,7 +6,6 @@

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -15,6 +14,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;

/*
Expand All @@ -24,7 +25,7 @@
public class AutoScaleOnIngress {

public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {
throws EventHubException, ExecutionException, InterruptedException, IOException {

// *********************************************************************
// List of variables involved - to achieve desired LOAD / THROUGHPUT UNITS
Expand All @@ -43,12 +44,14 @@ public static void main(String[] args)

final int NO_OF_CONNECTIONS = tus;

final ExecutorService executorService = Executors.newSingleThreadExecutor();

System.out.println();
System.out.print("EventHub Connection String: ");
final String connectionString = System.console().readLine();
System.out.println();

final EventHubClientPool ehClientPool = new EventHubClientPool(NO_OF_CONNECTIONS, connectionString);
final EventHubClientPool ehClientPool = new EventHubClientPool(NO_OF_CONNECTIONS, connectionString, executorService);

ehClientPool.initialize().get();
System.out.println("started sending...");
Expand All @@ -62,26 +65,26 @@ public static void main(String[] args)
for (int batchSize = 0; batchSize < BATCH_SIZE; batchSize++) {
final byte[] payload = new byte[EVENT_SIZE];
Arrays.fill(payload, (byte) 32);
final EventData eventData = new EventData(payload);
final EventData eventData = EventData.create(payload);
eventDataList.add(eventData);
}

for (int concurrentSends = 0; concurrentSends < NO_OF_CONCURRENT_SENDS; concurrentSends++) {
if (sendTasks[concurrentSends] == null || sendTasks[concurrentSends].isDone()) {
sendTasks[concurrentSends] = ehClientPool.send(eventDataList)
.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
System.out.println(String.format("result: %s, latency: %s, batchSize, %s", throwable == null ? "success" : "failure",
Duration.between(beforeSend, Instant.now()).toMillis(), BATCH_SIZE));

if (throwable != null && throwable.getCause() != null) {
System.out.println(String.format("%s :send failed with error: %s",
Instant.now().toString(),
throwable.getCause().getMessage()));
}
.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
System.out.println(String.format("result: %s, latency: %s, batchSize, %s", throwable == null ? "success" : "failure",
Duration.between(beforeSend, Instant.now()).toMillis(), BATCH_SIZE));

if (throwable != null && throwable.getCause() != null) {
System.out.println(String.format("%s :send failed with error: %s",
Instant.now().toString(),
throwable.getCause().getMessage()));
}
});
}
});
}
}

Expand Down
Expand Up @@ -7,9 +7,9 @@
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;


Expand All @@ -19,20 +19,22 @@ public final class EventHubClientPool {
private final String connectionString;
private final Object previouslySentLock = new Object();
private final EventHubClient[] clients;
private final ExecutorService executorService;

private int previouslySent = 0;

EventHubClientPool(final int poolSize, final String connectionString) {
EventHubClientPool(final int poolSize, final String connectionString, ExecutorService executorService) {
this.poolSize = poolSize;
this.connectionString = connectionString;
this.clients = new EventHubClient[this.poolSize];
this.executorService = executorService;
}

public CompletableFuture<Void> initialize() throws IOException, EventHubException {
final CompletableFuture[] createSenders = new CompletableFuture[this.poolSize];
for (int count = 0; count < poolSize; count++) {
final int clientsIndex = count;
createSenders[count] = EventHubClient.createFromConnectionString(this.connectionString).thenAccept(new Consumer<EventHubClient>() {
createSenders[count] = EventHubClient.create(this.connectionString, executorService).thenAccept(new Consumer<EventHubClient>() {
@Override
public void accept(EventHubClient eventHubClient) {
clients[clientsIndex] = eventHubClient;
Expand Down
Expand Up @@ -7,9 +7,9 @@
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;


Expand All @@ -19,20 +19,22 @@ public final class EventHubClientPool {
private final String connectionString;
private final Object previouslySentLock = new Object();
private final EventHubClient[] clients;
private final ExecutorService executorService;

private int previouslySent = 0;

EventHubClientPool(final int poolSize, final String connectionString) {
EventHubClientPool(final int poolSize, final String connectionString, ExecutorService executorService) {
this.poolSize = poolSize;
this.connectionString = connectionString;
this.clients = new EventHubClient[this.poolSize];
this.executorService = executorService;
}

public CompletableFuture<Void> initialize() throws IOException, EventHubException {
final CompletableFuture[] createSenders = new CompletableFuture[this.poolSize];
for (int count = 0; count < poolSize; count++) {
final int clientsIndex = count;
createSenders[count] = EventHubClient.createFromConnectionString(this.connectionString).thenAccept(new Consumer<EventHubClient>() {
createSenders[count] = EventHubClient.create(this.connectionString, executorService).thenAccept(new Consumer<EventHubClient>() {
@Override
public void accept(EventHubClient eventHubClient) {
clients[clientsIndex] = eventHubClient;
Expand Down
Expand Up @@ -7,7 +7,6 @@
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -16,6 +15,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;

/*
Expand All @@ -42,14 +43,14 @@
public class IngressBenchmark {

public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {
throws EventHubException, ExecutionException, InterruptedException, IOException {

final String namespaceName = "----ServiceBusNamespaceName-----";
final String eventHubName = "----EventHubName-----";
final String sasKeyName = "-----SharedAccessSignatureKeyName-----";
final String sasKey = "---SharedAccessSignatureKey----";
final ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);

final ConnectionStringBuilder connStr = new ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName(eventHubName)
.setSasKeyName(sasKeyName).setSasKey(sasKey);

// ***************************************************************************************************************
// List of variables involved
Expand All @@ -64,19 +65,20 @@ public static void main(String[] args)

final int NO_OF_CONNECTIONS = 10;

final ExecutorService executorService = Executors.newSingleThreadExecutor();

// each EventHubClient reserves its own **PHYSICAL SOCKET**
final EventHubClientPool ehClientPool = new EventHubClientPool(NO_OF_CONNECTIONS, connStr.toString());
final EventHubClientPool ehClientPool = new EventHubClientPool(NO_OF_CONNECTIONS, connStr.toString(), executorService);
ehClientPool.initialize().get();


final CompletableFuture<Void>[] sendTasks = new CompletableFuture[NO_OF_CONCURRENT_SENDS];
for (int perfSample = 0; perfSample < 50000 - NO_OF_CONCURRENT_SENDS + 1; perfSample++) {
final List<EventData> eventDataList = new LinkedList<>();

for (int batchSize = 0; batchSize < BATCH_SIZE; batchSize++) {
final byte[] payload = new byte[EVENT_SIZE];
Arrays.fill(payload, (byte) 32);
final EventData eventData = new EventData(payload);
final EventData eventData = EventData.create(payload);
eventDataList.add(eventData);
}

Expand All @@ -88,12 +90,14 @@ public static void main(String[] args)
sendTasks[concurrentSends] = ehClientPool.send(eventDataList).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
System.out.println(String.format("%s,%s", throwable == null ? "success" : "failure", Duration.between(beforeSend, Instant.now()).toMillis()));
System.out.println(
String.format("%s,%s", throwable == null ? "success" : "failure", Duration.between(beforeSend, Instant.now()).toMillis()));
}
});

if (!isInitializing)
if (!isInitializing) {
break;
}
}
}

Expand Down

0 comments on commit e14317f

Please sign in to comment.