Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize Job Coordination #319

Merged
merged 2 commits into from Jul 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -136,30 +136,40 @@ public String coordinateJob(
.withId(jobRequest.getId())
.withTags(jobRequest.getTags());

if (this.canRunJob()) {
jobBuilder
.withStatus(JobStatus.INIT)
.withStatusMsg("Job Accepted and in initialization phase.");
// TODO: if this throws exception the job will never be marked failed
this.jobPersistenceService.createJob(jobBuilder.build());
try {
final Future<?> task
= this.taskExecutor.submit(new JobLauncher(this.jobSubmitterService, jobRequest, this.registry));
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix looks correct to really coordinate the max number of jobs on an instance. Instead of synchronized and getting the count, cant we have a thread pool with a queue capacity of max jobs. If the number goes up, the pool will reject it and you can either return back with a failure or redirect to another instance.

final int numActiveJobs = this.jobCountService.getNumJobs();
if (numActiveJobs < this.maxRunningJobs) {
jobBuilder
.withStatus(JobStatus.INIT)
.withStatusMsg("Job Accepted and in initialization phase.");
// TODO: if this throws exception the job will never be marked failed
this.jobPersistenceService.createJob(jobBuilder.build());
try {
final Future<?> task = this.taskExecutor.submit(
new JobLauncher(this.jobSubmitterService, jobRequest, this.registry)
);

// Tell the system a new job has been scheduled so any actions can be taken
this.eventPublisher.publishEvent(new JobScheduledEvent(jobRequest.getId(), task, this));
} catch (final TaskRejectedException e) {
final String errorMsg = "Unable to launch job due to exception: " + e.getMessage();
this.jobPersistenceService.updateJobStatus(jobRequest.getId(), JobStatus.FAILED, errorMsg);
throw new GenieServerException(errorMsg, e);
// Tell the system a new job has been scheduled so any actions can be taken
this.eventPublisher.publishEvent(new JobScheduledEvent(jobRequest.getId(), task, this));
} catch (final TaskRejectedException e) {
final String errorMsg = "Unable to launch job due to exception: " + e.getMessage();
this.jobPersistenceService.updateJobStatus(jobRequest.getId(), JobStatus.FAILED, errorMsg);
throw new GenieServerException(errorMsg, e);
}
return jobRequest.getId();
} else {
jobBuilder
.withStatus(JobStatus.FAILED)
.withStatusMsg("Unable to run job due to host being too busy during request.");
this.jobPersistenceService.createJob(jobBuilder.build());
throw new GenieServerUnavailableException(
"Running ("
+ numActiveJobs
+ ") when max running jobs is ("
+ this.maxRunningJobs
+ ") on this host. Unable to run job."
);
}
return jobRequest.getId();
} else {
jobBuilder
.withStatus(JobStatus.FAILED)
.withStatusMsg("Unable to run job due to host being too busy during request.");
this.jobPersistenceService.createJob(jobBuilder.build());
throw new GenieServerUnavailableException("Reached max running jobs on this host. Unable to run job.");
}
}

Expand All @@ -172,14 +182,4 @@ public String coordinateJob(
public void killJob(@NotBlank final String jobId) throws GenieException {
this.jobKillService.killJob(jobId);
}


/**
* Synchronized to make sure only one request thread at a time is checking whether they can run.
*
* @return true if the job can run on this node or not
*/
private synchronized boolean canRunJob() {
return this.jobCountService.getNumJobs() < this.maxRunningJobs;
}
}
Expand Up @@ -49,7 +49,7 @@ public JobCountServiceImpl(@NotNull final JobSearchService jobSearchService, @No
* {@inheritDoc}
*/
@Override
public int getNumJobs() {
public synchronized int getNumJobs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to synchronize this method?

return this.jobSearchService.getAllRunningJobExecutionsOnHost(this.hostName).size();
}
}
Expand Up @@ -201,7 +201,7 @@ public synchronized void onJobFinished(final JobFinishedEvent event) {
*
* @return the number of jobs currently running on this node
*/
public int getNumJobs() {
public synchronized int getNumJobs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to synchronize this?

return this.jobMonitors.size() + this.scheduledJobs.size();
}

Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.springframework.core.io.ResourceLoader;
import org.springframework.hateoas.MediaTypes;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
Expand Down Expand Up @@ -353,20 +354,20 @@ private void createAllCommands() throws Exception {
*/
@After
public void cleanup() throws Exception {
this.jobRequestRepository.deleteAll();
this.jobRepository.deleteAll();
this.jobExecutionRepository.deleteAll();
this.clusterRepository.deleteAll();
this.commandRepository.deleteAll();
this.applicationRepository.deleteAll();

log.error("HEALTH ENDPOINT DATA: {}", this.mvc
.perform(
MockMvcRequestBuilders
.get("/actuator/health")
.contentType(MediaType.APPLICATION_JSON)
)
.andReturn().getResponse().getContentAsString());

this.jobRequestRepository.deleteAll();
this.jobRepository.deleteAll();
this.jobExecutionRepository.deleteAll();
this.clusterRepository.deleteAll();
this.commandRepository.deleteAll();
this.applicationRepository.deleteAll();
}

/**
Expand Down Expand Up @@ -766,16 +767,33 @@ public void testSubmitJobMethodKillOnTimeout() throws Exception {
.withDisableLogArchival(true)
.build();

// final MvcResult result = this.mvc
// .perform(
// MockMvcRequestBuilders
// .post(JOBS_API)
// .contentType(MediaType.APPLICATION_JSON)
// .content(OBJECT_MAPPER.writeValueAsBytes(jobRequest))
// )
// .andExpect(MockMvcResultMatchers.status().isAccepted())
// .andExpect(MockMvcResultMatchers.header().string(HttpHeaders.LOCATION, Matchers.notNullValue()))
// .andReturn();

final MvcResult result = this.mvc
.perform(
MockMvcRequestBuilders
.post(JOBS_API)
.contentType(MediaType.APPLICATION_JSON)
.content(OBJECT_MAPPER.writeValueAsBytes(jobRequest))
)
.andExpect(MockMvcResultMatchers.status().isAccepted())
.andExpect(MockMvcResultMatchers.header().string(HttpHeaders.LOCATION, Matchers.notNullValue()))
.andReturn();
).andReturn();

if (result.getResponse().getStatus() != HttpStatus.ACCEPTED.value()) {
log.error(
"RESPONSE WASN'T 202 IT WAS: {} AND THE MESSAGE IS: {}",
result.getResponse().getStatus(),
result.getResponse().getContentAsString()
);
Assert.fail();
}

final String jobId = this.getIdFromLocation(result.getResponse().getHeader(HttpHeaders.LOCATION));
final String statusEndpoint = JOBS_API + "/" + jobId + "/status";
Expand Down Expand Up @@ -819,16 +837,33 @@ public void testSubmitJobMethodFailure() throws Exception {
.withDisableLogArchival(true)
.build();

// final MvcResult result = this.mvc
// .perform(
// MockMvcRequestBuilders
// .post(JOBS_API)
// .contentType(MediaType.APPLICATION_JSON)
// .content(OBJECT_MAPPER.writeValueAsBytes(jobRequest))
// )
// .andExpect(MockMvcResultMatchers.status().isAccepted())
// .andExpect(MockMvcResultMatchers.header().string(HttpHeaders.LOCATION, Matchers.notNullValue()))
// .andReturn();

final MvcResult result = this.mvc
.perform(
MockMvcRequestBuilders
.post(JOBS_API)
.contentType(MediaType.APPLICATION_JSON)
.content(OBJECT_MAPPER.writeValueAsBytes(jobRequest))
)
.andExpect(MockMvcResultMatchers.status().isAccepted())
.andExpect(MockMvcResultMatchers.header().string(HttpHeaders.LOCATION, Matchers.notNullValue()))
.andReturn();
).andReturn();

if (result.getResponse().getStatus() != HttpStatus.ACCEPTED.value()) {
log.error(
"RESPONSE WASN'T 202 IT WAS: {} AND THE MESSAGE IS: {}",
result.getResponse().getStatus(),
result.getResponse().getContentAsString()
);
Assert.fail();
}

final String jobId = this.getIdFromLocation(result.getResponse().getHeader(HttpHeaders.LOCATION));
final String statusEndpoint
Expand Down