Skip to content

Commit

Permalink
Fixing cause of ConcurrentModificationException within memory check c…
Browse files Browse the repository at this point in the history
…ode (#410)
  • Loading branch information
tgianos committed Oct 13, 2016
1 parent 56de185 commit e4a5214
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,56 +239,58 @@ public String coordinateJob(
}

synchronized (this) {
log.info("Checking if can run job {} on this node", jobRequest.getId());
final int maxSystemMemory = this.jobsProperties.getMemory().getMaxSystemMemory();
final int usedMemory = this.jobMetricsService.getUsedMemory();
if (usedMemory + memory <= maxSystemMemory) {
log.info(
"Job {} can run on this node as only {}/{} MB are used and requested {} MB",
jobId,
usedMemory,
maxSystemMemory,
memory
);
try {
log.info("Scheduling job {} for submission", jobRequest.getId());
final Future<?> task = this.taskExecutor.submit(
new JobLauncher(
this.jobSubmitterService,
jobRequest,
cluster,
command,
applications,
memory,
this.registry
)
try {
log.info("Checking if can run job {} on this node", jobRequest.getId());
final int maxSystemMemory = this.jobsProperties.getMemory().getMaxSystemMemory();
final int usedMemory = this.jobMetricsService.getUsedMemory();
if (usedMemory + memory <= maxSystemMemory) {
log.info(
"Job {} can run on this node as only {}/{} MB are used and requested {} MB",
jobId,
usedMemory,
maxSystemMemory,
memory
);
try {
log.info("Scheduling job {} for submission", jobRequest.getId());
final Future<?> task = this.taskExecutor.submit(
new JobLauncher(
this.jobSubmitterService,
jobRequest,
cluster,
command,
applications,
memory,
this.registry
)
);

// Tell the system a new job has been scheduled so any actions can be taken
log.info("Publishing job scheduled event for job {}", jobId);
this.eventPublisher.publishEvent(new JobScheduledEvent(jobId, task, memory, this));
} catch (final TaskRejectedException e) {
final String errorMsg = "Unable to launch job due to exception: " + e.getMessage();
this.jobPersistenceService.updateJobStatus(jobId, JobStatus.FAILED, errorMsg);
throw new GenieServerException(errorMsg, e);
// Tell the system a new job has been scheduled so any actions can be taken
log.info("Publishing job scheduled event for job {}", jobId);
this.eventPublisher.publishEvent(new JobScheduledEvent(jobId, task, memory, this));
return jobId;
} catch (final TaskRejectedException e) {
throw new GenieServerException(
"Unable to launch job due to exception: " + e.getMessage(),
e
);
}
} else {
throw new GenieServerUnavailableException(
"Job "
+ jobId
+ " can't run on this node only "
+ usedMemory
+ "/"
+ maxSystemMemory
+ " MB are used and requested "
+ memory
+ " MB"
);
}
return jobId;
} else {
this.jobPersistenceService.updateJobStatus(jobId,
JobStatus.FAILED,
"Unable to run job due to lack of available memory on host."
);
throw new GenieServerUnavailableException(
"Job "
+ jobId
+ " can't run on this node only "
+ usedMemory
+ "/"
+ maxSystemMemory
+ " MB are used and requested "
+ memory
+ " MB"
);
} catch (final Exception e) {
this.jobPersistenceService.updateJobStatus(jobId, JobStatus.FAILED, e.getMessage());
throw e;
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public GenieExceptionMapper(final Registry registry) {
*/
@ExceptionHandler(GenieException.class)
public void handleGenieException(
final HttpServletResponse response,
final GenieException e
final HttpServletResponse response,
final GenieException e
) throws IOException {
if (e instanceof GenieBadRequestException) {
this.badRequestRate.increment();
Expand Down Expand Up @@ -118,8 +118,8 @@ public void handleGenieException(
*/
@ExceptionHandler(ConstraintViolationException.class)
public void handleConstraintViolation(
final HttpServletResponse response,
final ConstraintViolationException cve
final HttpServletResponse response,
final ConstraintViolationException cve
) throws IOException {
final StringBuilder builder = new StringBuilder();
if (cve.getConstraintViolations() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ public int getNumActiveJobs() {
*/
@Override
public int getUsedMemory() {
return this.jobMemories.values().stream().reduce((a, b) -> a + b).orElse(0);
// Synchronized to avoid concurrent modification exception
synchronized (this.jobMemories) {
return this.jobMemories.values().stream().reduce((a, b) -> a + b).orElse(0);
}
}

private void scheduleMonitor(final JobExecution jobExecution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.springframework.http.HttpStatus;

import javax.servlet.http.HttpServletResponse;
import javax.validation.ConstraintViolationException;
Expand All @@ -41,6 +42,7 @@
* Tests for the exception mapper.
*
* @author tgianos
* @since 3.0.0
*/
@Category(UnitTest.class)
public class GenieExceptionMapperUnitTests {
Expand Down Expand Up @@ -128,6 +130,7 @@ public void canHandleGenieExceptions() throws IOException {
public void canHandleConstraintViolationExceptions() throws IOException {
this.mapper.handleConstraintViolation(response, new ConstraintViolationException("cve", null));
Mockito.verify(this.constraintViolationRate, Mockito.times(1)).increment();
Mockito.verify(this.response, Mockito.times(1)).sendError(Mockito.anyInt(), Mockito.anyString());
Mockito.verify(this.response, Mockito.times(1))
.sendError(Mockito.eq(HttpStatus.PRECONDITION_FAILED.value()), Mockito.anyString());
}
}

0 comments on commit e4a5214

Please sign in to comment.