Skip to content

Commit

Permalink
Revert "SystemMaterializer in AkkaSpecWithMaterializer (#3075)" (#3145)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph committed May 11, 2020
1 parent ce525d9 commit b579ebb
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,41 @@

package akka.http.impl.engine.client

import akka.http.impl.util.{ AkkaSpecWithMaterializer, ExampleHttpContexts }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, headers }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }
import javax.net.ssl.SSLContext

import scala.concurrent.Await
import scala.concurrent.duration._

class ClientCancellationSpec extends AkkaSpecWithMaterializer {
"Http client connections" must {
"support cancellation in simple outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
})

"support cancellation in pooled outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get)
)
})

"support cancellation in simple outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
pending
testCase(
Http().outgoingConnectionHttps("akka.example.org", 443, settings = settingsWithProxyTransport, connectionContext = ExampleHttpContexts.exampleClientContext)
)
})

"support cancellation in pooled outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps("akka.example.org", 443,
settings = ConnectionPoolSettings(system).withConnectionSettings(settingsWithProxyTransport),
connectionContext = ExampleHttpContexts.exampleClientContext))
.map(_._1.get))
})
import akka.http.impl.util.AkkaSpecWithMaterializer
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.SystemMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }
import akka.http.scaladsl.model.headers

}
class ClientCancellationSpec extends AkkaSpecWithMaterializer {
// TODO document why this explicit materializer is needed here?
val noncheckedMaterializer = SystemMaterializer(system).materializer

class TestSetup {
lazy val binding = Await.result(
"Http client connections" must {
val address = Await.result(
Http().bindAndHandleSync(
{ _ => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0),
{ req => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0)(noncheckedMaterializer),
5.seconds
)
lazy val address = binding.localAddress
).localAddress

lazy val bindingTls = Await.result(
val addressTls = Await.result(
Http().bindAndHandleSync(
{ _ => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
{ req => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
"localhost",
0,
connectionContext = ExampleHttpContexts.exampleServerContext),
connectionContext = ConnectionContext.https(SSLContext.getDefault))(noncheckedMaterializer),
5.seconds
)
lazy val addressTls = bindingTls.localAddress
).localAddress

def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = {
def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped {
val requests = TestPublisher.probe[HttpRequest]()
val responses = TestSubscriber.probe[HttpResponse]()
Source.fromPublisher(requests).via(connection).runWith(Sink.fromSubscriber(responses))
Expand All @@ -77,15 +47,36 @@ class ClientCancellationSpec extends AkkaSpecWithMaterializer {
responses.expectNext().entity.dataBytes.runWith(Sink.cancelled)
responses.cancel()
requests.expectCancellation()
}

"support cancellation in simple outgoing connection" in {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
}

"support cancellation in pooled outgoing connection" in {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get))
}

binding.terminate(1.second)
bindingTls.terminate(1.second)
"support cancellation in simple outgoing connection with TLS" in {
pending
testCase(
Http().outgoingConnectionHttps(addressTls.getHostName, addressTls.getPort))
}

Http().shutdownAllConnectionPools()
"support cancellation in pooled outgoing connection with TLS" in {
pending
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps(addressTls.getHostName, addressTls.getPort))
.map(_._1.get))
}

def settingsWithProxyTransport: ClientConnectionSettings =
ClientConnectionSettings(system)
.withTransport(ExampleHttpContexts.proxyTransport(addressTls))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.event.LoggingAdapter
import akka.http.ParsingErrorHandler
import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.http.impl.util._
import akka.http.javadsl.model
import akka.http.scaladsl.Http.ServerLayer
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.http.impl.util

import akka.stream.{ ActorMaterializer, SystemMaterializer }
import akka.stream.ActorMaterializer
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter

Expand All @@ -16,15 +16,15 @@ abstract class AkkaSpecWithMaterializer(s: String)

def this() = this("")

implicit val materializer = SystemMaterializer(system).materializer
implicit val materializer = ActorMaterializer()

override protected def beforeTermination(): Unit =
// don't log anything during shutdown, especially not AbruptTerminationExceptions
EventFilter.custom { case x => true }.intercept {
// shutdown materializer first, otherwise it will only be shutdown during
// main system guardian being shutdown which will be after the logging has
// reverted to stdout logging that cannot be intercepted
materializer.asInstanceOf[ActorMaterializer].shutdown()
materializer.shutdown()
// materializer shutdown is async but cannot be watched
Thread.sleep(10)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@
package akka.http.impl.util

import java.io.InputStream
import java.net.InetSocketAddress
import java.security.{ KeyStore, SecureRandom }
import java.security.cert.{ Certificate, CertificateFactory }
import java.security.{ SecureRandom, KeyStore }
import java.security.cert.{ CertificateFactory, Certificate }
import javax.net.ssl.{ SSLParameters, SSLContext, TrustManagerFactory, KeyManagerFactory }

import akka.actor.ActorSystem
import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLParameters, TrustManagerFactory }
import akka.http.scaladsl.{ ClientTransport, Http, HttpsConnectionContext }
import akka.http.scaladsl.HttpsConnectionContext
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.scaladsl.Flow
import akka.util.ByteString

import scala.concurrent.Future

/**
* These are HTTPS example configurations that take key material from the resources/key folder.
Expand Down Expand Up @@ -69,14 +62,4 @@ object ExampleHttpContexts {

def loadX509Certificate(resourceName: String): Certificate =
CertificateFactory.getInstance("X.509").generateCertificate(resourceStream(resourceName))

/**
* A client transport that will rewrite the target address to a fixed address. This can be used
* to pretend to connect to akka.example.org which is required to connect to the example server certificate.
*/
def proxyTransport(realAddress: InetSocketAddress): ClientTransport =
new ClientTransport {
override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] =
ClientTransport.TCP.connectTo(realAddress.getHostString, realAddress.getPort, settings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.http.scaladsl.settings.RoutingSettings
import akka.http.scaladsl.settings.ServerSettings
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.util.FastFuture._
import akka.stream.SystemMaterializer
import akka.stream.{ Materializer, SystemMaterializer }
import akka.testkit.TestKit
import akka.util.ConstantFun
import com.typesafe.config.{ Config, ConfigFactory }
Expand Down

0 comments on commit b579ebb

Please sign in to comment.