Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
don't send messages within the singleton ejbs. use the helper
Browse files Browse the repository at this point in the history
mdb and command for execute-op
  • Loading branch information
jmazzitelli committed Jul 14, 2015
1 parent ed2bc2f commit e8c0be0
Show file tree
Hide file tree
Showing 13 changed files with 424 additions and 30 deletions.
6 changes: 0 additions & 6 deletions modules/feed-comm/feed-comm-war/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@
<scope>provided</scope> <!-- the nest provides this -->
</dependency>

<dependency>
<groupId>org.hawkular.bus</groupId>
<artifactId>hawkular-bus-mdb</artifactId>
<scope>provided</scope> <!-- the nest provides this -->
</dependency>

<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.feedcomm.ws;



/**
* Global constants.
*/
public interface Constants {
/**
* A JMS message header that will identify the targeted feed.
*/
String HEADER_FEEDID = "feedId";

/**
* The JNDI name of the bus connection factory.
*/
String CONNECTION_FACTORY_JNDI = "java:/HawkularBusConnectionFactory";

// QUEUES AND TOPICS
String DEST_FEED_EXECUTE_OP = "FeedExecuteOperation";
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@ValidIdRange(min = 1, max = 5000)
public interface MsgLogger extends BasicLogger {

MsgLogger LOG = Logger.getMessageLogger(MsgLogger.class, "org.hawkular.feedcom.ws");
MsgLogger LOG = Logger.getMessageLogger(MsgLogger.class, "org.hawkular.feedcomm.ws");

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 1, value = "Feed [%s] provided an invalid command request: [%s]")
Expand All @@ -54,4 +54,8 @@ public interface MsgLogger extends BasicLogger {
@Message(id = 6, value = "Failed to execute command [%s] for UI client session [%s]")
void errorCommandExecutionFailureUIClient(String commandRequest, String sessionId, @Cause Throwable t);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 7, value = "Cannot process an execute-operation message")
void errorCannotProcessExecuteOperationMessage(@Cause Exception e);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.feedcomm.ws;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

import javax.websocket.RemoteEndpoint.Async;
import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session;

import org.hawkular.bus.common.BasicMessage;
import org.hawkular.feedcomm.api.ApiDeserializer;

/**
* Some convienence methods when working with WebSockets.
*/
public class WebSocketHelper {

private Long asyncTimeout;

public WebSocketHelper() {
this.asyncTimeout = null;
}

/**
* Creates a helper object.
*
* @param timeout number of milliseconds before an asynchronous send will timeout.
* A negative number means no timeout, null means use the WebSocket default timeout.
*/
public WebSocketHelper(Long asyncTimeout) {
this.asyncTimeout = asyncTimeout;
}

/**
* Converts the given message to JSON and sends that JSON text to clients asynchronously.
*
* @param sessions the client sessions where the JSON message will be sent
* @param msg the message to be converted to JSON and sent
*/
public void sendBasicMessageAsync(Collection<Session> sessions, BasicMessage msg) {
String text = ApiDeserializer.toHawkularFormat(msg);
sendTextAsync(sessions, text);
}

/**
* Converts the given message to JSON and sends that JSON text to clients synchronously.
* If you send to multiple sessions, but an error occurred
* trying to deliver to one of the sessions, this method aborts, throws an exception, and the rest
* of the sessions will not get the message.
*
* @param sessions the client sessions where the message will be sent
* @param msg the message to be converted to JSON and sent
* @throws IOException if a problem occurred during delivery of the message to a session.
*/
public void sendBasicMessageSync(Collection<Session> sessions, BasicMessage msg) throws IOException {
String text = ApiDeserializer.toHawkularFormat(msg);
sendTextSync(sessions, text);
}

/**
* Sends text to clients asynchronously.
*
* @param sessions the client sessions where the message will be sent
* @param text the message
*/
public void sendTextAsync(Collection<Session> sessions, String text) {
if (sessions == null || sessions.isEmpty()) {
return;
}

if (text == null) {
throw new IllegalArgumentException("message must not be null");
}

MsgLogger.LOG.infof("Attempting to send async message to [%d] clients: [%s]", sessions.size(), text);

for (Session session : sessions) {
if (session.isOpen()) {
Async asyncRemote = session.getAsyncRemote();
if (this.asyncTimeout != null) {
asyncRemote.setSendTimeout(this.asyncTimeout.longValue());
}
asyncRemote.sendText(text);
}
}

return;
}

/**
* Sends text to clients synchronously. If you send to multiple sessions, but an error occurred
* trying to deliver to one of the sessions, this method aborts, throws an exception, and the rest
* of the sessions will not get the message.
*
* @param sessions the client sessions where the message will be sent
* @param text the message
* @throws IOException if a problem occurred during delivery of the message to a session.
*/
public void sendTextSync(Collection<Session> sessions, String text) throws IOException {
if (sessions == null || sessions.isEmpty()) {
return;
}

if (text == null) {
throw new IllegalArgumentException("message must not be null");
}

MsgLogger.LOG.infof("Attempting to send message to [%d] clients: [%s]", sessions.size(), text);

for (Session session : sessions) {
if (session.isOpen()) {
Basic basicRemote = session.getBasicRemote();
basicRemote.sendText(text);
}
}

return;
}

public void sendBasicMessageAsync(Session session, BasicMessage msg) {
sendBasicMessageAsync(Collections.singletonList(session), msg);
}

public void sendBasicMessageSync(Session session, BasicMessage msg) throws IOException {
sendBasicMessageSync(Collections.singletonList(session), msg);
}

public void sendTextAsync(Session session, String text) {
sendTextAsync(Collections.singletonList(session), text);
}

public void sendTextSync(Session session, String text) throws IOException {
sendTextSync(Collections.singletonList(session), text);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
* An command that comes from a feed.
*/
public interface Command<REQ extends BasicMessage, RESP extends BasicMessage> {

/**
* Performs the command for the feed.
*
* @param request the request that describes what needs to be executed
* @param context some context data that can be useful for the command to be able to execute the request
* @return the results of the command that need to be sent back to the feed.
* @throws Exception if failed to execute the operation
*/
RESP execute(REQ request);
RESP execute(REQ request, CommandContext context) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.feedcomm.ws.command;

import javax.jms.ConnectionFactory;

import org.hawkular.feedcomm.ws.server.ConnectedFeeds;
import org.hawkular.feedcomm.ws.server.ConnectedUIClients;

public class CommandContext {
public final ConnectedFeeds connectedFeeds;
public final ConnectedUIClients connectedUIClients;
public final ConnectionFactory connectionFactory;

public CommandContext(ConnectedFeeds f, ConnectedUIClients ui, ConnectionFactory cf) {
this.connectedFeeds = f;
this.connectedUIClients = ui;
this.connectionFactory = cf;
}

public ConnectedFeeds getConnectedFeeds() {
return connectedFeeds;
}

public ConnectedUIClients getConnectedUIClients() {
return connectedUIClients;
}

public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class EchoCommand implements Command<EchoRequest, EchoResponse> {
public static final Class<EchoRequest> REQUEST_CLASS = EchoRequest.class;

@Override
public EchoResponse execute(EchoRequest echoRequest) {
public EchoResponse execute(EchoRequest echoRequest, CommandContext context) {
String reply = String.format("ECHO [%s]", echoRequest.getEchoMessage());

// return the response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@
*/
package org.hawkular.feedcomm.ws.command.ui;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.hawkular.bus.common.ConnectionContextFactory;
import org.hawkular.bus.common.Endpoint;
import org.hawkular.bus.common.MessageId;
import org.hawkular.bus.common.MessageProcessor;
import org.hawkular.bus.common.producer.ProducerConnectionContext;
import org.hawkular.feedcomm.api.ExecuteOperationRequest;
import org.hawkular.feedcomm.api.GenericSuccessResponse;
import org.hawkular.feedcomm.ws.Constants;
import org.hawkular.feedcomm.ws.command.Command;
import org.hawkular.feedcomm.ws.command.CommandContext;

/**
* UI client requesting to execute an operation on a resource managed by a feed.
Expand All @@ -27,8 +38,25 @@ public class ExecuteOperationCommand implements Command<ExecuteOperationRequest,
public static final Class<ExecuteOperationRequest> REQUEST_CLASS = ExecuteOperationRequest.class;

@Override
public GenericSuccessResponse execute(ExecuteOperationRequest request) {
return null;
}
public GenericSuccessResponse execute(ExecuteOperationRequest request, CommandContext context) throws Exception {

// determine what feed needs to be sent the message
String feedId;

// TODO: THIS IS JUST FOR TESTING - JUST PICK A FEED, ANY FEED.
// IN THE FUTURE, WE NEED TO LOOK AT THE RESOURCE ID AND FIGURE OUT THE FEED RESPONSIBLE FOR IT
Set<String> feeds = context.getConnectedFeeds().getAllFeeds();
feedId = feeds.iterator().next();

GenericSuccessResponse response;
try (ConnectionContextFactory ccf = new ConnectionContextFactory(context.getConnectionFactory())) {
Endpoint endpoint = new Endpoint(Endpoint.Type.TOPIC, Constants.DEST_FEED_EXECUTE_OP);
ProducerConnectionContext pcc = ccf.createProducerConnectionContext(endpoint);
Map<String, String> feedIdHeader = Collections.singletonMap(Constants.HEADER_FEEDID, feedId);
MessageId mid = new MessageProcessor().send(pcc, request, feedIdHeader);
response = new GenericSuccessResponse();
response.setMessage("The execution request has been forwarded (id=" + mid + ")");
}
return response;
}
}
Loading

0 comments on commit e8c0be0

Please sign in to comment.