Skip to content

Commit

Permalink
Changing ApplicationReadyEvent to ContextRefreshedEvent so it is pick…
Browse files Browse the repository at this point in the history
…ed up in application AND war deployment models (#420)
  • Loading branch information
tgianos committed Oct 22, 2016
1 parent de0acf8 commit 6a4d8ab
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.Executor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Primary;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -130,43 +131,12 @@ public JobMonitoringCoordinator(
* database to find any jobs already running on this node that aren't in the map. The use case for this is if
* the Genie application crashes when it comes back up it can find the jobs again and not leave them orphaned.
*
* @param event The spring boot application ready event indicating the application is ready to start taking load
* @param event The spring ready event indicating the application is ready to start taking load
* @throws GenieException on unrecoverable error
*/
@EventListener
public void onStartup(final ApplicationReadyEvent event) throws GenieException {
log.info("Application is ready according to event {}. Attempting to re-attach to any active jobs", event);
final Set<Job> jobs = this.jobSearchService.getAllActiveJobsOnHost(this.hostName);
if (jobs.isEmpty()) {
log.info("No jobs currently active on this node.");
return;
} else {
log.info("{} jobs currently active on this node at startup", jobs.size());
}

for (final Job job : jobs) {
final String id = job.getId().orElseThrow(() -> new GenieServerException("Job has no id!"));
if (this.jobMonitors.containsKey(id) || this.scheduledJobs.containsKey(id)) {
log.info("Job {} is already being tracked. Ignoring.", id);
} else if (job.getStatus() != JobStatus.RUNNING) {
this.eventMulticaster.multicastEvent(
new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "System crashed while job starting", this)
);
} else {
try {
final JobExecution jobExecution = this.jobSearchService.getJobExecution(id);
this.jobMemories.put(id, jobExecution.getMemory().orElse(0));
this.scheduleMonitor(jobExecution);
log.info("Re-attached a job monitor to job {}", id);
} catch (final GenieException ge) {
log.error("Unable to re-attach to job {}.", id);
this.eventMulticaster.multicastEvent(
new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "Unable to re-attach on startup", this)
);
this.unableToReAttach.increment();
}
}
}
public void onStartup(final ContextRefreshedEvent event) throws GenieException {
this.reAttach(event);
}

/**
Expand Down Expand Up @@ -254,6 +224,41 @@ public int getUsedMemory() {
}
}

private void reAttach(final ApplicationEvent event) throws GenieException {
log.info("Application is ready according to event {}. Attempting to re-attach to any active jobs", event);
final Set<Job> jobs = this.jobSearchService.getAllActiveJobsOnHost(this.hostName);
if (jobs.isEmpty()) {
log.info("No jobs currently active on this node.");
return;
} else {
log.info("{} jobs currently active on this node at startup", jobs.size());
}

for (final Job job : jobs) {
final String id = job.getId().orElseThrow(() -> new GenieServerException("Job has no id!"));
if (this.jobMonitors.containsKey(id) || this.scheduledJobs.containsKey(id)) {
log.info("Job {} is already being tracked. Ignoring.", id);
} else if (job.getStatus() != JobStatus.RUNNING) {
this.eventMulticaster.multicastEvent(
new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "System crashed while job starting", this)
);
} else {
try {
final JobExecution jobExecution = this.jobSearchService.getJobExecution(id);
this.jobMemories.put(id, jobExecution.getMemory().orElse(0));
this.scheduleMonitor(jobExecution);
log.info("Re-attached a job monitor to job {}", id);
} catch (final GenieException ge) {
log.error("Unable to re-attach to job {}.", id);
this.eventMulticaster.multicastEvent(
new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "Unable to re-attach on startup", this)
);
this.unableToReAttach.increment();
}
}
}
}

private void scheduleMonitor(final JobExecution jobExecution) {
final String jobId = jobExecution.getId().orElseThrow(IllegalArgumentException::new);
final File stdOut = new File(this.jobsDir, jobId + "/" + JobConstants.STDOUT_LOG_FILE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package com.netflix.genie.web.tasks.leader;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.cluster.leader.event.OnGrantedEvent;
import org.springframework.cloud.cluster.leader.event.OnRevokedEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;

/**
Expand Down Expand Up @@ -60,7 +60,7 @@ public LocalLeader(final ApplicationEventPublisher publisher, final boolean isLe
* @param event The Spring Boot application ready event to startup on
*/
@EventListener
public void startLeadership(final ApplicationReadyEvent event) {
public void startLeadership(final ContextRefreshedEvent event) {
if (this.isLeader) {
log.debug("Starting Leadership due to " + event);
this.publisher.publishEvent(new OnGrantedEvent(this, null, "leader"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.TaskScheduler;

Expand Down Expand Up @@ -126,7 +126,7 @@ public void setup() throws IOException {
@Test
@SuppressWarnings("unchecked")
public void canAttachToRunningJobs() throws GenieException {
final ApplicationReadyEvent event = Mockito.mock(ApplicationReadyEvent.class);
final ContextRefreshedEvent event = Mockito.mock(ContextRefreshedEvent.class);

Mockito.when(this.jobSearchService.getAllActiveJobsOnHost(HOSTNAME)).thenReturn(Sets.newHashSet());
this.coordinator.onStartup(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.cluster.leader.event.OnGrantedEvent;
import org.springframework.cloud.cluster.leader.event.OnRevokedEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;

/**
* Unit tests for the LocalLeader class.
Expand All @@ -40,7 +40,7 @@ public class LocalLeaderUnitTests {

private LocalLeader localLeader;
private ApplicationEventPublisher publisher;
private ApplicationReadyEvent applicationReadyEvent;
private ContextRefreshedEvent contextRefreshedEvent;
private ContextClosedEvent contextClosedEvent;

/**
Expand All @@ -49,7 +49,7 @@ public class LocalLeaderUnitTests {
@Before
public void setup() {
this.publisher = Mockito.mock(ApplicationEventPublisher.class);
this.applicationReadyEvent = Mockito.mock(ApplicationReadyEvent.class);
this.contextRefreshedEvent = Mockito.mock(ContextRefreshedEvent.class);
this.contextClosedEvent = Mockito.mock(ContextClosedEvent.class);
}

Expand All @@ -67,7 +67,7 @@ public void tearDown() {
@Test
public void canStartLeadershipIfLeader() {
this.localLeader = new LocalLeader(this.publisher, true);
this.localLeader.startLeadership(this.applicationReadyEvent);
this.localLeader.startLeadership(this.contextRefreshedEvent);
Mockito.verify(this.publisher, Mockito.times(1)).publishEvent(Mockito.any(OnGrantedEvent.class));
}

Expand All @@ -77,7 +77,7 @@ public void canStartLeadershipIfLeader() {
@Test
public void wontStartLeadershipIfNotLeader() {
this.localLeader = new LocalLeader(this.publisher, false);
this.localLeader.startLeadership(this.applicationReadyEvent);
this.localLeader.startLeadership(this.contextRefreshedEvent);
Mockito.verify(this.publisher, Mockito.never()).publishEvent(Mockito.any(OnGrantedEvent.class));
}

Expand Down

0 comments on commit 6a4d8ab

Please sign in to comment.