Skip to content

Commit

Permalink
+str #24229 move SinkRef / SourceRef to akka.stream
Browse files Browse the repository at this point in the history
+str #24229 remove protobuf changes, which do not need to be made in this PR

docs

moved things

config object

subscription timeout confifmed working, also, attributes

document attributes for sub timeout

tests for the source also failing when it should

additional demand test

implemented protection from materializing "in cycles"; would be nice in
types but that breaks the niceness of use of the types
SinkRef/SourceRef...

cleanup

no idle timeout built in, can use the Timeout stages

more docs

simplest change to prevent exposing SinkRef => SourceRef => SinkRef cycle

Things to decide:
 * is it ok to require using `getSource` / `getSink` as Java API, is there better naming?
 * where should the constructors go? I'd say just in regular javadsl/scaladsl `Source`/ `Sink` objects

move constructors to {javadsl,scaladsl}.{Source,Sink} companion objects

Remove now useless "canMaterialize" field

Separate stage (implementation) from ref (wrapped actor ref) to make it clearer what is serialized

Clarify that partner refs are not optional in on-the-wire interfaces

minor cleanup in SourceRefStage

Renamed the stages but questionable if that really helps ;)

cleanups, better docs

cleanup, fix docs compilation

fix mima

got rid of Futures in the materialized values of stream refs
  • Loading branch information
ktoso committed Jan 22, 2018
1 parent bc6861f commit 7c75abb
Show file tree
Hide file tree
Showing 42 changed files with 2,696 additions and 4,248 deletions.
8 changes: 2 additions & 6 deletions akka-actor/src/main/scala/akka/actor/dungeon/Children.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import scala.util.control.NonFatal
import scala.collection.immutable
import akka.actor._
import akka.serialization.SerializationExtension
import akka.util.{ Helpers, Unsafe }
import akka.util.{ Unsafe, Helpers }
import akka.serialization.SerializerWithStringManifest
import java.util.Optional

import akka.event.Logging

private[akka] object Children {
val GetNobody = () Nobody
}
Expand Down Expand Up @@ -194,8 +192,7 @@ private[akka] trait Children { this: ActorCell ⇒

protected def getAllChildStats: immutable.Iterable[ChildRestartStats] = childrenRefs.stats

override def getSingleChild(name: String): InternalActorRef = {

override def getSingleChild(name: String): InternalActorRef =
if (name.indexOf('#') == -1) {
// optimization for the non-uid case
getChildByName(name) match {
Expand All @@ -210,7 +207,6 @@ private[akka] trait Children { this: ActorCell ⇒
case _ getFunctionRefOrNobody(childName, uid)
}
}
}

protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = {
@tailrec def removeChild(ref: ActorRef): ChildrenContainer = {
Expand Down
2 changes: 2 additions & 0 deletions akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ trait PipeToSupport {
*
* {{{
* import akka.pattern.pipe
* // requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor
*
* Future { doExpensiveCalc() } pipeTo nextActor
*
Expand All @@ -91,6 +92,7 @@ trait PipeToSupport {
*
* {{{
* import akka.pattern.pipe
* // requires implicit ExecutionContext, e.g. by importing `context.dispatcher` inside an Actor
*
* Future { doExpensiveCalc() } pipeTo nextActor
*
Expand Down
Loading

0 comments on commit 7c75abb

Please sign in to comment.