Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ package com.baeldung.parallelOperationsCollections
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.kotlin.toObservable
import io.reactivex.rxjava3.schedulers.Schedulers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.stream.Collectors


Expand Down Expand Up @@ -46,7 +44,7 @@ class ParallelOperationCollectionsUnitTest {
this.isAdult = this.age >= 18
logger.info(this.toString())
}

private fun Instant.printTotalTime() {
val totalTime = Duration.between(this, Instant.now()).toMillis()
logger.info("Total time taken: {} ms", totalTime)
Expand All @@ -59,7 +57,7 @@ class ParallelOperationCollectionsUnitTest {

val filteredPeople = people
.map { person ->
async {
async(Dispatchers.IO) {
person.setAdult()
person
}
Expand All @@ -79,14 +77,11 @@ class ParallelOperationCollectionsUnitTest {
val startTime = Instant.now()

val filteredPeople = people.asFlow()
.flowOn(Dispatchers.IO)
.flatMapMerge { person ->
flow {
emit(
async {
person.setAdult()
person
}.await()
)
person.setAdult()
emit(person)
}
}
.filter { it.age > 15 }.toList()
Expand All @@ -105,9 +100,11 @@ class ParallelOperationCollectionsUnitTest {
val observable = Observable.fromIterable(people)
.flatMap(
{
Observable.just(it).subscribeOn(Schedulers.computation()).doOnNext { person ->
person.setAdult()
}
Observable.just(it)
.subscribeOn(Schedulers.computation())
.doOnNext { person ->
person.setAdult()
}
}, people.size // Uses maxConcurrency for the number of elements
)
.filter { it.age > 15 }
Expand All @@ -128,9 +125,11 @@ class ParallelOperationCollectionsUnitTest {
val observable = people.toObservable()
.flatMap(
{
Observable.just(it).subscribeOn(Schedulers.computation()).doOnNext { person ->
person.setAdult()
}
Observable.just(it)
.subscribeOn(Schedulers.computation())
.doOnNext { person ->
person.setAdult()
}
}, people.size // Uses maxConcurrency for the number of elements
).filter { it.age > 15 }
.toList()
Expand Down Expand Up @@ -166,21 +165,25 @@ class ParallelOperationCollectionsUnitTest {
val startTime = Instant.now()

val executor = Executors.newFixedThreadPool(people.size)
val futures = people
val futures: List<Future<Person>> = people
.map { person ->
executor.submit(Callable {
person.setAdult()
person
}).get()
})
}

val results = futures
.map { it.get() }
.filter { it.age > 15 }
.sortedBy { it.age }

executor.shutdown()

startTime.printTotalTime()

futures.assertOver15AndSortedByAge()
results.assertOver15AndSortedByAge()
}

}