Skip to content

Commit

Permalink
Merge pull request #3151 from jrudolph/redo-ClientServerSpec-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph committed May 13, 2020
2 parents 78e321a + ecc7d43 commit 8d08ff3
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging {
def track(remaining: Iterator[Future[Done]]): Unit =
if (remaining.hasNext) remaining.next().onComplete(_ => track(remaining))
else shutdownCompletedPromise.trySuccess(Done)
track(poolStatus.keys.map(_.shutdown()).toIterator)
track(poolStatus.keys.map(_.shutdown()).iterator)

case HasBeenShutdown(pool, reason) =>
poolInterfaces.get(pool).foreach { gateway =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,71 @@

package akka.http.impl.engine.client

import javax.net.ssl.SSLContext
import akka.http.impl.util.{ AkkaSpecWithMaterializer, ExampleHttpContexts }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, headers }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.http.impl.util.AkkaSpecWithMaterializer
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.SystemMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }
import akka.http.scaladsl.model.headers

class ClientCancellationSpec extends AkkaSpecWithMaterializer {
// TODO document why this explicit materializer is needed here?
val noncheckedMaterializer = SystemMaterializer(system).materializer

"Http client connections" must {
val address = Await.result(
"support cancellation in simple outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
})

"support cancellation in pooled outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get)
)
})

"support cancellation in simple outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
pending
testCase(
Http().outgoingConnectionHttps("akka.example.org", 443, settings = settingsWithProxyTransport, connectionContext = ExampleHttpContexts.exampleClientContext)
)
})

"support cancellation in pooled outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps("akka.example.org", 443,
settings = ConnectionPoolSettings(system).withConnectionSettings(settingsWithProxyTransport),
connectionContext = ExampleHttpContexts.exampleClientContext))
.map(_._1.get))
})

}

class TestSetup {
lazy val binding = Await.result(
Http().bindAndHandleSync(
{ req => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0)(noncheckedMaterializer),
{ _ => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0),
5.seconds
).localAddress
)
lazy val address = binding.localAddress

val addressTls = Await.result(
lazy val bindingTls = Await.result(
Http().bindAndHandleSync(
{ req => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
{ _ => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
"localhost",
0,
connectionContext = ConnectionContext.https(SSLContext.getDefault))(noncheckedMaterializer),
connectionContext = ExampleHttpContexts.exampleServerContext),
5.seconds
).localAddress
)
lazy val addressTls = bindingTls.localAddress

def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped {
def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = {
val requests = TestPublisher.probe[HttpRequest]()
val responses = TestSubscriber.probe[HttpResponse]()
Source.fromPublisher(requests).via(connection).runWith(Sink.fromSubscriber(responses))
Expand All @@ -47,36 +77,15 @@ class ClientCancellationSpec extends AkkaSpecWithMaterializer {
responses.expectNext().entity.dataBytes.runWith(Sink.cancelled)
responses.cancel()
requests.expectCancellation()
}

"support cancellation in simple outgoing connection" in {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
}

"support cancellation in pooled outgoing connection" in {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get))
}

"support cancellation in simple outgoing connection with TLS" in {
pending
testCase(
Http().outgoingConnectionHttps(addressTls.getHostName, addressTls.getPort))
}
binding.terminate(1.second)
bindingTls.terminate(1.second)

"support cancellation in pooled outgoing connection with TLS" in {
pending
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps(addressTls.getHostName, addressTls.getPort))
.map(_._1.get))
Http().shutdownAllConnectionPools()
}

def settingsWithProxyTransport: ClientConnectionSettings =
ClientConnectionSettings(system)
.withTransport(ExampleHttpContexts.proxyTransport(addressTls))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import akka.event.LoggingAdapter
import akka.http.ParsingErrorHandler
import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.http.impl.util._
import akka.http.javadsl.model
import akka.http.scaladsl.Http.ServerLayer
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.http.impl.util

import akka.stream.ActorMaterializer
import akka.stream.{ ActorMaterializer, SystemMaterializer }
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter

Expand All @@ -16,15 +16,15 @@ abstract class AkkaSpecWithMaterializer(s: String)

def this() = this("")

implicit val materializer = ActorMaterializer()
implicit val materializer = SystemMaterializer(system).materializer

override protected def beforeTermination(): Unit =
// don't log anything during shutdown, especially not AbruptTerminationExceptions
EventFilter.custom { case x => true }.intercept {
// shutdown materializer first, otherwise it will only be shutdown during
// main system guardian being shutdown which will be after the logging has
// reverted to stdout logging that cannot be intercepted
materializer.shutdown()
materializer.asInstanceOf[ActorMaterializer].shutdown()
// materializer shutdown is async but cannot be watched
Thread.sleep(10)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@
package akka.http.impl.util

import java.io.InputStream
import java.security.{ SecureRandom, KeyStore }
import java.security.cert.{ CertificateFactory, Certificate }
import javax.net.ssl.{ SSLParameters, SSLContext, TrustManagerFactory, KeyManagerFactory }
import java.net.InetSocketAddress
import java.security.{ KeyStore, SecureRandom }
import java.security.cert.{ Certificate, CertificateFactory }

import akka.http.scaladsl.HttpsConnectionContext
import akka.actor.ActorSystem
import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLParameters, TrustManagerFactory }
import akka.http.scaladsl.{ ClientTransport, Http, HttpsConnectionContext }
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.scaladsl.Flow
import akka.util.ByteString

import scala.concurrent.Future

/**
* These are HTTPS example configurations that take key material from the resources/key folder.
Expand Down Expand Up @@ -62,4 +69,14 @@ object ExampleHttpContexts {

def loadX509Certificate(resourceName: String): Certificate =
CertificateFactory.getInstance("X.509").generateCertificate(resourceStream(resourceName))

/**
* A client transport that will rewrite the target address to a fixed address. This can be used
* to pretend to connect to akka.example.org which is required to connect to the example server certificate.
*/
def proxyTransport(realAddress: InetSocketAddress): ClientTransport =
new ClientTransport {
override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] =
ClientTransport.TCP.connectTo(realAddress.getHostString, realAddress.getPort, settings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ class ClientServerSpec extends AkkaSpecWithMaterializer(
try {
(1 to 10).foreach(runOnce)
} finally server.foreach(_.unbind())
Http().shutdownAllConnectionPools().futureValue
}

"complete a request/response when the request side immediately closes the connection after sending the request" in Utils.assertAllStagesStopped {
Expand Down Expand Up @@ -639,7 +640,8 @@ Host: example.com
.futureValue
.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).futureValue.utf8String shouldEqual entity

serverBinding.unbind()
serverBinding.unbind().futureValue
Http().shutdownAllConnectionPools().futureValue
}

class CloseDelimitedTLSSetup {
Expand Down Expand Up @@ -776,6 +778,7 @@ Host: example.com
}

Await.result(binding.unbind(), 10.seconds)
Http().shutdownAllConnectionPools().futureValue
}

"report idle timeout on request entity stream for stalled client" in Utils.assertAllStagesStopped {
Expand All @@ -798,7 +801,8 @@ Host: example.com
dataProbe.expectUtf8EncodedString("test")
dataProbe.expectError() should be(an[HttpIdleTimeoutException])

Await.result(binding.unbind(), 10.seconds)
binding.unbind().futureValue
Http().shutdownAllConnectionPools().futureValue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.http.scaladsl.settings.RoutingSettings
import akka.http.scaladsl.settings.ServerSettings
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.util.FastFuture._
import akka.stream.{ Materializer, SystemMaterializer }
import akka.stream.SystemMaterializer
import akka.testkit.TestKit
import akka.util.ConstantFun
import com.typesafe.config.{ Config, ConfigFactory }
Expand Down

0 comments on commit 8d08ff3

Please sign in to comment.