Skip to content

Commit

Permalink
Adding metrics and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
tgianos committed Jul 29, 2016
1 parent 62a8d97 commit f0268de
Show file tree
Hide file tree
Showing 20 changed files with 255 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public JobLauncher(
@Override
public void run() {
this.submitTimer.record(() -> {
// TODO: Compress the timer into a single ID tagged by exceptions
try {
this.jobSubmitterService.submitJob(this.jobRequest);
} catch (final GenieException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public void executeTask(
) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Application Task in the workflow.");
final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final String genieDir = jobWorkingDirectory
+ JobConstants.FILE_PATH_DELIMITER
+ JobConstants.GENIE_PATH_VAR;
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Application Task for job {}", jobExecEnv.getJobRequest().getId());


if (jobExecEnv.getApplications() != null) {
Expand Down Expand Up @@ -148,6 +148,7 @@ public void executeTask(
}
}
}
log.info("Finished Application Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@ public ClusterTask(@NotNull final Registry registry,
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Cluster Task in the workflow.");

final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final String genieDir = jobWorkingDirectory
+ JobConstants.FILE_PATH_DELIMITER
+ JobConstants.GENIE_PATH_VAR;
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Cluster Task for job {}", jobExecEnv.getJobRequest().getId());

// Create the directory for this application under applications in the cwd
createEntityInstanceDirectory(
Expand Down Expand Up @@ -122,6 +121,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
);
fts.getFile(configFile, localPath);
}
log.info("Finished Cluster Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@ public CommandTask(@NotNull final Registry registry,
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Command Task in the workflow.");

final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final String genieDir = jobWorkingDirectory
+ JobConstants.FILE_PATH_DELIMITER
+ JobConstants.GENIE_PATH_VAR;
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Command Task for job {}", jobExecEnv.getJobRequest().getId());

// Create the directory for this command under command dir in the cwd
createEntityInstanceDirectory(
Expand Down Expand Up @@ -121,6 +120,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
);
fts.getFile(configFile, localPath);
}
log.info("Finished Command Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ public InitialSetupTask(@NotNull final Registry registry) {
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing Initial setup Task in the workflow.");

final String lineSeparator = System.lineSeparator();
final JobExecutionEnvironment jobExecEnv
= (JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Initial Setup Task for job {}", jobExecEnv.getJobRequest().getId());

// create top level directory structure for the job

Expand Down Expand Up @@ -251,6 +250,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie

// Append new line
writer.write(System.lineSeparator());
log.info("Finished Initial Setup Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.core.jobs.JobConstants;
import com.netflix.genie.core.jobs.JobExecutionEnvironment;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -56,12 +57,15 @@ public JobFailureAndKillHandlerLogicTask(@NotNull final Registry registry) {
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Executing JobKillLogic Task in the workflow.");
final JobExecutionEnvironment jobExecEnv
= (JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
log.info("Starting Job Failure and Kill Handler Task for job {}", jobExecEnv.getJobRequest().getId());

final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);

// Append logic for handling job kill signal
writer.write(JobConstants.JOB_FAILURE_AND_KILL_HANDLER_LOGIC + System.lineSeparator());
log.info("Finished Job Failure and Kill Handler Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ public JobKickoffTask(
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.info("Executing Job Kickoff Task in the workflow.");

final JobExecutionEnvironment jobExecEnv =
(JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Job Kickoff Task for job {}", jobExecEnv.getJobRequest().getId());

// At this point all contents are written to the run script and we call an explicit flush and close to write
// the contents to the file before we execute it.
Expand Down Expand Up @@ -154,6 +153,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
} catch (final IOException ie) {
throw new GenieServerException("Unable to start command " + String.valueOf(command), ie);
}
log.info("Finished Job Kickoff Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ public JobTask(
public void executeTask(@NotNull final Map<String, Object> context) throws GenieException, IOException {
final long start = System.nanoTime();
try {
log.debug("Execution Job Task in the workflow.");

final JobExecutionEnvironment jobExecEnv
= (JobExecutionEnvironment) context.get(JobConstants.JOB_EXECUTION_ENV_KEY);
final String jobWorkingDirectory = jobExecEnv.getJobWorkingDir().getCanonicalPath();
final Writer writer = (Writer) context.get(JobConstants.WRITER_KEY);
log.info("Starting Job Task for job {}", jobExecEnv.getJobRequest().getId());

final String jobSetupFile = jobExecEnv.getJobRequest().getSetupFile();

Expand Down Expand Up @@ -153,6 +152,7 @@ public void executeTask(@NotNull final Map<String, Object> context) throws Genie
writer.write(JobConstants.GENIE_DONE_FILE_CONTENT_PREFIX
+ JobConstants.GENIE_DONE_FILE_NAME
+ System.lineSeparator());
log.info("Finished Job Task for job {}", jobExecEnv.getJobRequest().getId());
} finally {
final long finish = System.nanoTime();
this.timer.record(finish - start, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void createJob(
* {@inheritDoc}
*/
@Override
public synchronized void updateJobStatus(
public void updateJobStatus(
@NotBlank(message = "No job id entered. Unable to update.")
final String id,
@NotNull(message = "Status cannot be null.")
Expand Down Expand Up @@ -316,7 +316,7 @@ public void createJobExecution(
* {@inheritDoc}
*/
@Override
public synchronized void setJobCompletionInformation(
public void setJobCompletionInformation(
@NotBlank(message = "No job id entered. Unable to update.")
final String id,
final int exitCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,24 @@ public static Specification<ClusterEntity> findByClusterAndCommandCriteria(

cq.distinct(true);

predicates.add(cb.equal(commands.get(CommandEntity_.status), CommandStatus.ACTIVE));
predicates.add(cb.equal(root.get(ClusterEntity_.status), ClusterStatus.UP));

if (commandCriteria != null && !commandCriteria.isEmpty()) {
if (clusterCriteria != null && clusterCriteria.getTags() != null && !clusterCriteria.getTags().isEmpty()) {
predicates.add(
cb.like(
commands.get(CommandEntity_.tags),
JpaSpecificationUtils.getTagLikeString(commandCriteria)
root.get(ClusterEntity_.tags),
JpaSpecificationUtils.getTagLikeString(clusterCriteria.getTags())
)
);
}

if (clusterCriteria != null && clusterCriteria.getTags() != null && !clusterCriteria.getTags().isEmpty()) {
predicates.add(cb.equal(commands.get(CommandEntity_.status), CommandStatus.ACTIVE));

if (commandCriteria != null && !commandCriteria.isEmpty()) {
predicates.add(
cb.like(
root.get(ClusterEntity_.tags),
JpaSpecificationUtils.getTagLikeString(clusterCriteria.getTags())
commands.get(CommandEntity_.tags),
JpaSpecificationUtils.getTagLikeString(commandCriteria)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public String coordinateJob(
@Valid
final JobRequestMetadata jobRequestMetadata
) throws GenieException {
log.debug("Called with job request {}", jobRequest);
if (StringUtils.isBlank(jobRequest.getId())) {
throw new GenieServerException("Id of the jobRequest cannot be null");
}
log.info("Called to schedule job launch for job {}", jobRequest.getId());

// Log the job request and optionally the client host
this.jobPersistenceService.createJobRequest(jobRequest, jobRequestMetadata);
Expand All @@ -140,19 +140,28 @@ public String coordinateJob(
.withTags(jobRequest.getTags());

synchronized (this) {
log.info("Checking if can run job {} on this node", jobRequest.getId());
final int numActiveJobs = this.jobCountService.getNumJobs();
if (numActiveJobs < this.maxRunningJobs) {
log.info(
"Job {} can run on this node as only {}/{} jobs are active",
jobRequest.getId(),
numActiveJobs,
this.maxRunningJobs
);
jobBuilder
.withStatus(JobStatus.INIT)
.withStatusMsg("Job Accepted and in initialization phase.");
// TODO: if this throws exception the job will never be marked failed
this.jobPersistenceService.createJob(jobBuilder.build());
try {
log.info("Scheduling job {} for submission", jobRequest.getId());
final Future<?> task = this.taskExecutor.submit(
new JobLauncher(this.jobSubmitterService, jobRequest, this.registry)
);

// Tell the system a new job has been scheduled so any actions can be taken
log.info("Publishing job scheduled event for job {}", jobRequest.getId());
this.eventPublisher.publishEvent(new JobScheduledEvent(jobRequest.getId(), task, this));
} catch (final TaskRejectedException e) {
final String errorMsg = "Unable to launch job due to exception: " + e.getMessage();
Expand Down

0 comments on commit f0268de

Please sign in to comment.