Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import kotlinx.atomicfu.update
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.sync.Mutex
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

Expand All @@ -35,6 +36,7 @@ internal abstract class AbstractBufferedReadChannel(
private val currSegment: AtomicRef<Segment?> = atomic(null)

private val readOp: AtomicRef<CancellableContinuation<Boolean>?> = atomic(null)
private val readOpMutex = Mutex(locked = false)

private val _closed: AtomicRef<ClosedSentinel?> = atomic(null)
protected val closed: ClosedSentinel?
Expand All @@ -52,33 +54,34 @@ internal abstract class AbstractBufferedReadChannel(
get() = _availableForRead.value

/**
* Suspend reading until at least [requested] bytes are available to read or the channel is closed.
* If the requested amount can be fulfilled immediately this function will return without suspension.
* Suspend reading until at least one byte is available to read or the channel is closed. If there is already at
* least one byte available, this function will return without suspension.
*/
protected suspend fun readSuspend(requested: Int): Boolean {
protected suspend fun readSuspend(): Boolean {
// can fulfill immediately without suspension
if (availableForRead >= requested) return true
if (availableForRead > 0) return true

readOpMutex.lock()

closed?.let { closed ->
// if already closed - rethrow
closed.cause?.let { rethrowClosed(it) }

// no more data is coming
return availableForRead >= requested
readOpMutex.unlock()
return availableForRead > 0
}

return suspendCancellableCoroutine { cont ->
setReadContinuation(cont)
if (availableForRead > 0) {
readOpMutex.unlock()
return true
}
}

private fun setReadContinuation(cont: CancellableContinuation<Boolean>) {
val success = readOp.compareAndSet(null, cont)
check(success) { "Read operation already in progress" }
}

private fun resumeRead() {
readOp.getAndSet(null)?.resume(true)
return suspendCancellableCoroutine { cont ->
val success = readOp.compareAndSet(null, cont)
readOpMutex.unlock()
check(success) { "Read operation already in progress" }
}
}

/**
Expand Down Expand Up @@ -183,7 +186,7 @@ internal abstract class AbstractBufferedReadChannel(
var remaining = length

do {
if (!readSuspend(1)) {
if (!readSuspend()) {
throw ClosedReceiveChannelException("Unexpeced EOF: expected $remaining more bytes")
}

Expand All @@ -204,7 +207,7 @@ internal abstract class AbstractBufferedReadChannel(
}

private suspend fun readAvailableSuspend(dest: ByteArray, offset: Int, length: Int): Int {
if (!readSuspend(1)) {
if (!readSuspend()) {
return -1
}
return readAvailable(dest, offset, length)
Expand All @@ -225,11 +228,24 @@ internal abstract class AbstractBufferedReadChannel(
// advertise bytes available
_availableForRead.getAndAdd(bytesIn.size)

resumeRead()
readOpMutex.withSpinLock {
readOp.getAndSet(null)?.resume(true)
}
}

private inline fun <T> Mutex.withSpinLock(block: () -> T): T {
while (!tryLock()) {
// spin
}
return try {
block()
} finally {
unlock()
}
}

override suspend fun awaitContent() {
readSuspend(1)
readSuspend()
}

override fun cancel(cause: Throwable?): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal class BufferedReadChannelImpl(
}

private suspend fun readAvailableSuspend(dest: ByteBuffer): Int {
if (!readSuspend(1)) {
if (!readSuspend()) {
return -1
}
return readAvailable(dest)
Expand Down