Skip to content

Commit

Permalink
add java api for websocket testkit #21184
Browse files Browse the repository at this point in the history
And additionally adds unit test for WebSocketDirectives #20466
  • Loading branch information
Hawstein authored and johanandren committed Sep 2, 2016
1 parent 7b030bf commit 75dc20b
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.http.javadsl.server.directives;

import akka.NotUsed;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.Uri;
import akka.http.javadsl.model.headers.SecWebSocketProtocol;
import akka.http.javadsl.model.ws.BinaryMessage;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.javadsl.testkit.WSProbe;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class WebSocketDirectivesExamplesTest extends JUnitRouteTest {

@Test
public void testHandleWebSocketMessages() {
//#handleWebSocketMessages
final Flow<Message, Message, NotUsed> greeter = Flow.of(Message.class).mapConcat(msg -> {
if (msg instanceof TextMessage) {
final TextMessage tm = (TextMessage) msg;
final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!")));
return Collections.singletonList(ret);
} else if (msg instanceof BinaryMessage) {
final BinaryMessage bm = (BinaryMessage) msg;
bm.getStreamedData().runWith(Sink.ignore(), materializer());
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Unsupported message type!");
}
});

final Route websocketRoute = path("greeter", () ->
handleWebSocketMessages(greeter)
);

// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe.create(system(), materializer());

// WS creates a WebSocket request for testing
testRoute(websocketRoute).run(WS(Uri.create("/greeter"), wsClient.flow(), materializer()))
.assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS);

// manually run a WS conversation
wsClient.sendMessage("Peter");
wsClient.expectMessage("Hello Peter!");

wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));

wsClient.sendMessage("John");
wsClient.expectMessage("Hello John!");

wsClient.sendCompletion();
wsClient.expectCompletion();
//#handleWebSocketMessages
}

@Test
public void testHandleWebSocketMessagesForProtocol() {
//#handleWebSocketMessagesForProtocol
final Flow<Message, Message, NotUsed> greeterService = Flow.of(Message.class).mapConcat(msg -> {
if (msg instanceof TextMessage) {
final TextMessage tm = (TextMessage) msg;
final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!")));
return Collections.singletonList(ret);
} else if (msg instanceof BinaryMessage) {
final BinaryMessage bm = (BinaryMessage) msg;
bm.getStreamedData().runWith(Sink.ignore(), materializer());
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Unsupported message type!");
}
});

final Flow<Message, Message, NotUsed> echoService = Flow.of(Message.class).buffer(1, OverflowStrategy.backpressure());

final Route websocketMultipleProtocolRoute = path("services", () ->
route(
handleWebSocketMessagesForProtocol(greeterService, "greeter"),
handleWebSocketMessagesForProtocol(echoService, "echo")
)
);

// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe.create(system(), materializer());

// WS creates a WebSocket request for testing
testRoute(websocketMultipleProtocolRoute)
.run(WS(Uri.create("/services"), wsClient.flow(), materializer(), Arrays.asList("other", "echo")))
.assertHeaderExists(SecWebSocketProtocol.create("echo"));

wsClient.sendMessage("Peter");
wsClient.expectMessage("Peter");

wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
wsClient.expectMessage(ByteString.fromString("abcdef"));

wsClient.sendMessage("John");
wsClient.expectMessage("John");

wsClient.sendCompletion();
wsClient.expectCompletion();
//#handleWebSocketMessagesForProtocol
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke

Example
-------
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.

.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessages
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke

Example
-------
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.

.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessagesForProtocol
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.http.javadsl.model.headers;

import akka.http.impl.util.Util;

/**
* Model for the `Sec-WebSocket-Protocol` header.
*/
public abstract class SecWebSocketProtocol extends akka.http.scaladsl.model.HttpHeader {
public abstract Iterable<String> getProtocols();

public static SecWebSocketProtocol create(String... protocols) {
return new akka.http.scaladsl.model.headers.Sec$minusWebSocket$minusProtocol(Util.convertArray(protocols));
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.util.ByteString
/**
* Represents a WebSocket message. A message can either be a binary message or a text message.
*/
sealed abstract class Message {
abstract class Message {
/**
* Is this message a text message? If true, [[asTextMessage]] will return this
* text message, if false, [[asBinaryMessage]] will return this binary message.
Expand Down Expand Up @@ -150,4 +150,4 @@ object BinaryMessage {
case sm.ws.BinaryMessage.Strict(data) create(data)
case bm: sm.ws.BinaryMessage create(bm.dataStream.asJava)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,14 @@ private[http] object `Sec-WebSocket-Protocol` extends ModeledCompanion[`Sec-WebS
* INTERNAL API
*/
private[http] final case class `Sec-WebSocket-Protocol`(protocols: immutable.Seq[String])
extends RequestResponseHeader {
extends jm.headers.SecWebSocketProtocol with RequestResponseHeader {
require(protocols.nonEmpty, "Sec-WebSocket-Protocol.protocols must not be empty")
import `Sec-WebSocket-Protocol`.protocolsRenderer
protected[http] def renderValue[R <: Rendering](r: R): r.type = r ~~ protocols
protected def companion = `Sec-WebSocket-Protocol`

/** Java API */
override def getProtocols: Iterable[String] = protocols.asJava
}

// http://tools.ietf.org/html/rfc6455#section-4.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,30 @@

package akka.http.scaladsl.model.ws

import akka.stream.javadsl
import akka.stream.scaladsl.Source
import akka.util.ByteString

//#message-model
/**
* The ADT for WebSocket messages. A message can either be a binary or a text message.
*/
sealed trait Message // FIXME: Why don't we extend akka.http.javadsl.model.ws.Message here?
sealed trait Message extends akka.http.javadsl.model.ws.Message

/**
* Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case
* the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream`
* will return a Source streaming the data as it comes in.
*/
sealed trait TextMessage extends Message {
sealed trait TextMessage extends akka.http.javadsl.model.ws.TextMessage with Message {
/**
* The contents of this message as a stream.
*/
def textStream: Source[String, _]

/** Java API */
override def getStreamedText: javadsl.Source[String, _] = textStream.asJava
override def asScala: TextMessage = this
}
//#message-model
object TextMessage {
Expand All @@ -36,9 +41,18 @@ object TextMessage {
final case class Strict(text: String) extends TextMessage {
def textStream: Source[String, _] = Source.single(text)
override def toString: String = s"TextMessage.Strict($text)"

/** Java API */
override def getStrictText: String = text
override def isStrict: Boolean = true
}

final case class Streamed(textStream: Source[String, _]) extends TextMessage {
override def toString: String = s"TextMessage.Streamed($textStream)"

/** Java API */
override def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.")
override def isStrict: Boolean = false
}
}

Expand All @@ -48,11 +62,15 @@ object TextMessage {
* will return a Source streaming the data as it comes in.
*/
//#message-model
sealed trait BinaryMessage extends Message {
sealed trait BinaryMessage extends akka.http.javadsl.model.ws.BinaryMessage with Message {
/**
* The contents of this message as a stream.
*/
def dataStream: Source[ByteString, _]

/** Java API */
override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava
override def asScala: BinaryMessage = this
}
//#message-model
object BinaryMessage {
Expand All @@ -66,8 +84,16 @@ object BinaryMessage {
final case class Strict(data: ByteString) extends BinaryMessage {
def dataStream: Source[ByteString, _] = Source.single(data)
override def toString: String = s"BinaryMessage.Strict($data)"

/** Java API */
override def getStrictData: ByteString = data
override def isStrict: Boolean = true
}
final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage {
override def toString: String = s"BinaryMessage.Streamed($dataStream)"

/** Java API */
override def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.")
override def isStrict: Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import akka.stream.Materializer
*
* See `JUnitRouteTest` for an example of a concrete implementation.
*/
abstract class RouteTest extends AllDirectives {
abstract class RouteTest extends AllDirectives with WSTestRequestBuilding {
implicit def system: ActorSystem
implicit def materializer: Materializer
implicit def executionContext: ExecutionContextExecutor = system.dispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.http.javadsl.testkit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.javadsl.model.ws.Message
import akka.stream.Materializer
import akka.stream.javadsl.Flow
import akka.stream.scaladsl
import akka.util.ByteString

import akka.http.scaladsl.{ testkit st }

import akka.http.impl.util.JavaMapping.Implicits._

import scala.concurrent.duration._

/**
* A WSProbe is a probe that implements a `Flow[Message, Message, Unit]` for testing
* websocket code.
*
* Requesting elements is handled automatically.
*/
class WSProbe(delegate: st.WSProbe) {

def flow: Flow[Message, Message, Any] = {
val underlying = scaladsl.Flow[Message].map(_.asScala).via(delegate.flow).map(_.asJava)
new Flow[Message, Message, NotUsed](underlying)
}

/**
* Send the given messages out of the flow.
*/
def sendMessage(message: Message): Unit = delegate.sendMessage(message.asScala)

/**
* Send a text message containing the given string out of the flow.
*/
def sendMessage(text: String): Unit = delegate.sendMessage(text)

/**
* Send a binary message containing the given bytes out of the flow.
*/
def sendMessage(bytes: ByteString): Unit = delegate.sendMessage(bytes)

/**
* Complete the output side of the flow.
*/
def sendCompletion(): Unit = delegate.sendCompletion()

/**
* Expect a message on the input side of the flow.
*/
def expectMessage(): Message = delegate.expectMessage()

/**
* Expect a text message on the input side of the flow and compares its payload with the given one.
* If the received message is streamed its contents are collected and then asserted against the given
* String.
*/
def expectMessage(text: String): Unit = delegate.expectMessage(text)

/**
* Expect a binary message on the input side of the flow and compares its payload with the given one.
* If the received message is streamed its contents are collected and then asserted against the given
* ByteString.
*/
def expectMessage(bytes: ByteString): Unit = delegate.expectMessage(bytes)

/**
* Expect no message on the input side of the flow.
*/
def expectNoMessage(): Unit = delegate.expectNoMessage()

/**
* Expect no message on the input side of the flow for the given maximum duration.
*/
def expectNoMessage(max: FiniteDuration): Unit = delegate.expectNoMessage(max)

/**
* Expect completion on the input side of the flow.
*/
def expectCompletion(): Unit = delegate.expectCompletion()

}

object WSProbe {

// A convenient method to create WSProbe with default maxChunks and maxChunkCollectionMills
def create(system: ActorSystem, materializer: Materializer): WSProbe = {
create(system, materializer, 1000, 5000)
}

/**
* Creates a WSProbe to use in tests against websocket handlers.
*
* @param maxChunks The maximum number of chunks to collect for streamed messages.
* @param maxChunkCollectionMills The maximum time in milliseconds to collect chunks for streamed messages.
*/
def create(system: ActorSystem, materializer: Materializer, maxChunks: Int, maxChunkCollectionMills: Long): WSProbe = {
val delegate = st.WSProbe(maxChunks, maxChunkCollectionMills)(system, materializer)
new WSProbe(delegate)
}

}

0 comments on commit 75dc20b

Please sign in to comment.