Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ask timeout from Timeout to Duration in typed javadsl. #25975

Merged
merged 2 commits into from Nov 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -14,6 +14,7 @@
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class ActorContextAskTest extends JUnitSuite {
Expand Down Expand Up @@ -42,7 +43,7 @@ public void provideASafeAsk() {
final Behavior<Object> snitch = Behaviors.setup((ActorContext<Object> context) -> {
context.ask(Pong.class,
pingPong,
new Timeout(3, TimeUnit.SECONDS),
Duration.ofSeconds(3),
(ActorRef<Pong> ref) -> new Ping(ref),
(pong, exception) -> {
if (pong != null) return pong;
Expand Down
Expand Up @@ -11,7 +11,6 @@
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.junit.ClassRule;
import org.scalatest.junit.JUnitSuite;
import akka.util.Timeout;
import org.junit.Test;

import akka.actor.typed.*;
Expand Down Expand Up @@ -42,7 +41,7 @@ static final class Stop {
static final class CustomTerminationMessage implements Message {
}

final Timeout timeout = Timeout.create(Duration.ofSeconds(5));
final Duration timeout = Duration.ofSeconds(5);

final Behavior<Stop> exitingActor = receive((context, message) -> {
System.out.println("Stopping!");
Expand Down
Expand Up @@ -8,17 +8,16 @@
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import akka.util.Timeout;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

public class ReceptionistApiTest {

public void compileOnlyApiTest() {
// some dummy prerequisites
final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS);
final Duration timeout = Duration.ofSeconds(3);
final ActorRef<String> service = null;
final ServiceKey<String> key = ServiceKey.create(String.class, "id");
final ActorSystem<Void> system = null;
Expand Down
Expand Up @@ -10,7 +10,6 @@
import akka.actor.typed.Props;
import akka.actor.typed.javadsl.*;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.util.Timeout;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
Expand Down Expand Up @@ -382,7 +381,7 @@ public static Behavior<DaveProtocol> daveBehavior(final ActorRef<HalCommand> hal

// asking someone requires a timeout, if the timeout hits without response
// the ask is failed with a TimeoutException
final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS);
final Duration timeout = Duration.ofSeconds(3);

context.ask(
HalResponse.class,
Expand Down Expand Up @@ -448,7 +447,7 @@ public void askAndPrint(ActorSystem<Object> system, ActorRef<CookieCommand> cook
GiveMeCookies::new,
// asking someone requires a timeout and a scheduler, if the timeout hits without response
// the ask is failed with a TimeoutException
Timeout.apply(3, TimeUnit.SECONDS),
Duration.ofSeconds(3),
system.scheduler());

result.whenComplete((cookies, failure) -> {
Expand Down
Expand Up @@ -46,7 +46,7 @@ public static void main(String[] args) throws Exception {
//#system-spawn
final ActorSystem<SpawnProtocol> system =
ActorSystem.create(HelloWorldMain.main, "hello");
final Timeout timeout = Timeout.create(Duration.ofSeconds(3));
final Duration timeout = Duration.ofSeconds(3);

CompletionStage<ActorRef<HelloWorld.Greet>> greeter = AskPattern.ask(
system,
Expand Down
Expand Up @@ -5,6 +5,7 @@
package akka.actor.typed
package internal

import java.time.Duration
import java.util.function.{ Function ⇒ JFunction }
import java.util.ArrayList
import java.util.Optional
Expand All @@ -16,7 +17,6 @@ import scala.reflect.ClassTag
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.annotation.InternalApi
import akka.util.OptionVal
import akka.util.Timeout
Expand Down Expand Up @@ -90,11 +90,11 @@ import akka.util.JavaDurationConverters._
}

// Java API impl
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
this.ask(target)(createRequest.apply) {
case Success(message) ⇒ applyToResponse.apply(message, null)
case Failure(ex) ⇒ applyToResponse.apply(null.asInstanceOf[Res], ex)
}(responseTimeout, ClassTag[Res](resClass))
}(responseTimeout.asScala, ClassTag[Res](resClass))
}

private[akka] override def spawnMessageAdapter[U](f: U ⇒ T, name: String): ActorRef[U] =
Expand Down
Expand Up @@ -186,7 +186,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
* *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/
def setReceiveTimeout(d: Duration, msg: T): Unit
def setReceiveTimeout(timeout: Duration, msg: T): Unit

/**
* Cancel the sending of receive timeout notifications.
Expand Down Expand Up @@ -275,7 +275,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
def ask[Req, Res](
resClass: Class[Res],
target: RecipientRef[Req],
responseTimeout: Timeout,
responseTimeout: Duration,
createRequest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit

Expand Down
Expand Up @@ -5,12 +5,13 @@
package akka.actor.typed
package javadsl

import java.time.Duration
import java.util.concurrent.CompletionStage

import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.japi.function.{ Function ⇒ JFunction }
import akka.util.Timeout
import akka.util.JavaDurationConverters._

import scala.compat.java8.FutureConverters._

Expand All @@ -29,6 +30,6 @@ import scala.compat.java8.FutureConverters._
*
*/
object AskPattern {
def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] =
(actor.?(message.apply)(timeout, scheduler)).toJava
def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
(actor.?(message.apply)(timeout.asScala, scheduler)).toJava
}
Expand Up @@ -167,7 +167,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.acto
* *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
def setReceiveTimeout(timeout: FiniteDuration, msg: T): Unit

/**
* Cancel the sending of receive timeout notifications.
Expand Down
Expand Up @@ -13,11 +13,10 @@
import akka.actor.testkit.typed.javadsl.TestInbox;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.SideEffect;
import akka.util.Timeout;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static akka.actor.typed.javadsl.AskPattern.ask;

Expand Down Expand Up @@ -266,7 +265,7 @@ static class Response {
}

static ActorRef<Request> sideEffectProcessor = TestInbox.<Request>create().getRef();
static Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
static Duration timeout = Duration.ofSeconds(1);

private static void performSideEffect(ActorRef<AcknowledgeSideEffect> sender, int correlationId, String data, Scheduler scheduler) {
CompletionStage<Response> what = ask(sideEffectProcessor, (ActorRef<Response> ar) -> new Request(correlationId, data, ar), timeout, scheduler);
Expand Down