Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Sources/NIO/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class EmbeddedChannelCore: ChannelCore {
}

var outboundBuffer: [IOData] = []
var pendingOutboundBuffer: [(IOData, EventLoopPromise<Void>?)] = []
var inboundBuffer: [NIOAny] = []

func localAddress0() throws -> SocketAddress {
Expand Down Expand Up @@ -215,11 +216,16 @@ class EmbeddedChannelCore: ChannelCore {
return
}

addToBuffer(buffer: &outboundBuffer, data: data)
promise?.succeed(result: ())
self.pendingOutboundBuffer.append((data, promise))
}

func flush0() {
let pendings = self.pendingOutboundBuffer
self.pendingOutboundBuffer.removeAll()
for dataAndPromise in pendings {
self.addToBuffer(buffer: &self.outboundBuffer, data: dataAndPromise.0)
dataAndPromise.1?.succeed(result: ())
}
}

func read0() {
Expand Down Expand Up @@ -268,7 +274,7 @@ public class EmbeddedChannel: Channel {
try close().wait()
(self.eventLoop as! EmbeddedEventLoop).run()
try throwIfErrorCaught()
return !channelcore.outboundBuffer.isEmpty || !channelcore.inboundBuffer.isEmpty
return !channelcore.outboundBuffer.isEmpty || !channelcore.inboundBuffer.isEmpty || !channelcore.pendingOutboundBuffer.isEmpty
}

private var _pipeline: ChannelPipeline!
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOWebSocket/WebSocketFrameDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ public final class WebSocketFrameDecoder: ByteToMessageDecoder {
let frame = WebSocketFrame(fin: true,
opcode: .connectionClose,
data: data)
_ = ctx.write(self.wrapOutboundOut(frame)).then {
ctx.close()
ctx.writeAndFlush(self.wrapOutboundOut(frame)).whenComplete {
ctx.close(promise: nil)
}
ctx.fireErrorCaught(error)
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOHTTP1Tests/HTTPResponseCompressorTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class HTTPResponseCompressorTest: XCTestCase {
try sendRequest(acceptEncoding: nil, channel: channel)
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok)
let writePromise: EventLoopPromise<Void> = channel.eventLoop.newPromise()
channel.write(NIOAny(HTTPServerResponsePart.head(head)), promise: writePromise)
channel.writeAndFlush(NIOAny(HTTPServerResponsePart.head(head)), promise: writePromise)
channel.pipeline.removeHandlers()
try writePromise.futureResult.wait()
}
Expand Down
76 changes: 38 additions & 38 deletions Tests/NIOHTTP1Tests/HTTPServerPipelineHandlerTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Unblock by sending a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// Two requests should have made it through now.
XCTAssertEqual(self.readRecorder.reads,
Expand All @@ -147,8 +147,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Now send the last response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// Now all three.
XCTAssertEqual(self.readRecorder.reads,
Expand Down Expand Up @@ -176,8 +176,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertEqual(self.readCounter.readCount, 1)

// Send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// This should have automatically triggered a call to read(), but only one.
XCTAssertEqual(self.readCounter.readCount, 2)
Expand All @@ -201,8 +201,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertEqual(self.readCounter.readCount, 1)

// Send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// This should have not triggered a call to read.
XCTAssertEqual(self.readCounter.readCount, 1)
Expand All @@ -213,8 +213,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertEqual(self.readCounter.readCount, 1)

// Now send in the last response, and see the read go through.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())
XCTAssertEqual(self.readCounter.readCount, 2)
}

Expand All @@ -228,8 +228,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertEqual(self.readCounter.readCount, 1)

// Now the server sends a response immediately.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// We're still moving forward and can read.
XCTAssertEqual(self.readCounter.readCount, 1)
Expand Down Expand Up @@ -261,8 +261,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Unblock by sending a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// Two requests should have made it through now.
XCTAssertEqual(self.readRecorder.reads,
Expand All @@ -272,8 +272,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Now send the last response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// Now the half-closure should be delivered.
XCTAssertEqual(self.readRecorder.reads,
Expand Down Expand Up @@ -301,8 +301,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Unblock by sending a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// The second request head, followed by the half-close, should have made it through.
XCTAssertEqual(self.readRecorder.reads,
Expand Down Expand Up @@ -331,15 +331,15 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertEqual(self.readCounter.readCount, 1)

// Send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// This should have not triggered a call to read.
XCTAssertEqual(self.readCounter.readCount, 1)

// Now send in the last response. This should also not issue a read.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())
XCTAssertEqual(self.readCounter.readCount, 1)
}

Expand All @@ -356,8 +356,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Unblock by sending a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// Two requests should have made it through now. Still no half-closure.
XCTAssertEqual(self.readRecorder.reads,
Expand Down Expand Up @@ -495,8 +495,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Now send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// No further events should have happened.
XCTAssertEqual(self.readRecorder.reads,
Expand All @@ -520,8 +520,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertTrue(self.channel.isActive)

// Now send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

// still missing the request .end
XCTAssertTrue(self.channel.isActive)
Expand Down Expand Up @@ -552,8 +552,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertTrue(self.channel.isActive)

// Now send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

XCTAssertFalse(self.channel.isActive)

Expand All @@ -577,14 +577,14 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
.channelRead(HTTPServerRequestPart.end(nil))])

// Now send the response .head.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())

XCTAssertTrue(self.channel.isActive)
self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
XCTAssertTrue(self.channel.isActive)

// Now send the response .end.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())
XCTAssertFalse(self.channel.isActive)

XCTAssertEqual([HTTPServerResponsePart.head(self.responseHead),
Expand All @@ -603,7 +603,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
[.channelRead(HTTPServerRequestPart.head(self.requestHead))])

// Now send the response .head.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())

XCTAssertTrue(self.channel.isActive)
self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
Expand All @@ -618,7 +618,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertTrue(self.channel.isActive)

// Response .end.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())
XCTAssertFalse(self.channel.isActive)

XCTAssertEqual([HTTPServerResponsePart.head(self.responseHead),
Expand All @@ -639,14 +639,14 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
[.channelRead(HTTPServerRequestPart.head(self.requestHead))])

// Now send the response .head.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())

XCTAssertTrue(self.channel.isActive)
self.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
XCTAssertTrue(self.channel.isActive)

// Response .end.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())
XCTAssertTrue(self.channel.isActive)

// Request .end.
Expand Down Expand Up @@ -685,8 +685,8 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
XCTAssertTrue(self.channel.isActive)

// Now send a response.
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.write(HTTPServerResponsePart.end(nil)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.head(self.responseHead)).wait())
XCTAssertNoThrow(try channel.writeAndFlush(HTTPServerResponsePart.end(nil)).wait())

XCTAssertFalse(self.channel.isActive)

Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/ChannelPipelineTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class ChannelPipelineTest: XCTestCase {

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
ctx.write(self.wrapOutboundOut(data.map { $0 * -1 }), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(data.map { $0 * -1 }), promise: nil)
ctx.fireChannelRead(self.wrapInboundOut(data))
}
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ public class ChannelTests: XCTestCase {
try pipeline.eventLoop.submit { () -> Channel in
XCTAssertTrue(pipeline.channel is DeadChannel)
return pipeline.channel
}.wait().write(NIOAny(())).wait()
}.wait().writeAndFlush(NIOAny(())).wait()
XCTFail("shouldn't have been reached")
} catch let e as ChannelError where e == .ioOnClosedChannel {
// OK
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/EmbeddedChannelTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extension EmbeddedChannelTest {
("testEmbeddedChannelAndPipelineAndChannelCoreShareTheEventLoop", testEmbeddedChannelAndPipelineAndChannelCoreShareTheEventLoop),
("testSendingIncorrectDataOnEmbeddedChannel", testSendingIncorrectDataOnEmbeddedChannel),
("testActiveWhenConnectPromiseFiresAndInactiveWhenClosePromiseFires", testActiveWhenConnectPromiseFiresAndInactiveWhenClosePromiseFires),
("testWriteWithoutFlushDoesNotWrite", testWriteWithoutFlushDoesNotWrite),
]
}
}
Expand Down
16 changes: 15 additions & 1 deletion Tests/NIOTests/EmbeddedChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class EmbeddedChannelTest: XCTestCase {
let channel = EmbeddedChannel()

do {
try channel.write(NIOAny(5)).wait()
try channel.writeAndFlush(NIOAny(5)).wait()
XCTFail("Did not throw")
} catch ChannelError.writeDataUnsupported {
// All good
Expand Down Expand Up @@ -174,4 +174,18 @@ class EmbeddedChannelTest: XCTestCase {
channel.close(promise: closePromise)
try closePromise.futureResult.wait()
}

func testWriteWithoutFlushDoesNotWrite() throws {
let channel = EmbeddedChannel()

var buf = ByteBufferAllocator().buffer(capacity: 1)
buf.write(bytes: [1])
let writeFuture = channel.write(buf)
XCTAssertNil(channel.readOutbound())
XCTAssertFalse(writeFuture.isFulfilled)
channel.flush()
XCTAssertNotNil(channel.readOutbound())
XCTAssertTrue(writeFuture.isFulfilled)
XCTAssertNoThrow(try XCTAssertFalse(channel.finish()))
}
}
3 changes: 2 additions & 1 deletion Tests/NIOTests/SocketChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ public class SocketChannelTest : XCTestCase {

let serverChannel = try ServerBootstrap(group: group).bind(host: "127.0.0.1", port: 0).wait()
do {
try serverChannel.write("test").wait()
try serverChannel.writeAndFlush("test").wait()
XCTFail("did not throw")
} catch let err as ChannelError where err == .operationUnsupported {
// expected
}
Expand Down
6 changes: 3 additions & 3 deletions Tests/NIOWebSocketTests/EndToEndTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ extension EmbeddedChannel {
func writeString(_ string: String) -> EventLoopFuture<Void> {
var buffer = self.allocator.buffer(capacity: string.utf8.count)
buffer.write(string: string)
return self.write(buffer)
return self.writeAndFlush(buffer)
}
}

Expand Down Expand Up @@ -332,10 +332,10 @@ class EndToEndTests: XCTestCase {

// Let's send a frame or two, to confirm that this works.
let dataFrame = WebSocketFrame(fin: true, opcode: .binary, data: data)
XCTAssertNoThrow(try client.write(dataFrame).wait())
XCTAssertNoThrow(try client.writeAndFlush(dataFrame).wait())

let pingFrame = WebSocketFrame(fin: true, opcode: .ping, data: client.allocator.buffer(capacity: 0))
XCTAssertNoThrow(try client.write(pingFrame).wait())
XCTAssertNoThrow(try client.writeAndFlush(pingFrame).wait())
interactInMemory(client, server)

XCTAssertEqual(recorder.frames, [dataFrame, pingFrame])
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOWebSocketTests/WebSocketFrameDecoderTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class WebSocketFrameDecoderTest: XCTestCase {
}

private func frameForFrame(_ frame: WebSocketFrame) -> WebSocketFrame? {
self.encoderChannel.write(frame, promise: nil)
self.encoderChannel.writeAndFlush(frame, promise: nil)

while case .some(.byteBuffer(let d)) = self.encoderChannel.readOutbound() {
XCTAssertNoThrow(try self.decoderChannel.writeInbound(d))
Expand Down
Loading