Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "SystemMaterializer in AkkaSpecWithMaterializer (#3075)" #3145

Merged
merged 1 commit into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,41 @@

package akka.http.impl.engine.client

import akka.http.impl.util.{ AkkaSpecWithMaterializer, ExampleHttpContexts }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, headers }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }
import javax.net.ssl.SSLContext

import scala.concurrent.Await
import scala.concurrent.duration._

class ClientCancellationSpec extends AkkaSpecWithMaterializer {
"Http client connections" must {
"support cancellation in simple outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
})

"support cancellation in pooled outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get)
)
})

"support cancellation in simple outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
pending
testCase(
Http().outgoingConnectionHttps("akka.example.org", 443, settings = settingsWithProxyTransport, connectionContext = ExampleHttpContexts.exampleClientContext)
)
})

"support cancellation in pooled outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps("akka.example.org", 443,
settings = ConnectionPoolSettings(system).withConnectionSettings(settingsWithProxyTransport),
connectionContext = ExampleHttpContexts.exampleClientContext))
.map(_._1.get))
})
import akka.http.impl.util.AkkaSpecWithMaterializer
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.SystemMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }
import akka.http.scaladsl.model.headers

}
class ClientCancellationSpec extends AkkaSpecWithMaterializer {
// TODO document why this explicit materializer is needed here?
val noncheckedMaterializer = SystemMaterializer(system).materializer

class TestSetup {
lazy val binding = Await.result(
"Http client connections" must {
val address = Await.result(
Http().bindAndHandleSync(
{ _ => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0),
{ req => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0)(noncheckedMaterializer),
5.seconds
)
lazy val address = binding.localAddress
).localAddress

lazy val bindingTls = Await.result(
val addressTls = Await.result(
Http().bindAndHandleSync(
{ _ => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
{ req => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
"localhost",
0,
connectionContext = ExampleHttpContexts.exampleServerContext),
connectionContext = ConnectionContext.https(SSLContext.getDefault))(noncheckedMaterializer),
5.seconds
)
lazy val addressTls = bindingTls.localAddress
).localAddress

def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = {
def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped {
val requests = TestPublisher.probe[HttpRequest]()
val responses = TestSubscriber.probe[HttpResponse]()
Source.fromPublisher(requests).via(connection).runWith(Sink.fromSubscriber(responses))
Expand All @@ -77,15 +47,36 @@ class ClientCancellationSpec extends AkkaSpecWithMaterializer {
responses.expectNext().entity.dataBytes.runWith(Sink.cancelled)
responses.cancel()
requests.expectCancellation()
}

"support cancellation in simple outgoing connection" in {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
}

"support cancellation in pooled outgoing connection" in {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get))
}

binding.terminate(1.second)
bindingTls.terminate(1.second)
"support cancellation in simple outgoing connection with TLS" in {
pending
testCase(
Http().outgoingConnectionHttps(addressTls.getHostName, addressTls.getPort))
}

Http().shutdownAllConnectionPools()
"support cancellation in pooled outgoing connection with TLS" in {
pending
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps(addressTls.getHostName, addressTls.getPort))
.map(_._1.get))
}

def settingsWithProxyTransport: ClientConnectionSettings =
ClientConnectionSettings(system)
.withTransport(ExampleHttpContexts.proxyTransport(addressTls))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.event.LoggingAdapter
import akka.http.ParsingErrorHandler
import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.http.impl.util._
import akka.http.javadsl.model
import akka.http.scaladsl.Http.ServerLayer
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.http.impl.util

import akka.stream.{ ActorMaterializer, SystemMaterializer }
import akka.stream.ActorMaterializer
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter

Expand All @@ -16,15 +16,15 @@ abstract class AkkaSpecWithMaterializer(s: String)

def this() = this("")

implicit val materializer = SystemMaterializer(system).materializer
implicit val materializer = ActorMaterializer()

override protected def beforeTermination(): Unit =
// don't log anything during shutdown, especially not AbruptTerminationExceptions
EventFilter.custom { case x => true }.intercept {
// shutdown materializer first, otherwise it will only be shutdown during
// main system guardian being shutdown which will be after the logging has
// reverted to stdout logging that cannot be intercepted
materializer.asInstanceOf[ActorMaterializer].shutdown()
materializer.shutdown()
// materializer shutdown is async but cannot be watched
Thread.sleep(10)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@
package akka.http.impl.util

import java.io.InputStream
import java.net.InetSocketAddress
import java.security.{ KeyStore, SecureRandom }
import java.security.cert.{ Certificate, CertificateFactory }
import java.security.{ SecureRandom, KeyStore }
import java.security.cert.{ CertificateFactory, Certificate }
import javax.net.ssl.{ SSLParameters, SSLContext, TrustManagerFactory, KeyManagerFactory }

import akka.actor.ActorSystem
import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLParameters, TrustManagerFactory }
import akka.http.scaladsl.{ ClientTransport, Http, HttpsConnectionContext }
import akka.http.scaladsl.HttpsConnectionContext
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.scaladsl.Flow
import akka.util.ByteString

import scala.concurrent.Future

/**
* These are HTTPS example configurations that take key material from the resources/key folder.
Expand Down Expand Up @@ -69,14 +62,4 @@ object ExampleHttpContexts {

def loadX509Certificate(resourceName: String): Certificate =
CertificateFactory.getInstance("X.509").generateCertificate(resourceStream(resourceName))

/**
* A client transport that will rewrite the target address to a fixed address. This can be used
* to pretend to connect to akka.example.org which is required to connect to the example server certificate.
*/
def proxyTransport(realAddress: InetSocketAddress): ClientTransport =
new ClientTransport {
override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] =
ClientTransport.TCP.connectTo(realAddress.getHostString, realAddress.getPort, settings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.http.scaladsl.settings.RoutingSettings
import akka.http.scaladsl.settings.ServerSettings
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.util.FastFuture._
import akka.stream.SystemMaterializer
import akka.stream.{ Materializer, SystemMaterializer }
import akka.testkit.TestKit
import akka.util.ConstantFun
import com.typesafe.config.{ Config, ConfigFactory }
Expand Down