diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java index c58be64e2..f0d6073b2 100644 --- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java +++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java @@ -34,6 +34,9 @@ public class WorkflowNameBuilder { private static final String PREFIX = "FALCON"; + // Oozie JMS message property name that holds the workflow app name + private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName"; + private T entity; private Tag tag; private List suffixes; @@ -153,5 +156,9 @@ public static Pair getEntityNameAndType(String workflowName) } return null; } + + public static String getJMSFalconSelector() { + return String.format("%s like '%s%s%%'", OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR); + } } } diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki index 0df094fa8..8cf2a64bb 100644 --- a/docs/src/site/twiki/Configuration.twiki +++ b/docs/src/site/twiki/Configuration.twiki @@ -99,7 +99,7 @@ Some Falcon features such as late data handling, retries, metadata service, depe * In Falcon runtime.properties, set *.falcon.jms.notification.enabled to false. This will turn off JMS notification in post-processing. * Copy notification related properties in oozie/conf/oozie-site.xml to oozie-site.xml of the Oozie installation. Restart Oozie so changes get reflected. -*NOTE : If you disable Falcon post-processing JMS notification and not enable Oozie JMS notification, features such as failure retry, late data handling and metadata service will be disabled for all entities on the server.* +*NOTE : Oozie JMS notification needs to be enabled for features such as failure retry, late data handling and metadata service will be disabled for all entities on the server. Please refer Falcon documentation on how to configure Oozie for Falcon.* ---+++Enabling Falcon Native Scheudler You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to schedule entities natively on Falcon, you will need to add some additional properties to $FALCON_HOME/conf/startup.properties before starting the Falcon Server. For details on the same, refer to [[FalconNativeScheduler][Falcon Native Scheduler]] diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 90bbdd3a4..8b48e9351 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -92,7 +92,8 @@ public void startSubscriber() throws FalconException { topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic destination = topicSession.createTopic(topicName); - topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID); + topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID, + WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false); topicSubscriber.setMessageListener(this); connection.setExceptionListener(this); diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java index 5c53a3ec3..0ba94643e 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java @@ -82,6 +82,11 @@ public void setup() throws Exception { } public void sendMessages(String topic, WorkflowExecutionContext.Type type) + throws JMSException, FalconException, IOException { + sendMessages(topic, type, true); + } + + public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF) throws JMSException, FalconException, IOException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); Connection connection = connectionFactory.createConnection(); @@ -100,10 +105,10 @@ public void sendMessages(String topic, WorkflowExecutionContext.Type type) message = getMockFalconMessage(i, session); break; case WORKFLOW_JOB: - message = getMockOozieMessage(i, session); + message = getMockOozieMessage(i, session, isFalconWF); break; case COORDINATOR_ACTION: - message = getMockOozieCoordMessage(i, session); + message = getMockOozieCoordMessage(i, session, isFalconWF); default: break; } @@ -112,10 +117,15 @@ public void sendMessages(String topic, WorkflowExecutionContext.Type type) } } - private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException { + private Message getMockOozieMessage(int i, Session session, boolean isFalconWF) + throws FalconException, JMSException { TextMessage message = session.createTextMessage(); message.setStringProperty("appType", "WORKFLOW_JOB"); - message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); + if (isFalconWF) { + message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); + } else { + message.setStringProperty("appName", "OozieSampleShellWF"); + } message.setStringProperty("user", "falcon"); switch(i % 4) { case 0: @@ -142,10 +152,15 @@ private Message getMockOozieMessage(int i, Session session) throws FalconExcepti return message; } - private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException { + private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF) + throws FalconException, JMSException { TextMessage message = session.createTextMessage(); message.setStringProperty("appType", "COORDINATOR_ACTION"); - message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); + if (isFalconWF) { + message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); + } else { + message.setStringProperty("appName", "OozieSampleShellWF"); + } message.setStringProperty("user", "falcon"); switch(i % 5) { case 0: @@ -288,4 +303,24 @@ public void tearDown() throws Exception{ broker.stop(); subscriber.closeSubscriber(); } + + @Test + public void testJMSMessagesFromOozieForNonFalconWF() throws Exception { + sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF */); + + final BrokerView adminView = broker.getAdminView(); + + Assert.assertEquals(adminView.getTotalDequeueCount(), 0); + Assert.assertEquals(adminView.getTotalEnqueueCount(), 10); + Assert.assertEquals(adminView.getTotalConsumerCount(), 2); + Assert.assertEquals(adminView.getTotalMessageCount(), 0); + + Thread.sleep(100); + Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class)); + Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class)); + } + } diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java index 3bdfe7351..496189664 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java @@ -48,14 +48,6 @@ public int run(String[] args) throws Exception { // serialize the context to HDFS under logs dir before sending the message context.serialize(); - boolean systemNotificationEnabled = Boolean.parseBoolean(context. - getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true")); - - if (systemNotificationEnabled) { - LOG.info("Sending Falcon message {} ", context); - invokeFalconMessageProducer(context); - } - String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL); boolean userNotificationEnabled = Boolean.parseBoolean(context. getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true")); @@ -80,13 +72,6 @@ private void invokeUserMessageProducer(WorkflowExecutionContext context) throws jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS); } - private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception { - JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context) - .type(JMSMessageProducer.MessageType.FALCON) - .build(); - jmsMessageProducer.sendMessage(); - } - private void invokeLogProducer(WorkflowExecutionContext context) { // todo: need to move this out to Falcon in-process if (UserGroupInformation.isSecurityEnabled()) {