Skip to content

Commit

Permalink
[ML] Take more care that normalize processes use unique named pipes (#…
Browse files Browse the repository at this point in the history
…54642)

When one of ML's normalize processes fails to connect to the JVM
quickly enough and another normalize process for the same job
starts shortly afterwards it is possible that their named pipes
can get mixed up.

This change avoids the risk of that by adding an incrementing
counter value into the named pipe names used for normalize
processes.

Backport of #54636
  • Loading branch information
droberts195 committed Apr 2, 2020
1 parent d4d3c1b commit fec9d5d
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;

public class NativeNormalizerProcessFactory implements NormalizerProcessFactory {

Expand All @@ -31,11 +32,13 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory

private final Environment env;
private final NativeController nativeController;
private final AtomicLong counter;
private volatile Duration processConnectTimeout;

public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.counter = new AtomicLong(0);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
Expand All @@ -48,8 +51,11 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) {
@Override
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
ExecutorService executorService) {
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, jobId,
true, false, true, true, false, false);
// The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE,
jobId + "_" + counter.incrementAndGet(), true, false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);

NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),
Expand Down

0 comments on commit fec9d5d

Please sign in to comment.