Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JS - Channel.produce closes channel immediately #2236

Closed
ScottPierce opened this issue Sep 10, 2020 · 1 comment
Closed

JS - Channel.produce closes channel immediately #2236

ScottPierce opened this issue Sep 10, 2020 · 1 comment
Labels

Comments

@ScottPierce
Copy link
Contributor

ScottPierce commented Sep 10, 2020

Kotlin 1.4.0
Kotlinx.coroutines 1.3.9

I'm seeing unexpected behavior from channel.produce where it seems to be exiting / closing immediately. I can only produce this on my multiplatform project inside the JS side, while similar GlobalScope.produce works fine on the jvm. Simply swapping to manual use of a Channel seems to solve the problem.

Below is the context of the usage, the problem, the output, and a workaround. Also here is the project for a full repro:
repro2.zip

Usage

class DeviceRepository internal constructor(
    private val dataSource: DataSource
) : Repository() {
    suspend fun findDevices(): List<Device> = withRepositoryContext {
        println("FindDevices Map")
        dataSource.getConnectedDevices().map { device ->
            async {
                val data = dataSource.getDeviceInfo(device.id)
                println("getDeviceInfo Returned")
                Device(
                    id = device.id,
                    connectionType = device.type,
                    data = data
                )
            }
        }.awaitAll()
    }
}

class CommandLineDataSource : DataSource {
    companion object {
        private val CONNECTED_DEVICES_REGEX = Regex("(\\S+) \\((\\S+)\\)")
    }

    override suspend fun getConnectedDevices(): List<ConnectedDevice> {
        val output = CommandRunner.run("idevice_id")
            .toList()

        println("DataSource: $output")

        return output.map {
            val result = CONNECTED_DEVICES_REGEX.matchEntire(it)!!

            ConnectedDevice(
                id = result.groupValues[1],
                type = result.groupValues[2]
            )
        }
    }
    // ...
}

Problematic Code

internal actual object CommandRunner {
    actual fun run(command: String): Flow<String> {
        return GlobalScope.produce {
            val process = exec(command)

            process.stdout!!.apply {
                setEncoding("utf8")

                on("data") { data: Any ->
                    val message = data.toString()
                    println("Data: $message")
                    launch {
                        send(message)
                    }
                }

                on("end") { data: Any? ->
                    launch {
                        close()
                    }
                }
            }

            process.stderr!!.on("data") { data: Any ->
                val message = data.toString()
                println("Error: $message")
                launch {
                    send(message)
                }
            }

            var processIsExiting = false
            process.on("exit") { _ ->
                processIsExiting = true
                println("Process On Exit")
                launch {
                    close()
                }
            }

            invokeOnClose {
                println("Invoke on Close")
                if (!processIsExiting) {
//                    process.kill()
                }
            }
        }.receiveAsFlow()
    }
}

Output

Notice that invoke on close is called immediately

FindDevices Map
Invoke on Close
DataSource: []
Devices: []
Data: 59cbbfb669c0f2c72692ed25d9e31b271c2ef9c4 (USB)

Equivalent code that fixes it:

internal actual object CommandRunner {
    actual fun run(command: String): Flow<String> {
        val channel = Channel<String>()
        val process = exec(command)

        process.stdout!!.apply {
            setEncoding("utf8")

            on("data") { data: Any ->
                val message = data.toString()
                println("On Data: $message")
                GlobalScope.launch {
                    channel.send(message)
                }
            }

            on("end") { data: Any? ->
                println("On End")
                GlobalScope.launch {
                    channel.close()
                }
            }
        }

        process.stderr!!.on("data") { data: Any ->
            val message = data.toString()
            println("Error: $message")
            GlobalScope.launch {
                channel.send(message)
            }
        }

        var processIsExiting = false
        process.on("exit") { _ ->
            processIsExiting = true
            println("Process On Exit")
            GlobalScope.launch {
                channel.close()
            }
        }

        channel.invokeOnClose {
            println("Invoke on Close")
            if (!processIsExiting) {
                process.kill()
            }
        }

        return channel.receiveAsFlow()
    }
}

Output:

FindDevices Map
On Data: 59cbbfb669c0f2c72692ed25d9e31b271c2ef9c4 (USB)

On End
Invoke on Close
DataSource: [59cbbfb669c0f2c72692ed25d9e31b271c2ef9c4 (USB)
]
@dkhalanskyjb
Copy link
Contributor

This is working as intended.

With the on function, you're saying that, when there's new data available, the following callback should be invoked. The on call doesn't block. So, the coroutine in the produce finishes successfully. From the documentation (https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html):

The channel is closed when the coroutine completes.

In your Java code, we have a different story entirely. The reading of the data from the stream happens inside the produce coroutine, so the channel does not close until the data is wholly read.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants