Skip to content

Commit

Permalink
Merge 9c860fb into 1c3f5f1
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed Apr 2, 2020
2 parents 1c3f5f1 + 9c860fb commit e30d76d
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 64 deletions.
Expand Up @@ -64,7 +64,7 @@ public void smokeTestCommands() throws Exception {
Assert.assertThat(runner.getExitCode(), Matchers.is(ExitCode.SUCCESS.getCode()));

// Test info
runner.run("info");
runner.run("info", "--beans", "--env", "--properties", "--state-machine");
Assert.assertThat(runner.getExitCode(), Matchers.is(ExitCode.SUCCESS.getCode()));
}
}
60 changes: 32 additions & 28 deletions genie-agent/job-execution-state-machine.dot
Expand Up @@ -20,35 +20,37 @@
//
digraph state_machine {
// States
HANDSHAKE [label=HANDSHAKE penwidth=3.0]
CREATE_JOB_DIRECTORY [label=CREATE_JOB_DIRECTORY penwidth=3.0]
LOG_EXECUTION_ERRORS [label=LOG_EXECUTION_ERRORS ]
OBTAIN_JOB_SPECIFICATION [label=OBTAIN_JOB_SPECIFICATION penwidth=3.0]
DETERMINE_FINAL_STATUS [label=DETERMINE_FINAL_STATUS penwidth=3.0]
LAUNCH_JOB [label=LAUNCH_JOB penwidth=3.0]
DOWNLOAD_DEPENDENCIES [label=DOWNLOAD_DEPENDENCIES penwidth=3.0]
CONFIGURE_EXECUTION [label=CONFIGURE_EXECUTION penwidth=3.0]
START_HEARTBEAT_SERVICE [label=START_HEARTBEAT_SERVICE penwidth=3.0]
START_FILE_STREAM_SERVICE [label=START_FILE_STREAM_SERVICE penwidth=3.0]
SET_STATUS_RUNNING [label=SET_STATUS_RUNNING penwidth=3.0]
SET_STATUS_INIT [label=SET_STATUS_INIT penwidth=3.0]
DONE [label=DONE shape=rectangle ]
RESERVE_JOB_ID [label=RESERVE_JOB_ID penwidth=3.0]
STOP_KILL_SERVICE [label=STOP_KILL_SERVICE ]
POST_EXECUTION_MANIFEST_REFRESH [label=POST_EXECUTION_MANIFEST_REFRESH ]
WAIT_JOB_COMPLETION [label=WAIT_JOB_COMPLETION penwidth=3.0]
CLAIM_JOB [label=CLAIM_JOB penwidth=3.0]
CLEAN [label=CLEAN ]
ARCHIVE [label=ARCHIVE ]
POST_LAUNCH_MANIFEST_REFRESH [label=POST_LAUNCH_MANIFEST_REFRESH ]
CREATE_JOB_SCRIPT [label=CREATE_JOB_SCRIPT penwidth=3.0]
CREATE_JOB_DIRECTORY [label=CREATE_JOB_DIRECTORY penwidth=3.0]
READY [label=READY shape=diamond ]
DOWNLOAD_DEPENDENCIES [label=DOWNLOAD_DEPENDENCIES penwidth=3.0]
ARCHIVE [label=ARCHIVE ]
LAUNCH_JOB [label=LAUNCH_JOB penwidth=3.0]
CLAIM_JOB [label=CLAIM_JOB penwidth=3.0]
SET_STATUS_RUNNING [label=SET_STATUS_RUNNING penwidth=3.0]
STOP_FILES_STREAM_SERVICE [label=STOP_FILES_STREAM_SERVICE ]
CONFIGURE_EXECUTION [label=CONFIGURE_EXECUTION penwidth=3.0]
SET_STATUS_FINAL [label=SET_STATUS_FINAL penwidth=3.0]
CREATE_JOB_SCRIPT [label=CREATE_JOB_SCRIPT penwidth=3.0]
RELOCATE_LOG [label=RELOCATE_LOG ]
STOP_KILL_SERVICE [label=STOP_KILL_SERVICE ]
FORCE_FILE_MANIFEST_REFRESH [label=FORCE_FILE_MANIFEST_REFRESH ]
CLEAN [label=CLEAN ]
POST_SETUP_MANIFEST_REFRESH [label=POST_SETUP_MANIFEST_REFRESH ]
OBTAIN_JOB_SPECIFICATION [label=OBTAIN_JOB_SPECIFICATION penwidth=3.0]
SHUTDOWN [label=SHUTDOWN ]
SET_STATUS_INIT [label=SET_STATUS_INIT penwidth=3.0]
DETERMINE_FINAL_STATUS [label=DETERMINE_FINAL_STATUS penwidth=3.0]
STOP_HEARTBEAT_SERVICE [label=STOP_HEARTBEAT_SERVICE ]
START_HEARTBEAT_SERVICE [label=START_HEARTBEAT_SERVICE penwidth=3.0]
RESERVE_JOB_ID [label=RESERVE_JOB_ID penwidth=3.0]
START_KILL_SERVICE [label=START_KILL_SERVICE penwidth=3.0]
STOP_FILES_STREAM_SERVICE [label=STOP_FILES_STREAM_SERVICE ]
HANDSHAKE [label=HANDSHAKE penwidth=3.0]
INITIALIZE_AGENT [label=INITIALIZE_AGENT penwidth=3.0]
RELOCATE_LOG [label=RELOCATE_LOG ]
DONE [label=DONE shape=rectangle ]
LOG_EXECUTION_ERRORS [label=LOG_EXECUTION_ERRORS ]
START_KILL_SERVICE [label=START_KILL_SERVICE penwidth=3.0]
// Transitions
READY -> INITIALIZE_AGENT [label="START"]
SHUTDOWN -> DONE [label="PROCEED"]
Expand All @@ -65,11 +67,13 @@ digraph state_machine {
START_FILE_STREAM_SERVICE -> SET_STATUS_INIT [label="PROCEED"]
SET_STATUS_INIT -> CREATE_JOB_SCRIPT [label="PROCEED"]
CREATE_JOB_SCRIPT -> DOWNLOAD_DEPENDENCIES [label="PROCEED"]
DOWNLOAD_DEPENDENCIES -> LAUNCH_JOB [label="PROCEED"]
LAUNCH_JOB -> FORCE_FILE_MANIFEST_REFRESH [label="PROCEED"]
FORCE_FILE_MANIFEST_REFRESH -> SET_STATUS_RUNNING [label="PROCEED"]
DOWNLOAD_DEPENDENCIES -> POST_SETUP_MANIFEST_REFRESH [label="PROCEED"]
POST_SETUP_MANIFEST_REFRESH -> LAUNCH_JOB [label="PROCEED"]
LAUNCH_JOB -> POST_LAUNCH_MANIFEST_REFRESH [label="PROCEED"]
POST_LAUNCH_MANIFEST_REFRESH -> SET_STATUS_RUNNING [label="PROCEED"]
SET_STATUS_RUNNING -> WAIT_JOB_COMPLETION [label="PROCEED"]
WAIT_JOB_COMPLETION -> DETERMINE_FINAL_STATUS [label="PROCEED"]
WAIT_JOB_COMPLETION -> POST_EXECUTION_MANIFEST_REFRESH [label="PROCEED"]
POST_EXECUTION_MANIFEST_REFRESH -> DETERMINE_FINAL_STATUS [label="PROCEED"]
DETERMINE_FINAL_STATUS -> SET_STATUS_FINAL [label="PROCEED"]
SET_STATUS_FINAL -> STOP_KILL_SERVICE [label="PROCEED"]
STOP_KILL_SERVICE -> LOG_EXECUTION_ERRORS [label="PROCEED"]
Expand All @@ -93,8 +97,9 @@ digraph state_machine {
SET_STATUS_INIT -> WAIT_JOB_COMPLETION [style=dotted]
CREATE_JOB_SCRIPT -> WAIT_JOB_COMPLETION [style=dotted]
DOWNLOAD_DEPENDENCIES -> WAIT_JOB_COMPLETION [style=dotted]
POST_SETUP_MANIFEST_REFRESH -> WAIT_JOB_COMPLETION [style=dotted]
LAUNCH_JOB -> WAIT_JOB_COMPLETION [style=dotted]
FORCE_FILE_MANIFEST_REFRESH -> WAIT_JOB_COMPLETION [style=dotted]
POST_LAUNCH_MANIFEST_REFRESH -> WAIT_JOB_COMPLETION [style=dotted]
// Retry transitions
HANDSHAKE -> HANDSHAKE [style=dashed label="3 retries"]
RESERVE_JOB_ID -> RESERVE_JOB_ID [style=dashed label="3 retries"]
Expand All @@ -104,4 +109,3 @@ digraph state_machine {
SET_STATUS_RUNNING -> SET_STATUS_RUNNING [style=dashed label="3 retries"]
SET_STATUS_FINAL -> SET_STATUS_FINAL [style=dashed label="3 retries"]
}

Expand Up @@ -204,7 +204,7 @@ public ExitCode run() {
.append(NEWLINE);

final JobExecutionStateMachineImpl jobExecutionStateMachine
= applicationContext.getBean(JobExecutionStateMachineImpl.class);
= applicationContext.getBean("jobExecutionStateMachine", JobExecutionStateMachineImpl.class);

final StateMachine<States, Events> stateMachine = jobExecutionStateMachine.getStateMachine();
final List<ExecutionStage> stages = jobExecutionStateMachine.getExecutionStages();
Expand Down
Expand Up @@ -395,6 +395,19 @@ DownloadDependenciesStage downloadDependenciesStage(final JobSetupService jobSet
return new DownloadDependenciesStage(jobSetupService);
}

/**
* Create a {@link RefreshManifestStage} bean if one is not already defined.
*
* @param agentFileStreamService the agent file stream service
*/
@Bean
@Lazy
@Order(145)
@ConditionalOnMissingBean(name = "postSetupRefreshManifestStage")
RefreshManifestStage postSetupRefreshManifestStage(final AgentFileStreamService agentFileStreamService) {
return new RefreshManifestStage(agentFileStreamService, States.POST_SETUP_MANIFEST_REFRESH);
}

/**
* Create a {@link LaunchJobStage} bean if one is not already defined.
*
Expand All @@ -415,10 +428,10 @@ LaunchJobStage launchJobStage(final JobProcessManager jobProcessManager) {
*/
@Bean
@Lazy
@Order(160)
@ConditionalOnMissingBean(RefreshManifestStage.class)
RefreshManifestStage refreshManifestStage(final AgentFileStreamService agentFileStreamService) {
return new RefreshManifestStage(agentFileStreamService);
@Order(155)
@ConditionalOnMissingBean(name = "postLaunchRefreshManifestStage")
RefreshManifestStage postLaunchRefreshManifestStage(final AgentFileStreamService agentFileStreamService) {
return new RefreshManifestStage(agentFileStreamService, States.POST_LAUNCH_MANIFEST_REFRESH);
}

/**
Expand All @@ -428,7 +441,7 @@ RefreshManifestStage refreshManifestStage(final AgentFileStreamService agentFile
*/
@Bean
@Lazy
@Order(170)
@Order(160)
@ConditionalOnMissingBean(SetJobStatusRunning.class)
SetJobStatusRunning setJobStatusRunning(final AgentJobService agentJobService) {
return new SetJobStatusRunning(agentJobService);
Expand All @@ -441,18 +454,31 @@ SetJobStatusRunning setJobStatusRunning(final AgentJobService agentJobService) {
*/
@Bean
@Lazy
@Order(180)
@Order(170)
@ConditionalOnMissingBean(WaitJobCompletionStage.class)
WaitJobCompletionStage waitJobCompletionStage(final JobProcessManager jobProcessManager) {
return new WaitJobCompletionStage(jobProcessManager);
}

/**
* Create a {@link RefreshManifestStage} bean if one is not already defined.
*
* @param agentFileStreamService the agent file stream service
*/
@Bean
@Lazy
@Order(175)
@ConditionalOnMissingBean(name = "postExecutionRefreshManifestStage")
RefreshManifestStage postExecutionRefreshManifestStage(final AgentFileStreamService agentFileStreamService) {
return new RefreshManifestStage(agentFileStreamService, States.POST_EXECUTION_MANIFEST_REFRESH);
}

/**
* Create a {@link DetermineJobFinalStatusStage} bean if one is not already defined.
*/
@Bean
@Lazy
@Order(190)
@Order(180)
@ConditionalOnMissingBean(DetermineJobFinalStatusStage.class)
DetermineJobFinalStatusStage determineJobFinalStatusStage() {
return new DetermineJobFinalStatusStage();
Expand All @@ -465,7 +491,7 @@ DetermineJobFinalStatusStage determineJobFinalStatusStage() {
*/
@Bean
@Lazy
@Order(200)
@Order(190)
@ConditionalOnMissingBean(SetJobStatusFinal.class)
SetJobStatusFinal setJobStatusFinal(final AgentJobService agentJobService) {
return new SetJobStatusFinal(agentJobService);
Expand All @@ -478,7 +504,7 @@ SetJobStatusFinal setJobStatusFinal(final AgentJobService agentJobService) {
*/
@Bean
@Lazy
@Order(210)
@Order(200)
@ConditionalOnMissingBean(StopKillServiceStage.class)
StopKillServiceStage stopKillServiceStage(final AgentJobKillService killService) {
return new StopKillServiceStage(killService);
Expand All @@ -489,7 +515,7 @@ StopKillServiceStage stopKillServiceStage(final AgentJobKillService killService)
*/
@Bean
@Lazy
@Order(220)
@Order(210)
@ConditionalOnMissingBean(LogExecutionErrorsStage.class)
LogExecutionErrorsStage logExecutionErrorsStage() {
return new LogExecutionErrorsStage();
Expand All @@ -502,7 +528,7 @@ LogExecutionErrorsStage logExecutionErrorsStage() {
*/
@Bean
@Lazy
@Order(230)
@Order(220)
@ConditionalOnMissingBean(ArchiveJobOutputsStage.class)
ArchiveJobOutputsStage archiveJobOutputsStage(final JobArchiveService jobArchiveService) {
return new ArchiveJobOutputsStage(jobArchiveService);
Expand All @@ -515,7 +541,7 @@ ArchiveJobOutputsStage archiveJobOutputsStage(final JobArchiveService jobArchive
*/
@Bean
@Lazy
@Order(240)
@Order(230)
@ConditionalOnMissingBean(StopHeartbeatServiceStage.class)
StopHeartbeatServiceStage stopHeartbeatServiceStage(final AgentHeartBeatService heartbeatService) {
return new StopHeartbeatServiceStage(heartbeatService);
Expand All @@ -528,7 +554,7 @@ StopHeartbeatServiceStage stopHeartbeatServiceStage(final AgentHeartBeatService
*/
@Bean
@Lazy
@Order(250)
@Order(240)
@ConditionalOnMissingBean(StopFileServiceStage.class)
StopFileServiceStage stopFileServiceStage(final AgentFileStreamService agentFileStreamService) {
return new StopFileServiceStage(agentFileStreamService);
Expand All @@ -541,7 +567,7 @@ StopFileServiceStage stopFileServiceStage(final AgentFileStreamService agentFile
*/
@Bean
@Lazy
@Order(260)
@Order(250)
@ConditionalOnMissingBean(CleanupJobDirectoryStage.class)
CleanupJobDirectoryStage cleanupJobDirectoryStage(final JobSetupService jobSetupService) {
return new CleanupJobDirectoryStage(jobSetupService);
Expand All @@ -552,7 +578,7 @@ CleanupJobDirectoryStage cleanupJobDirectoryStage(final JobSetupService jobSetup
*/
@Bean
@Lazy
@Order(270)
@Order(260)
@ConditionalOnMissingBean(ShutdownStage.class)
ShutdownStage shutdownStage() {
return new ShutdownStage();
Expand Down
Expand Up @@ -166,14 +166,24 @@ public enum States {
DONE(0, false),

/**
* Trigger a job directory manifest refresh.
* Determine the job final status to publish to server.
*/
FORCE_FILE_MANIFEST_REFRESH(0, true),
DETERMINE_FINAL_STATUS(0, false, JobStatusMessages.UNKNOWN_JOB_STATE),

/**
* Determine the job final status to publish to server.
* Trigger a job directory manifest refresh after the job has launched.
*/
POST_LAUNCH_MANIFEST_REFRESH(0, true),

/**
* Trigger a job directory manifest refresh after job setup completed.
*/
POST_SETUP_MANIFEST_REFRESH(0, true),

/**
* Trigger a job directory manifest refresh after job process terminated.
*/
DETERMINE_FINAL_STATUS(0, false, JobStatusMessages.UNKNOWN_JOB_STATE);
POST_EXECUTION_MANIFEST_REFRESH(0, false);

/**
* If a state is critical, then upon encountering a {@link FatalTransitionException} while in it, execution is
Expand Down
Expand Up @@ -39,18 +39,20 @@ public class RefreshManifestStage extends ExecutionStage {
* Constructor.
*
* @param agentFileStreamService agent file stream service
* @param state the associated state
*/
public RefreshManifestStage(final AgentFileStreamService agentFileStreamService) {
super(States.FORCE_FILE_MANIFEST_REFRESH);
public RefreshManifestStage(final AgentFileStreamService agentFileStreamService, final States state) {
super(state);
this.agentFileStreamService = agentFileStreamService;
}

@Override
protected void attemptTransition(
final ExecutionContext executionContext
) throws RetryableTransitionException, FatalTransitionException {

this.agentFileStreamService.forceServerSync();

if (executionContext.getJobDirectory() != null) {
log.info("Forcing a manifest refresh");
this.agentFileStreamService.forceServerSync();
}
}
}
Expand Up @@ -126,7 +126,7 @@ class InfoCommandSpec extends Specification {
1 * env.getSystemEnvironment() >> map
1 * env.getSystemProperties() >> map
1 * env.getPropertySources() >> new MutablePropertySources()
1 * ctx.getBean(JobExecutionStateMachineImpl.class) >> jobExecutionStateMachine
1 * ctx.getBean("jobExecutionStateMachine", JobExecutionStateMachineImpl.class) >> jobExecutionStateMachine
1 * jobExecutionStateMachine.getExecutionStages() >> stages
1 * jobExecutionStateMachine.getStateMachine() >> stateMachine
2 * stateMachine.getStates() >> states
Expand Down
Expand Up @@ -20,24 +20,42 @@ package com.netflix.genie.agent.execution.statemachine.stages
import com.netflix.genie.agent.execution.services.AgentFileStreamService
import com.netflix.genie.agent.execution.statemachine.ExecutionContext
import com.netflix.genie.agent.execution.statemachine.ExecutionStage
import com.netflix.genie.agent.execution.statemachine.States
import spock.lang.Specification
import spock.lang.Unroll

class RefreshManifestStageSpec extends Specification {
AgentFileStreamService agentFileService
ExecutionStage stage

ExecutionContext executionContext

void setup() {
this.agentFileService = Mock(AgentFileStreamService)
this.executionContext = Mock(ExecutionContext)
this.stage = new RefreshManifestStage(agentFileService)
}

def "AttemptTransition"() {
@Unroll
def "AttemptTransition for state #state"() {
ExecutionStage stage = new RefreshManifestStage(agentFileService, state)

when:
this.stage.attemptTransition(executionContext)
stage.attemptTransition(executionContext)

then:
1 * executionContext.getJobDirectory() >> Mock(File)
1 * agentFileService.forceServerSync()

when:
stage.attemptTransition(executionContext)

then:
1 * executionContext.getJobDirectory() >> null
0 * agentFileService.forceServerSync()

where:
state | _
States.POST_SETUP_MANIFEST_REFRESH | _
States.POST_LAUNCH_MANIFEST_REFRESH | _
States.POST_EXECUTION_MANIFEST_REFRESH | _
}
}

0 comments on commit e30d76d

Please sign in to comment.