Skip to content

Conversation

hangga
Copy link
Contributor

@hangga hangga commented Jun 25, 2024

No description provided.

}
}
.flowOn(Dispatchers.IO)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand the purpose of this? I had a look at https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html

Should we put the flowOn call right after the flow call, rather than after the flatMapMerge? https://stackoverflow.com/questions/75884997/what-is-default-dispatcher-kotlin-flow-function

If not, what is the difference between those two approaches?

Copy link
Contributor Author

@hangga hangga Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I've added it.
Because in this unit test we use runBlocking, so without .flowOn(Dispatchers.IO), then I'm afraid flow will be run in the same context as the coroutine wrapper (in this case runBlocking)
If the operations in a stream are to be executed one by one on the same thread, make it sequential.

Copy link
Contributor

@theangrydev theangrydev Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangga Sorry I wasn't clear. My question is, what is the difference between solutions A and B below:

A:

 val filteredPeople = people.asFlow()
            .flatMapMerge { person ->
                flow {
                    emit(
                        async {
                            person.setAdult()
                            person
                        }.await()
                    )
                    person.setAdult()
                    emit(person)
                }
            }
            .flowOn(Dispatchers.IO)
            .filter { it.age > 15 }.toList()
            .sortedBy { it.age }

B:

 val filteredPeople = people.asFlow()
            .flatMapMerge { person ->
                flow {
                    emit(
                        async {
                            person.setAdult()
                            person
                        }.await()
                    )
                    person.setAdult()
                    emit(person)
                }.flowOn(Dispatchers.IO)
            }
            .filter { it.age > 15 }.toList()
            .sortedBy { it.age }
            

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, @theangrydev. I misunderstood your question.

As I understand it, code A uses the Dispatchers.IO context for the entire flow, which includes flatMapMerge, filter, and subsequent operations. This is more efficient if all operations require execution in IO.

Code B only uses the Dispatchers.IO context inside flatMapMerge, while filter and subsequent operations will be executed in the default context or the context where the Flow was originally created.

Since in unit tests we always use runBlocking to run coroutines, I currently choose to use code A so that the entire flow (including filter and subsequent operations) is executed in IO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now, I also remove async {} inside emit because I think there is no need to create new coroutines inside flow anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangga

In A, can/should we put the flowOn(Dispatchers.IO) call before the flatMapMerge?

It's not very intuitive that the entire flow uses Dispatchers.IO when the flowOn call is made after the flarMapMerge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, you're right.
It makes more sense to call flowOn(Dispatchers.IO) before the flatMapMerge.
Thanks.

Comment on lines 163 to 184
@Test
fun `using ExecutorService for parallel operations`() {
logger.info("Using ExecutorService")
val startTime = Instant.now()

val executor = Executors.newFixedThreadPool(people.size)
val futures = people
.map { person ->
executor.submit(Callable {
person.setAdult()
person
}).get()
}
.filter { it.age > 15 }
.sortedBy { it.age }

executor.shutdown()

startTime.printTotalTime()

futures.assertOver15AndSortedByAge()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially show a solution where we collect a List<Future<T>> and transform that to a List<T>

Any reason for not showing this kind of solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean to be like this?

    val executor = Executors.newFixedThreadPool(people.size)
    val futures: List<Future<Person>> = people
        .map { person ->
            executor.submit(Callable {
                person.setAdult()
                person
            })
        }

    // List<Future<Person>> to List<Person>
    val results = futures
        .map { it.get() }
        .filter { it.age > 15 }
        .sortedBy { it.age }

    executor.shutdown()

    results.assertOver15AndSortedByAge()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangga yes that's what I had in mind. I've used an approach similar to this in the past in plain Java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you agreed, I am more confident now. Because before, I was not sure about what I was doing.

.subscribeOn(Schedulers.computation())
.doOnNext { person ->
person.setAdult()
Thread.sleep(1500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the sleep calls

.subscribeOn(Schedulers.computation())
.doOnNext { person ->
person.setAdult()
Thread.sleep(1500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the sleep calls

@@ -59,7 +54,8 @@ class ParallelOperationCollectionsUnitTest {

val filteredPeople = people
.map { person ->
async {
async(Dispatchers.IO) {
Thread.sleep(1500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the sleep calls

@@ -150,6 +150,7 @@ class ParallelOperationCollectionsUnitTest {
val filteredPeople = people.parallelStream()
.map { person ->
person.setAdult()
Thread.sleep(1500)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the sleep calls

@hangga hangga requested a review from theangrydev July 27, 2024 03:43
@theangrydev theangrydev merged commit 90d8330 into Baeldung:master Jul 29, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants