Skip to content

Commit

Permalink
Merge pull request #1 from vuong-idnow/master
Browse files Browse the repository at this point in the history
Upgrade atmosphere-play to use play 2.6
  • Loading branch information
kbarzen-idn committed Dec 11, 2018
2 parents e4686f8 + 69ac74d commit 9619e19
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 129 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ Fork the [samples workspace](https://github.com/Atmosphere/atmosphere-samples/tr

Download [Atmosphere Play!](http://search.maven.org/#search%7Cga%7C1%7Catmosphere-play), use Maven or [sbt](http://ntcoding.blogspot.ca/2013/09/atmosphere-scala-sbt-akka-step-by-step.html)

For Play 2.6.x+:

```xml
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-play</artifactId>
<version>2.4.0</version>
</dependency>
```

For Play 2.5.x+:

```xml
Expand Down
6 changes: 3 additions & 3 deletions module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
<parent>
<groupId>org.atmosphere</groupId>
<artifactId>play-project</artifactId>
<version>2.3.1-SNAPSHOT</version>
<version>2.4.0</version>
<relativePath>../</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-play</artifactId>
<name>Atmosphere Play Module</name>
<version>2.3.1-SNAPSHOT</version>
<version>2.4.0</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
Expand All @@ -19,7 +19,7 @@
</dependency>
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play_2.11</artifactId>
<artifactId>play_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
45 changes: 34 additions & 11 deletions module/src/main/java/org/atmosphere/play/AtmosphereController.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,46 @@
*/
package org.atmosphere.play;

import akka.actor.ActorSystem;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import org.apache.commons.lang3.StringUtils;
import org.atmosphere.config.service.Singleton;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.util.IOUtils;
import play.api.libs.streams.ActorFlow;
import play.api.mvc.WebSocket;
import play.api.mvc.WebSocket$;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.Controller;
import play.mvc.LegacyWebSocket;
import play.mvc.Result;
import play.mvc.Results;

import javax.inject.Inject;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@Singleton
public class AtmosphereController extends Controller {
private final AtmosphereFramework framework;
public static final int BUFFER_SIZE = 10;
private final AtmosphereFramework framework;
private final AtmosphereConfig config;
private final AtmospherePlaySessionConverter converter;
private HttpExecutionContext httpExecutionContext;

@SuppressWarnings("unchecked")
public AtmosphereController() throws InstantiationException, IllegalAccessException, Exception {
framework = AtmosphereCoordinator.instance().framework();
@Inject
private ActorSystem actorSystem;

@Inject
private Materializer materializer;

@Inject
public AtmosphereController(HttpExecutionContext ec) throws Exception {
this.httpExecutionContext = ec;
framework = AtmosphereCoordinator.instance().framework();
config = framework.getAtmosphereConfig();

final String playSessionConverter = config.getInitParameter(AtmosphereCoordinator.PLAY_SESSION_CONVERTER);
Expand All @@ -45,13 +66,16 @@ public AtmosphereController() throws InstantiationException, IllegalAccessExcept
}
}

public LegacyWebSocket<String> webSocket() throws Throwable {
return new PlayWebSocket(config, request(), convertedSession()).internal();
public WebSocket webSocket() {
return WebSocket$.MODULE$.accept(
request -> ActorFlow.actorRef(
req -> AtmosphereWebSocketActor.props(req, request, convertedSession(), config), BUFFER_SIZE, OverflowStrategy.dropNew(), actorSystem , materializer),
WebSocket.MessageFlowTransformer$.MODULE$.stringMessageFlowTransformer());
}

public Result http() throws Throwable {
// TODO: Wrong status code on error!
return ok(new PlayAsyncIOWriter(request(), convertedSession(), response()).internal());
public CompletionStage<Result> http() {
return CompletableFuture.supplyAsync(() -> Results.ok().chunked(new PlayAsyncIOWriter(request(), convertedSession(), response()).internal()).as("text/event-stream"), httpExecutionContext.current());

}

protected Map<String, Object> convertedSession() {
Expand All @@ -64,5 +88,4 @@ protected Map<String, Object> convertedSession() {

return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.atmosphere.play;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.util.ByteString;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResponseImpl;
import org.atmosphere.cpr.WebSocketProcessorFactory;
import org.atmosphere.websocket.WebSocketProcessor;
import play.Logger;
import play.api.mvc.RequestHeader;
import play.mvc.Http;

import java.util.Map;

public class AtmosphereWebSocketActor extends AbstractActor {
private static final Logger.ALogger LOG = Logger.of(AtmosphereWebSocketActor.class);
private PlayWebSocket playWebSocket = null;
private WebSocketProcessor webSocketProcessor = null;
private ActorRef actorRef;
private RequestHeader requestHeader;
private Map<String, Object> additionalAttributes;
private AtmosphereConfig atmosphereConfig;

public static Props props(ActorRef actorRef, RequestHeader requestHeader, Map<String, Object> additionalAtts, AtmosphereConfig config) {
return Props.create(AtmosphereWebSocketActor.class, actorRef, requestHeader, additionalAtts, config);
}

public AtmosphereWebSocketActor(ActorRef actorRef, RequestHeader requestHeader, Map<String, Object> additionalAttributes, AtmosphereConfig atmosphereConfig) {
this.actorRef = actorRef;
this.requestHeader = requestHeader;
this.additionalAttributes = additionalAttributes;
this.atmosphereConfig = atmosphereConfig;
}

@Override
public void preStart() {
try {
this.playWebSocket = new PlayWebSocket(actorRef, atmosphereConfig);
this.webSocketProcessor = WebSocketProcessorFactory.getDefault().getWebSocketProcessor(atmosphereConfig.framework());
AtmosphereRequest atmosphereRequest = AtmosphereUtils.request(new Http.RequestImpl(requestHeader), additionalAttributes);
this.webSocketProcessor.open(playWebSocket, atmosphereRequest, AtmosphereResponseImpl.newInstance(atmosphereConfig, atmosphereRequest, playWebSocket));
} catch (Throwable throwable) {
LOG.error("Failed to start the actor ", throwable);
}
}

@Override
public void postStop() {
this.webSocketProcessor.close(playWebSocket, 1002);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, this::handleString)
.match(ByteString.class, this::handleByteString)
.build();
}

private void handleString(String string) {
this.webSocketProcessor.invokeWebSocketProtocol(this.playWebSocket, string);
}

private void handleByteString(ByteString byteString) {
this.webSocketProcessor.invokeWebSocketProtocol(this.playWebSocket, byteString.toString());
}

}
7 changes: 7 additions & 0 deletions module/src/main/java/org/atmosphere/play/OutStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.atmosphere.play;

public interface OutStream {
void write(String message);
void close();

}
98 changes: 59 additions & 39 deletions module/src/main/java/org/atmosphere/play/PlayAsyncIOWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package org.atmosphere.play;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.atmosphere.cpr.AsyncIOWriter;
import org.atmosphere.cpr.AsynchronousProcessor;
import org.atmosphere.cpr.AtmosphereInterceptorWriter;
Expand All @@ -26,60 +32,72 @@
import org.atmosphere.util.ByteArrayAsyncWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import play.api.mvc.Codec;
import play.core.j.JavaResults;
import play.libs.F;
import play.mvc.Http;
import play.mvc.Results;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class PlayAsyncIOWriter extends AtmosphereInterceptorWriter implements PlayInternal<Results.Chunks<String>> {
public class PlayAsyncIOWriter extends AtmosphereInterceptorWriter implements PlayInternal<Source<ByteString, ?>> {
public static final int BUFFER_SIZE = 10;
private static final Logger logger = LoggerFactory.getLogger(PlayAsyncIOWriter.class);
private final AtomicInteger pendingWrite = new AtomicInteger();
private final AtomicBoolean asyncClose = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final ByteArrayAsyncWriter buffer = new ByteArrayAsyncWriter();
protected Results.Chunks<String> chunks;
protected Results.Chunks.Out<String> out;
private Source<ByteString,? > source;
private OutStream out;
private boolean byteWritten = false;
private long lastWrite = 0;
private boolean resumeOnBroadcast;

public PlayAsyncIOWriter(final Http.Request request, final Map<String, Object> additionalAttributes, final Http.Response response) {
final String[] transport = request.queryString() != null ? request.queryString().get(HeaderConfig.X_ATMOSPHERE_TRANSPORT) : null;

chunks = new Results.Chunks<String>(JavaResults.writeString(Codec.utf_8())) {
@Override
public void onReady(Results.Chunks.Out<String> oout) {
out = oout;
boolean keepAlive = false;

try {
final AtmosphereRequest r = AtmosphereUtils.request(request, additionalAttributes);
if (transport != null && transport.length > 0 && !transport[0].equalsIgnoreCase(HeaderConfig.POLLING_TRANSPORT)) {
out.onDisconnected(() -> _close(r));
}

AtmosphereResponse res = new AtmosphereResponseImpl.Builder()
.asyncIOWriter(PlayAsyncIOWriter.this)
.writeHeader(false)
.request(r).build();
keepAlive = AtmosphereCoordinator.instance().route(r, res);
} catch (Throwable e) {
logger.error("", e);
keepAlive = true;
} finally {
if (!keepAlive) {
out.close();
}
}
}
};
this.source = Source.<ByteString>actorRef(BUFFER_SIZE, OverflowStrategy.dropNew()).mapMaterializedValue(actorRef -> {
out = new OutStream() {
@Override
public void write(String message) {
actorRef.tell(message, ActorRef.noSender());
byteWritten = true;
}

@Override
public void close() {
actorRef.tell(new Status.Success(200), ActorRef.noSender());
byteWritten = false;
}
};
boolean keepAlive = false;

try {
final AtmosphereRequest r = AtmosphereUtils.request(request, additionalAttributes);

AtmosphereResponse res = new AtmosphereResponseImpl.Builder()
.asyncIOWriter(PlayAsyncIOWriter.this)
.writeHeader(false)
.request(r).build();
keepAlive = AtmosphereCoordinator.instance().route(r, res);
} catch (Throwable e) {
logger.error("", e);
keepAlive = true;
} finally {
if (!keepAlive) {
out.close();
}
}

return NotUsed.getInstance();
})
.watchTermination((arg1, arg2) -> {
try {
if (transport != null && transport.length > 0 && !transport[0].equalsIgnoreCase(HeaderConfig.POLLING_TRANSPORT)) {
_close(AtmosphereUtils.request(request, additionalAttributes));
}
} catch (Throwable ignored) {
}
return NotUsed.getInstance();
});

// TODO: Configuring headers in Atmosphere won't work as the onReady is asynchronously called.
// TODO: Some Broadcaster's Cache won't work as well.
Expand All @@ -88,8 +106,8 @@ public void onReady(Results.Chunks.Out<String> oout) {
}
}

public Results.Chunks<String> internal() {
return chunks;
public Source<ByteString, ?> internal() {
return source;
}

public boolean isClosed() {
Expand Down Expand Up @@ -182,6 +200,8 @@ public void close(AtmosphereResponse r) throws IOException {
}

asyncClose.set(true);
out.close();
if (byteWritten) {
out.close();
}
}
}

0 comments on commit 9619e19

Please sign in to comment.