Skip to content
Go to file

Latest commit


Git stats


Failed to load latest commit information.
Latest commit message
Commit time

Build status Dependencies License GRPCAkkaStreamGenerator GRPCAkkaStreamRuntime

GRPC Akka stream

Use Akka-stream's building blocks (Flow) to implement GRPC services instead of Java's StreamObservers.

All service method can be implemented as a Flow[In, Out, NotUsed].

Note that this library only exposes an AkkaStream interface but still relies on the default Netty implementation provided by grpc-java.


You need to enable sbt-protoc plugin to generate source code for the proto definitions. You can do it by adding a protoc.sbt file into your project folder with the following lines:

import scalapb.compiler.Version.{grpcJavaVersion, scalapbVersion}
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.17")

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

libraryDependencies ++= Seq(
  "com.thesamet.scalapb"   %% "compilerplugin"          % scalapbVersion,
  "io.grpc"                %  "grpc-netty"              % grpcJavaVersion,
  "beyondthelines"         %% "grpcakkastreamgenerator" % "0.1.1"

Here we add a dependency to the GRPC Akkastream protobuf generator.

Then we need to trigger the generation from the build.sbt:

PB.targets in Compile := Seq(
  // compile your proto files into scala source files
  scalapb.gen() -> (sourceManaged in Compile).value,
  // generate the GRPC Akka stream source code
  grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

libraryDependencies += "beyondthelines" %% "grpcakkastreamruntime" % "0.1.1"


You're now ready to implement your GRPC service using Akka-streams Flow.

To implement your service's business logic you simply extend the GRPCAkkaStream generated trait.

E.g. for the RouteGuide service:

class RouteGuideAkkaStreamService(features: Seq[Feature]) extends RouteGuideGrpcAkkaStream.RouteGuide {
  // Unary call
  override def getFeature: Flow[Point, Feature, NotUsed] = ???
  // Server streaming
  override def listFeatures: Flow[Rectangle, Feature, NotUsed] = ???
  // Client streaming
  override def recordRoute: Flow[Point, RouteSummary, NotUsed] = ???
  // Bidi streaming
  override def routeChat: Observable[RouteNote, RouteNote, NotUsed] = ???

The server creation is similar except you need to provide an Akka-streams' Materializer instead of an ExecutionContext when binding the service

val system = ActorSystem("GRPC")
implicit val materializer = ActorMaterializer.create(system)
val server = ServerBuilder
      new RouteGuideAkkaStreamService(features) // the service implemented above

Akka-streams' Flows are also available on the client side:

val channel = ManagedChannelBuilder
  .forAddress("localhost", 8980)

val stub = RouteGuideGrpcAkkaStream.stub(channel) // only an async stub is provided

// Unary call
  .single(Point(408031728, -748645385))
// Server streaming
val rectangle = Rectangle(
  lo = Some(Point(408031728, -748645385)),
  hi = Some(Point(413700272, -742135189))
// Client streaming
  .throttle(1, 100.millis, 1, ThrottleMode.Shaping)
// Bidi streaming
  RouteNote(message = "First message", location = Some(Point(0, 0))),
  RouteNote(message = "Second message", location = Some(Point(0, 1))),
  RouteNote(message = "Third message", location = Some(Point(1, 0))),
  RouteNote(message = "Fourth message", location = Some(Point(1, 1)))
  .throttle(1, 1.second, 1, ThrottleMode.Shaping)


Use GRPC services with the Akka-stream API




No packages published

Contributors 4



You can’t perform that action at this time.