Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
FALCON-1926 Filter out effectively non-falcon related JMS messages …
Browse files Browse the repository at this point in the history
* Falcon to retrieve Oozie JMS notifications that belong to Falcon generated Workflows .. ignoring notifications generated by other WF directly submitted to Oozie.

Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: "Balu Vellanki <balu@apache.org>, Ying Zheng <yzheng@hortonworks.com>"

Closes #119 from vramachan/FALCON-1923.OpsCoord
  • Loading branch information
vramachan authored and bvellanki committed May 12, 2016
1 parent 5e26e11 commit 98a904c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 23 deletions.
Expand Up @@ -34,6 +34,9 @@
public class WorkflowNameBuilder<T extends Entity> {
private static final String PREFIX = "FALCON";

// Oozie JMS message property name that holds the workflow app name
private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName";

private T entity;
private Tag tag;
private List<String> suffixes;
Expand Down Expand Up @@ -153,5 +156,9 @@ public static Pair<String, EntityType> getEntityNameAndType(String workflowName)
}
return null;
}

public static String getJMSFalconSelector() {
return String.format("%s like '%s%s%%'", OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR);
}
}
}
2 changes: 1 addition & 1 deletion docs/src/site/twiki/Configuration.twiki
Expand Up @@ -99,7 +99,7 @@ Some Falcon features such as late data handling, retries, metadata service, depe
* In Falcon runtime.properties, set *.falcon.jms.notification.enabled to false. This will turn off JMS notification in post-processing.
* Copy notification related properties in oozie/conf/oozie-site.xml to oozie-site.xml of the Oozie installation. Restart Oozie so changes get reflected.

*NOTE : If you disable Falcon post-processing JMS notification and not enable Oozie JMS notification, features such as failure retry, late data handling and metadata service will be disabled for all entities on the server.*
*NOTE : Oozie JMS notification needs to be enabled for features such as failure retry, late data handling and metadata service will be disabled for all entities on the server. Please refer Falcon documentation on how to configure Oozie for Falcon.*

---+++Enabling Falcon Native Scheudler
You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to schedule entities natively on Falcon, you will need to add some additional properties to <verbatim>$FALCON_HOME/conf/startup.properties</verbatim> before starting the Falcon Server. For details on the same, refer to [[FalconNativeScheduler][Falcon Native Scheduler]]
Expand Down
Expand Up @@ -92,7 +92,8 @@ public void startSubscriber() throws FalconException {

topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = topicSession.createTopic(topicName);
topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false);
topicSubscriber.setMessageListener(this);

connection.setExceptionListener(this);
Expand Down
Expand Up @@ -82,6 +82,11 @@ public void setup() throws Exception {
}

public void sendMessages(String topic, WorkflowExecutionContext.Type type)
throws JMSException, FalconException, IOException {
sendMessages(topic, type, true);
}

public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF)
throws JMSException, FalconException, IOException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
Expand All @@ -100,10 +105,10 @@ public void sendMessages(String topic, WorkflowExecutionContext.Type type)
message = getMockFalconMessage(i, session);
break;
case WORKFLOW_JOB:
message = getMockOozieMessage(i, session);
message = getMockOozieMessage(i, session, isFalconWF);
break;
case COORDINATOR_ACTION:
message = getMockOozieCoordMessage(i, session);
message = getMockOozieCoordMessage(i, session, isFalconWF);
default:
break;
}
Expand All @@ -112,10 +117,15 @@ public void sendMessages(String topic, WorkflowExecutionContext.Type type)
}
}

private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException {
private Message getMockOozieMessage(int i, Session session, boolean isFalconWF)
throws FalconException, JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("appType", "WORKFLOW_JOB");
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
if (isFalconWF) {
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
} else {
message.setStringProperty("appName", "OozieSampleShellWF");
}
message.setStringProperty("user", "falcon");
switch(i % 4) {
case 0:
Expand All @@ -142,10 +152,15 @@ private Message getMockOozieMessage(int i, Session session) throws FalconExcepti
return message;
}

private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException {
private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF)
throws FalconException, JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("appType", "COORDINATOR_ACTION");
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
if (isFalconWF) {
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
} else {
message.setStringProperty("appName", "OozieSampleShellWF");
}
message.setStringProperty("user", "falcon");
switch(i % 5) {
case 0:
Expand Down Expand Up @@ -288,4 +303,24 @@ public void tearDown() throws Exception{
broker.stop();
subscriber.closeSubscriber();
}

@Test
public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF */);

final BrokerView adminView = broker.getAdminView();

Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
Assert.assertEquals(adminView.getTotalMessageCount(), 0);

Thread.sleep(100);
Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
}

}
Expand Up @@ -48,14 +48,6 @@ public int run(String[] args) throws Exception {
// serialize the context to HDFS under logs dir before sending the message
context.serialize();

boolean systemNotificationEnabled = Boolean.parseBoolean(context.
getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));

if (systemNotificationEnabled) {
LOG.info("Sending Falcon message {} ", context);
invokeFalconMessageProducer(context);
}

String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
boolean userNotificationEnabled = Boolean.parseBoolean(context.
getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
Expand All @@ -80,13 +72,6 @@ private void invokeUserMessageProducer(WorkflowExecutionContext context) throws
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
}

private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
.type(JMSMessageProducer.MessageType.FALCON)
.build();
jmsMessageProducer.sendMessage();
}

private void invokeLogProducer(WorkflowExecutionContext context) {
// todo: need to move this out to Falcon in-process
if (UserGroupInformation.isSecurityEnabled()) {
Expand Down

0 comments on commit 98a904c

Please sign in to comment.