Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding metrics and logging for job initialization process #334

Merged
merged 1 commit into from
Jul 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public JobLauncher(
@Override
public void run() {
this.submitTimer.record(() -> {
// TODO: Compress the timer into a single ID tagged by exceptions
try {
this.jobSubmitterService.submitJob(this.jobRequest);
} catch (final GenieException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public void executeTask(
) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Application Task in the workflow.");
final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final String genieDir = jobWorkingDirectory
+ JobConstants.FILE_PATH_DELIMITER
+ JobConstants.GENIE_PATH_VAR;
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Application Task for job {}", jobExecEnv.getJobRequest().getId());


if (jobExecEnv.getApplications() != null) {
Expand Down Expand Up @@ -148,6 +148,7 @@ public void executeTask(
}
}
}
log.info("Finished Application Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@ public ClusterTask(@NotNull final Registry registry,
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Cluster Task in the workflow.");

final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final String genieDir = jobWorkingDirectory
+ JobConstants.FILE_PATH_DELIMITER
+ JobConstants.GENIE_PATH_VAR;
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Cluster Task for job {}", jobExecEnv.getJobRequest().getId());

// Create the directory for this application under applications in the cwd
createEntityInstanceDirectory(
Expand Down Expand Up @@ -122,6 +121,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
);
fts.getFile(configFile, localPath);
}
log.info("Finished Cluster Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@ public CommandTask(@NotNull final Registry registry,
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Command Task in the workflow.");

final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final String genieDir = jobWorkingDirectory
+ JobConstants.FILE_PATH_DELIMITER
+ JobConstants.GENIE_PATH_VAR;
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Command Task for job {}", jobExecEnv.getJobRequest().getId());

// Create the directory for this command under command dir in the cwd
createEntityInstanceDirectory(
Expand Down Expand Up @@ -121,6 +120,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
);
fts.getFile(configFile, localPath);
}
log.info("Finished Command Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ public InitialSetupTask(@NotNull final Registry registry) {
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Initial setup Task in the workflow.");

final String lineSeparator = System.lineSeparator();
final JobExecutionEnvironment jobExecEnv
= (JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Initial Setup Task for job {}", jobExecEnv.getJobRequest().getId());

// create top level directory structure for the job

Expand Down Expand Up @@ -251,6 +250,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie

// Append new line
writer.write(System.lineSeparator());
log.info("Finished Initial Setup Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.core.jobs.JobConstants;
import com.netflix.genie.core.jobs.JobExecutionEnvironment;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -56,12 +57,15 @@ public JobFailureAndKillHandlerLogicTask(@NotNull final Registry registry) {
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing JobKillLogic Task in the workflow.");
final JobExecutionEnvironment jobExecEnv
= (JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
log.info("Starting Job Failure and Kill Handler Task for job {}", jobExecEnv.getJobRequest().getId());

final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);

// Append logic for handling job kill signal
writer.write(JobConstants.JOB_FAILURE_AND_KILL_HANDLER_LOGIC + System.lineSeparator());
log.info("Finished Job Failure and Kill Handler Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ public JobKickoffTask(
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.info("Executing Job Kickoff Task in the workflow.");

final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Job Kickoff Task for job {}", jobExecEnv.getJobRequest().getId());

// At this point all contents are written to the run script and we call an explicit flush and close to write
// the contents to the file before we execute it.
Expand Down Expand Up @@ -154,6 +153,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
} catch (final IOException ie) {
throw new GenieServerException("Unable to start command " + String.valueOf(command), ie);
}
log.info("Finished Job Kickoff Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ public JobTask(
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Execution Job Task in the workflow.");

final JobExecutionEnvironment jobExecEnv
= (JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Job Task for job {}", jobExecEnv.getJobRequest().getId());

final String jobSetupFile = jobExecEnv.getJobRequest().getSetupFile();

Expand Down Expand Up @@ -153,6 +152,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
writer.write(JobConstants.GENIE_DONE_FILE_CONTENT_PREFIX
+ JobConstants.GENIE_DONE_FILE_NAME
+ System.lineSeparator());
log.info("Finished Job Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void createJob(
* {@inheritDoc}
*/
@Override
public synchronized void updateJobStatus(
public void updateJobStatus(
@NotBlank(message = "No job id entered. Unable to update.")
final String id,
@NotNull(message = "Status cannot be null.")
Expand Down Expand Up @@ -316,7 +316,7 @@ public void createJobExecution(
* {@inheritDoc}
*/
@Override
public synchronized void setJobCompletionInformation(
public void setJobCompletionInformation(
@NotBlank(message = "No job id entered. Unable to update.")
final String id,
final int exitCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,24 @@ public static Specification<ClusterEntity> findByClusterAndCommandCriteria(

cq.distinct(true);

predicates.add(cb.equal(commands.get(CommandEntity_.status), CommandStatus.ACTIVE));
predicates.add(cb.equal(root.get(ClusterEntity_.status), ClusterStatus.UP));

if (commandCriteria != null && !commandCriteria.isEmpty()) {
if (clusterCriteria != null && clusterCriteria.getTags() != null && !clusterCriteria.getTags().isEmpty()) {
predicates.add(
cb.like(
commands.get(CommandEntity_.tags),
JpaSpecificationUtils.getTagLikeString(commandCriteria)
root.get(ClusterEntity_.tags),
JpaSpecificationUtils.getTagLikeString(clusterCriteria.getTags())
)
);
}

if (clusterCriteria != null && clusterCriteria.getTags() != null && !clusterCriteria.getTags().isEmpty()) {
predicates.add(cb.equal(commands.get(CommandEntity_.status), CommandStatus.ACTIVE));

if (commandCriteria != null && !commandCriteria.isEmpty()) {
predicates.add(
cb.like(
root.get(ClusterEntity_.tags),
JpaSpecificationUtils.getTagLikeString(clusterCriteria.getTags())
commands.get(CommandEntity_.tags),
JpaSpecificationUtils.getTagLikeString(commandCriteria)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public String coordinateJob(
@Valid
final JobRequestMetadata jobRequestMetadata
) throws GenieException {
log.debug("Called with job request {}", jobRequest);
if (StringUtils.isBlank(jobRequest.getId())) {
throw new GenieServerException("Id of the jobRequest cannot be null");
}
log.info("Called to schedule job launch for job {}", jobRequest.getId());

// Log the job request and optionally the client host
this.jobPersistenceService.createJobRequest(jobRequest, jobRequestMetadata);
Expand All @@ -140,19 +140,28 @@ public String coordinateJob(
.withTags(jobRequest.getTags());

synchronized (this) {
log.info("Checking if can run job {} on this node", jobRequest.getId());
final int numActiveJobs = this.jobCountService.getNumJobs();
if (numActiveJobs < this.maxRunningJobs) {
log.info(
"Job {} can run on this node as only {}/{} jobs are active",
jobRequest.getId(),
numActiveJobs,
this.maxRunningJobs
);
jobBuilder
.withStatus(JobStatus.INIT)
.withStatusMsg("Job Accepted and in initialization phase.");
// TODO: if this throws exception the job will never be marked failed
this.jobPersistenceService.createJob(jobBuilder.build());
try {
log.info("Scheduling job {} for submission", jobRequest.getId());
final Future<?> task = this.taskExecutor.submit(
new JobLauncher(this.jobSubmitterService, jobRequest, this.registry)
);

// Tell the system a new job has been scheduled so any actions can be taken
log.info("Publishing job scheduled event for job {}", jobRequest.getId());
this.eventPublisher.publishEvent(new JobScheduledEvent(jobRequest.getId(), task, this));
} catch (final TaskRejectedException e) {
final String errorMsg = "Unable to launch job due to exception: " + e.getMessage();
Expand Down