Skip to content

Commit

Permalink
[ML] Exceptions about starting native processes now include the node (#…
Browse files Browse the repository at this point in the history
…75937)

* [ML] Exceptions about starting native processes now include the node

Previously exceptions about starting native processes did not
include the name of the node where the attempt to start the
process failed.

Although this doesn't matter when looking at the log of the
node where the attempt was made, it is crippling when the
report of the problem is just the exception received by a
client. When the initial report comes from a client exception
we need to be able to easily determine which node to ask for
the logs for, or to look at operating system level issues on.

* Review comments

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
droberts195 and elasticmachine committed Aug 2, 2021
1 parent 95edc6d commit 64855b1
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@

public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<AnalyticsResult> {

private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class);
private static final Logger logger = LogManager.getLogger(NativeAnalyticsProcessFactory.class);

private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();

private final Environment env;
private final NativeController nativeController;
private final String nodeName;
private final NamedXContentRegistry namedXContentRegistry;
private final ResultsPersisterService resultsPersisterService;
private final DataFrameAnalyticsAuditor auditor;
Expand All @@ -57,6 +58,7 @@ public NativeAnalyticsProcessFactory(Environment env,
DataFrameAnalyticsAuditor auditor) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.nodeName = clusterService.getNodeName();
this.namedXContentRegistry = Objects.requireNonNull(namedXContentRegistry);
this.auditor = auditor;
this.resultsPersisterService = resultsPersisterService;
Expand Down Expand Up @@ -97,11 +99,11 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
return analyticsProcess;
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to data frame analytics process for job " + jobId;
LOGGER.error(msg);
logger.error(msg);
try {
IOUtils.close(analyticsProcess);
} catch (IOException ioe) {
LOGGER.error("Can't close data frame analytics process", ioe);
logger.error("Can't close data frame analytics process", ioe);
}
throw ExceptionsHelper.serverError(msg, e);
}
Expand All @@ -125,11 +127,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
analyticsBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching data frame analytics process", jobId);
logger.warn("[{}] Interrupted while launching data frame analytics process", jobId);
} catch (IOException e) {
String msg = "[" + jobId + "] Failed to launch data frame analytics process";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
logger.error(msg);
throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@

public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProcessFactory<MemoryUsageEstimationResult> {

private static final Logger LOGGER = LogManager.getLogger(NativeMemoryUsageEstimationProcessFactory.class);
private static final Logger logger = LogManager.getLogger(NativeMemoryUsageEstimationProcessFactory.class);

private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();

private final Environment env;
private final NativeController nativeController;
private final String nodeName;
private final AtomicLong counter;
private volatile Duration processConnectTimeout;

public NativeMemoryUsageEstimationProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.nodeName = clusterService.getNodeName();
this.counter = new AtomicLong(0);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(
Expand Down Expand Up @@ -85,11 +87,11 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
return process;
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to data frame analytics memory usage estimation process for job " + config.getId();
LOGGER.error(msg);
logger.error(msg);
try {
IOUtils.close(process);
} catch (IOException ioe) {
LOGGER.error("Can't close data frame analytics memory usage estimation process", ioe);
logger.error("Can't close data frame analytics memory usage estimation process", ioe);
}
throw ExceptionsHelper.serverError(msg, e);
}
Expand All @@ -104,11 +106,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
analyticsBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId);
logger.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId);
} catch (IOException e) {
String msg = "[" + jobId + "] Failed to launch data frame analytics memory usage estimation process";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
logger.error(msg);
throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class NativePyTorchProcessFactory implements PyTorchProcessFactory {

private final Environment env;
private final NativeController nativeController;
private final String nodeName;
private volatile Duration processConnectTimeout;

public NativePyTorchProcessFactory(Environment env,
NativeController nativeController,
ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.nodeName = clusterService.getNodeName();
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
Expand Down Expand Up @@ -95,8 +97,11 @@ private void executeProcess(ProcessPipes processPipes, List<Path> filesToDelete)
pyTorchBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while launching PyTorch process");
} catch (IOException e) {
throw ExceptionsHelper.serverError("Failed to launch PyTorch process");
String msg = "Failed to launch PyTorch process";
logger.error(msg);
throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public class NativeAutodetectProcessFactory implements AutodetectProcessFactory {

private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
private static final Logger logger = LogManager.getLogger(NativeAutodetectProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();

private final Environment env;
Expand All @@ -59,9 +59,9 @@ public NativeAutodetectProcessFactory(Environment env,
this.env = Objects.requireNonNull(env);
this.settings = Objects.requireNonNull(settings);
this.nativeController = Objects.requireNonNull(nativeController);
this.clusterService = clusterService;
this.resultsPersisterService = resultsPersisterService;
this.auditor = auditor;
this.clusterService = Objects.requireNonNull(clusterService);
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
this.auditor = Objects.requireNonNull(auditor);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
Expand Down Expand Up @@ -97,11 +97,11 @@ public AutodetectProcess createAutodetectProcess(String pipelineId,
return autodetect;
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to autodetect for job " + job.getId();
LOGGER.error(msg);
logger.error(msg);
try {
IOUtils.close(autodetect);
} catch (IOException ioe) {
LOGGER.error("Can't close autodetect", ioe);
logger.error("Can't close autodetect", ioe);
}
throw ExceptionsHelper.serverError(msg, e);
}
Expand All @@ -117,7 +117,7 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe
clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC))
.build();

AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, logger, env,
updatedSettings, nativeController, processPipes)
.referencedFilters(autodetectParams.filters())
.scheduledEvents(autodetectParams.scheduledEvents());
Expand All @@ -130,11 +130,11 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe
autodetectBuilder.build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching autodetect", job.getId());
logger.warn("[{}] Interrupted while launching autodetect", job.getId());
} catch (IOException e) {
String msg = "[" + job.getId() + "] Failed to launch autodetect";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
logger.error(msg);
throw ExceptionsHelper.serverError(msg + " on [" + clusterService.getNodeName() + "]", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@

public class NativeNormalizerProcessFactory implements NormalizerProcessFactory {

private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
private static final Logger logger = LogManager.getLogger(NativeNormalizerProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();

private final Environment env;
private final NativeController nativeController;
private final String nodeName;
private final AtomicLong counter;
private volatile Duration processConnectTimeout;

public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.nodeName = clusterService.getNodeName();
this.counter = new AtomicLong(0);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
Expand Down Expand Up @@ -66,11 +68,11 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS
return normalizerProcess;
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to normalizer for job " + jobId;
LOGGER.error(msg);
logger.error(msg);
try {
IOUtils.close(normalizerProcess);
} catch (IOException ioe) {
LOGGER.error("Can't close normalizer", ioe);
logger.error("Can't close normalizer", ioe);
}
throw ExceptionsHelper.serverError(msg, e);
}
Expand All @@ -84,11 +86,11 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
nativeController.startProcess(command);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("[{}] Interrupted while launching normalizer", jobId);
logger.warn("[{}] Interrupted while launching normalizer", jobId);
} catch (IOException e) {
String msg = "[" + jobId + "] Failed to launch normalizer";
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
logger.error(msg);
throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e);
}
}
}
Expand Down

0 comments on commit 64855b1

Please sign in to comment.