Skip to content

Commit

Permalink
Write timeout socket / Add the ability to provide own scheduled executor
Browse files Browse the repository at this point in the history
  • Loading branch information
egor-ponomarev authored and lukasj committed Dec 22, 2022
1 parent 2dc4167 commit bcace35
Show file tree
Hide file tree
Showing 10 changed files with 1,051 additions and 41 deletions.
21 changes: 20 additions & 1 deletion core/src/main/java/com/sun/mail/util/PropUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package com.sun.mail.util;

import java.util.*;
import jakarta.mail.Session;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;

/**
* Utilities to make it easier to get property values.
Expand Down Expand Up @@ -56,6 +57,20 @@ public static boolean getBooleanProperty(Properties props,
return getBoolean(getProp(props, name), def);
}

/**
* Get a ScheduledExecutorService valued property.
*
* @param props the properties
* @param name the property name
* @return the property value, null if the property is null
* @throws ClassCastException if the property value's class is
* not {@link java.util.concurrent.ScheduledThreadPoolExecutor }
*/
public static ScheduledExecutorService getScheduledExecutorServiceProperty(Properties props,
String name) {
return getScheduledExecutorService(getProp(props, name));
}

/**
* Get an integer valued property.
*
Expand Down Expand Up @@ -149,6 +164,10 @@ private static int getInt(Object value, int def) {
return def;
}

private static ScheduledExecutorService getScheduledExecutorService(Object value) {
return (ScheduledExecutorService) value;
}

/**
* Interpret the value object as a boolean,
* returning def if unable.
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/com/sun/mail/util/SocketFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -341,7 +342,11 @@ private static Socket createSocket(InetAddress localaddr, int localport,
if (writeTimeout != -1) { // wrap original
if (logger.isLoggable(Level.FINEST))
logger.finest("set socket write timeout " + writeTimeout);
socket = new WriteTimeoutSocket(socket, writeTimeout);
ScheduledExecutorService executorService = PropUtil.getScheduledExecutorServiceProperty(props,
prefix + ".executor.writetimeout");
socket = executorService == null ?
new WriteTimeoutSocket(socket, writeTimeout) :
new WriteTimeoutSocket(socket, writeTimeout, executorService);
}
if (localaddr != null)
socket.bind(new InetSocketAddress(localaddr, localport));
Expand Down
120 changes: 94 additions & 26 deletions core/src/main/java/com/sun/mail/util/WriteTimeoutSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A special Socket that uses a ScheduledExecutorService to
Expand All @@ -50,16 +53,27 @@ public class WriteTimeoutSocket extends Socket {
private final Socket socket;
// to schedule task to cancel write after timeout
private final ScheduledExecutorService ses;
// flag to indicate whether scheduled executor is provided from outside or
// should be created here in constructor
private final boolean isExternalSes;
// the timeout, in milliseconds
private final int timeout;

public WriteTimeoutSocket(Socket socket, int timeout) throws IOException {
this.socket = socket;
// XXX - could share executor with all instances?
this.ses = Executors.newScheduledThreadPool(1);
this.ses = createScheduledThreadPool();
this.isExternalSes = false;
this.timeout = timeout;
}

public WriteTimeoutSocket(Socket socket, int timeout, ScheduledExecutorService ses) throws IOException {
this.socket = socket;
this.ses = ses;
this.timeout = timeout;
this.isExternalSes = true;
}

public WriteTimeoutSocket(int timeout) throws IOException {
this(new Socket(), timeout);
}
Expand Down Expand Up @@ -158,7 +172,7 @@ public InputStream getInputStream() throws IOException {
@Override
public synchronized OutputStream getOutputStream() throws IOException {
// wrap the returned stream to implement write timeout
return new TimeoutOutputStream(socket.getOutputStream(), ses, timeout);
return new TimeoutOutputStream(socket, ses, timeout);
}

@Override
Expand Down Expand Up @@ -261,7 +275,8 @@ public void close() throws IOException {
try {
socket.close();
} finally {
ses.shutdownNow();
if (!isExternalSes)
ses.shutdownNow();
}
}

Expand Down Expand Up @@ -350,6 +365,14 @@ public Set<SocketOption<?>> supportedOptions() {
}
return null;
}

private ScheduledThreadPoolExecutor createScheduledThreadPool() {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
// Without setting setRemoveOnCancelPolicy = true write methods will create garbage that would only be
// reclaimed after the timeout.
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
return scheduledThreadPoolExecutor;
}
}


Expand All @@ -359,22 +382,32 @@ public Set<SocketOption<?>> supportedOptions() {
* socket (aborting the write) if the timeout expires.
*/
class TimeoutOutputStream extends OutputStream {
private static final String WRITE_TIMEOUT_MESSAGE = "Write timed out";
private static final String CANNOT_GET_TIMEOUT_TASK_RESULT_MESSAGE = "Couldn't get result of timeout task";

private final OutputStream os;
private final ScheduledExecutorService ses;
private final Callable<Object> timeoutTask;
private final Callable<String> timeoutTask;
private final int timeout;
private byte[] b1;
private final Socket socket;
// Implement timeout with a scheduled task
private ScheduledFuture<String> sf = null;

public TimeoutOutputStream(OutputStream os0, ScheduledExecutorService ses,
int timeout) throws IOException {
this.os = os0;
public TimeoutOutputStream(Socket socket, ScheduledExecutorService ses, int timeout) throws IOException {
this.os = socket.getOutputStream();
this.ses = ses;
this.timeout = timeout;
timeoutTask = new Callable<Object>() {
this.socket = socket;
timeoutTask = new Callable<String>() {
@Override
public Object call() throws Exception {
os.close(); // close the stream to abort the write
return null;
public String call() throws Exception {
try {
os.close(); // close the stream to abort the write
} catch (Throwable t) {
return t.toString();
}
return WRITE_TIMEOUT_MESSAGE;
}
};
}
Expand All @@ -397,26 +430,61 @@ public synchronized void write(byte[] bs, int off, int len)
return;
}

// Implement timeout with a scheduled task
ScheduledFuture<Object> sf = null;
try {
try {
if (timeout > 0)
sf = ses.schedule(timeoutTask,
timeout, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ex) {
// ignore it; Executor was shut down by another thread,
// the following write should fail with IOException
}
os.write(bs, off, len);
try {
if (timeout > 0)
sf = ses.schedule(timeoutTask,
timeout, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ex) {
if (!socket.isClosed()) {
throw new IOException("Write aborted due to timeout not enforced", ex);
}
}

try {
os.write(bs, off, len);
} catch (IOException e) {
if (sf != null && !sf.cancel(true)) {
throw new IOException(handleTimeoutTaskResult(sf), e);
}
throw e;
}
} finally {
if (sf != null)
sf.cancel(true);
}
sf.cancel(true);
}
}

@Override
public void close() throws IOException {
os.close();
os.close();
if (sf != null) {
sf.cancel(true);
}
}

private String handleTimeoutTaskResult(ScheduledFuture<String> sf) {
boolean wasInterrupted = Thread.interrupted();
String exceptionMessage = null;
try {
return sf.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
exceptionMessage = String.format("%s %s", e, ses.toString());
} catch (CancellationException e) {
exceptionMessage = e.toString();
} catch (InterruptedException e) {
wasInterrupted = true;
exceptionMessage = e.toString();
} catch (ExecutionException e) {
exceptionMessage = e.getCause() == null ? e.toString() : e.getCause().toString();
} catch (Exception e) {
exceptionMessage = e.toString();
} finally {
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}

return String.format("%s. %s", CANNOT_GET_TIMEOUT_TASK_RESULT_MESSAGE, exceptionMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.sun.mail.test;

import java.lang.reflect.Field;

public final class ReflectionUtil {
private ReflectionUtil() {
throw new UnsupportedOperationException();
}

public static Field setFieldValue(Object object,
String fieldName,
Object valueTobeSet) throws NoSuchFieldException, IllegalAccessException {
Field field = getField(object.getClass(), fieldName);
field.setAccessible(true);
field.set(object, valueTobeSet);
return field;
}

public static Object getPrivateFieldValue(Object object,
String fieldName) throws NoSuchFieldException, IllegalAccessException {
Field field = getField(object.getClass(), fieldName);
field.setAccessible(true);
return field.get(object);
}

private static Field getField(Class mClass, String fieldName) throws NoSuchFieldException {
try {
return mClass.getDeclaredField(fieldName);
} catch (NoSuchFieldException e) {
Class superClass = mClass.getSuperclass();
if (superClass == null) {
throw e;
} else {
return getField(superClass, fieldName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

package com.sun.mail.util;

import java.util.Properties;
import jakarta.mail.Session;
import com.sun.mail.util.PropUtil;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

/**
* Test that the PropUtil methods return the correct values,
* especially when defaults and non-String values are considered.
*/
public class PropUtilTest {

@Test
public void testInt() throws Exception {
Properties props = new Properties();
Expand Down Expand Up @@ -179,4 +182,34 @@ public void testSystemBoolean() throws Exception {
System.getProperties().put("testboolean", true);
assertTrue(PropUtil.getBooleanSystemProperty("testboolean", false));
}

@Test
public void testScheduledExecutorWriteTimeout() {
final String executorPropertyName = "test";
ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(1);
try {
Properties props = new Properties();
props.put(executorPropertyName, ses);
assertEquals(ses, PropUtil.getScheduledExecutorServiceProperty(props, executorPropertyName));
} finally {
ses.shutdownNow();
}
}

@Test
public void testScheduledExecutorWriteTimeoutIsNull() {
final String executorPropertyName = "test";
Properties props = new Properties();

assertEquals(null, PropUtil.getScheduledExecutorServiceProperty(props, executorPropertyName));
}

@Test(expected = ClassCastException.class)
public void testScheduledExecutorWriteTimeoutWrongType() {
final String executorPropertyName = "test";
Properties props = new Properties();
props.put(executorPropertyName, new HashSet<>());

PropUtil.getScheduledExecutorServiceProperty(props, executorPropertyName);
}
}
Loading

0 comments on commit bcace35

Please sign in to comment.