diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index da7b8168a75ad..26245b135a165 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -37,12 +37,13 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { - private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class); + private static final Logger logger = LogManager.getLogger(NativeAnalyticsProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private final Environment env; private final NativeController nativeController; + private final String nodeName; private final NamedXContentRegistry namedXContentRegistry; private final ResultsPersisterService resultsPersisterService; private final DataFrameAnalyticsAuditor auditor; @@ -57,6 +58,7 @@ public NativeAnalyticsProcessFactory(Environment env, DataFrameAnalyticsAuditor auditor) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + this.nodeName = clusterService.getNodeName(); this.namedXContentRegistry = Objects.requireNonNull(namedXContentRegistry); this.auditor = auditor; this.resultsPersisterService = resultsPersisterService; @@ -97,11 +99,11 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co return analyticsProcess; } catch (IOException | EsRejectedExecutionException e) { String msg = "Failed to connect to data frame analytics process for job " + jobId; - LOGGER.error(msg); + logger.error(msg); try { IOUtils.close(analyticsProcess); } catch (IOException ioe) { - LOGGER.error("Can't close data frame analytics process", ioe); + logger.error("Can't close data frame analytics process", ioe); } throw ExceptionsHelper.serverError(msg, e); } @@ -125,11 +127,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP analyticsBuilder.build(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("[{}] Interrupted while launching data frame analytics process", jobId); + logger.warn("[{}] Interrupted while launching data frame analytics process", jobId); } catch (IOException e) { String msg = "[" + jobId + "] Failed to launch data frame analytics process"; - LOGGER.error(msg); - throw ExceptionsHelper.serverError(msg, e); + logger.error(msg); + throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 19211c42cf92b..dc4d94d918ad0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -33,18 +33,20 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProcessFactory { - private static final Logger LOGGER = LogManager.getLogger(NativeMemoryUsageEstimationProcessFactory.class); + private static final Logger logger = LogManager.getLogger(NativeMemoryUsageEstimationProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private final Environment env; private final NativeController nativeController; + private final String nodeName; private final AtomicLong counter; private volatile Duration processConnectTimeout; public NativeMemoryUsageEstimationProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + this.nodeName = clusterService.getNodeName(); this.counter = new AtomicLong(0); setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); clusterService.getClusterSettings().addSettingsUpdateConsumer( @@ -85,11 +87,11 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess( return process; } catch (IOException | EsRejectedExecutionException e) { String msg = "Failed to connect to data frame analytics memory usage estimation process for job " + config.getId(); - LOGGER.error(msg); + logger.error(msg); try { IOUtils.close(process); } catch (IOException ioe) { - LOGGER.error("Can't close data frame analytics memory usage estimation process", ioe); + logger.error("Can't close data frame analytics memory usage estimation process", ioe); } throw ExceptionsHelper.serverError(msg, e); } @@ -104,11 +106,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP analyticsBuilder.build(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId); + logger.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId); } catch (IOException e) { String msg = "[" + jobId + "] Failed to launch data frame analytics memory usage estimation process"; - LOGGER.error(msg); - throw ExceptionsHelper.serverError(msg, e); + logger.error(msg); + throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/pytorch/process/NativePyTorchProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/pytorch/process/NativePyTorchProcessFactory.java index 88edb65684c16..ec5c1887822f2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/pytorch/process/NativePyTorchProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/pytorch/process/NativePyTorchProcessFactory.java @@ -37,6 +37,7 @@ public class NativePyTorchProcessFactory implements PyTorchProcessFactory { private final Environment env; private final NativeController nativeController; + private final String nodeName; private volatile Duration processConnectTimeout; public NativePyTorchProcessFactory(Environment env, @@ -44,6 +45,7 @@ public NativePyTorchProcessFactory(Environment env, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + this.nodeName = clusterService.getNodeName(); setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, this::setProcessConnectTimeout); @@ -95,8 +97,11 @@ private void executeProcess(ProcessPipes processPipes, List filesToDelete) pyTorchBuilder.build(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + logger.warn("Interrupted while launching PyTorch process"); } catch (IOException e) { - throw ExceptionsHelper.serverError("Failed to launch PyTorch process"); + String msg = "Failed to launch PyTorch process"; + logger.error(msg); + throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index e9184f9e2aec6..db98f7d0c65cd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -39,7 +39,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory { - private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class); + private static final Logger logger = LogManager.getLogger(NativeAutodetectProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private final Environment env; @@ -59,9 +59,9 @@ public NativeAutodetectProcessFactory(Environment env, this.env = Objects.requireNonNull(env); this.settings = Objects.requireNonNull(settings); this.nativeController = Objects.requireNonNull(nativeController); - this.clusterService = clusterService; - this.resultsPersisterService = resultsPersisterService; - this.auditor = auditor; + this.clusterService = Objects.requireNonNull(clusterService); + this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); + this.auditor = Objects.requireNonNull(auditor); setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings)); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, this::setProcessConnectTimeout); @@ -97,11 +97,11 @@ public AutodetectProcess createAutodetectProcess(String pipelineId, return autodetect; } catch (IOException | EsRejectedExecutionException e) { String msg = "Failed to connect to autodetect for job " + job.getId(); - LOGGER.error(msg); + logger.error(msg); try { IOUtils.close(autodetect); } catch (IOException ioe) { - LOGGER.error("Can't close autodetect", ioe); + logger.error("Can't close autodetect", ioe); } throw ExceptionsHelper.serverError(msg, e); } @@ -117,7 +117,7 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC)) .build(); - AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env, + AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, logger, env, updatedSettings, nativeController, processPipes) .referencedFilters(autodetectParams.filters()) .scheduledEvents(autodetectParams.scheduledEvents()); @@ -130,11 +130,11 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe autodetectBuilder.build(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("[{}] Interrupted while launching autodetect", job.getId()); + logger.warn("[{}] Interrupted while launching autodetect", job.getId()); } catch (IOException e) { String msg = "[" + job.getId() + "] Failed to launch autodetect"; - LOGGER.error(msg); - throw ExceptionsHelper.serverError(msg, e); + logger.error(msg); + throw ExceptionsHelper.serverError(msg + " on [" + clusterService.getNodeName() + "]", e); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 7d30119902736..22e32668598cd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -28,17 +28,19 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory { - private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class); + private static final Logger logger = LogManager.getLogger(NativeNormalizerProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private final Environment env; private final NativeController nativeController; + private final String nodeName; private final AtomicLong counter; private volatile Duration processConnectTimeout; public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + this.nodeName = clusterService.getNodeName(); this.counter = new AtomicLong(0); setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, @@ -66,11 +68,11 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS return normalizerProcess; } catch (IOException | EsRejectedExecutionException e) { String msg = "Failed to connect to normalizer for job " + jobId; - LOGGER.error(msg); + logger.error(msg); try { IOUtils.close(normalizerProcess); } catch (IOException ioe) { - LOGGER.error("Can't close normalizer", ioe); + logger.error("Can't close normalizer", ioe); } throw ExceptionsHelper.serverError(msg, e); } @@ -84,11 +86,11 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip nativeController.startProcess(command); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("[{}] Interrupted while launching normalizer", jobId); + logger.warn("[{}] Interrupted while launching normalizer", jobId); } catch (IOException e) { String msg = "[" + jobId + "] Failed to launch normalizer"; - LOGGER.error(msg); - throw ExceptionsHelper.serverError(msg, e); + logger.error(msg); + throw ExceptionsHelper.serverError(msg + " on [" + nodeName + "]", e); } } }