Skip to content

Commit

Permalink
Jetty 9.4.x 1803 proposal 0 - single ReservedThreadExecutor (#2119)
Browse files Browse the repository at this point in the history
Issue #1803 - Review usage of scheduling strategies

Use a single ReservedThreadExecutor built into the QueuedThreadPool
via new interface TryExecutor.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
gregw authored and sbordet committed Feb 8, 2018
1 parent 9a787b1 commit ab5fc29
Show file tree
Hide file tree
Showing 20 changed files with 204 additions and 184 deletions.
Expand Up @@ -39,7 +39,6 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;

public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Expand Down Expand Up @@ -69,36 +68,19 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);

ReservedThreadExecutor reservedExecutor = provideReservedThreadExecutor(client, executor);

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, reservedExecutor, endPoint,
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener);
return customize(connection, context);
}

protected ReservedThreadExecutor provideReservedThreadExecutor(HTTP2Client client, Executor executor)
{
synchronized (this)
{
ReservedThreadExecutor reservedExecutor = client.getBean(ReservedThreadExecutor.class);
if (reservedExecutor == null)
{
// TODO: see HTTP2Connection.FillableCallback
reservedExecutor = new ReservedThreadExecutor(executor, 0);
client.addManaged(reservedExecutor);
}
return reservedExecutor;
}
}

private class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
private final Promise<Session> promise;
private final Session.Listener listener;

private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.http2.parser.Parser;
Expand All @@ -35,7 +36,7 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.TryExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;

public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
Expand All @@ -51,14 +52,15 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private final int bufferSize;
private final ExecutionStrategy strategy;

public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor.getExecutor());
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor);
// TODO HTTP2 cannot use EWYK without fix for #1803
this.strategy = new EatWhatYouKill(producer, new TryExecutor.NoTryExecutor(executor));
LifeCycle.start(strategy);
}

Expand Down
Expand Up @@ -12,7 +12,6 @@
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="128"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="524288"/></Set>
<Set name="initialSessionRecvWindow"><Property name="jetty.http2.initialSessionRecvWindow" default="1048576"/></Set>
<Set name="reservedThreads"><Property name="jetty.http2.reservedThreads" default="-1"/></Set>
</New>
</Arg>
</Call>
Expand Down
4 changes: 0 additions & 4 deletions jetty-http2/http2-server/src/main/config/modules/http2.mod
Expand Up @@ -27,7 +27,3 @@ etc/jetty-http2.xml
## Initial session receive window (client to server)
# jetty.http2.initialSessionRecvWindow=1048576
## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads)
# jetty.http2.reservedThreads=-1
Expand Up @@ -35,7 +35,6 @@
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;

@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
Expand All @@ -49,7 +48,6 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private int reservedThreads;

public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
Expand Down Expand Up @@ -143,20 +141,23 @@ public void setStreamIdleTimeout(long streamIdleTimeout)
}

/**
* @see ReservedThreadExecutor
* @return The number of reserved threads
* @return -1
* @deprecated
*/
@ManagedAttribute("The number of threads reserved for high priority tasks")
@Deprecated
public int getReservedThreads()
{
return reservedThreads;
return -1;
}

/**
* @param threads ignored
* @deprecated
*/
@Deprecated
public void setReservedThreads(int threads)
{
// TODO: see also HTTP2Connection.FillableCallback.
// TODO: currently disabled since the only value that works is 0.
// this.reservedThreads = threads;
throw new UnsupportedOperationException();
}

public HttpConfiguration getHttpConfiguration()
Expand All @@ -183,30 +184,14 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());

ReservedThreadExecutor executor = provideReservedThreadExecutor(connector);

ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor,
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
}

protected ReservedThreadExecutor provideReservedThreadExecutor(Connector connector)
{
synchronized (this)
{
ReservedThreadExecutor executor = getBean(ReservedThreadExecutor.class);
if (executor == null)
{
executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads());
addManaged(executor);
}
return executor;
}
}

protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);

protected ServerParser newServerParser(Connector connector, ServerParser.Listener listener)
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.http.BadMessageException;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;

public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
Expand Down Expand Up @@ -94,7 +94,7 @@ public static boolean isSupportedProtocol(String protocol)
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;

public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
Expand Down
39 changes: 19 additions & 20 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
Expand Up @@ -48,7 +48,6 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;

Expand Down Expand Up @@ -77,7 +76,7 @@ public ManagedSelector(SelectorManager selectorManager, int id)
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
_strategy = new EatWhatYouKill(producer,executor);
addBean(_strategy,true);
setStopTimeout(5000);
}
Expand Down Expand Up @@ -136,17 +135,17 @@ protected void doStop() throws Exception

/**
* Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()}
* @param action
* @param update The selector update to apply at next wakeup
*/
public void submit(SelectorUpdate action)
public void submit(SelectorUpdate update)
{
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", action, this);
LOG.debug("Queued change {} on {}", update, this);

Selector selector = null;
synchronized(ManagedSelector.this)
{
_updates.offer(action);
_updates.offer(update);

if (_selecting)
{
Expand Down Expand Up @@ -265,15 +264,15 @@ public void dump(Appendable out, String indent) throws IOException
{
Selector selector = _selector;
List<String> keys = null;
List<SelectorUpdate> actions = null;
List<SelectorUpdate> updates = null;
if (selector != null && selector.isOpen())
{
DumpKeys dump = new DumpKeys();
String actionsAt;
String updatesAt;
synchronized(ManagedSelector.this)
{
actionsAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
actions = new ArrayList<>(_updates);
updatesAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
updates = new ArrayList<>(_updates);
_updates.addFirst(dump);
_selecting = false;
}
Expand All @@ -282,7 +281,7 @@ public void dump(Appendable out, String indent) throws IOException
String keysAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
if (keys==null)
keys = Collections.singletonList("No dump keys retrieved");
dumpBeans(out, indent, Arrays.asList(new DumpableCollection("actions @ "+actionsAt, actions),
dumpBeans(out, indent, Arrays.asList(new DumpableCollection("updates @ "+updatesAt, updates),
new DumpableCollection("keys @ "+keysAt, keys)));
}
else
Expand All @@ -295,7 +294,7 @@ public void dump(Appendable out, String indent) throws IOException
public String toString()
{
Selector selector = _selector;
return String.format("%s id=%s keys=%d selected=%d actions=%d",
return String.format("%s id=%s keys=%d selected=%d updates=%d",
super.toString(),
_id,
selector != null && selector.isOpen() ? selector.keys().size() : -1,
Expand Down Expand Up @@ -377,16 +376,16 @@ private void processUpdates()
_updateable.clear();

Selector selector;
int actions;
int updates;
synchronized(ManagedSelector.this)
{
actions = _updates.size();
_selecting = actions==0;
updates = _updates.size();
_selecting = updates==0;
selector = _selecting?null:_selector;
}

if (LOG.isDebugEnabled())
LOG.debug("actions {}",actions);
LOG.debug("updates {}",updates);

if (selector != null)
selector.wakeup();
Expand All @@ -405,18 +404,18 @@ private boolean select()
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select, {}/{} selected", selector, selected, selector.keys().size());

int actions;
int updates;
synchronized(ManagedSelector.this)
{
// finished selecting
_selecting = false;
actions = _updates.size();
updates = _updates.size();
}

_keys = selector.selectedKeys();
_cursor = _keys.iterator();
if (LOG.isDebugEnabled())
LOG.debug("Selector {} processing {} keys, {} actions", selector, _keys.size(), actions);
LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);

return true;
}
Expand Down Expand Up @@ -489,7 +488,7 @@ private void updateKeys()
{
// Do update keys for only previously selected keys.
// This will update only those keys whose selection did not cause an
// updateKeys action to be submitted.
// updateKeys update to be submitted.
for (SelectionKey key : _keys)
updateKey(key);
_keys.clear();
Expand Down

0 comments on commit ab5fc29

Please sign in to comment.