From b8572483ed0b677cef3116f331f52fe4558a95a6 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 01:11:40 +0100 Subject: [PATCH 1/8] old way to get subscriptionTimeoutSettings not accessible any more --- .../pekko/http/impl/engine/server/HttpServerBluePrint.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala index 9019443c6..8d4374513 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala @@ -721,7 +721,11 @@ private[http] object HttpServerBluePrint { }) private var activeTimers = 0 - private def timeout = materializer.settings.subscriptionTimeoutSettings.timeout + private val timeout = { + import scala.jdk.DurationConverters._ + materializer.system.settings.config.getDuration( + "pekko.stream.materializer.stream-ref.subscription-timeout").toScala + } private def addTimeout(s: SubscriptionTimeout): Unit = { if (activeTimers == 0) setKeepGoing(true) activeTimers += 1 From b3054293980d68317e3797950f219ddf79f83e88 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 01:14:20 +0100 Subject: [PATCH 2/8] ActorMaterializer class not accessible --- .../test/scala/org/apache/pekko/http/scaladsl/TestServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala index 357531e14..aa465d22a 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/scaladsl/TestServer.scala @@ -44,7 +44,7 @@ object TestServer extends App { val settings = ActorMaterializerSettings(system) // .withSyncProcessingLimit(Int.MaxValue) .withInputBuffer(128, 128) - implicit val fm: ActorMaterializer = ActorMaterializer(settings) + implicit val fm: Materializer = ActorMaterializer(settings) try { val binding = Http().newServerAt("localhost", 9001).bindSync { case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) => From 7f1e45ca44682f7f8a3ef33c06a5f32a58eb71a4 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 01:28:01 +0100 Subject: [PATCH 3/8] deprecated Source.actorRef function removed readability import temp compile workaround Update ConnectionTestApp.scala Update ConnectionTestApp.scala --- .../pekko/http/scaladsl/server/ConnectionTestApp.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala index 38f0dd444..01835c620 100644 --- a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala +++ b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/server/ConnectionTestApp.scala @@ -43,10 +43,11 @@ object ConnectionTestApp { val clientFlow = Http().superPool[Int]() - val sourceActor = { + val sourceQueue = { // Our superPool expects (HttpRequest, Int) as input val source = - Source.actorRef[(HttpRequest, Int)](10000, OverflowStrategy.dropNew).buffer(20000, OverflowStrategy.fail) + Source.queue[(HttpRequest, Int)](10000) + .buffer(20000, OverflowStrategy.fail) val sink = Sink.foreach[(Try[HttpResponse], Int)] { case (resp, id) => handleResponse(resp, id) } @@ -55,7 +56,7 @@ object ConnectionTestApp { } def sendPoolFlow(uri: Uri, id: Int): Unit = { - sourceActor ! ((buildRequest(uri), id)) + sourceQueue.offer((buildRequest(uri), id)) } def sendPoolFuture(uri: Uri, id: Int): Unit = { From 361a6fe71e7ebb0aea70445791545dc796b8e995 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 02:00:53 +0100 Subject: [PATCH 4/8] Update HttpClientExampleDocTest.java --- .../test/java/docs/http/javadsl/HttpClientExampleDocTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java index e6d0a8f77..739a8a8f1 100644 --- a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java +++ b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java @@ -49,7 +49,7 @@ import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.http.javadsl.model.HttpRequest; import org.apache.pekko.http.javadsl.model.HttpResponse; -import static org.apache.pekko.pattern.PatternsCS.pipe; +import static org.apache.pekko.pattern.Patterns.pipe; // #single-request-in-actor-example From 02db061544ef42439efc4cf5cf1323e9bdab5453 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 02:30:05 +0100 Subject: [PATCH 5/8] use Materializer instead of ActorMaterializer --- .../server/directives/BasicDirectivesExamplesTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java index 6fc3c2cf5..fb520207a 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/BasicDirectivesExamplesTest.java @@ -32,8 +32,7 @@ import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest; import org.apache.pekko.http.javadsl.server.*; import org.apache.pekko.japi.pf.PFBuilder; -import org.apache.pekko.stream.ActorMaterializer; -import org.apache.pekko.stream.ActorMaterializerSettings; +import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import org.apache.pekko.util.ByteString; @@ -292,8 +291,7 @@ public void testExtractLog() { @Test public void testWithMaterializer() { // #withMaterializer - final ActorMaterializerSettings settings = ActorMaterializerSettings.create(system()); - final ActorMaterializer special = ActorMaterializer.create(settings, system(), "special"); + final Materializer special = Materializer.createMaterializer(system()); final Route sample = path( From 828bdeba6406cd092b9f436377323126f0bf7112 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 03:18:25 +0100 Subject: [PATCH 6/8] Update FutureDirectivesExamplesTest.java --- .../server/directives/FutureDirectivesExamplesTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java b/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java index 65af9b1f0..bcd878781 100644 --- a/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java +++ b/docs/src/test/java/docs/http/javadsl/server/directives/FutureDirectivesExamplesTest.java @@ -27,7 +27,6 @@ import org.apache.pekko.testkit.javadsl.TestKit; import org.junit.Ignore; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; import static org.apache.pekko.http.javadsl.server.PathMatchers.*; @@ -165,8 +164,8 @@ public void testOnCompleteWithBreaker() throws InterruptedException { // import static org.apache.pekko.http.javadsl.server.PathMatchers.*; final int maxFailures = 1; - final FiniteDuration callTimeout = FiniteDuration.create(5, TimeUnit.SECONDS); - final FiniteDuration resetTimeout = FiniteDuration.create(1, TimeUnit.SECONDS); + final Duration callTimeout = Duration.ofSeconds(5); + final Duration resetTimeout = Duration.ofSeconds(1); final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout); From fd7f859cc40fa2fb066ff7ca8e801bbeff12f075 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 16 Sep 2025 11:38:38 +0100 Subject: [PATCH 7/8] try using attributes to get the timeout --- .../http/impl/engine/server/HttpServerBluePrint.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala index 8d4374513..84266b366 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala @@ -721,10 +721,11 @@ private[http] object HttpServerBluePrint { }) private var activeTimers = 0 - private val timeout = { - import scala.jdk.DurationConverters._ - materializer.system.settings.config.getDuration( - "pekko.stream.materializer.stream-ref.subscription-timeout").toScala + private val timeout: FiniteDuration = { + inheritedAttributes.get[ActorAttributes.StreamSubscriptionTimeout] match { + case Some(attr) => attr.timeout + case None => 5.minutes // should not happen + } } private def addTimeout(s: SubscriptionTimeout): Unit = { if (activeTimers == 0) setKeepGoing(true) From 8522840252b9408ee7d664937f2770f47ad3b419 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 18 Sep 2025 01:29:34 +0100 Subject: [PATCH 8/8] Update HttpServerBluePrint.scala Update HttpServerBluePrint.scala --- .../pekko/http/impl/engine/server/HttpServerBluePrint.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala index 84266b366..c9603f25a 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala @@ -45,9 +45,9 @@ import pekko.http.scaladsl.model.headers.`Timeout-Access` import pekko.http.scaladsl.model._ import pekko.http.impl.util.LogByteStringTools._ -import scala.concurrent.{ ExecutionContext, Future, Promise } -import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration } import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.concurrent.duration.{ Deadline, Duration, DurationLong, FiniteDuration } import scala.jdk.DurationConverters._ import scala.util.Failure import scala.util.control.{ NoStackTrace, NonFatal }