In [1]:
interp.load.module(ammonite.ops.Path("/Users/cshao/DeepLearning.scala/jupyter-script.sc"))

In [2]:
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
import scala.util.{Failure, Success, Try, DynamicVariable}
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.event.{Logging, LogSource}
import akka.pattern.ask
import akka.util.Timeout
import scalaz.syntax.all._
import com.thoughtworks.tryt._
import com.thoughtworks.tryt.covariant._
import com.thoughtworks.continuation._
import com.thoughtworks.future._
import com.thoughtworks.raii.asynchronous._
import com.thoughtworks.raii.covariant._
import scala.concurrent.duration.{SECONDS, FiniteDuration}

case class Dispatch(remoteContinuationBuffer: Array[Byte])

case class Receipt[A](receipt: Try[A])

class RemoteActor extends Actor {

  import Remote.{RemoteContinuation, actorSystemStore}

  override def receive: Receive = {
    case Dispatch(remoteContinuationBuffer) =>
      val remoteContinuation: RemoteContinuation[Unit] = actorSystemStore.withValue(context.system) {
        new ObjectInputStream(new ByteArrayInputStream(remoteContinuationBuffer))
          .readObject()
          .asInstanceOf[RemoteContinuation[Unit]]
      }
      val value: Try[Unit] = Success(())
      val actorSystem = context.system.asInstanceOf[Remote].actorSystem
      val release: UnitContinuation[Unit] = UnitContinuation.delay {
        actorSystem.stop(self)
      }
      val remoteContinuationParameter: Resource[UnitContinuation, Try[Unit]] = Resource(value, release)
      remoteContinuation(remoteContinuationParameter)
      sender ! Receipt(value)
  }
}

class Remote(val actorSystem: ActorSystem)(implicit val timeout: Timeout) extends Serializable {

  import Remote._
  import actorSystem.dispatcher

  val log = {
    implicit val logSource: LogSource[Remote] = new LogSource[Remote] {
      override def genString(t: Remote): String = toString
    }
    Logging(actorSystem, this)
  }

  log.info(s"remote context $this constructed")

  def jump: Do[Unit] = Do.async { (remoteContinuation: RemoteContinuation[Unit]) =>
    {
      log.info(s"jump of $this is invoked")

      val newActor = actorSystem.actorOf(Props(new RemoteActor))
      val remoteContinuationBuffer = {
        val byteArrayOutputStream = new ByteArrayOutputStream()
        new ObjectOutputStream(byteArrayOutputStream).writeObject(remoteContinuation)
        byteArrayOutputStream.toByteArray
      }

      (newActor ? Dispatch(remoteContinuationBuffer)).onComplete {
        case Success(Receipt(receipt)) =>
          receipt match {
            case Success(returnedActor) => log.info(s"remote execution returned from $returnedActor")
            case Failure(err)           => log.error(s"remote execution failed with $err")
          }
        case Failure(err) => log.error(s"akka messaging failed with $err")
      }

    }
  }

  def writeReplace: Any = {
    log.info(s"writeReplace of $this is invoked")
    RemoteProxy
  }
}

case object Remote {
  type RemoteContinuation[A] = (Resource[UnitContinuation, Try[A]]) => Unit

  @transient
  val actorSystemStore: DynamicVariable[ActorSystem] = new DynamicVariable[ActorSystem](null)

  object RemoteProxy extends Serializable {
    def readResolve: Any = {
      val actorSystem = actorSystemStore.value
      implicit val timeout: Timeout = FiniteDuration(10, SECONDS)
      val remote = new Remote(actorSystem)
      val log = {
        implicit val logSource: LogSource[Remote] = new LogSource[Remote] {
          override def genString(t: Remote): String = toString
        }
        Logging(remote.actorSystem, remote)
      }
      log.info(s"readResolve of $remote is invoked")
      remote
    }
  }

  def apply(makeActorSystem: => ActorSystem)(implicit timeout: Timeout): Do[Remote] = {
    Do.resource {
      val actorSystem = makeActorSystem
      Resource(
        new Remote(actorSystem),
        UnitContinuation.delay {
          import actorSystem.dispatcher
          val Future(TryT(tryFinalizer)) = actorSystem.terminate.toThoughtworksFuture
          val log = {
            implicit val logSource: LogSource[Remote.this.type] = new LogSource[Remote.this.type] {
              override def genString(t: Remote.this.type): String = toString
            }
            Logging(actorSystem, this)
          }
          tryFinalizer.map {
            case Success(_)   => log.info(s"actorSystem $actorSystem terminated")
            case Failure(err) => log.error(s"termination of actorSystem $actorSystem failed with $err")
          }
        }.join
      )
    }
  }
}


[32mimport [39m[36mjava.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
[39m
[32mimport [39m[36mscala.util.{Failure, Success, Try, DynamicVariable}
[39m
[32mimport [39m[36makka.actor.{Actor, ActorRef, ActorSystem, Props}
[39m
[32mimport [39m[36makka.event.{Logging, LogSource}
[39m
[32mimport [39m[36makka.pattern.ask
[39m
[32mimport [39m[36makka.util.Timeout
[39m
[32mimport [39m[36mscalaz.syntax.all._
[39m
[32mimport [39m[36mcom.thoughtworks.tryt._
[39m
[32mimport [39m[36mcom.thoughtworks.tryt.covariant._
[39m
[32mimport [39m[36mcom.thoughtworks.continuation._
[39m
[32mimport [39m[36mcom.thoughtworks.future._
[39m
[32mimport [39m[36mcom.thoughtworks.raii.asynchronous._
[39m
[32mimport [39m[36mcom.thoughtworks.raii.covariant._
[39m
[32mimport [39m[36mscala.concurrent.duration.{SECONDS, FiniteDuration}

[39m
defined [32mclass[39m [36mDispatch[39m
defined [32mclass[39m [36mReceipt[39m
de

In [3]:
import scala.concurrent.duration.{SECONDS, FiniteDuration}
import com.typesafe.config.{Config, ConfigFactory}
import com.thoughtworks.each.Monadic._

val config: Config = ConfigFactory.parseString("""
      akka {
        loglevel = "DEBUG"
        actor {
          provider = remote
          debug {
            receive = on
          }
        }
        remote {
          enabled-transports = ["akka.remote.netty.tcp"]
          netty.tcp {
            hostname = "127.0.0.1"
            port = 2552
          }
       }
      }
    """)

def remoteSpec: Double = {
    sys.props("sun.io.serialization.extendedDebugInfo") = "true"
    
    implicit val timeout: Timeout = FiniteDuration(10, SECONDS)
    
    val m: Do[Double] = monadic[Do] {
        val remote = Remote(ActorSystem("RemoteSpecActorSystem", config)).each
        remote.jump.each
        val x = Do.now(2.0).each
        remote.jump.each
        val y = Do.now(3.0).each
        remote.jump.each
        x * y
    }
    
    m.run.blockingAwait
}

[32mimport [39m[36mscala.concurrent.duration.{SECONDS, FiniteDuration}
[39m
[32mimport [39m[36mcom.typesafe.config.{Config, ConfigFactory}
[39m
[32mimport [39m[36mcom.thoughtworks.each.Monadic._

[39m
[36mconfig[39m: [32mcom[39m.[32mtypesafe[39m.[32mconfig[39m.[32mConfig[39m = Config(SimpleConfigObject({"akka":{"actor":{"debug":{"receive":"on"},"provider":"remote"},"loglevel":"DEBUG","remote":{"enabled-transports":["akka.remote.netty.tcp"],"netty":{"tcp":{"hostname":"127.0.0.1","port":2552}}}}}))
defined [32mfunction[39m [36mremoteSpec[39m

In [4]:
remoteSpec

[DEBUG] [08/29/2017 17:57:32.404] [pool-6-thread-1] [EventStream(akka://RemoteSpecActorSystem)] logger log1-Logging$DefaultLogger started
[DEBUG] [08/29/2017 17:57:32.405] [pool-6-thread-1] [EventStream(akka://RemoteSpecActorSystem)] Default Loggers started
[INFO] [08/29/2017 17:57:32.503] [pool-6-thread-1] [akka.remote.Remoting] Starting remoting
[INFO] [08/29/2017 17:57:32.630] [pool-6-thread-1] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteSpecActorSystem@127.0.0.1:2552]
[INFO] [08/29/2017 17:57:32.634] [pool-6-thread-1] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteSpecActorSystem@127.0.0.1:2552]
[INFO] [08/29/2017 17:57:32.658] [pool-6-thread-1] [$sess.cmd1Wrapper$Helper$Remote$$anon$1@54204d3e] remote context $sess.cmd1Wrapper$Helper$Remote@2ea069bf constructed
[INFO] [08/29/2017 17:57:32.667] [pool-6-thread-1] [$sess.cmd1Wrapper$Helper$Remote$$anon$1@54204d3e] jump of $sess.cmd1Wrapper$Helper$Remote@2ea069bf is i

: 