Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import static org.apache.pekko.pattern.PatternsCS.pipe;
import static org.apache.pekko.pattern.Patterns.pipe;

// #single-request-in-actor-example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest;
import org.apache.pekko.http.javadsl.server.*;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.ActorMaterializerSettings;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;
Expand Down Expand Up @@ -292,8 +291,7 @@ public void testExtractLog() {
@Test
public void testWithMaterializer() {
// #withMaterializer
final ActorMaterializerSettings settings = ActorMaterializerSettings.create(system());
final ActorMaterializer special = ActorMaterializer.create(settings, system(), "special");
final Materializer special = Materializer.createMaterializer(system());

final Route sample =
path(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import static org.apache.pekko.http.javadsl.server.PathMatchers.*;

Expand Down Expand Up @@ -165,8 +164,8 @@ public void testOnCompleteWithBreaker() throws InterruptedException {
// import static org.apache.pekko.http.javadsl.server.PathMatchers.*;

final int maxFailures = 1;
final FiniteDuration callTimeout = FiniteDuration.create(5, TimeUnit.SECONDS);
final FiniteDuration resetTimeout = FiniteDuration.create(1, TimeUnit.SECONDS);
final Duration callTimeout = Duration.ofSeconds(5);
final Duration resetTimeout = Duration.ofSeconds(1);
final CircuitBreaker breaker =
CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ import pekko.http.scaladsl.model.headers.`Timeout-Access`
import pekko.http.scaladsl.model._
import pekko.http.impl.util.LogByteStringTools._

import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration }
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ Deadline, Duration, DurationLong, FiniteDuration }
import scala.jdk.DurationConverters._
import scala.util.Failure
import scala.util.control.{ NoStackTrace, NonFatal }
Expand Down Expand Up @@ -721,7 +721,12 @@ private[http] object HttpServerBluePrint {
})

private var activeTimers = 0
private def timeout = materializer.settings.subscriptionTimeoutSettings.timeout
private val timeout: FiniteDuration = {
inheritedAttributes.get[ActorAttributes.StreamSubscriptionTimeout] match {
case Some(attr) => attr.timeout
case None => 5.minutes // should not happen
}
}
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object TestServer extends App {
val settings = ActorMaterializerSettings(system)
// .withSyncProcessingLimit(Int.MaxValue)
.withInputBuffer(128, 128)
implicit val fm: ActorMaterializer = ActorMaterializer(settings)
implicit val fm: Materializer = ActorMaterializer(settings)
try {
val binding = Http().newServerAt("localhost", 9001).bindSync {
case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ object ConnectionTestApp {

val clientFlow = Http().superPool[Int]()

val sourceActor = {
val sourceQueue = {
// Our superPool expects (HttpRequest, Int) as input
val source =
Source.actorRef[(HttpRequest, Int)](10000, OverflowStrategy.dropNew).buffer(20000, OverflowStrategy.fail)
Source.queue[(HttpRequest, Int)](10000)
.buffer(20000, OverflowStrategy.fail)
Comment on lines -49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a change in behavior (dropNew -> fail)? Seems fine for a test app though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would https://pekko.apache.org/japi/pekko/1.2/org/apache/pekko/stream/OverflowStrategy.html#backpressure() make sense? OverflowStrategy.dropNew is being removed (already deprecated in 1.x).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would https://pekko.apache.org/japi/pekko/1.2/org/apache/pekko/stream/OverflowStrategy.html#backpressure() make sense? OverflowStrategy.dropNew is being removed (already deprecated in 1.x).

TBH I'm not sure what this means in practice here - would that mean 'offer' would give a return value indicating the backpressure? I think fail is probably a fine choice.

val sink = Sink.foreach[(Try[HttpResponse], Int)] {
case (resp, id) => handleResponse(resp, id)
}
Expand All @@ -55,7 +56,7 @@ object ConnectionTestApp {
}

def sendPoolFlow(uri: Uri, id: Int): Unit = {
sourceActor ! ((buildRequest(uri), id))
sourceQueue.offer((buildRequest(uri), id))
}

def sendPoolFuture(uri: Uri, id: Int): Unit = {
Expand Down
Loading