Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2051 PostProcessing needs to send JMS message so that REPL met…
…rics can be added to the GraphDB

Basically, reverted the FALCON-1926 changes.
We will attempt to solve it properly by not sending JMS message from Falcon PostProcessing after 0.10 release.

Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: Venkat <n.r.v@gmail.com>, Pallavi Rao <pallavi.rao@inmobi.com>, Praveen Adlakha <adlakha.praveen@gmail.com>, Peeyush<peeyushb@apache.org>

Closes #201 from vramachan/FALCON-2051.PostProcessingNotInvoked.0.10
  • Loading branch information
vramachan authored and peeyushb committed Jun 30, 2016
1 parent 600b6bc commit 0222d390b5abef4b6b7263c84a3d409afac0afd6
Showing 5 changed files with 22 additions and 48 deletions.
@@ -34,9 +34,6 @@
public class WorkflowNameBuilder<T extends Entity> {
private static final String PREFIX = "FALCON";

// Oozie JMS message property name that holds the workflow app name
private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName";

private T entity;
private Tag tag;
private List<String> suffixes;
@@ -156,9 +153,5 @@ public static Pair<String, EntityType> getEntityNameAndType(String workflowName)
}
return null;
}

public static String getJMSFalconSelector() {
return String.format("%s like '%s%s%%'", OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR);
}
}
}
@@ -103,9 +103,6 @@ Oozie workflow completes. Falcon listens to Oozie notification via JMS. You need
explained below. Falcon post processing feature continues to only send user notifications so enabling Oozie
JMS notification is important.

*NOTE : If Oozie JMS notification is not enabled, the Falcon features such as failure retry, late data handling and metadata
service will be disabled for all entities on the server.*

---+++Enable Oozie JMS notification

* Please add/change the following properties in oozie-site.xml in the oozie installation dir.
@@ -92,8 +92,7 @@ public void startSubscriber() throws FalconException {

topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = topicSession.createTopic(topicName);
topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false);
topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
topicSubscriber.setMessageListener(this);

connection.setExceptionListener(this);
@@ -82,11 +82,6 @@ public void setup() throws Exception {
}

public void sendMessages(String topic, WorkflowExecutionContext.Type type)
throws JMSException, FalconException, IOException {
sendMessages(topic, type, true);
}

public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF)
throws JMSException, FalconException, IOException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
@@ -105,10 +100,10 @@ public void sendMessages(String topic, WorkflowExecutionContext.Type type, boole
message = getMockFalconMessage(i, session);
break;
case WORKFLOW_JOB:
message = getMockOozieMessage(i, session, isFalconWF);
message = getMockOozieMessage(i, session);
break;
case COORDINATOR_ACTION:
message = getMockOozieCoordMessage(i, session, isFalconWF);
message = getMockOozieCoordMessage(i, session);
default:
break;
}
@@ -117,15 +112,10 @@ public void sendMessages(String topic, WorkflowExecutionContext.Type type, boole
}
}

private Message getMockOozieMessage(int i, Session session, boolean isFalconWF)
throws FalconException, JMSException {
private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("appType", "WORKFLOW_JOB");
if (isFalconWF) {
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
} else {
message.setStringProperty("appName", "OozieSampleShellWF");
}
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
message.setStringProperty("user", "falcon");
switch(i % 4) {
case 0:
@@ -152,15 +142,11 @@ private Message getMockOozieMessage(int i, Session session, boolean isFalconWF)
return message;
}

private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF)
private Message getMockOozieCoordMessage(int i, Session session)
throws FalconException, JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("appType", "COORDINATOR_ACTION");
if (isFalconWF) {
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
} else {
message.setStringProperty("appName", "OozieSampleShellWF");
}
message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
message.setStringProperty("user", "falcon");
switch(i % 5) {
case 0:
@@ -292,20 +278,4 @@ public void tearDown() throws Exception{
broker.stop();
subscriber.closeSubscriber();
}

@Test
public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF */);

final BrokerView adminView = broker.getAdminView();
Assert.assertEquals(adminView.getTotalConsumerCount(), 2);

Thread.sleep(100);
Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
}

}
@@ -48,6 +48,14 @@ public int run(String[] args) throws Exception {
// serialize the context to HDFS under logs dir before sending the message
context.serialize();

boolean systemNotificationEnabled = Boolean.parseBoolean(context.
getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));

if (systemNotificationEnabled) {
LOG.info("Sending Falcon message {} ", context);
invokeFalconMessageProducer(context);
}

String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
boolean userNotificationEnabled = Boolean.parseBoolean(context.
getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
@@ -72,6 +80,13 @@ private void invokeUserMessageProducer(WorkflowExecutionContext context) throws
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
}

private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
.type(JMSMessageProducer.MessageType.FALCON)
.build();
jmsMessageProducer.sendMessage();
}

private void invokeLogProducer(WorkflowExecutionContext context) {
// todo: need to move this out to Falcon in-process
if (UserGroupInformation.isSecurityEnabled()) {

0 comments on commit 0222d39

Please sign in to comment.