Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ParMapN for IO up to arity 9 #1951

Merged
merged 7 commits into from Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IO.kt
Expand Up @@ -50,7 +50,7 @@ typealias IOProcF<A> = ((Either<Throwable, A>) -> Unit) -> IOOf<Unit>
@Suppress("StringLiteralDuplication")
sealed class IO<out A> : IOOf<A> {

companion object : IOParMap2, IOParMap3, IORacePair, IORaceTriple {
companion object : IOParMap, IORacePair, IORaceTriple {

/**
* Delay a suspended effect.
Expand Down
315 changes: 315 additions & 0 deletions modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt
@@ -0,0 +1,315 @@
package arrow.fx

import arrow.core.Either
import arrow.core.Left
import arrow.core.None
import arrow.core.Option
import arrow.core.Right
import arrow.core.Tuple2
import arrow.core.Tuple3
import arrow.core.extensions.option.applicative.applicative
import arrow.core.internal.AtomicRefW
import arrow.core.nonFatalOrThrow
import arrow.core.none
import arrow.core.some
import arrow.fx.internal.IOForkedStart
import arrow.fx.internal.Platform
import kotlin.coroutines.CoroutineContext
import arrow.core.extensions.option.applicativeError.handleError
import arrow.core.internal.AtomicBooleanW

/** Mix-in to enable `parMapN` 2-arity on IO's companion directly. */
interface IOParMap {

fun <A, B, C> parMapN(fa: IOOf<A>, fb: IOOf<B>, f: (A, B) -> C): IO<C> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, f)

fun <A, B, C, D> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, f: (A, B, C) -> D): IO<D> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, f)

fun <A, B, C, D, E> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, f: (A, B, C, D) -> E): IOOf<E> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, f)

/**
* @see parMapN
*/
fun <A, B, C, D, E, G> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, f: (A, B, C, D, E) -> G): IO<G> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, f)

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, f: (A, B, C, D, E, G) -> H): IO<H> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, f)

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H, I> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, fh: IOOf<H>, f: (A, B, C, D, E, G, H) -> I): IO<I> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, f)

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H, I, J> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, fh: IOOf<H>, fi: IOOf<I>, f: (A, B, C, D, E, G, H, I) -> J): IO<J> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, fi, f)

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H, I, J, K> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, fh: IOOf<H>, fi: IOOf<I>, fj: IOOf<J>, f: (A, B, C, D, E, G, H, I, J) -> K): IO<K> =
IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, fi, fj, f)

fun <A, B, C> parMapN(ctx: CoroutineContext, fa: IOOf<A>, fb: IOOf<B>, f: (A, B) -> C): IO<C> = IO.Async(true) { conn, cb ->
// Used to store Throwable, Either<A, B> or empty (null). (No sealed class used for a slightly better performing ParMap2)
val state = AtomicRefW<Any?>(null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: How would a sealed class negatively impact performance? Is it the extra wrapping of the throwable and either cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very valid question! It's only slightly if simply used for parMapN but parMapN is what powers traverse so it can potentially be infinite wrappings if you traverse over Sequence.

The slightly more complex code is hidden & private.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Btw: traverse over an infinite sequence with parApplicative won't even start any code because it will fold the entire sequence first, you need to have lazyAp to actually execute code before folding is complete 🙈

This is not even true atm because it just stackoverflows, which is a bug (I'll open a ticket, because I have no idea how to fix it...). But still even without the stackoverflow parApplicative cannot be used on infinite sequences.

Copy link
Member Author

@nomisRev nomisRev Jan 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a fix for IO's parallel ops not being stack-safe a week ago. So with this change, and your lazyAp change why wouldn't it work?

#1928

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this:

override fun <A, B> Kind<F, A>.lazyAp(ff: () -> Kind<F, (A) -> B>): Kind<F, B> = ap(ff())

Without properly implementing this method the applicative cannot short-circuit before the entire IO is constructed. Only after executing ap fully for every element it will start evaluating the IO. If you don't want this behaviour you need to defer calling the argument in lazyAp somehow. E.q. Option does this by first checking if it is Some or None and runs the function only if it is Some. Same for Either, Validated and the normal IO applicative


val connA = IOConnection()
val connB = IOConnection()

conn.pushPair(connA, connB)

fun complete(a: A, b: B) {
conn.pop()
cb(try {
Either.Right(f(a, b))
} catch (e: Throwable) {
Either.Left(e.nonFatalOrThrow())
})
}

fun sendError(other: IOConnection, e: Throwable) = when (state.getAndSet(e)) {
is Throwable -> Unit // Do nothing we already finished
else -> other.cancel().fix().unsafeRunAsync { r ->
conn.pop()
cb(Left(r.fold({ e2 -> Platform.composeErrors(e, e2) }, { e })))
}
}

IORunLoop.startCancelable(IOForkedStart(fa, ctx), connA) { resultA ->
resultA.fold({ e ->
sendError(connB, e)
}, { a ->
when (val oldState = state.getAndSet(Left(a))) {
null -> Unit // Wait for B
is Throwable -> Unit // ParMapN already failed and A was cancelled.
is Either.Left<*> -> Unit // Already state.getAndSet
is Either.Right<*> -> complete(a, (oldState as Either.Right<B>).b)
}
})
}

IORunLoop.startCancelable(IOForkedStart(fb, ctx), connB) { resultB ->
resultB.fold({ e ->
sendError(connA, e)
}, { b ->
when (val oldState = state.getAndSet(Right(b))) {
null -> Unit // Wait for A
is Throwable -> Unit // ParMapN already failed and B was cancelled.
is Either.Right<*> -> Unit // IO cannot finish twice
is Either.Left<*> -> complete((oldState as Either.Left<A>).a, b)
}
})
}
}

fun <A, B, C, D> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
f: (A, B, C) -> D
): IO<D> = IO.Async(true) { conn, cb ->

val state: AtomicRefW<Option<Tuple3<Option<A>, Option<B>, Option<C>>>> = AtomicRefW(None)
val active = AtomicBooleanW(true)

val connA = IOConnection()
val connB = IOConnection()
val connC = IOConnection()

// Composite cancelable that cancels all ops.
// NOTE: conn.pop() called when cb gets called below in complete.
conn.push(connA.cancel(), connB.cancel(), connC.cancel())

fun complete(a: A, b: B, c: C) {
conn.pop()
val result: Either<Throwable, D> = try {
Either.Right(f(a, b, c))
} catch (e: Throwable) {
Either.Left(e.nonFatalOrThrow())
}
cb(result)
}

fun tryComplete(result: Option<Tuple3<Option<A>, Option<B>, Option<C>>>): Unit =
result.fold({ Unit }, { (a, b, c) -> Option.applicative().map(a, b, c) { (a, b, c) -> complete(a, b, c) } })

fun sendError(other: IOConnection, other2: IOConnection, e: Throwable) =
if (active.getAndSet(false)) { // We were already cancelled so don't do anything.
other.cancel().fix().unsafeRunAsync { r1 ->
other2.cancel().fix().unsafeRunAsync { r2 ->
conn.pop()
cb(Left(r1.fold({ e2 ->
r2.fold({ e3 -> Platform.composeErrors(e, e2, e3) }, { Platform.composeErrors(e, e2) })
}, {
r2.fold({ e3 -> Platform.composeErrors(e, e3) }, { e })
})))
}
}
} else Unit

IORunLoop.startCancelable(IOForkedStart(fa, ctx), connA) { resultA ->
resultA.fold({ e ->
sendError(connB, connC, e)
}, { a ->
tryComplete(state.updateAndGet { current ->
current
.map { it.copy(a = a.some()) }
.handleError { Tuple3(a.some(), none(), none()) }
})
})
}

IORunLoop.startCancelable(IOForkedStart(fb, ctx), connB) { resultB ->
resultB.fold({ e ->
sendError(connA, connC, e)
}, { b ->
tryComplete(state.updateAndGet { current ->
current
.map { it.copy(b = b.some()) }
.handleError { Tuple3(none(), b.some(), none()) }
})
})
}

IORunLoop.startCancelable(IOForkedStart(fc, ctx), connC) { resultC ->
resultC.fold({ e ->
sendError(connA, connB, e)
}, { c ->
tryComplete(state.updateAndGet { current ->
current
.map { it.copy(c = c.some()) }
.handleError { Tuple3(none(), none(), c.some()) }
})
})
}
}

/**
* @see parMapN
*/
fun <A, B, C, D, E> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
fd: IOOf<D>,
f: (A, B, C, D) -> E
): IOOf<E> = parMapN(ctx,
parMapN(ctx, fa, fb, ::Tuple2),
parMapN(ctx, fc, fd, ::Tuple2)
) { (a, b), (c, d) ->
f(a, b, c, d)
}

/**
* @see parMapN
*/
fun <A, B, C, D, E, G> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
fd: IOOf<D>,
fe: IOOf<E>,
f: (A, B, C, D, E) -> G
): IO<G> = parMapN(ctx,
parMapN(ctx, fa, fb, fc, ::Tuple3),
parMapN(ctx, fd, fe, ::Tuple2)
) { (a, b, c), (d, e) ->
f(a, b, c, d, e)
}

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
fd: IOOf<D>,
fe: IOOf<E>,
fg: IOOf<G>,
f: (A, B, C, D, E, G) -> H
): IO<H> = parMapN(ctx,
parMapN(ctx, fa, fb, fc, ::Tuple3),
parMapN(ctx, fd, fe, fg, ::Tuple3)
) { (a, b, c), (d, e, g) ->
f(a, b, c, d, e, g)
}

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H, I> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
fd: IOOf<D>,
fe: IOOf<E>,
fg: IOOf<G>,
fh: IOOf<H>,
f: (A, B, C, D, E, G, H) -> I
): IO<I> = parMapN(ctx,
parMapN(ctx, fa, fb, fc, ::Tuple3),
parMapN(ctx, fd, fe, ::Tuple2),
parMapN(ctx, fg, fh, ::Tuple2)) { (a, b, c), (d, e), (g, h) ->
f(a, b, c, d, e, g, h)
}

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H, I, J> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
fd: IOOf<D>,
fe: IOOf<E>,
fg: IOOf<G>,
fh: IOOf<H>,
fi: IOOf<I>,
f: (A, B, C, D, E, G, H, I) -> J
): IO<J> = parMapN(ctx,
parMapN(ctx, fa, fb, fc, ::Tuple3),
parMapN(ctx, fd, fe, fg, ::Tuple3),
parMapN(ctx, fh, fi, ::Tuple2)) { (a, b, c), (d, e, g), (h, i) ->
f(a, b, c, d, e, g, h, i)
}

/**
* @see parMapN
*/
fun <A, B, C, D, E, G, H, I, J, K> parMapN(
ctx: CoroutineContext,
fa: IOOf<A>,
fb: IOOf<B>,
fc: IOOf<C>,
fd: IOOf<D>,
fe: IOOf<E>,
fg: IOOf<G>,
fh: IOOf<H>,
fi: IOOf<I>,
fj: IOOf<J>,
f: (A, B, C, D, E, G, H, I, J) -> K
): IO<K> = parMapN(ctx,
parMapN(ctx, fa, fb, fc, ::Tuple3),
parMapN(ctx, fd, fe, fg, ::Tuple3),
parMapN(ctx, fh, fi, fj, ::Tuple3)) { (a, b, c), (d, e, g), (h, i, j) ->
f(a, b, c, d, e, g, h, i, j)
}
}
70 changes: 0 additions & 70 deletions modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap2.kt

This file was deleted.