Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Request Tracing Implementation #1093

Merged
merged 6 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.netflix.genie.agent.execution.process.impl;

import brave.Span;
import brave.Tracer;
import com.netflix.genie.agent.cli.logging.ConsoleLog;
import com.netflix.genie.agent.execution.exceptions.JobLaunchException;
import com.netflix.genie.agent.execution.process.JobProcessManager;
Expand All @@ -25,6 +27,8 @@
import com.netflix.genie.agent.utils.PathUtils;
import com.netflix.genie.common.dto.JobStatusMessages;
import com.netflix.genie.common.external.dtos.v4.JobStatus;
import com.netflix.genie.common.internal.tracing.brave.BraveTracePropagator;
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -60,15 +64,20 @@ public class JobProcessManagerImpl implements JobProcessManager {
private final AtomicReference<ScheduledFuture> timeoutKillThread = new AtomicReference<>();
private final AtomicReference<File> initFailedFileRef = new AtomicReference<>();
private final TaskScheduler taskScheduler;
private final Tracer tracer;
private final BraveTracePropagator tracePropagator;
private boolean isInteractiveMode;

/**
* Constructor.
*
* @param taskScheduler The {@link TaskScheduler} instance to use to run scheduled asynchronous tasks
* @param taskScheduler The {@link TaskScheduler} instance to use to run scheduled asynchronous tasks
* @param tracingComponents The {@link BraveTracingComponents} instance to use for propagating trace information
*/
public JobProcessManagerImpl(final TaskScheduler taskScheduler) {
public JobProcessManagerImpl(final TaskScheduler taskScheduler, final BraveTracingComponents tracingComponents) {
this.taskScheduler = taskScheduler;
this.tracer = tracingComponents.getTracer();
this.tracePropagator = tracingComponents.getTracePropagator();
}

/**
Expand All @@ -80,7 +89,8 @@ public void launchProcess(
final File jobScript,
final boolean interactive,
@Nullable final Integer timeout,
final boolean launchInJobDirectory) throws JobLaunchException {
final boolean launchInJobDirectory
) throws JobLaunchException {
if (!this.launched.compareAndSet(false, true)) {
throw new IllegalStateException("Job already launched");
}
Expand Down Expand Up @@ -127,6 +137,11 @@ public void launchProcess(
processBuilder.redirectOutput(PathUtils.jobStdOutPath(jobDirectory).toFile());
}

final Span currentSpan = this.tracer.currentSpan();
if (currentSpan != null) {
processBuilder.environment().putAll(this.tracePropagator.injectForJob(currentSpan.context()));
}

if (this.killed.get()) {
log.info("Job aborted, skipping launch");
return;
Expand Down Expand Up @@ -183,23 +198,6 @@ public void kill(final KillService.KillSource source) {
log.error("Failed to kill job process");
}

private void gracefullyKill(final Process process) throws Exception {
final Instant graceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
process.destroy();
while (process.isAlive() && Instant.now().isBefore(graceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

private void forcefullyKill(final Process process) throws Exception {
final Instant forceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
// In Java8, this is exactly destroy(). However, this behavior can be changed in future java.
process.destroyForcibly();
while (process.isAlive() && Instant.now().isBefore(forceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -284,6 +282,23 @@ public JobProcessResult waitFor() throws InterruptedException {
return new JobProcessResult.Builder(JobStatus.FAILED, statusMessage, exitCode).build();
}

private void gracefullyKill(final Process process) throws Exception {
final Instant graceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
process.destroy();
while (process.isAlive() && Instant.now().isBefore(graceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

private void forcefullyKill(final Process process) throws Exception {
final Instant forceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
// In Java8, this is exactly destroy(). However, this behavior can be changed in future java.
process.destroyForcibly();
while (process.isAlive() && Instant.now().isBefore(forceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these 2 method moves seem unnecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Intellij auto formatting. oh well

/* TODO: HACK, Process does not expose PID in Java 8 API */
private long getPid(final Process process) {
long pid = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.netflix.genie.agent.execution.process.JobProcessManager;
import com.netflix.genie.agent.execution.process.impl.JobProcessManagerImpl;
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
Expand All @@ -38,15 +39,17 @@ public class ProcessAutoConfiguration {
/**
* Provide a lazy {@link JobProcessManager} bean if one hasn't already been defined.
*
* @param taskScheduler The {@link TaskScheduler} instance to use
* @param taskScheduler The {@link TaskScheduler} instance to use
* @param tracingComponents The {@link BraveTracingComponents} instance to use
* @return A {@link JobProcessManagerImpl} instance
*/
@Bean
@Lazy
@ConditionalOnMissingBean(JobProcessManager.class)
public JobProcessManagerImpl jobProcessManager(
@Qualifier("sharedAgentTaskScheduler") final TaskScheduler taskScheduler
@Qualifier("sharedAgentTaskScheduler") final TaskScheduler taskScheduler,
final BraveTracingComponents tracingComponents
) {
return new JobProcessManagerImpl(taskScheduler);
return new JobProcessManagerImpl(taskScheduler, tracingComponents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
*/
package com.netflix.genie.agent.execution.process.impl

import brave.Span
import brave.Tracer
import brave.propagation.TraceContext
import com.netflix.genie.agent.execution.exceptions.JobLaunchException
import com.netflix.genie.agent.execution.process.JobProcessManager
import com.netflix.genie.agent.execution.process.JobProcessResult
import com.netflix.genie.agent.execution.services.KillService
import com.netflix.genie.agent.utils.PathUtils
import com.netflix.genie.common.dto.JobStatusMessages
import com.netflix.genie.common.external.dtos.v4.JobStatus
import com.netflix.genie.common.internal.tracing.brave.BraveTagAdapter
import com.netflix.genie.common.internal.tracing.brave.BraveTracePropagator
import com.netflix.genie.common.internal.tracing.brave.BraveTracingCleanup
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents
import org.springframework.core.io.ClassPathResource
import org.springframework.scheduling.TaskScheduler
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
Expand All @@ -49,14 +56,36 @@ class JobProcessManagerImplSpec extends Specification {
File stdErr
TaskScheduler scheduler
JobProcessManager manager
Tracer tracer
BraveTracePropagator tracePropagator
Span span
TraceContext traceContext

def setup() {
this.stdOut = PathUtils.jobStdOutPath(temporaryFolder.toFile()).toFile()
this.stdErr = PathUtils.jobStdErrPath(temporaryFolder.toFile()).toFile()
Files.createDirectories(this.stdOut.getParentFile().toPath())
Files.createDirectories(this.stdErr.getParentFile().toPath())
this.scheduler = Mock(TaskScheduler)
this.manager = new JobProcessManagerImpl(this.scheduler)
this.tracer = Mock(Tracer)
this.tracePropagator = Mock(BraveTracePropagator)
UUID uuid = UUID.randomUUID()
this.traceContext = TraceContext.newBuilder()
.traceId(uuid.getLeastSignificantBits())
.traceIdHigh(uuid.getMostSignificantBits())
.spanId(UUID.randomUUID().getLeastSignificantBits())
.sampled(true)
.build()
this.span = Mock(Span)
this.manager = new JobProcessManagerImpl(
this.scheduler,
new BraveTracingComponents(
this.tracer,
this.tracePropagator,
Mock(BraveTracingCleanup),
Mock(BraveTagAdapter)
)
)
}

def cleanup() {
Expand Down Expand Up @@ -84,6 +113,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -125,6 +157,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> ["HI": "bye"]
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -157,6 +192,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> null
0 * this.span.context()
0 * this.tracePropagator.injectForJob(this.traceContext)
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -192,6 +230,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand All @@ -218,6 +259,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand Down Expand Up @@ -248,6 +290,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -263,6 +306,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -280,6 +324,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -298,6 +343,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -319,6 +365,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
1 * this.scheduler.schedule(_ as Runnable, _ as Instant) >> future

when:
Expand Down Expand Up @@ -356,6 +405,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -401,6 +453,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand All @@ -420,9 +475,9 @@ class JobProcessManagerImplSpec extends Specification {
result.getExitCode() == 143

where:
interactive | expectedStatusMessage
true | JobStatusMessages.JOB_KILLED_BY_USER
false | JobStatusMessages.JOB_KILLED_BY_SYSTEM
interactive | expectedStatusMessage
true | JobStatusMessages.JOB_KILLED_BY_USER
false | JobStatusMessages.JOB_KILLED_BY_SYSTEM
}

def "Kill completed process"() {
Expand All @@ -442,6 +497,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

// Wait until the process actually completes by checking the existence of the file
Expand Down Expand Up @@ -492,6 +550,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -523,6 +584,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand All @@ -536,6 +600,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(IllegalStateException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -558,7 +623,15 @@ class JobProcessManagerImplSpec extends Specification {
threadPoolScheduler.setThreadNamePrefix("job-process-manager-impl-spec-")
threadPoolScheduler.setWaitForTasksToCompleteOnShutdown(false)
threadPoolScheduler.initialize()
def realManager = new JobProcessManagerImpl(threadPoolScheduler)
def realManager = new JobProcessManagerImpl(
threadPoolScheduler,
new BraveTracingComponents(
this.tracer,
this.tracePropagator,
Mock(BraveTracingCleanup),
Mock(BraveTagAdapter)
)
)

when:
realManager.launchProcess(
Expand All @@ -571,6 +644,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()

when:
def result = realManager.waitFor()
Expand Down Expand Up @@ -614,6 +690,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
1 * this.scheduler.schedule(_ as Runnable, _ as Instant) >> future

// Wait until the process actually starts by checking the existence of the runfile
Expand Down