diff --git a/infographic/arrow-infographic.txt b/infographic/arrow-infographic.txt index dd0dd8845a4..fe6001a9437 100644 --- a/infographic/arrow-infographic.txt +++ b/infographic/arrow-infographic.txt @@ -8,62 +8,62 @@ #fill: #64B5F6 #.typeclasses: fill=#64B5F6 visual=database bold #.instances: fill=#B9F6CA visual=class italic bold dashed +[Applicative]<-[ApplicativeError] [Functor]<-[Applicative] +[MonadDefer]<-[Async] +[Async]<-[Effect] +[MonadError]<-[MonadDefer] [ApplicativeError]<-[MonadError] [Monad]<-[MonadError] -[Functor]<-[Comonad] -[Applicative]<-[ApplicativeError] -[Functor]<-[Traverse] -[Foldable]<-[Traverse] [Applicative]<-[Monad] [Semigroup]<-[Monoid] [Monad]<-[Bimonad] [Comonad]<-[Bimonad] +[Functor]<-[Comonad] +[Functor]<-[Traverse] +[Foldable]<-[Traverse] [SemigroupK]<-[MonoidK] -[MonadError]<-[MonadDefer] -[MonadDefer]<-[Async] -[Async]<-[Effect] -[Monad]<-[MonadFilter] -[FunctorFilter]<-[MonadFilter] -[Monad]<-[MonadWriter] -[Functor]<-[FunctorFilter] +[MonoidK]<-[MonoidK Instances|ListKMonoidKInstance|OptionTMonoidKInstance|SequenceKMonoidKInstance|SetKMonoidKInstance|WriterTMonoidKInstance] +[SemigroupK]<-[SemigroupK Instances|EitherSemigroupKInstance|ListKSemigroupKInstance|NonEmptyListSemigroupKInstance|OptionTSemigroupKInstance|SequenceKSemigroupKInstance|SetKSemigroupKInstance|StateTSemigroupKInstance|ValidatedSemigroupKInstance|WriterTSemigroupKInstance] +[Bimonad]<-[Bimonad Instances|EvalBimonadInstance|Function0BimonadInstance|IdBimonadInstance|NonEmptyListBimonadInstance] +[Foldable]<-[Foldable Instances|ConstFoldableInstance|EitherFoldableInstance|IdFoldableInstance|OptionFoldableInstance|TryFoldableInstance|Tuple2FoldableInstance|CoproductFoldableInstance|IorFoldableInstance|ListKFoldableInstance|MapKFoldableInstance|NonEmptyListFoldableInstance|OptionTFoldableInstance|SequenceKFoldableInstance|SetKFoldableInstance|SortedMapKFoldableInstance|ValidatedFoldableInstance] +[Traverse]<-[Traverse Instances|ConstTraverseInstance|EitherTraverseInstance|IdTraverseInstance|OptionTraverseInstance|TryTraverseInstance|Tuple2TraverseInstance|CoproductTraverseInstance|IorTraverseInstance|ListKTraverseInstance|MapKTraverseInstance|NonEmptyListTraverseInstance|OptionTTraverseInstance|SequenceKTraverseInstance|SortedMapKTraverseInstance|ValidatedTraverseInstance] +[Eq]<-[Eq Instances|ConstEqInstance|EitherEqInstance|IdEqInstance|OptionEqInstance|TryEqInstance|Tuple10EqInstance|Tuple2EqInstance|Tuple3EqInstance|Tuple4EqInstance|Tuple5EqInstance|Tuple6EqInstance|Tuple7EqInstance|Tuple8EqInstance|Tuple9EqInstance|IorEqInstance|ListKEqInstance|MapKEqInstance|NonEmptyListEqInstance|SequenceKEqInstance|SetKEqInstance|ValidatedEqInstance] +[Show]<-[Show Instances|ConstShowInstance|EitherShowInstance|IdShowInstance|OptionShowInstance|TryShowInstance|Tuple10ShowInstance|Tuple2ShowInstance|Tuple3ShowInstance|Tuple4ShowInstance|Tuple5ShowInstance|Tuple6ShowInstance|Tuple7ShowInstance|Tuple8ShowInstance|Tuple9ShowInstance|IorShowInstance|ListKShowInstance|MapKShowInstance|NonEmptyListShowInstance|SequenceKShowInstance|SetKShowInstance|SortedMapKShowInstance|ValidatedShowInstance] +[Comonad]<-[Comonad Instances|EvalComonadInstance|Function0ComonadInstance|IdComonadInstance|Tuple2ComonadInstance|CofreeComonadInstance|CoproductComonadInstance|NonEmptyListComonadInstance] [Monad]<-[MonadReader] +[MonadReader]<-[MonadReader Instances|Function1MonadReaderInstance|KleisliMonadReaderInstance] +[Functor]<-[FunctorFilter] +[FunctorFilter]<-[FunctorFilter Instances|ListKFunctorFilterInstance|OptionTFunctorFilterInstance] [Traverse]<-[TraverseFilter] [FunctorFilter]<-[TraverseFilter] -[Monad]<-[MonadState] +[TraverseFilter]<-[TraverseFilter Instances|ConstTraverseFilterInstance|OptionTraverseFilterInstance|OptionTTraverseFilterInstance] [MonadFilter]<-[MonadCombine] [Alternative]<-[MonadCombine] +[MonadCombine]<-[MonadCombine Instances|ListKMonadCombineInstance|StateTMonadCombineInstance] +[Monad]<-[MonadState] +[MonadState]<-[MonadState Instances|StateTMonadStateInstance] +[Monad]<-[MonadFilter] +[FunctorFilter]<-[MonadFilter] +[MonadFilter]<-[MonadFilter Instances|ListKMonadFilterInstance|OptionMonadFilterInstance|WriterTMonadFilterInstance] +[Monad]<-[MonadWriter] +[MonadWriter]<-[MonadWriter Instances|WriterTMonadWriterInstance] +[FilterIndex]<-[FilterIndex Instances|ListKFilterIndexInstance|MapKFilterIndexInstance|NonEmptyListFilterIndexInstance|SequenceKFilterIndexInstance] +[Index]<-[Index Instances|ListKIndexInstance|MapKIndexInstance|NonEmptyListIndexInstance|SequenceKIndexInstance] +[At]<-[At Instances|MapKAtInstance|SetKAtInstance] +[Each]<-[Each Instances|EitherEachInstance|ListKEachInstance|MapKEachInstance|NonEmptyListEachInstance|OptionEachInstance|SequenceKEachInstance|TryEachInstance] [Recursive]<-[Birecursive] [Corecursive]<-[Birecursive] -[MonoidK]<-[MonoidK Instances|OptionTMonoidKInstance|ListKMonoidKInstance|SetKMonoidKInstance|SequenceKMonoidKInstance|WriterTMonoidKInstance] -[SemigroupK]<-[SemigroupK Instances|EitherSemigroupKInstance|OptionTSemigroupKInstance|SequenceKSemigroupKInstance|WriterTSemigroupKInstance|StateTSemigroupKInstance|NonEmptyListSemigroupKInstance|SetKSemigroupKInstance|ValidatedSemigroupKInstance|ListKSemigroupKInstance] -[Eq]<-[Eq Instances|Tuple7EqInstance|Tuple5EqInstance|Tuple2EqInstance|EitherEqInstance|Tuple9EqInstance|Tuple3EqInstance|TryEqInstance|Tuple6EqInstance|ConstEqInstance|IdEqInstance|OptionEqInstance|Tuple10EqInstance|Tuple4EqInstance|Tuple8EqInstance|NonEmptyListEqInstance|ListKEqInstance|SetKEqInstance|IorEqInstance|SequenceKEqInstance|ValidatedEqInstance|MapKEqInstance] -[Show]<-[Show Instances|Tuple3ShowInstance|IdShowInstance|OptionShowInstance|Tuple8ShowInstance|ConstShowInstance|EitherShowInstance|Tuple4ShowInstance|TryShowInstance|Tuple2ShowInstance|Tuple9ShowInstance|Tuple6ShowInstance|Tuple5ShowInstance|Tuple10ShowInstance|Tuple7ShowInstance|IorShowInstance|MapKShowInstance|SetKShowInstance|SortedMapKShowInstance|SequenceKShowInstance|ListKShowInstance|ValidatedShowInstance|NonEmptyListShowInstance] -[Bimonad]<-[Bimonad Instances|EvalBimonadInstance|Function0BimonadInstance|IdBimonadInstance|NonEmptyListBimonadInstance] -[FilterIndex]<-[FilterIndex Instances|SequenceKFilterIndexInstance|MapKFilterIndexInstance|ListKFilterIndexInstance|NonEmptyListFilterIndexInstance] -[Index]<-[Index Instances|MapKIndexInstance|SequenceKIndexInstance|NonEmptyListIndexInstance|ListKIndexInstance|NonEmptyListFilterIndexInstance|MapKFilterIndexInstance|SequenceKFilterIndexInstance|ListKFilterIndexInstance] -[At]<-[At Instances|MapKAtInstance|SetKAtInstance] -[Each]<-[Each Instances|ListKEachInstance|OptionEachInstance|EitherEachInstance|TryEachInstance|SequenceKEachInstance|NonEmptyListEachInstance|MapKEachInstance] -[Semigroup]<-[Semigroup Instances|ConstSemigroupInstance|OptionSemigroupInstance|SortedMapKSemigroupInstance|ListKSemigroupInstance|MapKSemigroupInstance|SetKSemigroupInstance|SequenceKSemigroupInstance|NonEmptyListSemigroupInstance|IOSemigroupInstance|IOMonoidInstance|EitherSemigroupInstance|TrySemigroupInstance] -[Monoid]<-[Monoid Instances|OptionMonoidInstance|ConstMonoidInstance|Tuple2MonoidInstance|SequenceKMonoidInstance|SortedMapKMonoidInstance|ListKMonoidInstance|MapKMonoidInstance|SetKMonoidInstance|IOMonoidInstance|TryMonoidInstance|EitherMonoidInstance] -[Traverse]<-[Traverse Instances|EitherTraverseInstance|OptionTraverseInstance|Tuple2TraverseInstance|TryTraverseInstance|IdTraverseInstance|ConstTraverseInstance|IorTraverseInstance|MapKTraverseInstance|ValidatedTraverseInstance|CoproductTraverseInstance|ListKTraverseInstance|SequenceKTraverseInstance|OptionTTraverseInstance|NonEmptyListTraverseInstance|SortedMapKTraverseInstance|FlowableKTraverseInstance|ObservableKTraverseInstance|FluxKTraverseInstance] -[Foldable]<-[Foldable Instances|ConstFoldableInstance|IdFoldableInstance|EitherFoldableInstance|TryFoldableInstance|OptionFoldableInstance|Tuple2FoldableInstance|OptionTFoldableInstance|MapKFoldableInstance|SetKFoldableInstance|ListKFoldableInstance|NonEmptyListFoldableInstance|IorFoldableInstance|SequenceKFoldableInstance|SortedMapKFoldableInstance|CoproductFoldableInstance|ValidatedFoldableInstance|MaybeKFoldableInstance|FlowableKFoldableInstance|ObservableKFoldableInstance|FluxKFoldableInstance] -[MonadDefer]<-[MonadDefer Instances|IOMonadDeferInstance|DeferredKMonadDeferInstance|ObservableKMonadDeferInstance|MaybeKMonadDeferInstance|SingleKMonadDeferInstance|FlowableKMonadDeferInstance|FluxKMonadDeferInstance|MonoKMonadDeferInstance] -[Async]<-[Async Instances|IOAsyncInstance|DeferredKAsyncInstance|MaybeKAsyncInstance|SingleKAsyncInstance|ObservableKAsyncInstance|FlowableKAsyncInstance|MonoKAsyncInstance|FluxKAsyncInstance] -[Effect]<-[Effect Instances|IOEffectInstance|DeferredKEffectInstance|MaybeKEffectInstance|ObservableKEffectInstance|SingleKEffectInstance|FlowableKEffectInstance|FluxKEffectInstance|MonoKEffectInstance] -[MonadError]<-[MonadError Instances|EitherMonadErrorInstance|TryMonadErrorInstance|OptionMonadErrorInstance|StateTMonadErrorInstance|KleisliMonadErrorInstance|IOMonadErrorInstance|DeferredKMonadErrorInstance|MaybeKMonadErrorInstance|ObservableKMonadErrorInstance|SingleKMonadErrorInstance|FlowableKMonadErrorInstance|FluxKMonadErrorInstance|MonoKMonadErrorInstance] -[ApplicativeError]<-[ApplicativeError Instances|OptionApplicativeErrorInstance|TryApplicativeErrorInstance|EitherApplicativeErrorInstance|StateTApplicativeErrorInstance|KleisliApplicativeErrorInstance|ValidatedApplicativeErrorInstance|IOApplicativeErrorInstance|DeferredKApplicativeErrorInstance|SingleKApplicativeErrorInstance|ObservableKApplicativeErrorInstance|MaybeKApplicativeErrorInstance|FlowableKApplicativeErrorInstance|FluxKApplicativeErrorInstance|MonoKApplicativeErrorInstance] -[Comonad]<-[Comonad Instances|Tuple2ComonadInstance|IdComonadInstance|Function0ComonadInstance|EvalComonadInstance|CoproductComonadInstance|NonEmptyListComonadInstance|CofreeComonadInstance] -[Applicative]<-[Applicative Instances|Function1ApplicativeInstance|IdApplicativeInstance|Tuple2ApplicativeInstance|Function0ApplicativeInstance|ConstApplicativeInstance|OptionApplicativeInstance|EvalApplicativeInstance|EitherApplicativeInstance|TryApplicativeInstance|ListKApplicativeInstance|ValidatedApplicativeInstance|IorApplicativeInstance|WriterTApplicativeInstance|NonEmptyListApplicativeInstance|KleisliApplicativeInstance|StateTApplicativeInstance|SequenceKApplicativeInstance|OptionTApplicativeInstance|IOApplicativeInstance|FreeApplicativeInstance|FreeApplicativeApplicativeInstance|DeferredKApplicativeInstance|MaybeKApplicativeInstance|ObservableKApplicativeInstance|FlowableKApplicativeInstance|SingleKApplicativeInstance|FluxKApplicativeInstance|MonoKApplicativeInstance] -[Monad]<-[Monad Instances|Function1MonadInstance|Function0MonadInstance|EitherMonadInstance|IdMonadInstance|TryMonadInstance|Tuple2MonadInstance|EvalMonadInstance|OptionMonadInstance|NonEmptyListMonadInstance|OptionTMonadInstance|WriterTMonadInstance|SequenceKMonadInstance|IorMonadInstance|StateTMonadInstance|KleisliMonadInstance|ListKMonadInstance|IOMonadInstance|FreeMonadInstance|DeferredKMonadInstance|SingleKMonadInstance|MaybeKMonadInstance|FlowableKMonadInstance|ObservableKMonadInstance|FluxKMonadInstance|MonoKMonadInstance] -[MonadFilter]<-[MonadFilter Instances|WriterTMonadFilterInstance|OptionMonadFilterInstance|ListKMonadFilterInstance] -[MonadWriter]<-[MonadWriter Instances|WriterTMonadWriterInstance] -[FunctorFilter]<-[FunctorFilter Instances|ListKFunctorFilterInstance|OptionTFunctorFilterInstance] -[MonadReader]<-[MonadReader Instances|Function1MonadReaderInstance|KleisliMonadReaderInstance] -[TraverseFilter]<-[TraverseFilter Instances|OptionTraverseFilterInstance|OptionTTraverseFilterInstance|ConstTraverseFilterInstance] -[MonadState]<-[MonadState Instances|StateTMonadStateInstance] -[MonadCombine]<-[MonadCombine Instances|ListKMonadCombineInstance|StateTMonadCombineInstance] -[Corecursive]<-[Corecursive Instances|MuCorecursiveInstance|FixCorecursiveInstance|NuCorecursiveInstance] [Birecursive]<-[Birecursive Instances|FixBirecursiveInstance|MuBirecursiveInstance|NuBirecursiveInstance] -[Recursive]<-[Recursive Instances|MuRecursiveInstance|FixRecursiveInstance|NuRecursiveInstance] -[Functor]<-[Functor Instances|ConstFunctorInstance|OptionFunctorInstance|Function1FunctorInstance|Tuple2FunctorInstance|TryFunctorInstance|IdFunctorInstance|EvalFunctorInstance|EitherFunctorInstance|Function0FunctorInstance|MapKFunctorInstance|SortedMapKFunctorInstance|CoproductFunctorInstance|OptionTFunctorInstance|IorFunctorInstance|StateTFunctorInstance|ValidatedFunctorInstance|SequenceKFunctorInstance|WriterTFunctorInstance|ListKFunctorInstance|KleisliFunctorInstance|NonEmptyListFunctorInstance|IOFunctorInstance|CofreeFunctorInstance|YonedaFunctorInstance|FreeFunctorInstance|CoyonedaFunctorInstance|FreeApplicativeFunctorInstance|DeferredKFunctorInstance|MaybeKFunctorInstance|SingleKFunctorInstance|FlowableKFunctorInstance|ObservableKFunctorInstance|MonoKFunctorInstance|FluxKFunctorInstance|IntListPatternFunctorInstance] \ No newline at end of file +[Corecursive]<-[Corecursive Instances|FixCorecursiveInstance|MuCorecursiveInstance|NuCorecursiveInstance] +[Recursive]<-[Recursive Instances|FixRecursiveInstance|MuRecursiveInstance|NuRecursiveInstance] +[Async]<-[Async Instances|IOAsyncInstance] +[Effect]<-[Effect Instances|IOEffectInstance] +[MonadDefer]<-[MonadDefer Instances|IOMonadDeferInstance] +[ApplicativeError]<-[ApplicativeError Instances|IOApplicativeErrorInstance|EitherApplicativeErrorInstance|OptionApplicativeErrorInstance|TryApplicativeErrorInstance|KleisliApplicativeErrorInstance|StateTApplicativeErrorInstance|ValidatedApplicativeErrorInstance] +[MonadError]<-[MonadError Instances|IOMonadErrorInstance|EitherMonadErrorInstance|OptionMonadErrorInstance|TryMonadErrorInstance|KleisliMonadErrorInstance|StateTMonadErrorInstance] +[Monoid]<-[Monoid Instances|IOMonoidInstance|ConstMonoidInstance|EitherMonoidInstance|OptionMonoidInstance|TryMonoidInstance|Tuple2MonoidInstance|ListKMonoidInstance|MapKMonoidInstance|SequenceKMonoidInstance|SetKMonoidInstance|SortedMapKMonoidInstance] +[Semigroup]<-[Semigroup Instances|IOMonoidInstance|IOSemigroupInstance|ConstSemigroupInstance|EitherSemigroupInstance|OptionSemigroupInstance|TrySemigroupInstance|ListKSemigroupInstance|MapKSemigroupInstance|NonEmptyListSemigroupInstance|SequenceKSemigroupInstance|SetKSemigroupInstance|SortedMapKSemigroupInstance] +[Applicative]<-[Applicative Instances|IOApplicativeInstance|ConstApplicativeInstance|EitherApplicativeInstance|EvalApplicativeInstance|Function0ApplicativeInstance|Function1ApplicativeInstance|IdApplicativeInstance|OptionApplicativeInstance|TryApplicativeInstance|Tuple2ApplicativeInstance|FreeApplicativeApplicativeInstance|FreeApplicativeInstance|IorApplicativeInstance|KleisliApplicativeInstance|ListKApplicativeInstance|NonEmptyListApplicativeInstance|OptionTApplicativeInstance|SequenceKApplicativeInstance|StateTApplicativeInstance|ValidatedApplicativeInstance|WriterTApplicativeInstance] +[Functor]<-[Functor Instances|IOFunctorInstance|ConstFunctorInstance|EitherFunctorInstance|EvalFunctorInstance|Function0FunctorInstance|Function1FunctorInstance|IdFunctorInstance|OptionFunctorInstance|TryFunctorInstance|Tuple2FunctorInstance|CofreeFunctorInstance|CoyonedaFunctorInstance|FreeApplicativeFunctorInstance|FreeFunctorInstance|YonedaFunctorInstance|CoproductFunctorInstance|IorFunctorInstance|KleisliFunctorInstance|ListKFunctorInstance|MapKFunctorInstance|NonEmptyListFunctorInstance|OptionTFunctorInstance|SequenceKFunctorInstance|SortedMapKFunctorInstance|StateTFunctorInstance|ValidatedFunctorInstance|WriterTFunctorInstance] +[Monad]<-[Monad Instances|IOMonadInstance|EitherMonadInstance|EvalMonadInstance|Function0MonadInstance|Function1MonadInstance|IdMonadInstance|OptionMonadInstance|TryMonadInstance|Tuple2MonadInstance|FreeMonadInstance|IorMonadInstance|KleisliMonadInstance|ListKMonadInstance|NonEmptyListMonadInstance|OptionTMonadInstance|SequenceKMonadInstance|StateTMonadInstance|WriterTMonadInstance] \ No newline at end of file diff --git a/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IO.kt b/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IO.kt index bcfd08f3d2d..ac889a29883 100644 --- a/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IO.kt +++ b/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IO.kt @@ -36,6 +36,9 @@ sealed class IO : IOOf { } } + fun async(ctx: CoroutineContext, f: () -> A): IO = + IO.unit.continueOn(ctx).flatMap { Pure(f()) } + val unit: IO = just(Unit) @@ -55,6 +58,8 @@ sealed class IO : IOOf { is Either.Right -> IO.just(it.b) } } + + /* For parMap, look into IOParallel */ } abstract fun map(f: (A) -> B): IO diff --git a/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IOParallel.kt b/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IOParallel.kt new file mode 100644 index 00000000000..daf8c2119af --- /dev/null +++ b/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/IOParallel.kt @@ -0,0 +1,52 @@ +package arrow.effects + +import arrow.core.Tuple2 +import arrow.core.Tuple3 +import arrow.effects.internal.parMap2 +import arrow.effects.internal.parMap3 +import kotlin.coroutines.experimental.CoroutineContext + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, f: (A, B) -> C): IO = + IO.async(IO.effect().parMap2(ctx, ioA, ioB, f, /* see parMap2 notes on this parameter */ { it.fix().unsafeRunSync() })) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, f: (A, B, C) -> D): IO = + IO.async(IO.effect().parMap3(ctx, ioA, ioB, ioC, f, /* see parMap2 notes on this parameter */ { it.fix().unsafeRunSync() })) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, ioD: IO, f: (A, B, C, D) -> E): IO = + parallelMapN(ctx, + parallelMapN(ctx, ioA, ioB, ::Tuple2), + parallelMapN(ctx, ioC, ioD, ::Tuple2), + { ab, cd -> f(ab.a, ab.b, cd.a, cd.b) }) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, ioD: IO, ioE: IO, f: (A, B, C, D, E) -> F): IO = + parallelMapN(ctx, + parallelMapN(ctx, ioA, ioB, ioC, ::Tuple3), + parallelMapN(ctx, ioD, ioE, ::Tuple2), + { abc, de -> f(abc.a, abc.b, abc.c, de.a, de.b) }) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, ioD: IO, ioE: IO, ioF: IO, f: (A, B, C, D, E, F) -> G): IO = + parallelMapN(ctx, + parallelMapN(ctx, ioA, ioB, ioC, ::Tuple3), + parallelMapN(ctx, ioD, ioE, ioF, ::Tuple3), + { abc, def -> f(abc.a, abc.b, abc.c, def.a, def.b, def.c) }) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, ioD: IO, ioE: IO, ioF: IO, ioG: IO, f: (A, B, C, D, E, F, G) -> H): IO = + parallelMapN(ctx, + parallelMapN(ctx, ioA, ioB, ioC, ::Tuple3), + parallelMapN(ctx, ioD, ioE, ::Tuple2), + parallelMapN(ctx, ioF, ioG, ::Tuple2), + { abc, de, fg -> f(abc.a, abc.b, abc.c, de.a, de.b, fg.a, fg.b) }) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, ioD: IO, ioE: IO, ioF: IO, ioG: IO, ioH: IO, f: (A, B, C, D, E, F, G, H) -> I): IO = + parallelMapN(ctx, + parallelMapN(ctx, ioA, ioB, ioC, ::Tuple3), + parallelMapN(ctx, ioD, ioE, ioF, ::Tuple3), + parallelMapN(ctx, ioG, ioH, ::Tuple2), + { abc, def, gh -> f(abc.a, abc.b, abc.c, def.a, def.b, def.c, gh.a, gh.b) }) + +fun IO.Companion.parallelMapN(ctx: CoroutineContext, ioA: IO, ioB: IO, ioC: IO, ioD: IO, ioE: IO, ioF: IO, ioG: IO, ioH: IO, ioI: IO, f: (A, B, C, D, E, F, G, H, I) -> J): IO = + parallelMapN(ctx, + parallelMapN(ctx, ioA, ioB, ioC, ::Tuple3), + parallelMapN(ctx, ioD, ioE, ioF, ::Tuple3), + parallelMapN(ctx, ioG, ioH, ioI, ::Tuple3), + { abc, def, ghi -> f(abc.a, abc.b, abc.c, def.a, def.b, def.c, ghi.a, ghi.b, ghi.c) }) diff --git a/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/ParallelUtils.kt b/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/ParallelUtils.kt new file mode 100644 index 00000000000..d81204bd0a0 --- /dev/null +++ b/modules/effects/arrow-effects/src/main/kotlin/arrow/effects/internal/ParallelUtils.kt @@ -0,0 +1,161 @@ +package arrow.effects.internal + +import arrow.Kind +import arrow.core.* +import arrow.effects.typeclasses.Effect +import arrow.effects.typeclasses.Proc +import kotlin.coroutines.experimental.Continuation +import kotlin.coroutines.experimental.CoroutineContext +import kotlin.coroutines.experimental.startCoroutine +import kotlin.coroutines.experimental.suspendCoroutine + +/* See par3 */ +internal fun Effect.parMap2(ctx: CoroutineContext, ioA: Kind, ioB: Kind, f: (A, B) -> C, + /* start is used because this should return Tuple3, but there's no good implementation of Future before Java8 */ + start: (Kind) -> Unit): Proc = { cc -> + val a: suspend () -> Either = { + suspendCoroutine { ca: Continuation> -> + start(ioA.map { it.left() }.runAsync { + it.fold({ invoke { ca.resumeWithException(it) } }, { invoke { ca.resume(it) } }) + }) + } + } + val b: suspend () -> Either = { + suspendCoroutine { ca: Continuation> -> + start(ioB.map { it.right() }.runAsync { + it.fold({ invoke { ca.resumeWithException(it) } }, { invoke { ca.resume(it) } }) + }) + } + } + val parCont = parContinuation(ctx, f, asyncIOContinuation(ctx, cc)) + a.startCoroutine(parCont) + b.startCoroutine(parCont) +} + +/* Parallelization is only provided in pairs and triples. + * Every time you start 4+ elements, each pair or triple has to be combined with another one at the same depth. + * Elements at higher depths that are synchronous can prevent elements at a lower depth from starting. + * Thus, we need to provide solutions for even and uneven amounts of IOs for all to be started at the same depth. */ +internal fun Effect.parMap3(ctx: CoroutineContext, ioA: Kind, ioB: Kind, ioC: Kind, f: (A, B, C) -> D, + /* start is used because this should return Tuple4, but there's no good implementation of Future before Java8 */ + start: (Kind) -> Unit): Proc = { cc -> + val a: suspend () -> Treither = { + suspendCoroutine { ca: Continuation> -> + start(ioA.map { Treither.Left(it) }.runAsync { + it.fold({ invoke { ca.resumeWithException(it) } }, { invoke { ca.resume(it) } }) + }) + } + } + val b: suspend () -> Treither = { + suspendCoroutine { ca: Continuation> -> + start(ioB.map { Treither.Middle(it) }.runAsync { + it.fold({ invoke { ca.resumeWithException(it) } }, { invoke { ca.resume(it) } }) + }) + } + } + val c: suspend () -> Treither = { + suspendCoroutine { ca: Continuation> -> + start(ioC.map { Treither.Right(it) }.runAsync { + it.fold({ invoke { ca.resumeWithException(it) } }, { invoke { ca.resume(it) } }) + }) + } + } + val triCont = triContinuation(ctx, f, asyncIOContinuation(ctx, cc)) + a.startCoroutine(triCont) + b.startCoroutine(triCont) + c.startCoroutine(triCont) +} + +private fun parContinuation(ctx: CoroutineContext, f: (A, B) -> C, cc: Continuation): Continuation> = + object : Continuation> { + override val context: CoroutineContext = ctx + + var intermediate: Tuple2 = null toT null + + override fun resume(value: Either) = + synchronized(this) { + val resA = intermediate.a + val resB = intermediate.b + value.fold({ a -> + intermediate = a toT resB + if (resB != null) { + cc.resume(f(a, resB)) + } + }, { b -> + intermediate = resA toT b + if (resA != null) { + cc.resume(f(resA, b)) + } + }) + } + + override fun resumeWithException(exception: Throwable) { + cc.resumeWithException(exception) + } + } + +private fun triContinuation(ctx: CoroutineContext, f: (A, B, C) -> D, cc: Continuation): Continuation> = + object : Continuation> { + override val context: CoroutineContext = ctx + + var intermediate: Tuple3 = Tuple3(null, null, null) + + override fun resume(value: Treither) = + synchronized(this) { + val resA = intermediate.a + val resB = intermediate.b + val resC = intermediate.c + value.fold({ a -> + intermediate = Tuple3(a, resB, resC) + if (resB != null && resC != null) { + cc.resume(f(a, resB, resC)) + } + }, { b -> + intermediate = Tuple3(resA, b, resC) + if (resA != null && resC != null) { + cc.resume(f(resA, b, resC)) + } + }, { c -> + intermediate = Tuple3(resA, resB, c) + if (resA != null && resB != null) { + cc.resume(f(resA, resB, c)) + } + }) + } + + override fun resumeWithException(exception: Throwable) { + cc.resumeWithException(exception) + } + } + +private sealed class Treither { + data class Left(val a: A) : Treither() { + override fun fold(fa: (A) -> D, fb: (B) -> D, fc: (C) -> D) = + fa(a) + } + + data class Middle(val b: B) : Treither() { + override fun fold(fa: (A) -> D, fb: (B) -> D, fc: (C) -> D) = + fb(b) + } + + data class Right(val c: C) : Treither() { + override fun fold(fa: (A) -> D, fb: (B) -> D, fc: (C) -> D) = + fc(c) + } + + abstract fun fold(fa: (A) -> D, fb: (B) -> D, fc: (C) -> D): D +} + +private fun asyncIOContinuation(ctx: CoroutineContext, cc: (Either) -> Unit): Continuation = + object : Continuation { + override val context: CoroutineContext = ctx + + override fun resume(value: A) { + cc(value.right()) + } + + override fun resumeWithException(exception: Throwable) { + cc(exception.left()) + } + } diff --git a/modules/effects/arrow-effects/src/test/kotlin/arrow/effects/IOTest.kt b/modules/effects/arrow-effects/src/test/kotlin/arrow/effects/IOTest.kt index c3164413b50..c2821482491 100644 --- a/modules/effects/arrow-effects/src/test/kotlin/arrow/effects/IOTest.kt +++ b/modules/effects/arrow-effects/src/test/kotlin/arrow/effects/IOTest.kt @@ -13,6 +13,7 @@ import io.kotlintest.KTestJUnitRunner import io.kotlintest.matchers.fail import io.kotlintest.matchers.shouldBe import io.kotlintest.matchers.shouldEqual +import kotlinx.coroutines.experimental.newSingleThreadContext import org.junit.runner.RunWith @RunWith(KTestJUnitRunner::class) @@ -242,6 +243,80 @@ class IOTest : UnitSpec() { result shouldBe None } + "parallel execution makes all IOs start at the same time" { + val order = mutableListOf() + + fun makePar(num: Long) = + IO.async(newSingleThreadContext("$num")) { + // Sleep according to my number + Thread.sleep(num * 20) + }.map { + // Add myself to order list + order.add(num) + num + } + + val result = + IO.parallelMapN(newSingleThreadContext("all"), makePar(6), makePar(3), makePar(2), makePar(4), makePar(1), makePar(5)) + { six, tree, two, four, one, five -> listOf(six, tree, two, four, one, five) } + .unsafeRunSync() + + result shouldBe listOf(6L, 3, 2, 4, 1, 5) + order.toList() shouldBe listOf(1L, 2, 3, 4, 5, 6) + } + + "parallel execution preserves order for synchronous IOs" { + val order = mutableListOf() + + fun IO.order() = + map { + order.add(it) + it + } + + fun makePar(num: Long) = + IO.async(newSingleThreadContext("$num")) { + // Sleep according to my number + Thread.sleep(num * 20) + num + }.order() + + val result = + IO.parallelMapN(newSingleThreadContext("all"), makePar(6), IO.just(1L).order(), makePar(4), IO.defer { IO.just(2L) }.order(), makePar(5), IO { 3L }.order()) + { six, tree, two, four, one, five -> listOf(six, tree, two, four, one, five) } + .unsafeRunSync() + + result shouldBe listOf(6L, 1, 4, 2, 5, 3) + order.toList() shouldBe listOf(1L, 2, 3, 4, 5, 6) + } + + "parallel mapping is done in the expected CoroutineContext" { + fun makePar(num: Long) = + IO.async(newSingleThreadContext("$num")) { + // Sleep according to my number + Thread.sleep(num * 20) + num + } + + val result = + IO.parallelMapN(newSingleThreadContext("all"), makePar(6), IO.just(1L), makePar(4), IO.defer { IO.just(2L) }, makePar(5), IO { 3L }) + { _, _, _, _, _, _ -> + Thread.currentThread().name + }.unsafeRunSync() + + result shouldBe "all" + } + + "parallel IO#defer, IO#suspend and IO#async are run in the expected CoroutineContext" { + val result = + IO.parallelMapN(newSingleThreadContext("here"), + IO { Thread.currentThread().name }, IO.defer { IO.just(Thread.currentThread().name) }, IO.async { it(Thread.currentThread().name.right()) }, + ::Tuple3) + .unsafeRunSync() + + result shouldBe Tuple3("here", "here", "here") + } + "IO.binding should for comprehend over IO" { val result = IO.monad().binding { val x = IO.just(1).bind()