Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow builder does not work from suspend fun main (IllegalStateException) #2044

Closed
elizarov opened this issue May 21, 2020 · 1 comment
Closed
Assignees

Comments

@elizarov
Copy link
Contributor

elizarov commented May 21, 2020

The bug was introduced by #2028. Run this code:

import kotlinx.coroutines.flow.*

suspend fun main() {
    flow {
        emit(1)
    }.collect {
        println(it)
    }
}

Result: Exception in thread "main" java.lang.IllegalStateException: Context cannot be checked for liveness because it does not have a job: EmptyCoroutineContext

As a larger problem, now implementation details of the flow are leaking outside of it. If the flow is cancellable via flow { ... } builder or cancellable() operation, it cannot be used from the empty context.

The proposed solution is to fix ensureActive so that it does not throw on empty context.

@elizarov elizarov changed the title Flow does not work from suspend fun main Flow builder does not work from suspend fun main (IllegalStateException) May 21, 2020
@fluidsonic
Copy link

I just ran into this issue when using my library fluid-mongo from suspend fun main().
It wraps a reactive stream as a Flow using kotlinx-coroutines-reactive.

Wrapping everything in coroutineScope { … } works around the issue.


Exception in thread "main" java.lang.IllegalStateException: Context cannot be checked for liveness because it does not have a job: EmptyCoroutineContext
	at kotlinx.coroutines.JobKt__JobKt.ensureActive(Job.kt:603)
	at kotlinx.coroutines.JobKt.ensureActive(Unknown Source)
	at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:88)
	at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:122)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:111)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:308)
	at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:395)
	at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.completeResumeReceive(AbstractChannel.kt:872)
	at kotlinx.coroutines.channels.ArrayChannel.offerInternal(ArrayChannel.kt:83)
	at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:147)
	at kotlinx.coroutines.reactive.ReactiveSubscriber.onNext(ReactiveFlow.kt:117)
	at com.mongodb.reactivestreams.client.internal.AbstractSubscription.onNext(AbstractSubscription.java:152)
	at com.mongodb.reactivestreams.client.internal.AbstractSubscription.processResultsQueue(AbstractSubscription.java:227)
	at com.mongodb.reactivestreams.client.internal.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:182)
	at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.lambda$requestMoreData$1(MongoIterableSubscription.java:99)
	at com.mongodb.internal.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:191)
	at com.mongodb.internal.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:134)
	at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85)
	at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.lambda$requestInitialData$0(MongoIterableSubscription.java:50)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:92)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.operation.FindOperation$3.onResult(FindOperation.java:727)
	at com.mongodb.internal.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:411)
	at com.mongodb.internal.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:251)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:84)
	at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$2.onResult(DefaultConnectionPool.java:517)
	at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:398)
	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:375)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:676)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:643)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:513)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:230)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:213)
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
	at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:568)
	at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
	at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:136)
	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:109)
	at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:510)
	at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:75)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:633)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:618)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:513)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:230)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:213)
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:443)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:193)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:215)
	at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:830)

@elizarov elizarov assigned elizarov and unassigned qwwdfsad Jul 2, 2020
@elizarov elizarov mentioned this issue Jul 16, 2020
1 task
elizarov added a commit that referenced this issue Sep 11, 2020
This is leftover from #2044 fix.

Fixes #2241
elizarov added a commit that referenced this issue Sep 11, 2020
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants