Skip to content

Commit

Permalink
Another round of update based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Hien Luu committed Jul 1, 2015
1 parent ee7b0ad commit 41a1bb9
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,21 @@ private static int validateBasedOnStatus(Props jobProps,
JOB_CALLBACK_BODY_TEMPLATE.replaceFirst(STATUS_TOKEN, jobStatus.name()
.toLowerCase());

for (int i = 1; i <= maxNumCallback; i++) {
for (int i = 0; i <= maxNumCallback; i++) {
// callback url
String callbackUrlKey =
jobCallBackUrl.replaceFirst(SEQUENCE_TOKEN, Integer.toString(i));
String callbackUrlValue = jobProps.get(callbackUrlKey);

// sequence number should start at 1, this is to check for sequence
// number that starts a 0
if (i == 0) {
if (callbackUrlValue != null) {
errors.add("Sequence number starts at 1, not 0");
}
continue;
}

if (callbackUrlValue == null || callbackUrlValue.length() == 0) {
break;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ public void noJobCallbackProps() {
Assert.assertEquals(0, errors.size());
}

@Test
public void sequenceStartWithZeroProps() {
Props jobProps = new Props();
Set<String> errors = new HashSet<String>();

jobProps.put("job.notification."
+ JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".0.url",
"http://www.linkedin.com");

jobProps.put("job.notification."
+ JobCallbackStatusEnum.COMPLETED.name().toLowerCase() + ".1.url",
"http://www.linkedin.com");

Assert.assertEquals(1, JobCallbackValidator.validate("bogusJob",
serverProps, jobProps, errors));

Assert.assertEquals(1, errors.size());
}

@Test
public void oneGetJobCallback() {
Props jobProps = new Props();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.log.Log;
import org.mortbay.thread.QueuedThreadPool;

import azkaban.execapp.event.JobCallbackManager;
Expand Down Expand Up @@ -79,7 +78,7 @@ public class AzkabanExecutorServer {
public static final String METRIC_INTERVAL =
"executor.metric.milisecinterval.";
public static final int DEFAULT_PORT_NUMBER = 12321;
public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;

private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
private static final int DEFAULT_THREAD_NUMBER = 50;
Expand Down Expand Up @@ -113,14 +112,17 @@ public AzkabanExecutorServer(Props props) throws Exception {

boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
logger.info("Setting up connector with stats on: " + isStatsOn);
for (Connector connector : server.getConnectors()) {

for (Connector connector : server.getConnectors()) {
connector.setStatsOn(isStatsOn);
logger.info(String.format("Jetty connector name: %s, default header buffer size: %d",
connector.getName(), connector.getHeaderBufferSize()));
connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize", DEFAULT_HEADER_BUFFER_SIZE));
logger.info(String.format("Jetty connector name: %s, (if) new header buffer size: %d",
connector.getName(), connector.getHeaderBufferSize()));
logger.info(String.format(
"Jetty connector name: %s, default header buffer size: %d",
connector.getName(), connector.getHeaderBufferSize()));
connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize",
DEFAULT_HEADER_BUFFER_SIZE));
logger.info(String.format(
"Jetty connector name: %s, (if) new header buffer size: %d",
connector.getName(), connector.getHeaderBufferSize()));
}

Context root = new Context(server, "/", Context.SESSIONS);
Expand Down Expand Up @@ -217,11 +219,20 @@ private void configureMetricReports() throws MetricException {
logger.info("Completed configuring Metric Reports");
}

// initialize event emitter
// AsyncEventEmitterFactory.getInstance().initialize(props);

}

/**
* Load a custom class, which is provided by a configuration
* CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
*
* This method will try to instantiate an instance of this custom class and
* with given properties as the argument in the constructor.
*
* Basically the custom class must have a constructor that takes an argument
* with type Properties.
*
* @param props
*/
private void loadCustomJMXAttributeProcessor(Props props) {
String jmxAttributeEmitter =
props.get(CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class JobCallbackManager implements EventListener {
{ SUCCESS, FAILURE, COMPLETED };

public static void initialize(Props props) {
if (isInitialized) {
logger.info("Already initialized");
return;
}

logger.info("Initializing");
instance = new JobCallbackManager(props);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public static void initialize(Props props) {
if (props == null) {
throw new NullPointerException("props argument can't be null");
}

if (isInitialized) {
return;
}

instance = new JobCallbackRequestMaker(props);
isInitialized = true;
logger.info("Initialization for " + JobCallbackRequestMaker.class.getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void handleEvent(Event event) {
ExecutableNode node = jobRunner.getNode();

if (logger.isDebugEnabled()) {
logger.info("*** got " + event.getType() + " " + node.getId() + " "
logger.debug("*** got " + event.getType() + " " + node.getId() + " "
+ event.getRunner().getClass().getName() + " status: "
+ node.getStatus());
}
Expand Down

0 comments on commit 41a1bb9

Please sign in to comment.