Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
name := "asyncstreams"

version := "0.5-SNAPSHOT"
version := "1.0"

scalaVersion := "2.11.8"
scalaVersion := "2.12.2"

parallelExecution in ThisBuild := false

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3")

val versions = Map(
"monix" -> "2.2.3"
"monix" -> "2.2.4"
)

libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-core" % "7.2.9",
"com.twitter" %% "util-core" % "6.41.0" % Test,
"org.scalaz" %% "scalaz-core" % "7.2.11",
"io.monix" %% "monix-eval" % versions("monix") % Test,
"io.monix" %% "monix-scalaz-72" % versions("monix") % Test,
"org.scalatest" %% "scalatest" % "3.0.1" % Test
//"com.twitter" %% "util-core" % "6.43.0" % Test,
//"io.catbird" %% "catbird-util" % "0.14.0" % Test, //cats instances for util-core
//"com.codecommit" %% "shims-core" % "1.0-b0e5152" % Test,
"org.scalatest" %% "scalatest" % "3.0.3" % Test
)
13 changes: 0 additions & 13 deletions copying.txt

This file was deleted.

2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 0.13.13
sbt.version = 0.13.15
1 change: 0 additions & 1 deletion project/plugins.sbt

This file was deleted.

11 changes: 6 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
asyncstreams [![Release](https://jitpack.io/v/danslapman/asyncstreams.svg)](https://jitpack.io/#danslapman/asyncstreams)
=========

**Note: 0.4 release is outdated, use master-SNAPSHOT for now**

asyncstreams is a monadic asynchronous stream library. It allows you to write stateful asynchronous algorithms
that emits elements into a stream:

```scala
val stream = generateS(0) {
val stream = genS(0) {
for {
s <- getS[Int]
if s < 3
Expand All @@ -20,15 +22,14 @@ See more examples in tests.

asyncstreams is tested to work with:
- standard scala futures
- twitter futures (with some [instances](https://github.com/danslapman/asyncstreams/blob/master/src/test/scala/asyncstreams/twitterFutures/TwitterInstances.scala))
- monix tasks
- monix tasks (WIP, there are some issues)

asyncstreams is available via jitpack:

```
resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies += "com.github.danslapman" %% "asyncstreams" % "0.4"
libraryDependencies += "com.github.danslapman" %% "asyncstreams" % "master-SNAPSHOT"
```

asyncstreams is based on [scala-async](https://github.com/iboltaev/scala-async) ideas.
asyncstreams initially based on [scala-async](https://github.com/iboltaev/scala-async) ideas.
38 changes: 38 additions & 0 deletions src/main/scala/asyncstreams/ASImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package asyncstreams

import scala.language.higherKinds
import scalaz.MonadError

trait ASImpl[F[+_]] {
def empty[A]: AsyncStream[F, A]
def collectLeft[A, B](s: AsyncStream[F, A])(init: B)(f: (B, A) => B): F[B]
def fromIterable[T](it: Iterable[T]): AsyncStream[F, T]
def takeWhile[T](s: AsyncStream[F, T])(p: T => Boolean): AsyncStream[F, T]
def isEmpty[T](s: AsyncStream[F, T]): F[Boolean]
}

class ASImplForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends ASImpl[F] {
import scalaz.syntax.monadError._

override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zeroElement.raiseError[F, AsyncStream[F, A]#SStep])

override def collectLeft[A, B](s: AsyncStream[F, A])(init: B)(f: (B, A) => B): F[B] = {
def impl(d: F[Step[A, AsyncStream[F, A]]], acc: F[B]): F[B] =
d.flatMap(step => impl(step.rest.data, acc.map(b => f(b, step.value)))).handleError(_ => acc)

impl(s.data, init.point[F])
}

override def fromIterable[T](it: Iterable[T]): AsyncStream[F, T] = AsyncStream {
if (it.nonEmpty) Step(it.head, fromIterable(it.tail)).point[F] else ze.zeroElement.raiseError[F, AsyncStream[F, T]#SStep]
}

override def takeWhile[T](s: AsyncStream[F, T])(p: (T) => Boolean): AsyncStream[F, T] = AsyncStream {
s.data.map {
case step if !p(step.value) => throw ze.zeroElement
case step => Step(step.value, takeWhile(step.rest)(p))
}
}

override def isEmpty[T](s: AsyncStream[F, T]): F[Boolean] = s.data.map(_ => false).handleError(_ => true.point[F])
}
20 changes: 20 additions & 0 deletions src/main/scala/asyncstreams/ASMonadPlusForMonadError.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package asyncstreams

import scala.language.higherKinds
import scalaz.syntax.monadError._
import scalaz.{MonadError, MonadPlus}

class ASMonadPlusForMonadError[F[+_]](implicit fmp: MonadError[F, Throwable], ze: ZeroError[Throwable, F]) extends MonadPlus[AsyncStream[F, ?]] {
override def bind[A, B](fa: AsyncStream[F, A])(f: (A) => AsyncStream[F, B]): AsyncStream[F, B] = AsyncStream {
fa.data.flatMap(step => f(step.value).data.map(step2 => Step(step2.value, plus(step2.rest, bind(step.rest)(f)))))
.handleError(_ => fmp.raiseError(ze.zeroElement))
}

override def plus[A](a: AsyncStream[F, A], b: => AsyncStream[F, A]): AsyncStream[F, A] = AsyncStream {
a.data.map(step => Step(step.value, plus(step.rest, b))).handleError(_ => b.data)
}

override def point[A](a: => A): AsyncStream[F, A] = AsyncStream(Step(a, empty[A]).point[F])

override def empty[A]: AsyncStream[F, A] = AsyncStream(ze.zeroElement.raiseError[F, AsyncStream[F, A]#SStep])
}
35 changes: 35 additions & 0 deletions src/main/scala/asyncstreams/ASStateTOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package asyncstreams

import scala.language.higherKinds
import scalaz.syntax.monad._
import scalaz.{IndexedStateT, Monad, MonadPlus, MonadState, StateT}

class ASStateTOps[F[+_]: Monad](implicit methods: ASImpl[F]) {
def foreach[A, S](stream: AsyncStream[F, A])(f: A => StateT[F, S, _]): StateT[F, S, Unit] = StateT { s =>
methods.collectLeft(stream)(s.point[F])((fS, a) => fS.flatMap(s2 => f(a)(s2).map(_._1)))
.flatMap(identity).map((_, ()))
}

def isEmpty[A, S](stream: AsyncStream[F, A]): StateT[F, S, Boolean] = StateT { s =>
stream.isEmpty.map((s, _))
}

def isEmpty[A, S](f: S => AsyncStream[F, A])(implicit ms: MonadState[IndexedStateT[F, S, S, ?], S]): StateT[F, S, Boolean] = {
ms.get >>= ((s: S) => isEmpty(f(s)))
}

def notEmpty[A, S](stream: AsyncStream[F, A]): StateT[F, S, Boolean] = StateT { s =>
stream.nonEmpty.map((s, _))
}

def notEmpty[A, S](f: S => AsyncStream[F, A])(implicit ms: MonadState[IndexedStateT[F, S, S, ?], S]): StateT[F, S, Boolean] = {
ms.get >>= ((s: S) => notEmpty(f(s)))
}

def get[A, S](stream: AsyncStream[F, A]): StateT[F, S, (AsyncStream[F, A], A)] = StateT { s =>
stream.data.map(step => (s, (step.rest, step.value)))
}

def genS[S, A](start: S)(gen: StateT[F, S, A])(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, A] =
AsyncStream.generate(start)(gen.run)
}
89 changes: 34 additions & 55 deletions src/main/scala/asyncstreams/AsyncStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,52 @@ import scala.annotation.unchecked.{uncheckedVariance => uV}
import scala.collection.GenIterable
import scala.collection.generic.CanBuildFrom
import scala.language.higherKinds
import scalaz.Monad
import scalaz.syntax.monad._
import scalaz.syntax.monadPlus._
import scalaz.{Monad, MonadPlus}

case class AsyncStream[F[+_]: Monad, A](data: F[Step[A, AsyncStream[F, A]]]) {
import AsyncStream._
class AsyncStream[F[+_]: Monad, A](val data: F[Step[A, AsyncStream[F, A]]]) {
type SStep = Step[A, AsyncStream[F, A]]

def foldLeft[B](start: B)(f: (B, A) => B): F[B] = {
def impl(d: F[Step[A, AsyncStream[F, A]]], acc: F[B]): F[B] =
d.flatMap {
case END => acc
case step => impl(step.rest.data, acc map (b => f(b, step.value)))
}

impl(data, start.point[F])
}

def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, A, Col[A @uV]]): F[Col[A]] =
foldLeft(cbf())((col, el) => col += el).map(_.result())


def takeWhile(p: A => Boolean): AsyncStream[F, A] =
new AsyncStream[F, A](data map {
case END => END
case step if !p(step.value) => END
case step => Step(step.value, step.rest.takeWhile(p))
})
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, A, Col[A @uV]], methods: ASImpl[F]): F[Col[A]] =
methods.collectLeft(this)(cbf())((col, el) => col += el).map(_.result())

def takeWhile(p: A => Boolean)(implicit impl: ASImpl[F]): AsyncStream[F, A] = impl.takeWhile(this)(p)

def take(n: Int): AsyncStream[F, A] =
if (n <= 0) nil
else AsyncStream(data.map {
case END => END
case p => Step(p.value, p.rest.take(n - 1))
})
def take(n: Int)(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, A] =
if (n <= 0) smp.empty
else AsyncStream {
data.map(p => Step(p.value, p.rest.take(n - 1)))
}

def foreach[U](f: (A) => U): F[Unit] =
foldLeft(())((_: Unit, a: A) => {f(a); ()})
def foreach[U](f: (A) => U)(implicit methods: ASImpl[F]): F[Unit] =
methods.collectLeft(this)(())((_: Unit, a: A) => {f(a); ()})

def foreachF[U](f: (A) => F[U]): F[Unit] =
foldLeft(().point[F])((fu: F[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
def foreachF[U](f: (A) => F[U])(implicit impl: ASImpl[F]): F[Unit] =
impl.collectLeft(this)(().point[F])((fu: F[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)

def flatten[B](implicit asIterable: A => GenIterable[B]): AsyncStream[F, B] = {
val streamChunk = (p: Step[A, AsyncStream[F, A]]) =>
concat(generate(asIterable(p.value))(it => if (it.nonEmpty) (it.head, it.tail).point[F] else ENDF[F]), p.rest.flatten)
def flatten[B](implicit asIterable: A => GenIterable[B], smp: MonadPlus[AsyncStream[F, ?]], impl: ASImpl[F]): AsyncStream[F, B] = {
def streamChunk(step: AsyncStream[F, A]#SStep): AsyncStream[F, B] =
impl.fromIterable(asIterable(step.value).seq) <+> step.rest.flatten

AsyncStream(data.flatMap {
case END => ENDF[F]
case step => streamChunk(step).data
})
AsyncStream(data.flatMap(step => streamChunk(step).data))
}

def isEmpty(implicit impl: ASImpl[F]): F[Boolean] = impl.isEmpty(this)
def nonEmpty(implicit impl: ASImpl[F]): F[Boolean] = impl.isEmpty(this).map(!_)
}

object AsyncStream {
def nil[F[+_]: Monad, A]: AsyncStream[F, A] = AsyncStream(ENDF[F])
def single[F[+_]: Monad, A](item: A): AsyncStream[F, A] =
AsyncStream(Step(item, nil[F, A]).point[F])
def apply[F[+_]: Monad, A](data: => F[Step[A, AsyncStream[F, A]]]): AsyncStream[F, A] = new AsyncStream(data)
def asyncNil[F[+_]: Monad, A](implicit impl: ASImpl[F]): AsyncStream[F, A] = impl.empty

def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(A, S)]): AsyncStream[F, A] =
AsyncStream(gen(start).map {
case END => END
case (el, rest) => Step(el, generate(rest)(gen))
})
private[asyncstreams] def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(S, A)])(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, A] = AsyncStream {
gen(start).map((stateEl: (S, A)) => Step(stateEl._2, generate(stateEl._1)(gen)))
}

def concat[F[+_]: Monad, A](s1: AsyncStream[F, A], s2: AsyncStream[F, A]): AsyncStream[F, A] =
new AsyncStream[F, A](s1.data.flatMap {
case END => s2.data
case step => Step(step.value, concat(step.rest, s2)).point[F]
})
}
def unfold[F[+_]: Monad, T](start: T)(makeNext: T => T)(implicit smp: MonadPlus[AsyncStream[F, ?]]): AsyncStream[F, T] =
generate(start)(s => (makeNext(s), s).point[F])

implicit class AsyncStreamOps[F[+_]: Monad, A](stream: => AsyncStream[F, A]) {
def ~::(el: A) = AsyncStream(Step(el, stream).point[F])
}
}
51 changes: 0 additions & 51 deletions src/main/scala/asyncstreams/AsyncStreamMonad.scala

This file was deleted.

37 changes: 0 additions & 37 deletions src/main/scala/asyncstreams/FState.scala

This file was deleted.

Loading