Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Commit

Permalink
! io: major refactoring of SslTlsSupport, fixes #544
Browse files Browse the repository at this point in the history
Breaking API change because of the added `maxEncryptionChunkSize: Int` parameter on `SslTlsSupport.apply`
  • Loading branch information
sirthias committed Oct 18, 2013
1 parent 0b5ef36 commit 01c4aa9
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 204 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Expand Up @@ -94,7 +94,7 @@ object Build extends Build with DocSupport {


lazy val sprayIO = Project("spray-io", file("spray-io"))
.dependsOn(sprayUtil)
.dependsOn(sprayUtil, sprayHttp)
.settings(sprayModuleSettings: _*)
.settings(osgiSettings(exports = Seq("spray.io", "akka.io")): _*)
.settings(libraryDependencies ++= provided(akkaActor, scalaReflect))
Expand Down
Expand Up @@ -86,7 +86,7 @@ private object HttpClientConnection {
ResponseParsing(parserSettings) >>
RequestRendering(settings) >>
ConnectionTimeouts(idleTimeout) ? (reapingCycle.isFinite && idleTimeout.isFinite) >>
SslTlsSupport(parserSettings.sslSessionInfoHeader) >>
SslTlsSupport(maxEncryptionChunkSize, parserSettings.sslSessionInfoHeader) >>
TickGenerator(reapingCycle) ? (idleTimeout.isFinite || requestTimeout.isFinite)
}

Expand Down
Expand Up @@ -214,7 +214,7 @@ private object HttpServerConnection {
RequestParsing(settings) >>
ResponseRendering(settings) >>
ConnectionTimeouts(idleTimeout) ? (reapingCycle.isFinite && idleTimeout.isFinite) >>
SslTlsSupport(parserSettings.sslSessionInfoHeader) ? sslEncryption >>
SslTlsSupport(maxEncryptionChunkSize, parserSettings.sslSessionInfoHeader) ? sslEncryption >>
TickGenerator(reapingCycle) ? (reapingCycle.isFinite && (idleTimeout.isFinite || requestTimeout.isFinite)) >>
BackPressureHandling(backpressureSettings.get.noAckRate, backpressureSettings.get.readingLowWatermark) ? backpressureSettings.isDefined
}
Expand Down
55 changes: 29 additions & 26 deletions spray-io-tests/src/test/scala/spray/io/SslTlsSupportSpec.scala
Expand Up @@ -21,6 +21,7 @@ import java.net.InetSocketAddress
import java.security.{ KeyStore, SecureRandom }
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.{ ConfigFactory, Config }
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.Future
import org.specs2.mutable.Specification
Expand All @@ -41,6 +42,7 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
loglevel = WARNING
io.tcp.trace-logging = off
}""")
val sslTraceLogging = true
implicit val system = ActorSystem(Utils.actorSystemNameFrom(getClass), testConf)
import system.dispatcher

Expand Down Expand Up @@ -74,9 +76,9 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
clientConn.writeLn("Foo")
serverConn.readLn() === "Foo"
serverConn.writeLn("bar")
clientConn.events.expectMsg(Tcp.Received(ByteString("bar\n")))
clientConn.expectReceivedString("bar\n")
serverConn.writeLn("baz")
clientConn.events.expectMsg(Tcp.Received(ByteString("baz\n")))
clientConn.expectReceivedString("baz\n")
clientConn.writeLn("yeah")
serverConn.readLn() === "yeah"

Expand All @@ -95,13 +97,13 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
val serverConn = server.acceptOne()

clientConn.writeLn("Foo")
serverConn.events.expectMsg(Tcp.Received(ByteString("Foo\n")))
serverConn.expectReceivedString("Foo\n")
serverConn.writeLn("bar")
clientConn.readLn() === "bar"
serverConn.writeLn("baz")
clientConn.readLn() === "baz"
clientConn.writeLn("yeah")
serverConn.events.expectMsg(Tcp.Received(ByteString("yeah\n")))
serverConn.expectReceivedString("yeah\n")

val baselineSessionCounts = sessionCounts()
clientConn.close()
Expand All @@ -119,13 +121,13 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
val clientConn = connAttempt.finishConnect()

clientConn.writeLn("Foo")
serverConn.events.expectMsg(Tcp.Received(ByteString("Foo\n")))
serverConn.expectReceivedString("Foo\n")
serverConn.writeLn("bar")
clientConn.events.expectMsg(Tcp.Received(ByteString("bar\n")))
clientConn.expectReceivedString("bar\n")
serverConn.writeLn("baz")
clientConn.events.expectMsg(Tcp.Received(ByteString("baz\n")))
clientConn.expectReceivedString("baz\n")
clientConn.writeLn("yeah")
serverConn.events.expectMsg(Tcp.Received(ByteString("yeah\n")))
serverConn.expectReceivedString("yeah\n")

val baselineSessionCounts = sessionCounts()
clientConn.command(Tcp.Close)
Expand All @@ -147,16 +149,17 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
clientConn.writeLn("Foo")
serverConn.readLn() === "Foo"
serverConn.writeLn("bar")
clientConn.events.expectMsg(Tcp.Received(ByteString("bar\n")))
clientConn.expectReceivedString("bar\n")

clientConn.command(Tcp.ConfirmedClosed)
clientConn.command(Tcp.ConfirmedClose)
serverConn.readLn()
clientConn.events.expectMsg(Tcp.ConfirmedClosed)
TestUtils.verifyActorTermination(clientConn.handler)
serverConn.close()
server.close()
}

"produce a PeerClosed event upon receiving an SSL-level termination sequence" in new TestSetup(blockClosedEvent = true) {
"produce a PeerClosed event upon receiving an SSL-level termination sequence" in new TestSetup {
val server = new JavaSslServer
val connAttempt = attemptSpraySslClientConnection(server.address)
val serverConn = server.acceptOne()
Expand All @@ -165,7 +168,7 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
clientConn.writeLn("Foo")
serverConn.readLn() === "Foo"
serverConn.writeLn("bar")
clientConn.events.expectMsg(Tcp.Received(ByteString("bar\n")))
clientConn.expectReceivedString("bar\n")

serverConn.close()
clientConn.events.expectMsg(Tcp.PeerClosed)
Expand All @@ -178,7 +181,7 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {

val counter = new AtomicInteger

class TestSetup(publishSslSessionInfo: Boolean = false, blockClosedEvent: Boolean = false) extends org.specs2.specification.Scope {
class TestSetup(publishSslSessionInfo: Boolean = false) extends org.specs2.specification.Scope {
implicit val sslContext = createSSLContext("/ssl-test-keystore.jks", "")

def sessionCounts() = (clientSessions().length, serverSessions().length)
Expand Down Expand Up @@ -223,8 +226,18 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
def finishConnect() = new SpraySslClientConnection(connectedProbe.expectMsgType[Tcp.Connected], connectedProbe.sender)
}

class SpraySslClientConnection(connected: Tcp.Connected, connection: ActorRef) {
sealed abstract class SslConnection {
val events = TestProbe()
@tailrec final def expectReceivedString(data: String): Unit = {
data.isEmpty must beFalse
val got = events.expectMsgType[Tcp.Received].data.utf8String
data.startsWith(got) must beTrue
if (got.length < data.length)
expectReceivedString(data drop got.length)
}
}

class SpraySslClientConnection(connected: Tcp.Connected, connection: ActorRef) extends SslConnection {
val handler = system.actorOf(Props(new ConnectionActor[ClientSSLEngineProvider](events.ref, connection, connected)),
"client" + counter.incrementAndGet())
connection ! Tcp.Register(handler, keepOpenOnPeerClosed = true)
Expand All @@ -245,8 +258,7 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
}
}

class SpraySslServerConnection(connected: Tcp.Connected, connection: ActorRef) {
val events = TestProbe()
class SpraySslServerConnection(connected: Tcp.Connected, connection: ActorRef) extends SslConnection {
val handler = system.actorOf(Props(new ConnectionActor[ServerSSLEngineProvider](events.ref, connection, connected)),
"server" + counter.incrementAndGet())
connection ! Tcp.Register(handler, keepOpenOnPeerClosed = true)
Expand All @@ -256,7 +268,7 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {

class ConnectionActor[T <: (PipelineContext Option[SSLEngine])](events: ActorRef, connection: ActorRef,
connected: Tcp.Connected)(implicit engineProvider: T) extends ConnectionHandler {
val pipeline = frontend >> SslTlsSupport(publishSslSessionInfo) >> (filterPeerClosed ? blockClosedEvent)
val pipeline = frontend >> SslTlsSupport(128, publishSslSessionInfo, sslTraceLogging)
def receive = running(connection, pipeline, createSslTlsContext[T](connected))
def frontend: PipelineStage = new PipelineStage {
def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines =
Expand All @@ -269,15 +281,6 @@ class SslTlsSupportSpec extends Specification with NoTimeConversions {
}
}
}
def filterPeerClosed: PipelineStage = new PipelineStage {
def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines = new Pipelines {
val commandPipeline = commandPL
val eventPipeline: EPL = {
case x: Tcp.ConnectionClosed log.debug("Blocking ConnectionClosed event: " + x)
case ev eventPL(ev)
}
}
}
}

private def sessions(f: SSLContext SSLSessionContext): Seq[SSLSession] = {
Expand Down

0 comments on commit 01c4aa9

Please sign in to comment.