Skip to content

Commit

Permalink
Merge branch 'master' into js
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Aug 17, 2021
2 parents d3b6e6e + 510800e commit 717ab29
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 8 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v12
uses: olafurpg/setup-scala@v13
with:
java-version: ${{ matrix.java }}

Expand All @@ -49,10 +49,10 @@ jobs:
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Check that workflows are up to date
run: sbt --client '++${{ matrix.scala }}; githubWorkflowCheck'
run: sbt ++${{ matrix.scala }} githubWorkflowCheck

- name: Test
run: sbt --client '++${{ matrix.scala }}; clean; coverage; test; coverageReport; scalafmtCheckAll'
run: sbt ++${{ matrix.scala }} clean coverage test coverageReport scalafmtCheckAll

- name: Upload code coverage
uses: codecov/codecov-action@e156083f13aff6830c92fc5faa23505779fbf649
uses: codecov/codecov-action@e156083f13aff6830c92fc5faa23505779fbf649
2 changes: 1 addition & 1 deletion .github/workflows/clean.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ jobs:
printf "Deleting '%s' #%d, %'d bytes\n" $name ${ARTCOUNT[$name]} $size
ghapi -X DELETE $REPO/actions/artifacts/$id
done
done
done
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ val compilerOptions = Seq(
)

val circeVersion = "0.15.0-M1"
val fs2Version = "3.0.6"
val fs2Version = "3.1.0"
val jawnVersion = "1.2.0"
val previousCirceFs2Version = "0.13.0"

Expand Down
20 changes: 20 additions & 0 deletions fs2/src/main/scala/io/circe/fs2/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.circe.jawn.CirceSupportParser
import org.typelevel.jawn.{ AsyncParser, ParseException }

import scala.collection.Seq
import cats.ApplicativeError

package object fs2 {
private[this] val supportParser: CirceSupportParser = new CirceSupportParser(None, true)
Expand Down Expand Up @@ -40,11 +41,30 @@ package object fs2 {

final def byteParser[F[_]: Sync](mode: AsyncParser.Mode): Pipe[F, Byte, Json] = _.chunks.through(byteParserC(mode))

/* Decode json stream to a stream of `A`
*
* Lazily decodes elements and emits each resulting `A` as a singleton chunk. This stream "de-chunking" can have
* performance implications. As an alternative, `chunkDecoder` is available, with some caveats.
*/
final def decoder[F[_]: RaiseThrowable, A](implicit decode: Decoder[A]): Pipe[F, Json, A] =
_.flatMap { json =>
decode(json.hcursor) match {
case Left(df) => Stream.raiseError(df)
case Right(a) => Stream.emit(a)
}
}

/* Like `decoder` but operates on the original chunks in the stream.
*
* Preserving the chunk structure of the stream is more performant. However, this means that this pipe is not
* lazy on elements, but rather on the chunks. For example, `stream.chunkN(199).through(chunkDecoder[F, A]).take(400)`
* would decode 597 elements (3 chunks worth) in order to accumulate 400 `A`
*/
final def chunkDecoder[F[_], A](implicit decode: Decoder[A], ev: ApplicativeError[F, Throwable]): Pipe[F, Json, A] =
_.evalMapChunk { json =>
decode(json.hcursor) match {
case Left(df) => ev.raiseError[A](df)
case Right(a) => ev.pure(a)
}
}
}
39 changes: 39 additions & 0 deletions fs2/src/test/scala/io/circe/fs2/Fs2Suite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ class Fs2Suite extends CirceSuite {
)
}.check().map(r => assert(r.passed))

"chunkDecoder" should "decode enumerated JSON values" in
PropF.forAllF { (fooStdStream: StdStream[Foo], fooVector: Vector[Foo]) =>
val stream = serializeFoos(AsyncParser.UnwrapArray, fooStream(fooStdStream, fooVector))
val foos = fooStdStream ++ fooVector

val result = stream.through(stringArrayParser).through(chunkDecoder[IO, Foo]).compile.toVector.attempt

result.map(r => assert(r === Right(foos.toVector)))
}.check().map(r => assert(r.passed))

"chunkDecoder" should "maintain chunk size" in
PropF.forAllF { (fooStdStream: StdStream[Foo], fooVector: Vector[Foo]) =>
val chunkSize = 4
val stream = serializeFoos(AsyncParser.UnwrapArray, fooStream(fooStdStream, fooVector))
val x1 = stream.through(stringArrayParser).chunkMin(chunkSize).flatMap(c => Stream.chunk(c))
x1.through(chunkDecoder[IO, Foo]).chunks.map(_.size).compile.toList.map { chunkSizes =>
if (chunkSizes.sum >= chunkSize)
assert(chunkSizes.head == chunkSize)
else
assert(chunkSizes.length <= 1)
}
}.check().map(r => assert(r.passed))

"stringArrayParser" should "return ParsingFailure" in {
testParsingFailure(_.through(stringArrayParser))
}
Expand Down Expand Up @@ -175,6 +198,22 @@ class Fs2Suite extends CirceSuite {
}
}.check().map(r => assert(r.passed))

"chunkDecoder" should "return DecodingFailure" in
PropF.forAllF { (fooStdStream: StdStream[Foo], fooVector: Vector[Foo]) =>
sealed trait Foo2
case class Bar2(x: String) extends Foo2

whenever(fooStdStream.nonEmpty && fooVector.nonEmpty) {
val result = serializeFoos(AsyncParser.UnwrapArray, fooStream(fooStdStream, fooVector))
.through(stringArrayParser)
.through(chunkDecoder[IO, Foo2])
.compile
.toVector
.attempt
result.map(r => assert(r.isLeft && r.left.get.isInstanceOf[DecodingFailure]))
}
}.check().map(r => assert(r.passed))

private def testParser(mode: AsyncParser.Mode, through: Pipe[IO, String, Json]) =
PropF.forAllF { (fooStdStream: StdStream[Foo], fooVector: Vector[Foo]) =>
val stream = serializeFoos(mode, fooStream(fooStdStream, fooVector))
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.12.0")
addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.13.0")
addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0")
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.9.2")
Expand All @@ -9,4 +9,4 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.8.2")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.1.0")
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.6.0")
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.7.0")

0 comments on commit 717ab29

Please sign in to comment.