Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Sources/NIO/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,19 @@ final class SocketChannel: BaseSocketChannel<Socket> {

assert(self.isActive)
pipeline.fireChannelRead0(NIOAny(buffer))
if mayGrow && i < maxMessagesPerRead {
result = .some

if buffer.writableBytes > 0 {
// If we did not fill the whole buffer with read(...) we should stop reading and wait until we get notified again.
// Otherwise chances are good that the next read(...) call will either read nothing or only a very small amount of data.
// Also this will allow us to call fireChannelReadComplete() which may give the user the chance to flush out all pending
// writes.
return result
} else if mayGrow && i < maxMessagesPerRead {
// if the ByteBuffer may grow on the next allocation due we used all the writable bytes we should allocate a new `ByteBuffer` to allow ramping up how much data
// we are able to read on the next read operation.
buffer = recvAllocator.buffer(allocator: allocator)
}
result = .some
} else {
if inputShutdown {
// We received a EOF because we called shutdown on the fd by ourself, unregister from the Selector and return
Expand Down
13 changes: 11 additions & 2 deletions Tests/NIOTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,7 @@ public class ChannelTests: XCTestCase {

private let promise: EventLoopPromise<Void>
private var readRequested = false
private var channelReadCalled = false

init(_ promise: EventLoopPromise<Void>) {
self.promise = promise
Expand All @@ -1617,8 +1618,16 @@ public class ChannelTests: XCTestCase {
ctx.read()
}

public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
XCTAssertFalse(channelReadCalled)
channelReadCalled = true
ctx.read()
}

public func channelInactive(ctx: ChannelHandlerContext) {
XCTAssertTrue(readRequested, "Should only be called after a read was requested")
XCTAssertTrue(channelReadCalled, "channelRead(...) should have been called before channel became inactive")

promise.succeed(result: ())
}
}
Expand Down Expand Up @@ -1963,12 +1972,12 @@ public class ChannelTests: XCTestCase {
}

func channelReadComplete(ctx: ChannelHandlerContext) {
XCTFail("channelReadComplete unexpected")
XCTAssertEqual(.read, self.state)
self.state = .readComplete
}

func errorCaught(ctx: ChannelHandlerContext, error: Error) {
XCTAssertEqual(.read, self.state)
XCTAssertEqual(.readComplete, self.state)
self.state = .error
ctx.close(promise: nil)
}
Expand Down