Compositional, streaming I/O library for Scala
Clone or download
mpilquist Merge pull request #1320 from kiambogo/projects
Add fs2-aws and fs2-jms under project reference
Latest commit 026e446 Nov 15, 2018
Permalink
Failed to load latest commit information.
benchmark-macros/src/main/scala/fs2 Fixed Xlint warning in benchmark project May 23, 2017
benchmark/src/main/scala/fs2/benchmark Added Benchmark, fairQueue Sep 14, 2018
core Merge pull request #1319 from SystemFw/groupWithin/leak Nov 13, 2018
docs Remove reference to removed Once Sep 18, 2018
experimental/src/main/scala/fs2/experimental/concurrent Bugfix/queue get (#1308) Nov 6, 2018
io/src Fixed compilation issue due to stricter warnings on 2.12.7 Sep 28, 2018
project Upgraded to latest sbt-travisci to fix bug where build matrix in .tra… Oct 5, 2018
reactive-streams/src Fix doctest for scala 2.11 Sep 14, 2018
site/src Fix concurrency-primitives.md: Oct 24, 2018
testdata Add simple benchmark for utf8Decode Feb 12, 2014
.gitignore Add test-output directory to .gitignore Sep 14, 2018
.sbtopts Bumped max metaspace to 512m Jan 30, 2018
.scalafmt.conf Enabled AvoidInfix rule Jan 4, 2018
.travis.yml Fixed SNAPSHOT publishing Nov 12, 2018
CHANGELOG.md Added link to releases page to CHANGELOG Apr 26, 2018
CODE_OF_CONDUCT.md Updated code of conduct Sep 8, 2018
CONTRIBUTORS.md Add Zainab Ali as a contributor Sep 14, 2018
LICENSE Updated LICENSE to add a reference to Cats due to adaptation of Gen[F… Apr 18, 2018
README.md Add fs2-aws and fs2-jms under project reference Nov 13, 2018
build.sbt Fixed SNAPSHOT publishing Nov 12, 2018
sbt Updates to latest sbt launch script Dec 5, 2017
version.sbt Setting version to 1.0.1-SNAPSHOT Oct 5, 2018

README.md

FS2: Functional Streams for Scala (previously 'Scalaz-Stream')

Build Status Gitter Chat Latest version

Quick links:

About the library

FS2 is a streaming I/O library. The design goals are compositionality, expressiveness, resource safety, and speed. Here's a simple example of its use:

import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import fs2.{io, text}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

object Converter extends IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    def fahrenheitToCelsius(f: Double): Double =
      (f - 32.0) * (5.0/9.0)

    val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))

    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext))
      .compile.drain
      .as(ExitCode.Success)
  }
}

This will construct a program that reads lines incrementally from testdata/fahrenheit.txt, skipping blank lines and commented lines. It then parses temperatures in degrees Fahrenheit, converts these to Celsius, UTF-8 encodes the output, and writes incrementally to testdata/celsius.txt, using constant memory. The input and output files will be closed upon normal termination or if exceptions occur.

Note that this example is specialised to IO for simplicity, but Stream is fully polymorphic in the effect type (the F[_] in Stream[F, A]), as long as F[_] is compatible with the cats-effect typeclasses.

The library supports a number of other interesting use cases:

  • Zipping and merging of streams: A streaming computation may read from multiple sources in a streaming fashion, zipping or merging their elements using an arbitrary function. In general, clients have a great deal of flexibility in what sort of topologies they can define, due to Stream being a first class entity with a very rich algebra of combinators.
  • Dynamic resource allocation: A streaming computation may allocate resources dynamically (for instance, reading a list of files to process from a stream built off a network socket), and the library will ensure these resources get released upon normal termination or if exceptions occur.
  • Nondeterministic and concurrent processing: A computation may read from multiple input streams simultaneously, using whichever result comes back first, and a pipeline of transformations can allow for nondeterminism and queueing at each stage. Due to several concurrency combinators and data structures, streams can be used as light-weight, declarative threads to build complex concurrent behaviour compositionally.

These features mean that FS2 goes beyond streaming I/O to offer a very general and declarative model for arbitrary control flow.

Documentation and getting help

  • The official guide is a good starting point for learning more about the library.
  • The FAQ has frequently asked questions. Feel free to open issues or PRs with additions to the FAQ!
  • Also feel free to come discuss and ask/answer questions in the gitter channel and/or on StackOverflow using the tag FS2. Gitter will generally get you a quicker answer.

Blog posts, talks, and other external resources are listed on the Additional Resources page.

Where to get the latest version

The latest version is 1.0.x. See the badge at the top of the README for the exact version number.

The 1.0 migration guide summarizes the differences between 1.0 and 0.10. To get 1.0.x, add the following to your SBT build:

// available for Scala 2.11, 2.12
libraryDependencies += "co.fs2" %% "fs2-core" % "1.0.0" // For cats 1.4.0 and cats-effect 1.0

// optional I/O library
libraryDependencies += "co.fs2" %% "fs2-io" % "1.0.0"

// optional reactive streams interop
libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "1.0.0"

// optional experimental library
libraryDependencies += "co.fs2" %% "fs2-experimental" % "1.0.0"

The previous stable release is 0.10.6. You may want to first read the 0.10 migration guide if you are upgrading from 0.9 or earlier. To get 0.10, add the following to your SBT build:

// available for Scala 2.11, 2.12
libraryDependencies += "co.fs2" %% "fs2-core" % "0.10.6"

// optional I/O library
libraryDependencies += "co.fs2" %% "fs2-io" % "0.10.6"

The fs2-core library is also supported on Scala.js:

libraryDependencies += "co.fs2" %%% "fs2-core" % "0.10.6"

API docs:

Projects using FS2

If you have a project you'd like to include in this list, either open a PR or let us know in the gitter channel and we'll add a link to it here.

  • circe-fs2: Streaming JSON manipulation with circe.
  • doobie: Pure functional JDBC built on fs2.
  • fs2-aws: FS2 streams to interact with AWS utilities
  • fs2-blobstore: Minimal, idiomatic, stream-based Scala interface for key/value store implementations.
  • fs2-cassandra: Cassandra bindings for fs2.
  • fs2-cron: FS2 streams based on cron expressions.
  • fs2-crypto: TLS support for fs2.
  • fs2-elastic: Simple client for Elasticsearch.
  • fs2-grpc: gRPC implementation for FS2 / Cats Effect.
  • fs2-http: Http server and client library implemented in fs2.
  • fs2-jms: JMS connectors for FS2 streams
  • fs2-kafka: Simple client for Apache Kafka.
  • fs2-mail: Fully asynchronous java non-blocking email client using fs2.
  • fs2-rabbit: RabbitMQ stream-based client built on top of Fs2.
  • fs2-reactive-streams: A reactive streams implementation for fs2.
  • fs2-redis: Redis stream-based client built on top of Fs2 / Cats Effect.
  • fs2-zk: Simple Apache Zookeeper bindings for fs2.
  • http4s: Minimal, idiomatic Scala interface for HTTP services using fs2.
  • scodec-protocols: A library for working with libpcap files. Contains many interesting pipes (e.g., working with time series and playing back streams at various rates).
  • scodec-stream: A library for streaming binary decoding and encoding, built using fs2 and scodec.
  • streamz: A library that supports the conversion of Akka Stream Sources, Flows and Sinks to and from FS2 Streams, Pipes and Sinks, respectively. It also supports the usage of Apache Camel endpoints in FS2 Streams and Akka Stream Sources, Flows and SubFlows.
  • upperbound: A purely functional, interval-based rate limiter with support for backpressure.

Related projects

FS2 has evolved from earlier work on streaming APIs in Scala and Haskell. Some influences:

Presentations, Blogs, etc.

See Additional resources.

Acknowledgments

YourKit

Special thanks to YourKit for supporting this project's ongoing performance tuning efforts with licenses to their excellent product.

Code of Conduct

See the Code of Conduct.