Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Commit

Permalink
Deprecate parTraverseN/parSequenceN with semaphore limit as Long
Browse files Browse the repository at this point in the history
  • Loading branch information
satyamagarwal committed Dec 14, 2020
1 parent aff2bec commit e0f85f5
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
15 changes: 15 additions & 0 deletions PULL_REQUEST_TEMPLATE
@@ -0,0 +1,15 @@
## Issue
[n](https://github.com/arrow-kt/arrow-fx/issues/n)

## Status
**READY/IN DEVELOPMENT/HOLD**

## Description
<!-- A few sentences describing the overall goals of the pull request's commits. -->

## Todos
- [ ] Tests
- [ ] Documentation

## Related PRs
* None
Expand Up @@ -23,9 +23,18 @@ import kotlin.coroutines.EmptyCoroutineContext
*
* Cancelling this operation cancels all running tasks
*/
@Deprecated("use parSequenceN with n as Int type", ReplaceWith("parSequenceN", "arrow.fx.coroutines.parSequenceN"))
suspend fun <A> Iterable<suspend () -> A>.parSequenceN(n: Long): List<A> =
parSequenceN(Dispatchers.Default, n)

/**
* Sequences all tasks in [n] parallel processes on [Dispatchers.Default] and return the result.
*
* Cancelling this operation cancels all running tasks
*/
suspend fun <A> Iterable<suspend () -> A>.parSequenceN(n: Int): List<A> =
parSequenceN(Dispatchers.Default, n)

/**
* Sequences all tasks in [n] parallel processes and return the result.
*
Expand All @@ -35,8 +44,25 @@ suspend fun <A> Iterable<suspend () -> A>.parSequenceN(n: Long): List<A> =
*
* Cancelling this operation cancels all running tasks
*/
@Deprecated("use parSequenceN with n as Int type", ReplaceWith("parTraverseN", "arrow.fx.coroutines.parSequenceN"))
suspend fun <A> Iterable<suspend () -> A>.parSequenceN(ctx: CoroutineContext = EmptyCoroutineContext, n: Long): List<A> {
val s = Semaphore(n.toInt())
val s = Semaphore(Math.toIntExact(n))
return parTraverse(ctx) {
s.withPermit { it.invoke() }
}
}

/**
* Sequences all tasks in [n] parallel processes and return the result.
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [ctx] argument.
* If the combined context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* **WARNING** If the combined context has a single threaded [ContinuationInterceptor], this function will not run in parallel.
*
* Cancelling this operation cancels all running tasks
*/
suspend fun <A> Iterable<suspend () -> A>.parSequenceN(ctx: CoroutineContext = EmptyCoroutineContext, n: Int): List<A> {
val s = Semaphore(n)
return parTraverse(ctx) {
s.withPermit { it.invoke() }
}
Expand Down Expand Up @@ -103,9 +129,17 @@ suspend fun <A> Iterable<suspend () -> A>.parSequence(ctx: CoroutineContext = Em
* Traverses this [Iterable] and runs [f] in [n] parallel operations on [Dispatchers.Default].
* Cancelling this operation cancels all running tasks.
*/
@Deprecated("use parTraverseN with n as Int type", ReplaceWith("parTraverseN", "arrow.fx.coroutines.parTraverseN"))
suspend fun <A, B> Iterable<A>.parTraverseN(n: Long, f: suspend (A) -> B): List<B> =
parTraverseN(Dispatchers.Default, n, f)

/**
* Traverses this [Iterable] and runs [f] in [n] parallel operations on [Dispatchers.Default].
* Cancelling this operation cancels all running tasks.
*/
suspend fun <A, B> Iterable<A>.parTraverseN(n: Int, f: suspend (A) -> B): List<B> =
parTraverseN(Dispatchers.Default, n, f)

/**
* Traverses this [Iterable] and runs [f] in [n] parallel operations on [ctx].
*
Expand All @@ -115,12 +149,33 @@ suspend fun <A, B> Iterable<A>.parTraverseN(n: Long, f: suspend (A) -> B): List<
*
* Cancelling this operation cancels all running tasks.
*/
@Deprecated("use parTraverseN with n as Int type", ReplaceWith("parTraverseN", "arrow.fx.coroutines.parTraverseN"))
suspend fun <A, B> Iterable<A>.parTraverseN(
ctx: CoroutineContext = EmptyCoroutineContext,
n: Long,
f: suspend (A) -> B
): List<B> {
val s = Semaphore(n.toInt())
val s = Semaphore(Math.toIntExact(n))
return parTraverse(ctx) { a ->
s.withPermit { f(a) }
}
}

/**
* Traverses this [Iterable] and runs [f] in [n] parallel operations on [ctx].
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [ctx] argument.
* If the combined context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* **WARNING** If the combined context has a single threaded [ContinuationInterceptor], this function will not run in parallel.
*
* Cancelling this operation cancels all running tasks.
*/
suspend fun <A, B> Iterable<A>.parTraverseN(
ctx: CoroutineContext = EmptyCoroutineContext,
n: Int,
f: suspend (A) -> B
): List<B> {
val s = Semaphore(n)
return parTraverse(ctx) { a ->
s.withPermit { f(a) }
}
Expand Down
@@ -1,6 +1,7 @@
package arrow.fx.coroutines

import arrow.core.Either
import io.kotest.assertions.throwables.shouldThrowExactly
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
Expand Down Expand Up @@ -73,6 +74,17 @@ class ParTraverseTest : ArrowFxSpec(spec = {
}
}

"parTraverseN throws validation error as semaphore limit is greater than Int max value" {
val ref = Atomic(0)
shouldThrowExactly<ArithmeticException> {
(0 until 100).parTraverseN(Long.MAX_VALUE) {
ref.update { it + 1 }
}

ref.get() shouldBe 100
}
}

"parTraverseN can traverse effect full computations" {
val ref = Atomic(0)
(0 until 100).parTraverseN(5) {
Expand Down Expand Up @@ -160,4 +172,24 @@ class ParTraverseTest : ArrowFxSpec(spec = {
val l = (0 until count).parTraverseN(20) { it }
l shouldBe (0 until count).toList()
}

"parSequenceN throws validation error as semaphore limit is greater than Int max value" {
val ref = Atomic(0)
shouldThrowExactly<ArithmeticException> {
(0 until 100)
.map { suspend { ref.update { it + 1 } } }
.parSequenceN(Long.MAX_VALUE)

ref.get() shouldBe 100
}
}

"parSequenceN can traverse effect full computations" {
val ref = Atomic(0)
(0 until 100)
.map { suspend { ref.update { it + 1 } } }
.parSequenceN(5)

ref.get() shouldBe 100
}
})

0 comments on commit e0f85f5

Please sign in to comment.