Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nio http server transport #29587

Merged
merged 81 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from 74 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
17c904a
Add server context
Tim-Brooks Jan 17, 2018
6c8cd44
WIP
Tim-Brooks Jan 17, 2018
d9d995b
Do not extend autocloseable
Tim-Brooks Jan 18, 2018
88bca9a
But keep close method
Tim-Brooks Jan 18, 2018
c4db506
Remove multiple context getters
Tim-Brooks Jan 18, 2018
4c3bf37
WIP
Tim-Brooks Jan 18, 2018
dfd1901
Merge remote-tracking branch 'upstream/master' into layer_http
Tim-Brooks Jan 18, 2018
c4e1c50
Merge branch 'master' into layer_http
Tim-Brooks Feb 5, 2018
b5a374a
Do not depend on netty module
Tim-Brooks Feb 5, 2018
408c87c
Pull over tests
Tim-Brooks Feb 5, 2018
0f74aa0
Work on bytes producer
Tim-Brooks Feb 6, 2018
44be919
Merge remote-tracking branch 'upstream/master' into layer_http
Tim-Brooks Feb 24, 2018
913028e
Comments
Tim-Brooks Feb 26, 2018
df53494
Merge remote-tracking branch 'upstream/master' into layer_http
Tim-Brooks Feb 28, 2018
9072a45
WIP
Tim-Brooks Mar 5, 2018
5c89990
Merge remote-tracking branch 'upstream/master' into layer_http
Tim-Brooks Mar 20, 2018
8bccda3
WIP
Tim-Brooks Mar 21, 2018
26bdadf
Merge remote-tracking branch 'upstream/master' into layer_http
Tim-Brooks Mar 21, 2018
99ad1fd
Work on refactoring
Tim-Brooks Mar 21, 2018
3540e5c
Continue op refactor
Tim-Brooks Mar 21, 2018
bcc776a
WIP
Tim-Brooks Mar 22, 2018
1e86182
Work on fixing tests
Tim-Brooks Mar 22, 2018
c6d01b7
Work on tests
Tim-Brooks Mar 24, 2018
234e965
get simple tests passing
Tim-Brooks Mar 24, 2018
4e9ab9d
WIP
Tim-Brooks Mar 26, 2018
6b16068
Close write producer
Tim-Brooks Mar 26, 2018
c3c9c4f
Move bytes writer
Tim-Brooks Mar 26, 2018
fe4c961
Move producer
Tim-Brooks Mar 26, 2018
8f5622d
Remove imports
Tim-Brooks Mar 26, 2018
e7fc228
Merge remote-tracking branch 'upstream/master' into layer_http
Tim-Brooks Apr 3, 2018
0bd3810
Remove http stuff
Tim-Brooks Apr 3, 2018
6bd2d91
Extract stuff to super
Tim-Brooks Apr 4, 2018
c761bce
Fix tests
Tim-Brooks Apr 4, 2018
e196257
Fix warning
Tim-Brooks Apr 5, 2018
35daf28
Merge remote-tracking branch 'upstream/master' into remove_http
Tim-Brooks Apr 5, 2018
ac7ba90
Changes based on review
Tim-Brooks Apr 11, 2018
b1047b0
Work on netty adaptor
Tim-Brooks Apr 12, 2018
10c75aa
Start http adapting work
Tim-Brooks Apr 12, 2018
21dbd7e
Work on implementing http
Tim-Brooks Apr 13, 2018
4fe6870
Work on server transport
Tim-Brooks Apr 13, 2018
70f50c8
Work on config
Tim-Brooks Apr 14, 2018
3e56692
WIP
Tim-Brooks Apr 16, 2018
a94d6f6
WIP
Tim-Brooks Apr 16, 2018
e0aa6bf
Work on setting up transport
Tim-Brooks Apr 16, 2018
56b0cf5
Make basic http tests pass
Tim-Brooks Apr 17, 2018
1c2e5d6
Wip
Tim-Brooks Apr 18, 2018
86438d1
Fix issue
Tim-Brooks Apr 18, 2018
7efb126
Fix checkstyle
Tim-Brooks Apr 18, 2018
a96dc7f
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks Apr 18, 2018
8dfe123
Fix checkstyle
Tim-Brooks Apr 18, 2018
9c77802
Fix forbidden
Tim-Brooks Apr 18, 2018
30301b0
headers
Tim-Brooks Apr 18, 2018
dfdd446
Update setting
Tim-Brooks Apr 18, 2018
b3dc74c
Fix third party audit
Tim-Brooks Apr 18, 2018
eeeb8fd
Fix tests
Tim-Brooks Apr 19, 2018
e4380f2
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks Apr 19, 2018
6a73065
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks Apr 25, 2018
c18f87f
Update
Tim-Brooks Apr 25, 2018
4960e16
Fix issue
Tim-Brooks Apr 25, 2018
bb486b3
Fix compile issue
Tim-Brooks Apr 26, 2018
0c7d64a
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 1, 2018
b29b80c
Cleanups
Tim-Brooks May 2, 2018
94c748b
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 2, 2018
be40373
Move tests to abstract tests class
Tim-Brooks May 2, 2018
996b557
Remove unnecessary methods
Tim-Brooks May 2, 2018
683142c
WIP
Tim-Brooks May 3, 2018
ec56f62
Fix warning
Tim-Brooks May 3, 2018
95debd9
Fix warning
Tim-Brooks May 3, 2018
bd81563
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 4, 2018
dd3f129
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 4, 2018
6f64963
Close and wait
Tim-Brooks May 7, 2018
29a942f
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 8, 2018
2a40a6c
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 9, 2018
839f071
Actualy close the channel
Tim-Brooks May 10, 2018
570c6ed
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 11, 2018
f66801f
Work on read write tests
Tim-Brooks May 14, 2018
e2564c0
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 15, 2018
52ef47e
Changes from review
Tim-Brooks May 15, 2018
c6d084e
A few more tests
Tim-Brooks May 15, 2018
408b845
Remove unused imports
Tim-Brooks May 15, 2018
3f01084
Merge remote-tracking branch 'upstream/master' into add_http_back
Tim-Brooks May 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,13 @@
package org.elasticsearch.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class BytesChannelContext extends SocketChannelContext {

private final ReadConsumer readConsumer;
private final InboundChannelBuffer channelBuffer;
private final LinkedList<BytesWriteOperation> queued = new LinkedList<>();
private final AtomicBoolean isClosing = new AtomicBoolean(false);

public BytesChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer<Exception> exceptionHandler,
ReadConsumer readConsumer, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler);
this.readConsumer = readConsumer;
this.channelBuffer = channelBuffer;
ReadWriteHandler handler, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler, handler, channelBuffer);
}

@Override
Expand All @@ -56,55 +44,30 @@ public int read() throws IOException {

channelBuffer.incrementIndex(bytesRead);

int bytesConsumed = Integer.MAX_VALUE;
while (bytesConsumed > 0 && channelBuffer.getIndex() > 0) {
bytesConsumed = readConsumer.consumeReads(channelBuffer);
channelBuffer.release(bytesConsumed);
}
handleReadBytes();

return bytesRead;
}

@Override
public void sendMessage(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
if (isClosing.get()) {
listener.accept(null, new ClosedChannelException());
return;
}

BytesWriteOperation writeOperation = new BytesWriteOperation(this, buffers, listener);
SocketSelector selector = getSelector();
if (selector.isOnCurrentThread() == false) {
selector.queueWrite(writeOperation);
return;
}

selector.queueWriteInChannelBuffer(writeOperation);
}

@Override
public void queueWriteOperation(WriteOperation writeOperation) {
getSelector().assertOnSelectorThread();
queued.add((BytesWriteOperation) writeOperation);
}

@Override
public void flushChannel() throws IOException {
getSelector().assertOnSelectorThread();
int ops = queued.size();
if (ops == 1) {
singleFlush(queued.pop());
} else if (ops > 1) {
multiFlush();
boolean lastOpCompleted = true;
FlushOperation flushOperation;
while (lastOpCompleted && (flushOperation = getPendingFlush()) != null) {
try {
if (singleFlush(flushOperation)) {
currentFlushOperationComplete();
} else {
lastOpCompleted = false;
}
} catch (IOException e) {
currentFlushOperationFailed(e);
throw e;
}
}
}

@Override
public boolean hasQueuedWriteOps() {
getSelector().assertOnSelectorThread();
return queued.isEmpty() == false;
}

@Override
public void closeChannel() {
if (isClosing.compareAndSet(false, true)) {
Expand All @@ -117,51 +80,12 @@ public boolean selectorShouldClose() {
return isPeerClosed() || hasIOException() || isClosing.get();
}

@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (channel.isOpen()) {
IOException channelCloseException = null;
try {
super.closeFromSelector();
} catch (IOException e) {
channelCloseException = e;
}
// Set to true in order to reject new writes before queuing with selector
isClosing.set(true);
channelBuffer.close();
for (BytesWriteOperation op : queued) {
getSelector().executeFailedListener(op.getListener(), new ClosedChannelException());
}
queued.clear();
if (channelCloseException != null) {
throw channelCloseException;
}
}
}

private void singleFlush(BytesWriteOperation headOp) throws IOException {
try {
int written = flushToChannel(headOp.getBuffersToWrite());
headOp.incrementIndex(written);
} catch (IOException e) {
getSelector().executeFailedListener(headOp.getListener(), e);
throw e;
}

if (headOp.isFullyFlushed()) {
getSelector().executeListener(headOp.getListener(), null);
} else {
queued.push(headOp);
}
}

private void multiFlush() throws IOException {
boolean lastOpCompleted = true;
while (lastOpCompleted && queued.isEmpty() == false) {
BytesWriteOperation op = queued.pop();
singleFlush(op);
lastOpCompleted = op.isFullyFlushed();
}
/**
* Returns a boolean indicating if the operation was fully flushed.
*/
private boolean singleFlush(FlushOperation flushOperation) throws IOException {
int written = flushToChannel(flushOperation.getBuffersToWrite());
flushOperation.incrementIndex(written);
return flushOperation.isFullyFlushed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.nio;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;

public abstract class BytesWriteHandler implements ReadWriteHandler {

private static final List<FlushOperation> EMPTY_LIST = Collections.emptyList();

public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
if (message instanceof ByteBuffer[]) {
return new FlushReadyWrite(context, (ByteBuffer[]) message, listener);
} else {
throw new IllegalArgumentException("This channel only supports messages that are of type: " + ByteBuffer[].class);
}
}

public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
assert writeOperation instanceof FlushReadyWrite : "Write operation must be flush ready";
return Collections.singletonList((FlushReadyWrite) writeOperation);
}

public List<FlushOperation> pollFlushOperations() {
return EMPTY_LIST;
}

public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@
import java.util.Arrays;
import java.util.function.BiConsumer;

public class BytesWriteOperation implements WriteOperation {
public class FlushOperation {

private final SocketChannelContext channelContext;
private final BiConsumer<Void, Throwable> listener;
private final ByteBuffer[] buffers;
private final int[] offsets;
private final int length;
private int internalIndex;

public BytesWriteOperation(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
this.channelContext = channelContext;
public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
this.listener = listener;
this.buffers = buffers;
this.offsets = new int[buffers.length];
Expand All @@ -46,16 +44,10 @@ public BytesWriteOperation(SocketChannelContext channelContext, ByteBuffer[] buf
length = offset;
}

@Override
public BiConsumer<Void, Throwable> getListener() {
return listener;
}

@Override
public SocketChannelContext getChannel() {
return channelContext;
}

public boolean isFullyFlushed() {
assert length >= internalIndex : "Should never have an index that is greater than the length [length=" + length + ", index="
+ internalIndex + "]";
Expand Down Expand Up @@ -84,5 +76,4 @@ public ByteBuffer[] getBuffersToWrite() {

return postIndexBuffers;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.nio;

import java.nio.ByteBuffer;
import java.util.function.BiConsumer;

public class FlushReadyWrite extends FlushOperation implements WriteOperation {

private final SocketChannelContext channelContext;
private final ByteBuffer[] buffers;

FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
super(buffers, listener);
this.channelContext = channelContext;
this.buffers = buffers;
}

@Override
public SocketChannelContext getChannel() {
return channelContext;
}

@Override
public ByteBuffer[] getObject() {
return buffers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.nio;

import java.io.IOException;
import java.util.List;
import java.util.function.BiConsumer;

/**
* Implements the application specific logic for handling inbound and outbound messages for a channel.
*/
public interface ReadWriteHandler {

/**
* This method is called when a message is queued with a channel. It can be called from any thread.
* This method should validate that the message is a valid type and return a write operation object
* to be queued with the channel
*
* @param context the channel context
* @param message the message
* @param listener the listener to be called when the message is sent
* @return the write operation to be queued
*/
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance we can make this typesafe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I have thought about and I don't think it is very easy to do in this PR. The ReadWriteHandler would need to be parameterized. And then the channel context would need to be parameterized. And then the channel.

I think down the line it might be more possible. With implementing an "http" channel type that is a subclass of a niochannel (similar to how we have a tcp channel type). But I think that would be a lot more code and better to do as follow-up work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++


/**
* This method is called on the event loop thread. It should serialize a write operation object to bytes
* that can be flushed to the raw nio channel.
*
* @param writeOperation to be converted to bytes
* @return the operations to flush the bytes to the channel
*/
List<FlushOperation> writeToBytes(WriteOperation writeOperation);

/**
* Returns any flush operations that are ready to flush. This exists as a way to check if any flush
* operations were produced during a read call.
*
* @return flush operations
*/
List<FlushOperation> pollFlushOperations();

/**
* This method handles bytes that have been read from the network. It should return the number of bytes
* consumed so that they can be released.
*
* @param channelBuffer of bytes read from the network
* @return the number of bytes consumed
* @throws IOException if an exception occurs
*/
int consumeReads(InboundChannelBuffer channelBuffer) throws IOException;

void close() throws IOException;
}
Loading