Skip to content

Commit

Permalink
got end-to-end streaming to work from UI->server->bus->server->feed
Browse files Browse the repository at this point in the history
  • Loading branch information
jmazzitelli committed Aug 5, 2015
1 parent 1ed1ac4 commit 120e5f9
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.Map;

import javax.jms.JMSException;
Expand Down Expand Up @@ -444,7 +445,9 @@ protected Message createMessageWithBinaryData(ConnectionContext context, BasicMe
BinaryData messagePlusBinaryData = new BinaryData(basicMessage.toJSON().getBytes(), inputStream);

// we are using a ActiveMQ-specific feature that allows us to stream blobs
Message msg = ((ActiveMQSession) session).createBlobMessage(messagePlusBinaryData);
// for some unknown reason, ActiveMQ doesn't allow RA-obtained sessions to create BlobMessages.
// Need to play games to get the real ActiveMQ session so we can create BlobMessage.
Message msg = getActiveMQSession(session).createBlobMessage(messagePlusBinaryData);

// if the basicMessage has headers, use those first
Map<String, String> basicMessageHeaders = basicMessage.getHeaders();
Expand All @@ -464,4 +467,20 @@ protected Message createMessageWithBinaryData(ConnectionContext context, BasicMe

return msg;
}

protected ActiveMQSession getActiveMQSession(Session session) {
if (session instanceof ActiveMQSession) {
return (ActiveMQSession) session;
}

// This is probably a session obtained from the resource adapter, which is really a proxy.
// It has a non-public method called "getSession" that gets the session we want, so use reflection to get it.
try {
Method m = session.getClass().getDeclaredMethod("getSession");
m.setAccessible(true);
return (ActiveMQSession) m.invoke(session);
} catch (Exception e) {
throw new IllegalStateException("Not running with ActiveMQ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

/**
* A message listener that expects to receive a JSON-encoded BasicMessage or one of its subclasses. Implementors need
* only implement the method that takes an BasicRecord or one of its subclasses; the JSON decoding is handled for you.
* not worry about the JSON decoding as it is handled for you
*
* Subclasses must override one and only one of the {@link #onBasicMessage(BasicMessageWithExtraData)} or
* {@link #onBasicMessage(BasicMessage)} methods.
*
* This processes fire-and-forget requests - that is, the request message is processed with no response being sent back
* to the sender.
Expand Down Expand Up @@ -55,7 +58,28 @@ public void onMessage(Message message) {
/**
* Subclasses implement this method to process the received message.
*
* If subclasses would rather just receive the {@link BasicMessage}, it can do so by
* overriding {@link #onBasicMessage(BasicMessage)} and leaving this method as-is (that is,
* do NOT override this method).
*
* @param msgWithExtraData the basic message received with any extra data that came with it
*/
protected abstract void onBasicMessage(BasicMessageWithExtraData<T> msgWithExtraData);
protected void onBasicMessage(BasicMessageWithExtraData<T> msgWithExtraData) {
onBasicMessage(msgWithExtraData.getBasicMessage()); // delegate
}

/**
* Subclasses can implement this method rather than {@link #onBasicMessage(BasicMessageWithExtraData)}
* if they only expect to receive a {@link BasicMessage} with no additional data.
*
* If this method is overridden by subclasses, then the {@link #onBasicMessage(BasicMessageWithExtraData)}
* should not be.
*
* This base implementation is a no-op.
*
* @param basicMessage the basic message received
*/
protected void onBasicMessage(T basicMessage) {
// no op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
/**
* A listener that processes an incoming request that will require a response sent back to the sender of the request.
*
* Subclasses must override one and only one of the {@link #onBasicMessage(BasicMessageWithExtraData)} or
* {@link #onBasicMessage(BasicMessage)} methods.
*
* @author John Mazzitelli
*
* @param <T> the type of the incoming request message
Expand Down Expand Up @@ -137,8 +140,30 @@ public void onMessage(Message message) {
/**
* Subclasses implement this method to process the received message.
*
* If subclasses would rather just receive the {@link BasicMessage}, it can do so by
* overriding the onBasicMessage method that just takes the message type as a parameter
* and leaving this method as-is (that is, do NOT override this method).
*
* @param msgWithExtraData the basic message received with any extra data that came with it
* @return the response message - this will be forwarded to the sender of the request message
*/
protected abstract U onBasicMessage(BasicMessageWithExtraData<T> msgWithExtraData);
protected U onBasicMessage(BasicMessageWithExtraData<T> msgWithExtraData) {
return onBasicMessage(msgWithExtraData.getBasicMessage());
}

/**
* Subclasses can implement this method rather than {@link #onBasicMessage(BasicMessageWithExtraData)}
* if they only expect to receive a {@link BasicMessage} with no additional data.
*
* If this method is overridden by subclasses, then the {@link #onBasicMessage(BasicMessageWithExtraData)}
* should not be.
*
* This base implementation is a no-op.
*
* @param basicMessage the basic message received
* @return the response message - this will be forwarded to the sender of the request message
*/
protected U onBasicMessage(T basicMessage) {
return null; // no op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"type": "object",
"extends": {
"type": "object",
"javaType": "org.hawkular.bus.common.BasicMessage"
},
"javaType": "org.hawkular.feedcomm.api.FileUploadRequest",
"description": "A simple request to upload a file. The actual file data must be appended to this JSON message.",
"additionalProperties": false,
"properties": {
"feedId": {
"type": "string"
},
"destinationFileName": {
"type": "string"
}
},
"required": ["destinationFileName"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hawkular.feedcomm.ws.command.GenericErrorResponseCommand;
import org.hawkular.feedcomm.ws.command.feed.ExecuteOperationResponseCommand;
import org.hawkular.feedcomm.ws.command.ui.ExecuteOperationCommand;
import org.hawkular.feedcomm.ws.command.ui.FileUploadCommand;
import org.hawkular.feedcomm.ws.server.ValidCommandsMap;

/**
Expand Down Expand Up @@ -58,10 +59,13 @@ public interface Constants {
ValidCommandsMap VALID_COMMANDS_FROM_UI = new ValidCommandsMap()
.put(EchoCommand.REQUEST_CLASS.getName(), EchoCommand.class)
.put(GenericErrorResponseCommand.REQUEST_CLASS.getName(), GenericErrorResponseCommand.class)
.put(FileUploadCommand.REQUEST_CLASS.getName(), FileUploadCommand.class)
.put(ExecuteOperationCommand.REQUEST_CLASS.getName(), ExecuteOperationCommand.class);

// QUEUES AND TOPICS
Endpoint DEST_FEED_EXECUTE_OP = new Endpoint(Type.QUEUE, "FeedExecuteOperation");
Endpoint DEST_FEED_FILE_UPLOAD = new Endpoint(Type.QUEUE, "FeedFileUpload");

Endpoint DEST_UICLIENT_EXECUTE_OP_RESPONSE = new Endpoint(Type.QUEUE, "UIClientExecuteOperationResponse");

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ void errorCommandExecutionFailureUIClient(String commandRequest, String uiClient
void warnReceivedGenericErrorResponse(String errorMessage, String stackTrack);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 9, value = "Feed [%s] session opened")
void infoFeedSessionOpened(String feedId);
@Message(id = 9, value = "Feed [%s] session opened [%s]")
void infoFeedSessionOpened(String feedId, String sessionId);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 10, value = "Failed to add message listeners for feed [%s]. Closing session [%s]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.hawkular.feedcomm.ws;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;

import javax.websocket.RemoteEndpoint.Async;
import javax.websocket.RemoteEndpoint.Basic;
Expand Down Expand Up @@ -136,60 +138,56 @@ public void sendTextSync(Collection<Session> sessions, String text) throws IOExc
}

/**
* Sends binary data to clients asynchronously.
* Sends binary data to a client asynchronously.
*
* @param sessions the client sessions where the message will be sent
* @param binaryData the binary data to send
* @param session the client session where the message will be sent
* @param inputStream the binary data to send
* @param threadPool where the job will be submitted so it can execute asynchronously
*/
public void sendBinaryAsync(Collection<Session> sessions, ByteBuffer binaryData) {
if (sessions == null || sessions.isEmpty()) {
public void sendBinaryAsync(Session session, InputStream inputStream, ExecutorService threadPool) {
if (session == null) {
return;
}

if (binaryData == null) {
throw new IllegalArgumentException("binaryData must not be null");
if (inputStream == null) {
throw new IllegalArgumentException("inputStream must not be null");
}

MsgLogger.LOG.infof("Attempting to send async binary data to [%d] clients", sessions.size());
MsgLogger.LOG.infof("Attempting to send async binary data to client [%s]", session.getId());

for (Session session : sessions) {
if (session.isOpen()) {
Async asyncRemote = session.getAsyncRemote();
if (this.asyncTimeout != null) {
asyncRemote.setSendTimeout(this.asyncTimeout.longValue());
}
asyncRemote.sendBinary(binaryData);
if (session.isOpen()) {
if (this.asyncTimeout != null) {
// TODO: what to do with timeout?
}

CopyStreamRunnable runnable = new CopyStreamRunnable(session, inputStream);
threadPool.execute(runnable);
}

return;
}

/**
* Sends binary data to clients synchronously. If you send to multiple sessions, but an error occurred
* trying to deliver to one of the sessions, this method aborts, throws an exception, and the rest
* of the sessions will not get the message.
* Sends binary data to a client synchronously.
*
* @param sessions the client sessions where the message will be sent
* @param binaryData the binary data to send
* @param session the client where the message will be sent
* @param inputStream the binary data to send
* @throws IOException if a problem occurred during delivery of the data to a session.
*/
public void sendBinarySync(Collection<Session> sessions, ByteBuffer binaryData) throws IOException {
if (sessions == null || sessions.isEmpty()) {
public void sendBinarySync(Session session, InputStream inputStream) throws IOException {
if (session == null) {
return;
}

if (binaryData == null) {
throw new IllegalArgumentException("binaryData must not be null");
if (inputStream == null) {
throw new IllegalArgumentException("inputStream must not be null");
}

MsgLogger.LOG.infof("Attempting to send message to [%d] clients: [%s]", sessions.size(), binaryData);
MsgLogger.LOG.infof("Attempting to send binary data to client [%s]", session.getId());

for (Session session : sessions) {
if (session.isOpen()) {
Basic basicRemote = session.getBasicRemote();
basicRemote.sendBinary(binaryData);
}
if (session.isOpen()) {
long size = new CopyStreamRunnable(session, inputStream).copyInputToOutput();
MsgLogger.LOG.infof("Finished sending binary data to client [%s]: size=[%s]", session.getId(), size);
}

return;
Expand All @@ -211,11 +209,50 @@ public void sendTextSync(Session session, String text) throws IOException {
sendTextSync(Collections.singletonList(session), text);
}

public void sendBinaryAsync(Session session, ByteBuffer binaryData) {
sendBinaryAsync(Collections.singletonList(session), binaryData);
}
private class CopyStreamRunnable implements Runnable {
private final Session session;
private final InputStream inputStream;

public void sendBinarySync(Session session, ByteBuffer binaryData) throws IOException {
sendBinarySync(Collections.singletonList(session), binaryData);
public CopyStreamRunnable(Session session, InputStream inputStream) {
this.session = session;
this.inputStream = inputStream;
}

@Override
public void run() {
try {
long size = copyInputToOutput();
MsgLogger.LOG.infof("Finished sending async binary data to client [%s]: size=[%s]", session.getId(),
size);
} catch (Exception e) {
MsgLogger.LOG.errorf(e, "Failed sending async binary data to client [%s].", session.getId());
}
}

public long copyInputToOutput() throws IOException {
Basic basicRemote = session.getBasicRemote();
OutputStream outputStream = basicRemote.getSendStream();

try {
// slurp the input stream data and send directly to the output stream
byte[] buf = new byte[4096];
long totalBytesCopied = 0L;
while (true) {
int numRead = inputStream.read(buf);
if (numRead == -1) {
break;
}
outputStream.write(buf, 0, numRead);
totalBytesCopied += numRead;
}
return totalBytesCopied;
} finally {
try {
outputStream.close();
} finally {
inputStream.close();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.feedcomm.ws.command.ui;

import java.util.Collections;
import java.util.Map;

import org.hawkular.bus.common.BinaryData;
import org.hawkular.bus.common.ConnectionContextFactory;
import org.hawkular.bus.common.Endpoint;
import org.hawkular.bus.common.MessageId;
import org.hawkular.bus.common.MessageProcessor;
import org.hawkular.bus.common.producer.ProducerConnectionContext;
import org.hawkular.feedcomm.api.FileUploadRequest;
import org.hawkular.feedcomm.api.GenericSuccessResponse;
import org.hawkular.feedcomm.ws.Constants;
import org.hawkular.feedcomm.ws.MsgLogger;
import org.hawkular.feedcomm.ws.command.Command;
import org.hawkular.feedcomm.ws.command.CommandContext;

/**
* UI client requesting to send a file to a remote feed.
*/
public class FileUploadCommand implements Command<FileUploadRequest, GenericSuccessResponse> {
public static final Class<FileUploadRequest> REQUEST_CLASS = FileUploadRequest.class;

@Override
public GenericSuccessResponse execute(FileUploadRequest request, BinaryData binaryData, CommandContext context)
throws Exception {

try (ConnectionContextFactory ccf = new ConnectionContextFactory(context.getConnectionFactory())) {
String feedId = request.getFeedId();
Endpoint endpoint = Constants.DEST_FEED_FILE_UPLOAD;
ProducerConnectionContext pcc = ccf.createProducerConnectionContext(endpoint);
Map<String, String> feedIdHeader = Collections.singletonMap(Constants.HEADER_FEEDID, feedId);
MessageId mid = new MessageProcessor().sendWithBinaryData(pcc, request, binaryData, feedIdHeader);
MsgLogger.LOG.debugf("File upload request placed on bus. mid=[%s], request=[%s]", mid, request);
}

return null;
}
}

0 comments on commit 120e5f9

Please sign in to comment.