Skip to content

Commit

Permalink
Upgrade fs2 to 1.0.0-M5
Browse files Browse the repository at this point in the history
  • Loading branch information
kubukoz authored and travisbrown committed Sep 7, 2018
1 parent af06849 commit fb02783
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val compilerOptions = Seq(
)

val circeVersion = "0.9.3"
val fs2Version = "1.0.0-M1"
val fs2Version = "1.0.0-M5"
val previousCirceFs2Version = "0.9.0"

val baseSettings = Seq(
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/io/circe/fs2/ParsingPipe.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.circe.fs2

import _root_.fs2.{ Pipe, Pull, Segment, Stream }
import _root_.fs2.{ Chunk, Pipe, Pull, RaiseThrowable, Stream }
import _root_.jawn.{ AsyncParser, ParseException }
import io.circe.{ Json, ParsingFailure }
import io.circe.jawn.CirceSupportParser

private[fs2] abstract class ParsingPipe[F[_], S] extends Pipe[F, S, Json] {
protected[this] val raiseThrowable: RaiseThrowable[F]

protected[this] def parsingMode: AsyncParser.Mode

protected[this] def parseWith(parser: AsyncParser[Json])(in: S): Either[ParseException, Seq[Json]]
Expand All @@ -16,9 +18,9 @@ private[fs2] abstract class ParsingPipe[F[_], S] extends Pipe[F, S, Json] {
s.pull.uncons1.flatMap {
case Some((s, str)) => parseWith(p)(s) match {
case Left(error) =>
Pull.raiseError(ParsingFailure(error.getMessage, error))
Pull.raiseError(ParsingFailure(error.getMessage, error))(raiseThrowable)
case Right(js) =>
Pull.output(Segment.seq(js)) >> doneOrLoop(p)(str)
Pull.output(Chunk.seq(js)) >> doneOrLoop(p)(str)
}
case None => Pull.done
}
Expand Down
34 changes: 19 additions & 15 deletions src/main/scala/io/circe/fs2/package.scala
Original file line number Diff line number Diff line change
@@ -1,40 +1,44 @@
package io.circe

import _root_.fs2.{ Pipe, Segment, Stream }
import _root_.jawn.{ AsyncParser, ParseException }
import _root_.fs2.{Chunk, Pipe, RaiseThrowable, Stream}
import _root_.jawn.{AsyncParser, ParseException}
import io.circe.jawn.CirceSupportParser

package object fs2 {
final def stringArrayParser[F[_]]: Pipe[F, String, Json] = stringParser(AsyncParser.UnwrapArray)
final def stringArrayParser[F[_] : RaiseThrowable]: Pipe[F, String, Json] = stringParser(AsyncParser.UnwrapArray)

final def stringStreamParser[F[_]]: Pipe[F, String, Json] = stringParser(AsyncParser.ValueStream)
final def stringStreamParser[F[_] : RaiseThrowable]: Pipe[F, String, Json] = stringParser(AsyncParser.ValueStream)

final def byteArrayParser[F[_]]: Pipe[F, Byte, Json] = byteParser(AsyncParser.UnwrapArray)
final def byteArrayParser[F[_] : RaiseThrowable]: Pipe[F, Byte, Json] = byteParser(AsyncParser.UnwrapArray)

final def byteStreamParser[F[_]]: Pipe[F, Byte, Json] = byteParser(AsyncParser.ValueStream)
final def byteStreamParser[F[_] : RaiseThrowable]: Pipe[F, Byte, Json] = byteParser(AsyncParser.ValueStream)

final def byteArrayParserS[F[_]]: Pipe[F, Segment[Byte, Unit], Json] = byteParserS(AsyncParser.UnwrapArray)
final def byteArrayParserC[F[_] : RaiseThrowable]: Pipe[F, Chunk[Byte], Json] = byteParserC(AsyncParser.UnwrapArray)

final def byteStreamParserS[F[_]]: Pipe[F, Segment[Byte, Unit], Json] = byteParserS(AsyncParser.ValueStream)
final def byteStreamParserC[F[_] : RaiseThrowable]: Pipe[F, Chunk[Byte], Json] = byteParserC(AsyncParser.ValueStream)

final def stringParser[F[_]](mode: AsyncParser.Mode)(implicit F: RaiseThrowable[F]): Pipe[F, String, Json] = new ParsingPipe[F, String] {
override protected[this] val raiseThrowable: RaiseThrowable[F] = F

final def stringParser[F[_]](mode: AsyncParser.Mode): Pipe[F, String, Json] = new ParsingPipe[F, String] {
protected[this] final def parseWith(p: AsyncParser[Json])(in: String): Either[ParseException, Seq[Json]] =
p.absorb(in)(CirceSupportParser.facade)

protected[this] val parsingMode: AsyncParser.Mode = mode
}

final def byteParserS[F[_]](mode: AsyncParser.Mode): Pipe[F, Segment[Byte, Unit], Json] =
new ParsingPipe[F, Segment[Byte, Unit]] {
protected[this] final def parseWith(p: AsyncParser[Json])(in: Segment[Byte, Unit]): Either[ParseException, Seq[Json]] =
p.absorb(in.force.toArray)(CirceSupportParser.facade)
final def byteParserC[F[_]](mode: AsyncParser.Mode)(implicit F: RaiseThrowable[F]): Pipe[F, Chunk[Byte], Json] =
new ParsingPipe[F, Chunk[Byte]] {
override protected[this] val raiseThrowable: RaiseThrowable[F] = F

protected[this] final def parseWith(p: AsyncParser[Json])(in: Chunk[Byte]): Either[ParseException, Seq[Json]] =
p.absorb(in.toArray)(CirceSupportParser.facade)

protected[this] val parsingMode: AsyncParser.Mode = mode
}

final def byteParser[F[_]](mode: AsyncParser.Mode): Pipe[F, Byte, Json] = _.segments.through(byteParserS(mode))
final def byteParser[F[_] : RaiseThrowable](mode: AsyncParser.Mode): Pipe[F, Byte, Json] = _.chunks.through(byteParserC(mode))

final def decoder[F[_], A](implicit decode: Decoder[A]): Pipe[F, Json, A] =
final def decoder[F[_] : RaiseThrowable, A](implicit decode: Decoder[A]): Pipe[F, Json, A] =
_.flatMap { json =>
decode(json.hcursor) match {
case Left(df) => Stream.raiseError(df)
Expand Down
22 changes: 11 additions & 11 deletions src/test/scala/io/circe/fs2/Fs2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,24 @@ class Fs2Suite extends CirceSuite {
}
}

"byteArrayParserS" should "parse bytes wrapped in array" in {
"byteArrayParserC" should "parse bytes wrapped in array" in {
testParser(AsyncParser.UnwrapArray,
_.through(text.utf8Encode).segments.through(byteArrayParserS))
_.through(text.utf8Encode).chunks.through(byteArrayParserC))
}

"byteStreamParserS" should "parse bytes delimited by new lines" in {
"byteStreamParserC" should "parse bytes delimited by new lines" in {
testParser(AsyncParser.ValueStream,
_.through(text.utf8Encode).segments.through(byteStreamParserS))
_.through(text.utf8Encode).chunks.through(byteStreamParserC))
}


"byteParserS" should "parse single value" in {
"byteParserC" should "parse single value" in {
forAll { (foo: Foo) =>
val stream = serializeFoos(AsyncParser.SingleValue, Stream.emit(foo))
assert(stream
.through(text.utf8Encode)
.segments
.through(byteParserS(AsyncParser.SingleValue))
.chunks
.through(byteParserC(AsyncParser.SingleValue))
.compile.toVector.attempt.unsafeRunSync() === Right(Vector(foo.asJson)))
}
}
Expand Down Expand Up @@ -103,12 +103,12 @@ class Fs2Suite extends CirceSuite {
testParsingFailure(_.through(text.utf8Encode).through(byteStreamParser))
}

"byteArrayParserS" should "return ParsingFailure" in {
testParsingFailure(_.through(text.utf8Encode).segments.through(byteArrayParserS))
"byteArrayParserC" should "return ParsingFailure" in {
testParsingFailure(_.through(text.utf8Encode).chunks.through(byteArrayParserC))
}

"byteStreamParserS" should "return ParsingFailure" in {
testParsingFailure(_.through(text.utf8Encode).segments.through(byteStreamParserS))
"byteStreamParserC" should "return ParsingFailure" in {
testParsingFailure(_.through(text.utf8Encode).chunks.through(byteStreamParserC))
}

"decoder" should "return DecodingFailure" in
Expand Down

0 comments on commit fb02783

Please sign in to comment.