From ebbee8304e0861ea8d1121ec13cb0d2b2438eed1 Mon Sep 17 00:00:00 2001 From: vuong-idnow Date: Thu, 6 Dec 2018 15:39:56 +0700 Subject: [PATCH] Upgrade play 2.6 --- README.md | 10 ++ module/pom.xml | 6 +- .../atmosphere/play/AtmosphereController.java | 45 ++++++--- .../play/AtmosphereWebSocketActor.java | 71 ++++++++++++++ .../java/org/atmosphere/play/OutStream.java | 7 ++ .../atmosphere/play/PlayAsyncIOWriter.java | 98 +++++++++++-------- .../org/atmosphere/play/PlayWebSocket.java | 63 +++--------- .../play/AtmosphereHttpRequestHandler.scala | 49 +++++----- pom.xml | 6 +- 9 files changed, 226 insertions(+), 129 deletions(-) create mode 100644 module/src/main/java/org/atmosphere/play/AtmosphereWebSocketActor.java create mode 100644 module/src/main/java/org/atmosphere/play/OutStream.java diff --git a/README.md b/README.md index d96969f..a3a6b32 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,16 @@ Fork the [samples workspace](https://github.com/Atmosphere/atmosphere-samples/tr Download [Atmosphere Play!](http://search.maven.org/#search%7Cga%7C1%7Catmosphere-play), use Maven or [sbt](http://ntcoding.blogspot.ca/2013/09/atmosphere-scala-sbt-akka-step-by-step.html) +For Play 2.6.x+: + +```xml + + org.atmosphere + atmosphere-play + 2.4.0 + +``` + For Play 2.5.x+: ```xml diff --git a/module/pom.xml b/module/pom.xml index 1dc5826..e4aecc7 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -3,14 +3,14 @@ org.atmosphere play-project - 2.3.1-SNAPSHOT + 2.4.0 ../ 4.0.0 org.atmosphere atmosphere-play Atmosphere Play Module - 2.3.1-SNAPSHOT + 2.4.0 jar @@ -19,7 +19,7 @@ com.typesafe.play - play_2.11 + play_2.12 org.slf4j diff --git a/module/src/main/java/org/atmosphere/play/AtmosphereController.java b/module/src/main/java/org/atmosphere/play/AtmosphereController.java index 8f91423..6da7e5c 100644 --- a/module/src/main/java/org/atmosphere/play/AtmosphereController.java +++ b/module/src/main/java/org/atmosphere/play/AtmosphereController.java @@ -15,25 +15,46 @@ */ package org.atmosphere.play; +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import akka.stream.OverflowStrategy; import org.apache.commons.lang3.StringUtils; +import org.atmosphere.config.service.Singleton; import org.atmosphere.cpr.AtmosphereConfig; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.util.IOUtils; +import play.api.libs.streams.ActorFlow; +import play.api.mvc.WebSocket; +import play.api.mvc.WebSocket$; +import play.libs.concurrent.HttpExecutionContext; import play.mvc.Controller; -import play.mvc.LegacyWebSocket; import play.mvc.Result; +import play.mvc.Results; +import javax.inject.Inject; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +@Singleton public class AtmosphereController extends Controller { - private final AtmosphereFramework framework; + public static final int BUFFER_SIZE = 10; + private final AtmosphereFramework framework; private final AtmosphereConfig config; private final AtmospherePlaySessionConverter converter; + private HttpExecutionContext httpExecutionContext; - @SuppressWarnings("unchecked") - public AtmosphereController() throws InstantiationException, IllegalAccessException, Exception { - framework = AtmosphereCoordinator.instance().framework(); + @Inject + private ActorSystem actorSystem; + + @Inject + private Materializer materializer; + + @Inject + public AtmosphereController(HttpExecutionContext ec) throws Exception { + this.httpExecutionContext = ec; + framework = AtmosphereCoordinator.instance().framework(); config = framework.getAtmosphereConfig(); final String playSessionConverter = config.getInitParameter(AtmosphereCoordinator.PLAY_SESSION_CONVERTER); @@ -45,13 +66,16 @@ public AtmosphereController() throws InstantiationException, IllegalAccessExcept } } - public LegacyWebSocket webSocket() throws Throwable { - return new PlayWebSocket(config, request(), convertedSession()).internal(); + public WebSocket webSocket() { + return WebSocket$.MODULE$.accept( + request -> ActorFlow.actorRef( + req -> AtmosphereWebSocketActor.props(req, request, convertedSession(), config), BUFFER_SIZE, OverflowStrategy.dropNew(), actorSystem , materializer), + WebSocket.MessageFlowTransformer$.MODULE$.stringMessageFlowTransformer()); } - public Result http() throws Throwable { - // TODO: Wrong status code on error! - return ok(new PlayAsyncIOWriter(request(), convertedSession(), response()).internal()); + public CompletionStage http() { + return CompletableFuture.supplyAsync(() -> Results.ok().chunked(new PlayAsyncIOWriter(request(), convertedSession(), response()).internal()).as("text/event-stream"), httpExecutionContext.current()); + } protected Map convertedSession() { @@ -64,5 +88,4 @@ protected Map convertedSession() { return result; } - } diff --git a/module/src/main/java/org/atmosphere/play/AtmosphereWebSocketActor.java b/module/src/main/java/org/atmosphere/play/AtmosphereWebSocketActor.java new file mode 100644 index 0000000..fdfdcfd --- /dev/null +++ b/module/src/main/java/org/atmosphere/play/AtmosphereWebSocketActor.java @@ -0,0 +1,71 @@ +package org.atmosphere.play; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.util.ByteString; +import org.atmosphere.cpr.AtmosphereConfig; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResponseImpl; +import org.atmosphere.cpr.WebSocketProcessorFactory; +import org.atmosphere.websocket.WebSocketProcessor; +import play.Logger; +import play.api.mvc.RequestHeader; +import play.mvc.Http; + +import java.util.Map; + +public class AtmosphereWebSocketActor extends AbstractActor { + private static final Logger.ALogger LOG = Logger.of(AtmosphereWebSocketActor.class); + private PlayWebSocket playWebSocket = null; + private WebSocketProcessor webSocketProcessor = null; + private ActorRef actorRef; + private RequestHeader requestHeader; + private Map additionalAttributes; + private AtmosphereConfig atmosphereConfig; + + public static Props props(ActorRef actorRef, RequestHeader requestHeader, Map additionalAtts, AtmosphereConfig config) { + return Props.create(AtmosphereWebSocketActor.class, actorRef, requestHeader, additionalAtts, config); + } + + public AtmosphereWebSocketActor(ActorRef actorRef, RequestHeader requestHeader, Map additionalAttributes, AtmosphereConfig atmosphereConfig) { + this.actorRef = actorRef; + this.requestHeader = requestHeader; + this.additionalAttributes = additionalAttributes; + this.atmosphereConfig = atmosphereConfig; + } + + @Override + public void preStart() { + try { + this.playWebSocket = new PlayWebSocket(actorRef, atmosphereConfig); + this.webSocketProcessor = WebSocketProcessorFactory.getDefault().getWebSocketProcessor(atmosphereConfig.framework()); + AtmosphereRequest atmosphereRequest = AtmosphereUtils.request(new Http.RequestImpl(requestHeader), additionalAttributes); + this.webSocketProcessor.open(playWebSocket, atmosphereRequest, AtmosphereResponseImpl.newInstance(atmosphereConfig, atmosphereRequest, playWebSocket)); + } catch (Throwable throwable) { + LOG.error("Failed to start the actor ", throwable); + } + } + + @Override + public void postStop() { + this.webSocketProcessor.close(playWebSocket, 1002); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(String.class, this::handleString) + .match(ByteString.class, this::handleByteString) + .build(); + } + + private void handleString(String string) { + this.webSocketProcessor.invokeWebSocketProtocol(this.playWebSocket, string); + } + + private void handleByteString(ByteString byteString) { + this.webSocketProcessor.invokeWebSocketProtocol(this.playWebSocket, byteString.toString()); + } + +} diff --git a/module/src/main/java/org/atmosphere/play/OutStream.java b/module/src/main/java/org/atmosphere/play/OutStream.java new file mode 100644 index 0000000..1b33d90 --- /dev/null +++ b/module/src/main/java/org/atmosphere/play/OutStream.java @@ -0,0 +1,7 @@ +package org.atmosphere.play; + +public interface OutStream { + void write(String message); + void close(); + +} diff --git a/module/src/main/java/org/atmosphere/play/PlayAsyncIOWriter.java b/module/src/main/java/org/atmosphere/play/PlayAsyncIOWriter.java index d346b3a..5ef36e9 100644 --- a/module/src/main/java/org/atmosphere/play/PlayAsyncIOWriter.java +++ b/module/src/main/java/org/atmosphere/play/PlayAsyncIOWriter.java @@ -15,6 +15,12 @@ */ package org.atmosphere.play; +import akka.NotUsed; +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Source; +import akka.util.ByteString; import org.atmosphere.cpr.AsyncIOWriter; import org.atmosphere.cpr.AsynchronousProcessor; import org.atmosphere.cpr.AtmosphereInterceptorWriter; @@ -26,60 +32,72 @@ import org.atmosphere.util.ByteArrayAsyncWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import play.api.mvc.Codec; -import play.core.j.JavaResults; -import play.libs.F; import play.mvc.Http; -import play.mvc.Results; import java.io.IOException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -public class PlayAsyncIOWriter extends AtmosphereInterceptorWriter implements PlayInternal> { +public class PlayAsyncIOWriter extends AtmosphereInterceptorWriter implements PlayInternal> { + public static final int BUFFER_SIZE = 10; private static final Logger logger = LoggerFactory.getLogger(PlayAsyncIOWriter.class); private final AtomicInteger pendingWrite = new AtomicInteger(); private final AtomicBoolean asyncClose = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final ByteArrayAsyncWriter buffer = new ByteArrayAsyncWriter(); - protected Results.Chunks chunks; - protected Results.Chunks.Out out; + private Source source; + private OutStream out; private boolean byteWritten = false; private long lastWrite = 0; private boolean resumeOnBroadcast; public PlayAsyncIOWriter(final Http.Request request, final Map additionalAttributes, final Http.Response response) { final String[] transport = request.queryString() != null ? request.queryString().get(HeaderConfig.X_ATMOSPHERE_TRANSPORT) : null; - - chunks = new Results.Chunks(JavaResults.writeString(Codec.utf_8())) { - @Override - public void onReady(Results.Chunks.Out oout) { - out = oout; - boolean keepAlive = false; - - try { - final AtmosphereRequest r = AtmosphereUtils.request(request, additionalAttributes); - if (transport != null && transport.length > 0 && !transport[0].equalsIgnoreCase(HeaderConfig.POLLING_TRANSPORT)) { - out.onDisconnected(() -> _close(r)); - } - - AtmosphereResponse res = new AtmosphereResponseImpl.Builder() - .asyncIOWriter(PlayAsyncIOWriter.this) - .writeHeader(false) - .request(r).build(); - keepAlive = AtmosphereCoordinator.instance().route(r, res); - } catch (Throwable e) { - logger.error("", e); - keepAlive = true; - } finally { - if (!keepAlive) { - out.close(); - } - } - } - }; + this.source = Source.actorRef(BUFFER_SIZE, OverflowStrategy.dropNew()).mapMaterializedValue(actorRef -> { + out = new OutStream() { + @Override + public void write(String message) { + actorRef.tell(message, ActorRef.noSender()); + byteWritten = true; + } + + @Override + public void close() { + actorRef.tell(new Status.Success(200), ActorRef.noSender()); + byteWritten = false; + } + }; + boolean keepAlive = false; + + try { + final AtmosphereRequest r = AtmosphereUtils.request(request, additionalAttributes); + + AtmosphereResponse res = new AtmosphereResponseImpl.Builder() + .asyncIOWriter(PlayAsyncIOWriter.this) + .writeHeader(false) + .request(r).build(); + keepAlive = AtmosphereCoordinator.instance().route(r, res); + } catch (Throwable e) { + logger.error("", e); + keepAlive = true; + } finally { + if (!keepAlive) { + out.close(); + } + } + + return NotUsed.getInstance(); + }) + .watchTermination((arg1, arg2) -> { + try { + if (transport != null && transport.length > 0 && !transport[0].equalsIgnoreCase(HeaderConfig.POLLING_TRANSPORT)) { + _close(AtmosphereUtils.request(request, additionalAttributes)); + } + } catch (Throwable ignored) { + } + return NotUsed.getInstance(); + }); // TODO: Configuring headers in Atmosphere won't work as the onReady is asynchronously called. // TODO: Some Broadcaster's Cache won't work as well. @@ -88,8 +106,8 @@ public void onReady(Results.Chunks.Out oout) { } } - public Results.Chunks internal() { - return chunks; + public Source internal() { + return source; } public boolean isClosed() { @@ -182,6 +200,8 @@ public void close(AtmosphereResponse r) throws IOException { } asyncClose.set(true); - out.close(); + if (byteWritten) { + out.close(); + } } } diff --git a/module/src/main/java/org/atmosphere/play/PlayWebSocket.java b/module/src/main/java/org/atmosphere/play/PlayWebSocket.java index 22da1ee..b945db8 100644 --- a/module/src/main/java/org/atmosphere/play/PlayWebSocket.java +++ b/module/src/main/java/org/atmosphere/play/PlayWebSocket.java @@ -15,63 +15,40 @@ */ package org.atmosphere.play; +import akka.actor.ActorRef; +import akka.actor.Status; import org.atmosphere.cpr.AtmosphereConfig; -import org.atmosphere.cpr.AtmosphereRequest; -import org.atmosphere.cpr.AtmosphereResponseImpl; -import org.atmosphere.cpr.WebSocketProcessorFactory; -import org.atmosphere.websocket.WebSocketProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import play.libs.F; -import play.mvc.Http; -import play.mvc.LegacyWebSocket; -import play.mvc.WebSocket; - import java.io.IOException; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -public class PlayWebSocket extends org.atmosphere.websocket.WebSocket implements PlayInternal> { +public class PlayWebSocket extends org.atmosphere.websocket.WebSocket { private static final Logger logger = LoggerFactory.getLogger(PlayWebSocket.class); - private WebSocket.Out out; - private WebSocket.In in; - private final LegacyWebSocket w; + private OutStream out; + private final AtomicBoolean firstWrite = new AtomicBoolean(false); - private final WebSocketProcessor webSocketProcessor; - public PlayWebSocket(final AtmosphereConfig config, final Http.Request request, final Map additionalAttributes) { + public PlayWebSocket(ActorRef actorRef, final AtmosphereConfig config) { super(config); - - webSocketProcessor = WebSocketProcessorFactory.getDefault().getWebSocketProcessor(config.framework()); - w = new LegacyWebSocket() { + out = new OutStream() { @Override - public void onReady(WebSocket.In iin, WebSocket.Out oout) { - out = oout; - in = iin; - in.onClose(() -> webSocketProcessor.close(PlayWebSocket.this, 1002)); - in.onMessage(message -> webSocketProcessor.invokeWebSocketProtocol(PlayWebSocket.this, message)); - AtmosphereRequest r = null; - try { - r = AtmosphereUtils.request(request, additionalAttributes); - } catch (Throwable t) { - logger.error("", t); - } + public void write(String message) { + actorRef.tell(message, ActorRef.noSender()); + } - try { - webSocketProcessor.open(PlayWebSocket.this, r, AtmosphereResponseImpl.newInstance(config, r, PlayWebSocket.this)); - } catch (IOException e) { - logger.error("", e); - out.close(); - } + @Override + public void close() { + actorRef.tell(new Status.Success(200), ActorRef.noSender()); } }; } - public LegacyWebSocket internal() { - return w; + @Override + public boolean isOpen() { + return true; } /** @@ -100,12 +77,6 @@ public org.atmosphere.websocket.WebSocket write(byte[] data, int offset, int len return this; } - @Override - public boolean isOpen() { - //return in.isOpen(); - return true; - } - /** * {@inheritDoc} */ @@ -114,5 +85,3 @@ public void close() { out.close(); } } - - diff --git a/module/src/main/scala/org/atmosphere/play/AtmosphereHttpRequestHandler.scala b/module/src/main/scala/org/atmosphere/play/AtmosphereHttpRequestHandler.scala index 7f2ddcd..4019509 100644 --- a/module/src/main/scala/org/atmosphere/play/AtmosphereHttpRequestHandler.scala +++ b/module/src/main/scala/org/atmosphere/play/AtmosphereHttpRequestHandler.scala @@ -1,16 +1,16 @@ package org.atmosphere.play -import java.util.concurrent.CompletableFuture - -import com.google.inject.Inject +import javax.inject.Inject import play.api.http._ import play.api.inject.Injector -import play.api.mvc.{Handler, RequestHeader} +import play.api.mvc.{BodyParser, Handler, RequestHeader} import play.api.routing.Router import play.core.j._ -import play.api.mvc.BodyParser import play.mvc.Http.RequestBody +import scala.concurrent.ExecutionContext.Implicits.global + + class AtmosphereHttpRequestHandler @Inject()(components: JavaHandlerComponents, router: Router, @@ -18,6 +18,7 @@ class AtmosphereHttpRequestHandler @Inject()(components: JavaHandlerComponents, configuration: HttpConfiguration, filters: HttpFilters, injector: Injector) + extends JavaCompatibleHttpRequestHandler(router, errorHandler, configuration, filters, components) { override def routeRequest(request: RequestHeader) = { @@ -33,36 +34,41 @@ class AtmosphereHttpRequestHandler @Inject()(components: JavaHandlerComponents, upgradeHeader.map("websocket".equalsIgnoreCase) .getOrElse(connectionHeaders.contains("upgrade".equalsIgnoreCase _)) + def dispatch(request: RequestHeader): Option[Handler] = + dispatch(request, classOf[AtmosphereController]) + + def dispatch(request: RequestHeader, controllerClass: Class[_ <: AtmosphereController]): Option[Handler] = { + val upgradeHeader = request.headers.get("Upgrade") + val connectionHeaders = Option(request.headers.get("Connection")).map(_.toSeq).getOrElse(Seq.empty) + dispatch(request.path, controllerClass, upgradeHeader, connectionHeaders) + } + private def dispatch(requestPath: String, controllerClass: Class[_ <: AtmosphereController], upgradeHeader: Option[String], connectionHeaders: Seq[String]): Option[Handler] = { - if (!AtmosphereCoordinator.instance.matchPath(requestPath)) - None - - else { - val controller: AtmosphereController = - Option(injector.instanceOf(controllerClass)).getOrElse(controllerClass.newInstance) - + if (AtmosphereCoordinator.instance.matchPath(requestPath)) { + val controller: AtmosphereController = Option(injector.instanceOf(controllerClass)).getOrElse(controllerClass.newInstance) // Netty fail to decode headers separated by a ',' val javaAction = if (isWsSupported(upgradeHeader, connectionHeaders)) - JavaWebSocket.ofString(controller.webSocket) + controller.webSocket else new JavaAction(components) { - val annotations = new JavaActionAnnotations(controllerClass, controllerClass.getMethod("http")) + val annotations = new JavaActionAnnotations(controllerClass, controllerClass.getMethod("http"), new ActionCompositionConfiguration()) val parser = javaBodyParserToScala(injector.instanceOf(annotations.parser)) - - def invocation = CompletableFuture.completedFuture(controller.http) + def invocation = controller.http } Some(javaAction) } + else { + None + } } def javaBodyParserToScala(parser: play.mvc.BodyParser[_]): BodyParser[RequestBody] = BodyParser { request => val accumulator = parser.apply(new play.core.j.RequestHeaderImpl(request)).asScala() - import play.api.libs.iteratee.Execution.Implicits.trampoline accumulator.map { javaEither => if (javaEither.left.isPresent) { Left(javaEither.left.get().asScala()) @@ -72,13 +78,4 @@ class AtmosphereHttpRequestHandler @Inject()(components: JavaHandlerComponents, } } - def dispatch(request: RequestHeader): Option[Handler] = - dispatch(request, classOf[AtmosphereController]) - - def dispatch(request: RequestHeader, controllerClass: Class[_ <: AtmosphereController]): Option[Handler] = { - val upgradeHeader = request.headers.get("Upgrade") - val connectionHeaders = Option(request.headers.get("Connection")).map(_.toSeq).getOrElse(Seq.empty) - dispatch(request.path, controllerClass, upgradeHeader, connectionHeaders) - } - } \ No newline at end of file diff --git a/pom.xml b/pom.xml index cd08a94..3eed941 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.atmosphere play-project Atmosphere Play Project - 2.3.1-SNAPSHOT + 2.4.0 pom Atmosphere running on top of Play! @@ -46,8 +46,8 @@ com.typesafe.play - play_2.11 - 2.5.4 + play_2.12 + 2.6.20 provided