diff --git a/core/src/main/java/com/sun/mail/util/PropUtil.java b/core/src/main/java/com/sun/mail/util/PropUtil.java index 7190e078..e4d29784 100644 --- a/core/src/main/java/com/sun/mail/util/PropUtil.java +++ b/core/src/main/java/com/sun/mail/util/PropUtil.java @@ -16,8 +16,9 @@ package com.sun.mail.util; -import java.util.*; import jakarta.mail.Session; +import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; /** * Utilities to make it easier to get property values. @@ -56,6 +57,20 @@ public static boolean getBooleanProperty(Properties props, return getBoolean(getProp(props, name), def); } + /** + * Get a ScheduledExecutorService valued property. + * + * @param props the properties + * @param name the property name + * @return the property value, null if the property is null + * @throws ClassCastException if the property value's class is + * not {@link java.util.concurrent.ScheduledThreadPoolExecutor } + */ + public static ScheduledExecutorService getScheduledExecutorServiceProperty(Properties props, + String name) { + return getScheduledExecutorService(getProp(props, name)); + } + /** * Get an integer valued property. * @@ -149,6 +164,10 @@ private static int getInt(Object value, int def) { return def; } + private static ScheduledExecutorService getScheduledExecutorService(Object value) { + return (ScheduledExecutorService) value; + } + /** * Interpret the value object as a boolean, * returning def if unable. diff --git a/core/src/main/java/com/sun/mail/util/SocketFetcher.java b/core/src/main/java/com/sun/mail/util/SocketFetcher.java index cce5a88c..0a4cbf3f 100644 --- a/core/src/main/java/com/sun/mail/util/SocketFetcher.java +++ b/core/src/main/java/com/sun/mail/util/SocketFetcher.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Properties; import java.util.StringTokenizer; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -341,7 +342,11 @@ private static Socket createSocket(InetAddress localaddr, int localport, if (writeTimeout != -1) { // wrap original if (logger.isLoggable(Level.FINEST)) logger.finest("set socket write timeout " + writeTimeout); - socket = new WriteTimeoutSocket(socket, writeTimeout); + ScheduledExecutorService executorService = PropUtil.getScheduledExecutorServiceProperty(props, + prefix + ".executor.writetimeout"); + socket = executorService == null ? + new WriteTimeoutSocket(socket, writeTimeout) : + new WriteTimeoutSocket(socket, writeTimeout, executorService); } if (localaddr != null) socket.bind(new InetSocketAddress(localaddr, localport)); diff --git a/core/src/main/java/com/sun/mail/util/WriteTimeoutSocket.java b/core/src/main/java/com/sun/mail/util/WriteTimeoutSocket.java index aa59b830..2e634d16 100644 --- a/core/src/main/java/com/sun/mail/util/WriteTimeoutSocket.java +++ b/core/src/main/java/com/sun/mail/util/WriteTimeoutSocket.java @@ -31,11 +31,14 @@ import java.util.Collections; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A special Socket that uses a ScheduledExecutorService to @@ -50,16 +53,27 @@ public class WriteTimeoutSocket extends Socket { private final Socket socket; // to schedule task to cancel write after timeout private final ScheduledExecutorService ses; + // flag to indicate whether scheduled executor is provided from outside or + // should be created here in constructor + private final boolean isExternalSes; // the timeout, in milliseconds private final int timeout; public WriteTimeoutSocket(Socket socket, int timeout) throws IOException { this.socket = socket; // XXX - could share executor with all instances? - this.ses = Executors.newScheduledThreadPool(1); + this.ses = createScheduledThreadPool(); + this.isExternalSes = false; this.timeout = timeout; } + public WriteTimeoutSocket(Socket socket, int timeout, ScheduledExecutorService ses) throws IOException { + this.socket = socket; + this.ses = ses; + this.timeout = timeout; + this.isExternalSes = true; + } + public WriteTimeoutSocket(int timeout) throws IOException { this(new Socket(), timeout); } @@ -158,7 +172,7 @@ public InputStream getInputStream() throws IOException { @Override public synchronized OutputStream getOutputStream() throws IOException { // wrap the returned stream to implement write timeout - return new TimeoutOutputStream(socket.getOutputStream(), ses, timeout); + return new TimeoutOutputStream(socket, ses, timeout); } @Override @@ -261,7 +275,8 @@ public void close() throws IOException { try { socket.close(); } finally { - ses.shutdownNow(); + if (!isExternalSes) + ses.shutdownNow(); } } @@ -350,6 +365,14 @@ public Set> supportedOptions() { } return null; } + + private ScheduledThreadPoolExecutor createScheduledThreadPool() { + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + // Without setting setRemoveOnCancelPolicy = true write methods will create garbage that would only be + // reclaimed after the timeout. + scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); + return scheduledThreadPoolExecutor; + } } @@ -359,22 +382,32 @@ public Set> supportedOptions() { * socket (aborting the write) if the timeout expires. */ class TimeoutOutputStream extends OutputStream { + private static final String WRITE_TIMEOUT_MESSAGE = "Write timed out"; + private static final String CANNOT_GET_TIMEOUT_TASK_RESULT_MESSAGE = "Couldn't get result of timeout task"; + private final OutputStream os; private final ScheduledExecutorService ses; - private final Callable timeoutTask; + private final Callable timeoutTask; private final int timeout; private byte[] b1; + private final Socket socket; + // Implement timeout with a scheduled task + private ScheduledFuture sf = null; - public TimeoutOutputStream(OutputStream os0, ScheduledExecutorService ses, - int timeout) throws IOException { - this.os = os0; + public TimeoutOutputStream(Socket socket, ScheduledExecutorService ses, int timeout) throws IOException { + this.os = socket.getOutputStream(); this.ses = ses; this.timeout = timeout; - timeoutTask = new Callable() { + this.socket = socket; + timeoutTask = new Callable() { @Override - public Object call() throws Exception { - os.close(); // close the stream to abort the write - return null; + public String call() throws Exception { + try { + os.close(); // close the stream to abort the write + } catch (Throwable t) { + return t.toString(); + } + return WRITE_TIMEOUT_MESSAGE; } }; } @@ -397,26 +430,61 @@ public synchronized void write(byte[] bs, int off, int len) return; } - // Implement timeout with a scheduled task - ScheduledFuture sf = null; try { - try { - if (timeout > 0) - sf = ses.schedule(timeoutTask, - timeout, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ex) { - // ignore it; Executor was shut down by another thread, - // the following write should fail with IOException - } - os.write(bs, off, len); + try { + if (timeout > 0) + sf = ses.schedule(timeoutTask, + timeout, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + if (!socket.isClosed()) { + throw new IOException("Write aborted due to timeout not enforced", ex); + } + } + + try { + os.write(bs, off, len); + } catch (IOException e) { + if (sf != null && !sf.cancel(true)) { + throw new IOException(handleTimeoutTaskResult(sf), e); + } + throw e; + } } finally { if (sf != null) - sf.cancel(true); - } + sf.cancel(true); + } } @Override public void close() throws IOException { - os.close(); + os.close(); + if (sf != null) { + sf.cancel(true); + } + } + + private String handleTimeoutTaskResult(ScheduledFuture sf) { + boolean wasInterrupted = Thread.interrupted(); + String exceptionMessage = null; + try { + return sf.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + exceptionMessage = String.format("%s %s", e, ses.toString()); + } catch (CancellationException e) { + exceptionMessage = e.toString(); + } catch (InterruptedException e) { + wasInterrupted = true; + exceptionMessage = e.toString(); + } catch (ExecutionException e) { + exceptionMessage = e.getCause() == null ? e.toString() : e.getCause().toString(); + } catch (Exception e) { + exceptionMessage = e.toString(); + } finally { + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } + } + + return String.format("%s. %s", CANNOT_GET_TIMEOUT_TASK_RESULT_MESSAGE, exceptionMessage); } } diff --git a/providers/angus-mail/src/test/java/com/sun/mail/test/ReflectionUtil.java b/providers/angus-mail/src/test/java/com/sun/mail/test/ReflectionUtil.java new file mode 100644 index 00000000..51e0a5d8 --- /dev/null +++ b/providers/angus-mail/src/test/java/com/sun/mail/test/ReflectionUtil.java @@ -0,0 +1,38 @@ +package com.sun.mail.test; + +import java.lang.reflect.Field; + +public final class ReflectionUtil { + private ReflectionUtil() { + throw new UnsupportedOperationException(); + } + + public static Field setFieldValue(Object object, + String fieldName, + Object valueTobeSet) throws NoSuchFieldException, IllegalAccessException { + Field field = getField(object.getClass(), fieldName); + field.setAccessible(true); + field.set(object, valueTobeSet); + return field; + } + + public static Object getPrivateFieldValue(Object object, + String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field field = getField(object.getClass(), fieldName); + field.setAccessible(true); + return field.get(object); + } + + private static Field getField(Class mClass, String fieldName) throws NoSuchFieldException { + try { + return mClass.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + Class superClass = mClass.getSuperclass(); + if (superClass == null) { + throw e; + } else { + return getField(superClass, fieldName); + } + } + } +} diff --git a/providers/angus-mail/src/test/java/com/sun/mail/util/PropUtilTest.java b/providers/angus-mail/src/test/java/com/sun/mail/util/PropUtilTest.java index 6805f580..268b6eb9 100644 --- a/providers/angus-mail/src/test/java/com/sun/mail/util/PropUtilTest.java +++ b/providers/angus-mail/src/test/java/com/sun/mail/util/PropUtilTest.java @@ -16,19 +16,22 @@ package com.sun.mail.util; -import java.util.Properties; import jakarta.mail.Session; -import com.sun.mail.util.PropUtil; +import java.util.HashSet; +import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.junit.Test; /** * Test that the PropUtil methods return the correct values, * especially when defaults and non-String values are considered. */ public class PropUtilTest { + @Test public void testInt() throws Exception { Properties props = new Properties(); @@ -179,4 +182,34 @@ public void testSystemBoolean() throws Exception { System.getProperties().put("testboolean", true); assertTrue(PropUtil.getBooleanSystemProperty("testboolean", false)); } + + @Test + public void testScheduledExecutorWriteTimeout() { + final String executorPropertyName = "test"; + ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(1); + try { + Properties props = new Properties(); + props.put(executorPropertyName, ses); + assertEquals(ses, PropUtil.getScheduledExecutorServiceProperty(props, executorPropertyName)); + } finally { + ses.shutdownNow(); + } + } + + @Test + public void testScheduledExecutorWriteTimeoutIsNull() { + final String executorPropertyName = "test"; + Properties props = new Properties(); + + assertEquals(null, PropUtil.getScheduledExecutorServiceProperty(props, executorPropertyName)); + } + + @Test(expected = ClassCastException.class) + public void testScheduledExecutorWriteTimeoutWrongType() { + final String executorPropertyName = "test"; + Properties props = new Properties(); + props.put(executorPropertyName, new HashSet<>()); + + PropUtil.getScheduledExecutorServiceProperty(props, executorPropertyName); + } } diff --git a/providers/angus-mail/src/test/java/com/sun/mail/util/TimeoutOutputStreamTest.java b/providers/angus-mail/src/test/java/com/sun/mail/util/TimeoutOutputStreamTest.java new file mode 100644 index 00000000..f8e0e2d3 --- /dev/null +++ b/providers/angus-mail/src/test/java/com/sun/mail/util/TimeoutOutputStreamTest.java @@ -0,0 +1,526 @@ +/* + * Copyright (c) 2009, 2022 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package com.sun.mail.util; + +import com.sun.mail.test.ReflectionUtil; +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public final class TimeoutOutputStreamTest { + private List serverSockets = new ArrayList<>(); + private List sockets = new ArrayList<>(); + private List scheduledExecutorServices = new ArrayList<>(); + + @After + public void tearDown() { + scheduledExecutorServices.forEach(this::close); + scheduledExecutorServices.clear(); + serverSockets.forEach(this::close); + serverSockets.clear(); + sockets.forEach(this::close); + sockets.clear(); + } + + @Test + public void testWriteSesWithRemoveOnCancelPolicyTrue() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(true); + BlockingQueue queue = ses.getQueue(); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 5000); + + timeoutOutputStream.write(new byte[]{0}, 0, 1); + + Assert.assertEquals(0, queue.size()); + Assert.assertTrue(getScheduledFeature(timeoutOutputStream).isCancelled()); + } + + @Test + public void testWriteSesWithRemoveOnCancelPolicyFalse() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + BlockingQueue queue = ses.getQueue(); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 5000); + + timeoutOutputStream.write(new byte[]{0}, 0, 1); + + Assert.assertEquals(1, queue.size()); + ScheduledFuture sf = (ScheduledFuture) queue.peek(); + Assert.assertTrue(sf.isCancelled()); + } + + @Test + public void testWriteSesWithRemoveOnCancelPolicyFalseWithoutTimeout() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + BlockingQueue queue = ses.getQueue(); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 0); + + timeoutOutputStream.write(new byte[]{0}, 0, 1); + + Assert.assertEquals(0, queue.size()); + Assert.assertNull(getScheduledFeature(timeoutOutputStream)); + } + + @Test + public void testWriteRejectedExecutionException() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + ses.shutdownNow(); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + + IOException expectedException = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("Write aborted due to timeout not enforced", expectedException.getMessage()); + Assert.assertFalse(socket.isClosed()); + } + + @Test + public void testWriteSwallowRejectedExecutionException() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + ses.shutdownNow(); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + socket.close(); + + IOException expectedException = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("Socket closed", expectedException.getMessage()); + Assert.assertTrue(socket.isClosed()); + } + + @Test + public void testSocketClosedAfterWrite() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 0); + + timeoutOutputStream.write(new byte[]{0}, 0, 1); + + Assert.assertFalse(socket.isClosed()); + Assert.assertNull(getScheduledFeature(timeoutOutputStream)); + } + + @Test + public void testWriteSocketNoTimeoutWithIOException() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + TimeoutOutputStream timeoutOutputStream = + new TimeoutOutputStream(socket, ses, 0); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(0), new IOException("an error"))); + + IOException expectedException = Assert.assertThrows(IOException.class, + () -> timeoutOutputStream.write(new byte[] {1}, 0, 1)); + + Assert.assertEquals("an error", expectedException.getMessage()); + } + + @Test + public void testWriteSocketClosedByWriteTimeout() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + TimeoutOutputStream timeoutOutputStream = + new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getSlowOutputStream(socket.getOutputStream(), Duration.ofSeconds(1), null)); + + IOException expectedException = Assert.assertThrows(IOException.class, + () -> timeoutOutputStream.write(new byte[] {1}, 0, 1)); + + Assert.assertEquals("Write timed out", expectedException.getMessage()); + Assert.assertTrue(socket.isClosed()); + } + + @Test + public void testWriteSocketClosedByWriteTimeoutWithException() throws Exception { + Socket socket = createSocket(); + ScheduledThreadPoolExecutor ses = createSes(); + TimeoutOutputStream timeoutOutputStream = + new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getSlowOutputStream(socket.getOutputStream(), Duration.ofSeconds(1), new RuntimeException("Unknown error"))); + + IOException expectedException = Assert.assertThrows(IOException.class, + () -> timeoutOutputStream.write(new byte[] {1}, 0, 1)); + + Assert.assertEquals("java.lang.RuntimeException: Unknown error", expectedException.getMessage()); + Assert.assertTrue(socket.isClosed()); + } + + @Test + public void testHandleTimeoutTaskResultCancellationException() throws Exception { + Socket socket = createSocket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + this.scheduledExecutorServices.add(ses); + CancellationException e = new CancellationException("An exception happened"); + ses.setCancellationException(e); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(1), new IOException("any error"))); + + IOException exception = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("java.io.IOException: Couldn't get result of timeout task. java.util.concurrent" + + ".CancellationException: An exception happened", exception.toString()); + Assert.assertEquals("java.io.IOException: any error", exception.getCause().toString()); + } + + @Test + public void testHandleTimeoutTaskResultTimeoutException() throws Exception { + Socket socket = createSocket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + this.scheduledExecutorServices.add(ses); + TimeoutException e = new TimeoutException("An exception happened"); + ses.setTimeoutException(e); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(1), new IOException("any error"))); + + IOException exception = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertTrue(exception.toString().startsWith( + "java.io.IOException: Couldn't get result of timeout task. java" + + ".util.concurrent.TimeoutException: An exception happened")); + Assert.assertTrue(exception.toString().contains("CustomScheduledThreadPoolExecutor")); + Assert.assertEquals("java.io.IOException: any error", exception.getCause().toString()); + } + + @Test + public void testHandleTimeoutTaskResultExecutionException() throws Exception { + Socket socket = createSocket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + this.scheduledExecutorServices.add(ses); + ExecutionException e = new ExecutionException(new RuntimeException("Random exception")); + ses.setExecutionException(e); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(1), new IOException("any error"))); + + IOException exception = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("java.io.IOException: Couldn't get result of timeout task." + + " java.lang.RuntimeException: Random exception", exception.toString()); + Assert.assertEquals("java.io.IOException: any error", exception.getCause().toString()); + } + + @Test + public void testHandleTimeoutTaskResultInterruptedException() throws Exception { + Socket socket = createSocket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + this.scheduledExecutorServices.add(ses); + InterruptedException e = new InterruptedException("An exception happened"); + ses.setInterruptedException(e); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(1), new IOException("any error"))); + + IOException exception = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("java.io.IOException: Couldn't get result of timeout task. " + + "java.lang.InterruptedException: An exception happened", exception.toString()); + Assert.assertEquals("java.io.IOException: any error", exception.getCause().toString()); + } + + @Test + public void testHandleTimeoutTaskResultRuntimeException() throws Exception { + Socket socket = createSocket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + this.scheduledExecutorServices.add(ses); + RuntimeException e = new RuntimeException("An exception happened"); + ses.setRuntimeException(e); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(1), new IOException("any error"))); + + IOException exception = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("java.io.IOException: Couldn't get result of timeout task. " + + "java.lang.RuntimeException: An exception happened", exception.toString()); + Assert.assertEquals("java.io.IOException: any error", exception.getCause().toString()); + } + + @Test + public void testHandleTimeoutTaskResultWithNoException() throws Exception { + Socket socket = createSocket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + this.scheduledExecutorServices.add(ses); + TimeoutOutputStream timeoutOutputStream = new TimeoutOutputStream(socket, ses, 1); + ReflectionUtil.setFieldValue(timeoutOutputStream, "os", + getOutputStreamIOException(socket.getOutputStream(), Duration.ofSeconds(1), new IOException("any error"))); + + IOException exception = + Assert.assertThrows(IOException.class, () -> timeoutOutputStream.write(new byte[] {0}, 0, 1)); + + Assert.assertEquals("java.io.IOException: some result", exception.toString()); + } + + private int getRandomFreePort() throws IOException { + ServerSocket serverSocket = new ServerSocket(0); + serverSockets.add(serverSocket); + int freePort = serverSocket.getLocalPort(); + + return freePort; + } + + private OutputStream getSlowOutputStream(OutputStream os, Duration delay, RuntimeException exception) { + OutputStream outputStream = new OutputStream() { + @Override + public void write(int i) throws IOException { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + os.write(i); + } + + @Override + public void close() throws IOException { + os.close(); + if (exception != null) { + throw exception; + } + } + }; + + return outputStream; + } + + private OutputStream getOutputStreamIOException(OutputStream os, Duration delay, IOException exception) { + OutputStream outputStream = new OutputStream() { + @Override + public void write(int i) throws IOException { + throw exception; + } + + @Override + public void close() throws IOException { + os.close(); + } + }; + + return outputStream; + } + + private void close(ServerSocket serverSocket) { + if (serverSocket.isClosed()) { + return; + } + + try { + serverSocket.close(); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + private void close(Socket socket) { + if (socket.isClosed()) { + return; + } + + try { + socket.close(); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + private void close(ScheduledExecutorService ses) { + if (ses.isTerminated()) { + return; + } + + try { + ses.shutdownNow(); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + private Socket createSocket() throws IOException { + int port = getRandomFreePort(); + Socket socket = new Socket("localhost", port); + sockets.add(socket); + return socket; + } + + private ScheduledThreadPoolExecutor createSes(boolean removeOnCancelPolicy) { + ScheduledThreadPoolExecutor ses = createSes(); + ses.setRemoveOnCancelPolicy(removeOnCancelPolicy); + return ses; + } + + private ScheduledFuture getScheduledFeature( + TimeoutOutputStream timeoutOutputStream) { + try { + ScheduledFuture sf = + (ScheduledFuture) ReflectionUtil.getPrivateFieldValue(timeoutOutputStream, "sf"); + return sf; + } catch (Exception e) { + throw new RuntimeException("Couldn't extract scheduled feature", e); + } + } + + private ScheduledThreadPoolExecutor createSes() { + ScheduledThreadPoolExecutor ses = new ScheduledThreadPoolExecutor(1); + scheduledExecutorServices.add(ses); + return ses; + } + + private static final class CustomScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private ScheduledFeatureMock scheduledFuture = new ScheduledFeatureMock(); + + public CustomScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize); + } + + @Override + public ScheduledFuture schedule(Callable callable, + long delay, TimeUnit unit) { + return (ScheduledFuture) scheduledFuture; + } + + public void setCancellationException(CancellationException cancellationException) { + scheduledFuture.setCancellationException(cancellationException); + } + + public void setInterruptedException(InterruptedException interruptedException) { + scheduledFuture.setInterruptedException(interruptedException); + } + + public void setExecutionException(ExecutionException executionException) { + scheduledFuture.setExecutionException(executionException); + } + + public void setTimeoutException(TimeoutException timeoutException) { + scheduledFuture.setTimeoutException(timeoutException); + } + + public void setRuntimeException(RuntimeException runtimeException) { + scheduledFuture.setRuntimeException(runtimeException); + } + } + + private static final class ScheduledFeatureMock implements ScheduledFuture { + private CancellationException cancellationException; + private InterruptedException interruptedException; + private ExecutionException executionException; + private TimeoutException timeoutException; + private RuntimeException runtimeException; + + @Override + public long getDelay(TimeUnit timeUnit) { + return 0; + } + + @Override + public int compareTo(Delayed delayed) { + return 0; + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public String get() throws InterruptedException, ExecutionException { + return null; + } + + @Override + public String get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + if (cancellationException != null) { + throw cancellationException; + } + if (interruptedException != null) { + throw interruptedException; + } + + if (executionException != null) { + throw executionException; + } + + if (timeoutException != null) { + throw timeoutException; + } + + if (runtimeException != null) { + throw runtimeException; + } + + return "some result"; + } + + public void setCancellationException(CancellationException cancellationException) { + this.cancellationException = cancellationException; + } + + public void setInterruptedException(InterruptedException interruptedException) { + this.interruptedException = interruptedException; + } + + public void setExecutionException(ExecutionException executionException) { + this.executionException = executionException; + } + + public void setTimeoutException(TimeoutException timeoutException) { + this.timeoutException = timeoutException; + } + + public void setRuntimeException(RuntimeException runtimeException) { + this.runtimeException = runtimeException; + } + } +} diff --git a/providers/angus-mail/src/test/java/com/sun/mail/util/WriteTimeoutSocketTest.java b/providers/angus-mail/src/test/java/com/sun/mail/util/WriteTimeoutSocketTest.java index d531cbaa..17f6207a 100644 --- a/providers/angus-mail/src/test/java/com/sun/mail/util/WriteTimeoutSocketTest.java +++ b/providers/angus-mail/src/test/java/com/sun/mail/util/WriteTimeoutSocketTest.java @@ -16,6 +16,12 @@ package com.sun.mail.util; +import com.sun.mail.iap.ConnectionException; +import com.sun.mail.imap.IMAPHandler; +import com.sun.mail.test.ReflectionUtil; +import com.sun.mail.test.TestSSLSocketFactory; +import com.sun.mail.test.TestServer; +import com.sun.mail.test.TestSocketFactory; import jakarta.activation.DataHandler; import jakarta.mail.Folder; import jakarta.mail.Message; @@ -25,25 +31,29 @@ import jakarta.mail.StoreClosedException; import jakarta.mail.internet.MimeMessage; import jakarta.mail.util.ByteArrayDataSource; - import java.io.FileDescriptor; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.net.InetAddress; +import java.net.ServerSocket; import java.net.Socket; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Properties; import java.util.Set; - -import com.sun.mail.imap.IMAPHandler; -import com.sun.mail.test.TestSSLSocketFactory; -import com.sun.mail.test.TestServer; -import com.sun.mail.test.TestSocketFactory; - +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -52,6 +62,9 @@ * Test that write timeouts work. */ public final class WriteTimeoutSocketTest { + private TestServer testServer; + private List scheduledExecutorServices = new ArrayList<>(); + private List writeTimeoutSockets = new ArrayList<>(); // timeout the test in case of deadlock @Rule @@ -61,6 +74,15 @@ public final class WriteTimeoutSocketTest { private static final String data = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + @After + public void tearDown() { + close(testServer); + scheduledExecutorServices.forEach(this::close); + writeTimeoutSockets.forEach(this::close); + scheduledExecutorServices.clear(); + writeTimeoutSockets.clear(); + } + /** * Test write timeouts with plain sockets. */ @@ -227,7 +249,172 @@ else if (j == 63) testFileDescriptor$(new PublicFileSocket2of3()); testFileDescriptor$(new PublicFileSocket3of3()); } - + + @Test + public void testExternalSesIsBeingUsed() throws Exception { + final Properties properties = new Properties(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + scheduledExecutorServices.add(ses); + properties.setProperty("mail.imap.host", "localhost"); + properties.setProperty("mail.imap.writetimeout", "" + TIMEOUT); + properties.put("mail.imap.executor.writetimeout", ses); + + test(properties, false); + + assertFalse(ses.isShutdownNowMethodCalled); + assertTrue(ses.isScheduleMethodCalled); + } + + @Test + public void testRejectedExecutionException() throws Exception { + final Properties properties = new Properties(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + scheduledExecutorServices.add(ses); + ses.shutdownNow(); + properties.setProperty("mail.imap.host", "localhost"); + properties.setProperty("mail.imap.writetimeout", "" + TIMEOUT); + properties.put("mail.imap.executor.writetimeout", ses); + + try { + test(properties, false); + fail("Expected IOException wasn't thrown "); + } catch (MessagingException mex) { + Throwable cause = mex.getCause(); + assertTrue(cause instanceof ConnectionException); + assertTrue(cause.getMessage().contains("java.io.IOException: Write aborted due to timeout not enforced")); + } + } + + @Test + public void testCloseOneSocketDoesntImpactAnother() throws Exception { + WriteTimeoutSocket wts1 = new WriteTimeoutSocket(new Socket(), 10000); + WriteTimeoutSocket wts2 = new WriteTimeoutSocket(new Socket(), 10000); + writeTimeoutSockets.add(wts1); + writeTimeoutSockets.add(wts2); + + ScheduledExecutorService ses1 = + (ScheduledExecutorService) ReflectionUtil.getPrivateFieldValue(wts1, "ses"); + ScheduledExecutorService ses2 = + (ScheduledExecutorService) ReflectionUtil.getPrivateFieldValue(wts2, "ses"); + scheduledExecutorServices.add(ses1); + scheduledExecutorServices.add(ses2); + + assertFalse(ses1.isTerminated()); + assertFalse(ses2.isTerminated()); + + wts1.close(); + assertTrue(ses1.isTerminated()); + assertFalse(ses2.isTerminated()); + } + + @Test + public void testDefaultSesConstructor1() throws Exception { + WriteTimeoutSocket writeTimeoutSocket = new WriteTimeoutSocket(new Socket(), 10000); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertFalse((Boolean) isExternalSes); + } + + @Test + public void testDefaultSesConstructor2() throws Exception { + WriteTimeoutSocket writeTimeoutSocket = new WriteTimeoutSocket(10000); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertFalse((Boolean) isExternalSes); + } + + @Test + public void testDefaultSesConstructor3() throws Exception { + testServer = getActiveTestServer(false); + + WriteTimeoutSocket writeTimeoutSocket = + new WriteTimeoutSocket("localhost", testServer.getPort(), 10000); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertFalse((Boolean) isExternalSes); + } + + @Test + public void testDefaultSesConstructor4() throws Exception { + testServer = getActiveTestServer(false); + WriteTimeoutSocket writeTimeoutSocket = + new WriteTimeoutSocket(InetAddress.getLocalHost(), testServer.getPort(), 10000); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertFalse((Boolean) isExternalSes); + } + + @Test + public void testDefaultSesConstructor5() throws Exception { + testServer = getActiveTestServer(false); + + WriteTimeoutSocket writeTimeoutSocket = + new WriteTimeoutSocket("localhost", testServer.getPort(), + (InetAddress) null, getRandomFreePort(), 10000); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertFalse((Boolean) isExternalSes); + } + + @Test + public void testDefaultSesConstructor6() throws Exception { + testServer = getActiveTestServer(false); + + WriteTimeoutSocket writeTimeoutSocket = + new WriteTimeoutSocket(InetAddress.getByName("localhost"), testServer.getPort(), + (InetAddress) null, getRandomFreePort(), 10000); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertFalse((Boolean) isExternalSes); + } + + @Test + public void testExternalSesConstructor7() throws Exception { + WriteTimeoutSocket writeTimeoutSocket = + new WriteTimeoutSocket(new Socket(), 10000, new ScheduledThreadPoolExecutor(1)); + writeTimeoutSockets.add(writeTimeoutSocket); + + Object isExternalSes = ReflectionUtil.getPrivateFieldValue(writeTimeoutSocket, "isExternalSes"); + assertTrue(isExternalSes instanceof Boolean); + assertTrue((Boolean) isExternalSes); + } + + @Test + public void testExternalSesOnClose() throws Exception { + Socket socket = new Socket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + WriteTimeoutSocket writeTimeoutSocket = new WriteTimeoutSocket(socket, 10000, ses); + writeTimeoutSockets.add(writeTimeoutSocket); + writeTimeoutSocket.close(); + + assertFalse(ses.isShutdownNowMethodCalled); + } + + @Test + public void testDefaultSesOnClose() throws Exception { + Socket socket = new Socket(); + CustomScheduledThreadPoolExecutor ses = new CustomScheduledThreadPoolExecutor(1); + WriteTimeoutSocket writeTimeoutSocket = new WriteTimeoutSocket(socket, 10000); + writeTimeoutSockets.add(writeTimeoutSocket); + ReflectionUtil.setFieldValue(writeTimeoutSocket, "ses", ses); + + writeTimeoutSocket.close(); + + assertTrue(ses.isShutdownNowMethodCalled); + } + private void testFileDescriptor$(Socket s) throws Exception { try (WriteTimeoutSocket ws = new WriteTimeoutSocket(s, 1000)) { assertNotNull(ws.getFileDescriptor$()); @@ -235,7 +422,63 @@ else if (j == 63) s.close(); } } - + + private TestServer getActiveTestServer(boolean isSSL) { + TestServer server = null; + try { + final TimeoutHandler handler = new TimeoutHandler(); + server = new TestServer(handler, isSSL); + server.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return server; + } + + private int getRandomFreePort() throws IOException { + ServerSocket serverSocket = new ServerSocket(0); + int freePort = serverSocket.getLocalPort(); + serverSocket.close(); + + return freePort; + } + + private void close(ScheduledExecutorService ses) { + if (ses.isTerminated()) { + return; + } + + try { + ses.shutdownNow(); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + private void close(TestServer testServer) { + if (testServer == null || !testServer.isAlive()) { + return; + } + + try { + testServer.quit(); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + private void close(WriteTimeoutSocket writeTimeoutSocket) { + if (writeTimeoutSocket.isClosed()) { + return; + } + + try { + writeTimeoutSocket.close(); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + private static class PublicFileSocket extends Socket { public FileDescriptor getFileDescriptor$() { return new FileDescriptor(); @@ -270,4 +513,26 @@ public void list(String line) throws IOException { ok(); } } + + private static final class CustomScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private boolean isShutdownNowMethodCalled; + private boolean isScheduleMethodCalled; + + public CustomScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize); + } + + @Override + public ScheduledFuture schedule(Callable callable, + long delay, TimeUnit unit) { + isScheduleMethodCalled = true; + return super.schedule(callable, delay, unit); + } + + @Override + public List shutdownNow() { + isShutdownNowMethodCalled = true; + return super.shutdownNow(); + } + } } diff --git a/providers/imap/src/main/java/com/sun/mail/imap/package-info.java b/providers/imap/src/main/java/com/sun/mail/imap/package-info.java index 8e9e0794..e3615f57 100644 --- a/providers/imap/src/main/java/com/sun/mail/imap/package-info.java +++ b/providers/imap/src/main/java/com/sun/mail/imap/package-info.java @@ -333,6 +333,24 @@ * * * + * mail.imap.executor.writetimeout + * java.util.concurrent.ScheduledExecutorService + * Provides specific ScheduledExecutorService for mail.imap.writetimeout option. + * The value of mail.imap.writetimeout shouldn't be a null. + * For provided executor pool it is highly recommended to have set up in true + * {@link java.util.concurrent.ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}. + * Without it, write methods will create garbage that would only be reclaimed after the timeout. + * Be careful with calling {@link java.util.concurrent.ScheduledThreadPoolExecutor#shutdownNow()} in your executor, + * it can kill the running tasks. It would be ok to use shutdownNow only when JavaMail sockets are closed. + * This would be all service subclasses ({@link jakarta.mail.Store}/{@link jakarta.mail.Transport}) + * Invoking run {@link java.lang.Runnable#run()} on the returned {@link java.util.concurrent.Future} objects + * would force close the open connections. + * Instead of shutdownNow you can use {@link java.util.concurrent.ScheduledThreadPoolExecutor#shutdown()} ()} + * and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor#awaitTermination(long, java.util.concurrent.TimeUnit)} ()}. + * + * + * * mail.imap.statuscachetimeout * int * Timeout value in milliseconds for cache of STATUS command response. diff --git a/providers/pop3/src/main/java/com/sun/mail/pop3/package-info.java b/providers/pop3/src/main/java/com/sun/mail/pop3/package-info.java index d7129087..d884d477 100644 --- a/providers/pop3/src/main/java/com/sun/mail/pop3/package-info.java +++ b/providers/pop3/src/main/java/com/sun/mail/pop3/package-info.java @@ -234,6 +234,25 @@ * * * + * mail.pop3.executor.writetimeout + * java.util.concurrent.ScheduledExecutorService + * Provides specific ScheduledExecutorService for mail.pop3.writetimeout option. + * The value of mail.pop3.writetimeout shouldn't be a null. + * For provided executor pool it is highly recommended to have set up in true + * {@link java.util.concurrent.ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}. + * Without it, write methods will create garbage that would only be reclaimed after the timeout. + * Be careful with calling {@link java.util.concurrent.ScheduledThreadPoolExecutor#shutdownNow()} in your executor, + * it can kill the running tasks. It would be ok to use shutdownNow only when JavaMail sockets are closed. + * This would be all service subclasses ({@link jakarta.mail.Store}/{@link jakarta.mail.Transport}) + * Invoking run {@link java.lang.Runnable#run()} on the returned {@link java.util.concurrent.Future} objects + * would force close the open connections. + * Instead of shutdownNow you can use {@link java.util.concurrent.ScheduledThreadPoolExecutor#shutdown()} ()} + * and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor#awaitTermination(long, java.util.concurrent.TimeUnit)} ()}. + * + * + * + * * mail.pop3.rsetbeforequit * boolean * diff --git a/providers/smtp/src/main/java/com/sun/mail/smtp/package-info.java b/providers/smtp/src/main/java/com/sun/mail/smtp/package-info.java index 3fc68573..a9dd60c8 100644 --- a/providers/smtp/src/main/java/com/sun/mail/smtp/package-info.java +++ b/providers/smtp/src/main/java/com/sun/mail/smtp/package-info.java @@ -219,13 +219,32 @@ * int * Socket write timeout value in milliseconds. * This timeout is implemented by using a - * java.util.concurrent.ScheduledExecutorService per connection + * {@link java.util.concurrent.ScheduledExecutorService} per connection * that schedules a thread to close the socket if the timeout expires. * Thus, the overhead of using this timeout is one thread per connection. * Default is infinite timeout. * * * + * mail.smtp.executor.writetimeout + * java.util.concurrent.ScheduledExecutorService + * Provides specific ScheduledExecutorService for mail.smtp.writetimeout option. + * The value of mail.smtp.writetimeout shouldn't be a null. + * For provided executor pool it is highly recommended to have set up in true + * {@link java.util.concurrent.ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}. + * Without it, write methods will create garbage that would only be reclaimed after the timeout. + * Be careful with calling {@link java.util.concurrent.ScheduledThreadPoolExecutor#shutdownNow()} in your executor, + * it can kill the running tasks. It would be ok to use shutdownNow only when JavaMail sockets are closed. + * This would be all service subclasses ({@link jakarta.mail.Store}/{@link jakarta.mail.Transport}) + * Invoking run {@link java.lang.Runnable#run()} on the returned {@link java.util.concurrent.Future} objects + * would force close the open connections. + * Instead of shutdownNow you can use {@link java.util.concurrent.ScheduledThreadPoolExecutor#shutdown()} ()} + * and + * {@link java.util.concurrent.ScheduledThreadPoolExecutor#awaitTermination(long, java.util.concurrent.TimeUnit)} ()}. + * + * + * + * * mail.smtp.from * String *