diff --git a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java index e6d0a8f77..739a8a8f1 100644 --- a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java +++ b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java @@ -49,7 +49,7 @@ import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; -import static org.apache.pekko.pattern.PatternsCS.pipe; +import static org.apache.pekko.pattern.Patterns.pipe; // #single-request-in-actor-example diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java index 6fc3c2cf5..fb520207a 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java @@ -32,8 +32,7 @@ import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest; import org.apache.pekko.http.javadsl.server.*; import org.apache.pekko.japi.pf.PFBuilder; -import org.apache.pekko.stream.ActorMaterializer; -import org.apache.pekko.stream.ActorMaterializerSettings; +import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; @@ -292,8 +291,7 @@ public void testExtractLog() { @Test public void testWithMaterializer() { // #withMaterializer - final ActorMaterializerSettings settings = ActorMaterializerSettings.create(system()); - final ActorMaterializer special = ActorMaterializer.create(settings, system(), "special"); + final Materializer special = Materializer.createMaterializer(system()); final Route sample = path( diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java index 65af9b1f0..bcd878781 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java @@ -27,7 +27,6 @@ import org.apache.pekko.testkit.javadsl.TestKit; import org.junit.Ignore; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; import static org.apache.pekko.http.javadsl.server.PathMatchers.*; @@ -165,8 +164,8 @@ public void testOnCompleteWithBreaker() throws InterruptedException { // import static org.apache.pekko.http.javadsl.server.PathMatchers.*; final int maxFailures = 1; - final FiniteDuration callTimeout = FiniteDuration.create(5, TimeUnit.SECONDS); - final FiniteDuration resetTimeout = FiniteDuration.create(1, TimeUnit.SECONDS); + final Duration callTimeout = Duration.ofSeconds(5); + final Duration resetTimeout = Duration.ofSeconds(1); final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout); diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala index 9019443c6..c9603f25a 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala @@ -45,9 +45,9 @@ import pekko.http.scaladsl.model.headers.`Timeout-Access` import pekko.http.scaladsl.model._ import pekko.http.impl.util.LogByteStringTools._ -import scala.concurrent.{ ExecutionContext, Future, Promise } -import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration } import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.concurrent.duration.{ Deadline, Duration, DurationLong, FiniteDuration } import scala.jdk.DurationConverters._ import scala.util.Failure import scala.util.control.{ NoStackTrace, NonFatal } @@ -721,7 +721,12 @@ private[http] object HttpServerBluePrint { }) private var activeTimers = 0 - private def timeout = materializer.settings.subscriptionTimeoutSettings.timeout + private val timeout: FiniteDuration = { + inheritedAttributes.get[ActorAttributes.StreamSubscriptionTimeout] match { + case Some(attr) => attr.timeout + case None => 5.minutes // should not happen + } + } private def addTimeout(s: SubscriptionTimeout): Unit = { if (activeTimers == 0) setKeepGoing(true) activeTimers += 1 diff --git a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala index 357531e14..aa465d22a 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala @@ -44,7 +44,7 @@ object TestServer extends App { val settings = ActorMaterializerSettings(system) // .withSyncProcessingLimit(Int.MaxValue) .withInputBuffer(128, 128) - implicit val fm: ActorMaterializer = ActorMaterializer(settings) + implicit val fm: Materializer = ActorMaterializer(settings) try { val binding = Http().newServerAt("localhost", 9001).bindSync { case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) => diff --git a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala index 38f0dd444..01835c620 100644 --- a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala +++ b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala @@ -43,10 +43,11 @@ object ConnectionTestApp { val clientFlow = Http().superPool[Int]() - val sourceActor = { + val sourceQueue = { // Our superPool expects (HttpRequest, Int) as input val source = - Source.actorRef[(HttpRequest, Int)](10000, OverflowStrategy.dropNew).buffer(20000, OverflowStrategy.fail) + Source.queue[(HttpRequest, Int)](10000) + .buffer(20000, OverflowStrategy.fail) val sink = Sink.foreach[(Try[HttpResponse], Int)] { case (resp, id) => handleResponse(resp, id) } @@ -55,7 +56,7 @@ object ConnectionTestApp { } def sendPoolFlow(uri: Uri, id: Int): Unit = { - sourceActor ! ((buildRequest(uri), id)) + sourceQueue.offer((buildRequest(uri), id)) } def sendPoolFuture(uri: Uri, id: Int): Unit = {