Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unused objects not collected in Flow.transform #3197

Closed
Vladiatro opened this issue Feb 20, 2022 · 1 comment
Closed

Unused objects not collected in Flow.transform #3197

Vladiatro opened this issue Feb 20, 2022 · 1 comment
Assignees
Labels

Comments

@Vladiatro
Copy link

@Vladiatro Vladiatro commented Feb 20, 2022

I produce chunks of data with flow {} and then transform them into smaller parts as in the following example:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        generate().flattenToBooleans().collect()
    }
}

fun generate() = flow {
    emit((1..99999).toMutableList())
    delay(99999)
}

fun Flow<MutableList<Int>>.flattenToBooleans() = transform { list ->
    list.forEach { emit(it == 99999) }
}

suspend fun Flow<Boolean>.collect() = collect { isLast ->
    if (isLast) {
        println("the last has been collected")
    }
}

I expect the initial list of 99999 integers to be removed by the GC after it is transformed to booleans, but for some reason, it is still kept in the memory after "the last has been collected" is printed:
image

If I manually clear the list, the objects are removed and everything works normally:

// ...
fun Flow<MutableList<Int>>.flattenToBooleans() = transform { list ->
    list.forEach { emit(it == 99999) }
    list.clear()
}
// ...

image

If I send another list of integers in flow {}, the first one is expectedly removed from the heap, but the new one is still present somewhere in another place:
image

I expect the processed elements to be removed by the GC as soon as possible when using transform to free up the space for the other coroutines. This is the code example where commenting out list.clear() makes the program run out of the heap space with -Xmx500m:

fun main() {
    runBlocking {
        repeat(3) {
            launch {
                generate().flattenToBooleans().collect()
            }
            delay(500)
        }
    }
}

fun generate() = flow {
    emit((1..9999999).toMutableList())
    delay(99999)
}

fun Flow<MutableList<Int>>.flattenToBooleans() = transform { list ->
    list.forEach { emit(it == 9999999) }
    list.clear() // comment this line to make the code fail with -Xmx500m
}

suspend fun Flow<Boolean>.collect() = collect { isLast ->
    if (isLast) {
        println("last collected")
    }
}

Kotlin 1.6.10, coroutines 1.6.0, azul-15.0.6 JVM on an ARM macOS.

@qwwdfsad qwwdfsad added the flow label Feb 21, 2022
@qwwdfsad qwwdfsad self-assigned this Feb 21, 2022
@qwwdfsad
Copy link
Member

@qwwdfsad qwwdfsad commented Feb 21, 2022

Thanks for the detailed report!

Self-contained reproducer:

@Test
fun foo() = runBlocking {
    val flow = flow {
        emit(listOf(239))
        expect(2)
        hang {}
    }
    val j = flow.transform { l -> l.forEach { _ -> emit(42) } }.onEach { expect (1) }.launchIn(this)
    yield()
    expect(3)
    FieldWalker.assertReachableCount(0, j) { it == 239 }
    j.cancelAndJoin()
    finish(4)
}

qwwdfsad added a commit that referenced this issue Feb 21, 2022
…ry leak that regular coroutines (e.g. unsafe flow) are not prone to

Fixes #3197
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants