Skip to content

Commit

Permalink
#367: replaced homebrew async result handling with CompletableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
bbottema committed Jan 2, 2022
1 parent 30f3923 commit 214b14b
Show file tree
Hide file tree
Showing 17 changed files with 10,367 additions and 239 deletions.
Expand Up @@ -7,7 +7,6 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.simplejavamail.api.internal.batchsupport.LifecycleDelegatingTransport;
import org.simplejavamail.api.mailer.AsyncResponse;
import org.simplejavamail.api.mailer.config.OperationalConfig;
import org.simplejavamail.internal.batchsupport.concurrent.NonJvmBlockingThreadPoolExecutor;
import org.simplejavamail.internal.modules.BatchModule;
Expand All @@ -17,6 +16,7 @@
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand All @@ -41,9 +41,8 @@ public class BatchSupport implements BatchModule {
/**
* @see BatchModule#executeAsync(String, Runnable)
*/
@NotNull
@Override
public AsyncResponse executeAsync(@NotNull final String processName, @NotNull final Runnable operation) {
public CompletableFuture<Void> executeAsync(@NotNull final String processName, @NotNull final Runnable operation) {
return AsyncOperationHelper.executeAsync(processName, operation);
}

Expand All @@ -52,7 +51,7 @@ public AsyncResponse executeAsync(@NotNull final String processName, @NotNull fi
*/
@NotNull
@Override
public AsyncResponse executeAsync(@NotNull final ExecutorService executorService, @NotNull final String processName, @NotNull final Runnable operation) {
public CompletableFuture<Void> executeAsync(@NotNull final ExecutorService executorService, @NotNull final String processName, @NotNull final Runnable operation) {
return AsyncOperationHelper.executeAsync(executorService, processName, operation);
}

Expand Down

This file was deleted.

Expand Up @@ -13,6 +13,7 @@
import org.simplejavamail.api.mailer.config.ServerConfig;
import org.simplejavamail.api.mailer.config.TransportStrategy;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
Expand Down Expand Up @@ -48,16 +49,16 @@ public interface Mailer {
* <p>
* Note: synchronizes on the thread for sending mails so that we don't get into race condition conflicts with emails actually being sent.
*
* @return An AsyncResponse in case of async == true, otherwise <code>null</code>.
* @return A {@link CompletableFuture} that is completed immediately if not <em>async</em>.
*/
AsyncResponse testConnection(boolean async);
@NotNull CompletableFuture<Void> testConnection(boolean async);

/**
* Delegates to {@link #sendMail(Email, boolean)}, with <code>async = false</code>. This method returns only when the email has been processed by
* the target SMTP server.
* @return AsyncResponse if the email was configured to be sent asynchronously.
* @return A {@link CompletableFuture} that is completed immediately if not <em>async</em>.
*/
@Nullable AsyncResponse sendMail(Email email);
@NotNull CompletableFuture<Void> sendMail(Email email);

/**
* Processes an {@link Email} instance into a completely configured {@link Message}.
Expand All @@ -78,14 +79,12 @@ public interface Mailer {
* @param email The information for the email to be sent.
* @param async If false, this method blocks until the mail has been processed completely by the SMTP server. If true, a new thread is started to
* send the email and this method returns immediately.
* @return A {@link AsyncResponse} or null if not <em>async</em>.
* @return A {@link CompletableFuture} that is completed immediately if not <em>async</em>.
* @throws MailException Can be thrown if an email isn't validating correctly, or some other problem occurs during connection, sending etc.
* @see java.util.concurrent.Executors#newFixedThreadPool(int)
* @see #validate(Email)
*/
@Nullable
// FIXME replace with Optional when Java 8?
AsyncResponse sendMail(Email email, @SuppressWarnings("SameParameterValue") boolean async);
@NotNull CompletableFuture<Void> sendMail(Email email, @SuppressWarnings("SameParameterValue") boolean async);

/**
* Validates an {@link Email} instance. Validation fails if the subject is missing, content is missing, or no recipients are defined or that
Expand Down
Expand Up @@ -4,10 +4,10 @@
import jakarta.mail.Transport;
import org.jetbrains.annotations.NotNull;
import org.simplejavamail.api.internal.batchsupport.LifecycleDelegatingTransport;
import org.simplejavamail.api.mailer.AsyncResponse;
import org.simplejavamail.api.mailer.config.OperationalConfig;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

Expand All @@ -23,14 +23,13 @@ public interface BatchModule {
*
* @see java.util.concurrent.Executors#newSingleThreadExecutor()
*/
@NotNull
AsyncResponse executeAsync(@NotNull String processName, @NotNull Runnable operation);
CompletableFuture<Void> executeAsync(@NotNull String processName, @NotNull Runnable operation);

/**
* Executes using the given ExecutorService, which is left running after the thread finishes running.
*/
@NotNull
AsyncResponse executeAsync(@NotNull ExecutorService executorService, @NotNull String processName, @NotNull Runnable operation);
CompletableFuture<Void> executeAsync(@NotNull ExecutorService executorService, @NotNull String processName, @NotNull Runnable operation);

/**
* @return A NonJvmBlockingThreadPoolExecutor instance that by default doesn't block the JVM from exiting
Expand Down
@@ -1,76 +1,41 @@
package org.simplejavamail.internal.util.concurrent;

import org.jetbrains.annotations.NotNull;
import org.simplejavamail.api.mailer.AsyncResponse;
import org.slf4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.simplejavamail.internal.util.Preconditions.assumeTrue;
import static org.slf4j.LoggerFactory.getLogger;

/**
* Util that facilitates running a concurrent operation while supporting {@link AsyncResponse}.
* Util that facilitates running a concurrent operation with CompletableFuture support.
*/
@SuppressWarnings("SameParameterValue")
public class AsyncOperationHelper {

private static final Logger LOGGER = getLogger(AsyncOperationHelper.class);

// TODO Lombok
private AsyncOperationHelper() {
}

/**
* Executes using a single-execution ExecutorService, which shutdown immediately after the thread finishes.
* Executes using a single-execution ExecutorService, which is shutdown immediately after the operation finishes.
*
* @see Executors#newSingleThreadExecutor()
*/
public static AsyncResponse executeAsync(final @NotNull String processName,
final @NotNull Runnable operation) {
return executeAsync(newSingleThreadExecutor(), processName, operation, true);
public static CompletableFuture<Void> executeAsync(final @NotNull String processName, final @NotNull Runnable operation) {
final ExecutorService executorService = newSingleThreadExecutor();
return runAsync(new NamedRunnable(processName, operation), executorService)
.thenRun(executorService::shutdown);
}

/**
* Executes using the given ExecutorService, which is left running after the thread finishes running.
*
* @see Executors#newSingleThreadExecutor()
*/
public static AsyncResponse executeAsync(final @NotNull ExecutorService executorService,
final @NotNull String processName,
final @NotNull Runnable operation) {
return executeAsync(executorService, processName, operation, false);
}

private static AsyncResponse executeAsync(final @NotNull ExecutorService executorService,
final @NotNull String processName,
final @NotNull Runnable operation,
final boolean shutDownExecutorService) {
// atomic reference is needed to be able to smuggle the asyncResponse
// into the Runnable which is passed itself to the asyncResponse.
final AtomicReference<AsyncResponseImpl> asyncResponseRef = new AtomicReference<>();
public static CompletableFuture<Void> executeAsync(final @NotNull ExecutorService executorService, final @NotNull String processName, final @NotNull Runnable operation) {
assumeTrue(!executorService.isShutdown(), "cannot send async email, executor service is already shut down!");
asyncResponseRef.set(new AsyncResponseImpl(executorService.submit(new NamedRunnable(processName) {
@Override
public void run() {
// by the time the code reaches here, the user would have configured the appropriate handlers
try {
operation.run();
asyncResponseRef.get().delegateSuccessHandling();
} catch (Exception e) {
LOGGER.error("Failed to run " + processName, e);
asyncResponseRef.get().delegateExceptionHandling(e);
throw e; // trigger the returned Future's exception handle
} finally {
if (shutDownExecutorService) {
executorService.shutdown();
}
}
}

})));
return asyncResponseRef.get();
return runAsync(new NamedRunnable(processName, operation), executorService);
}
}
}

This file was deleted.

@@ -1,15 +1,40 @@
package org.simplejavamail.internal.util.concurrent;

public abstract class NamedRunnable implements Runnable {
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;

private final String name;
import static org.slf4j.LoggerFactory.getLogger;

protected NamedRunnable(final String name) {
this.name = name;

/**
* This Runnable is smart in the sense that it can shutdown the
*/
// TODO LOMBOK
public class NamedRunnable implements Runnable {

private static final Logger LOGGER = getLogger(NamedRunnable.class);

@NotNull private final String processName;
@NotNull private final Runnable operation;

protected NamedRunnable(@NotNull String processName, @NotNull Runnable operation) {
this.processName = processName;
this.operation = operation;
}

@Override
public void run() {
// by the time the code reaches here, the user would have configured the appropriate handlers
try {
operation.run();
} catch (Exception e) {
LOGGER.error("Failed to run " + processName, e);
throw e;
}
}

@Override
public String toString() {
return name;
return processName;
}
}
2 changes: 1 addition & 1 deletion modules/core-test-module/pom.xml
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>2.9.1</version>
<version>3.21.0</version>
</dependency>
</dependencies>
</project>

0 comments on commit 214b14b

Please sign in to comment.