Skip to content

Commit

Permalink
Do not throw an exception if the process finished quickly but without…
Browse files Browse the repository at this point in the history
… any error. (#46073)
  • Loading branch information
przemekwitek committed Aug 29, 2019
1 parent 02abb1a commit 8cc4194
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

public class MemoryUsageEstimationProcessManager {

Expand Down Expand Up @@ -74,24 +72,21 @@ private MemoryUsageEstimationResult runJob(String jobId,
"",
categoricalFields,
config.getAnalysis());
ProcessHolder processHolder = new ProcessHolder();
AnalyticsProcess<MemoryUsageEstimationResult> process =
processFactory.createAnalyticsProcess(
jobId,
processConfig,
executorServiceForProcess,
onProcessCrash(jobId, processHolder));
processHolder.process = process;
if (process.isProcessAlive() == false) {
String errorMsg =
new ParameterizedMessage("[{}] Error while starting process: {}", jobId, process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg);
}
// The handler passed here will never be called as AbstractNativeProcess.detectCrash method returns early when
// (processInStream == null) which is the case for MemoryUsageEstimationProcess.
reason -> {});
try {
return readResult(jobId, process);
} catch (Exception e) {
String errorMsg =
new ParameterizedMessage("[{}] Error while processing result [{}]", jobId, e.getMessage()).getFormattedMessage();
new ParameterizedMessage(
"[{}] Error while processing process output [{}], process errors: [{}]",
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e);
} finally {
process.consumeAndCloseOutputStream();
Expand All @@ -101,31 +96,14 @@ private MemoryUsageEstimationResult runJob(String jobId,
LOGGER.info("[{}] Closed process", jobId);
} catch (Exception e) {
String errorMsg =
new ParameterizedMessage("[{}] Error while closing process [{}]", jobId, e.getMessage()).getFormattedMessage();
new ParameterizedMessage(
"[{}] Error while closing process [{}], process errors: [{}]",
jobId, e.getMessage(), process.readError()).getFormattedMessage();
throw ExceptionsHelper.serverError(errorMsg, e);
}
}
}

private static class ProcessHolder {
volatile AnalyticsProcess<MemoryUsageEstimationResult> process;
}

private static Consumer<String> onProcessCrash(String jobId, ProcessHolder processHolder) {
return reason -> {
AnalyticsProcess<MemoryUsageEstimationResult> process = processHolder.process;
if (process == null) {
LOGGER.error(new ParameterizedMessage("[{}] Process does not exist", jobId));
return;
}
try {
process.kill();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", jobId), e);
}
};
}

/**
* Extracts {@link MemoryUsageEstimationResult} from process' output.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.dataframe.process;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -65,7 +66,6 @@ public void setUpMocks() {
executorServiceForJob = EsExecutors.newDirectExecutorService();
executorServiceForProcess = mock(ExecutorService.class);
process = mock(AnalyticsProcess.class);
when(process.isProcessAlive()).thenReturn(true);
when(process.readAnalyticsResults()).thenReturn(List.of(PROCESS_RESULT).iterator());
processFactory = mock(AnalyticsProcessFactory.class);
when(processFactory.createAnalyticsProcess(anyString(), any(), any(), any())).thenReturn(process);
Expand Down Expand Up @@ -93,61 +93,61 @@ public void testRunJob_EmptyDataFrame() {
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_ProcessNotAlive() {
when(process.isProcessAlive()).thenReturn(false);
when(process.readError()).thenReturn("Error from inside the process");
public void testRunJob_NoResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.<MemoryUsageEstimationResult>of().iterator());

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("Error while starting process"));
assertThat(exception.getMessage(), containsString("Error from inside the process"));
assertThat(exception.getMessage(), containsString("no results"));

verify(process).isProcessAlive();
verify(process).readError();
InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_NoResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.<MemoryUsageEstimationResult>of().iterator());
public void testRunJob_MultipleResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.of(PROCESS_RESULT, PROCESS_RESULT).iterator());

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("no results"));
assertThat(exception.getMessage(), containsString("more than one result"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_MultipleResults() throws Exception {
when(process.readAnalyticsResults()).thenReturn(List.of(PROCESS_RESULT, PROCESS_RESULT).iterator());
public void testRunJob_OneResult_ParseException() throws Exception {
when(process.readAnalyticsResults()).thenThrow(new ElasticsearchParseException("cannot parse result"));

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("more than one result"));
assertThat(exception.getMessage(), containsString("cannot parse result"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).readError();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
verifyNoMoreInteractions(process, listener);

}

public void testRunJob_FailsOnClose() throws Exception {
Expand All @@ -162,10 +162,32 @@ public void testRunJob_FailsOnClose() throws Exception {
assertThat(exception.getMessage(), containsString("Error while closing process"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
}

public void testRunJob_FailsOnClose_ProcessReportsError() throws Exception {
doThrow(ExceptionsHelper.serverError("some LOG(ERROR) lines coming from cpp process")).when(process).close();
when(process.readError()).thenReturn("Error from inside the process");

processManager.runJobAsync(TASK_ID, dataFrameAnalyticsConfig, dataExtractorFactory, listener);

verify(listener).onFailure(exceptionCaptor.capture());
ElasticsearchException exception = (ElasticsearchException) exceptionCaptor.getValue();
assertThat(exception.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(exception.getMessage(), containsString(TASK_ID));
assertThat(exception.getMessage(), containsString("Error while closing process"));
assertThat(exception.getMessage(), containsString("some LOG(ERROR) lines coming from cpp process"));
assertThat(exception.getMessage(), containsString("Error from inside the process"));

InOrder inOrder = inOrder(process);
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
inOrder.verify(process).readError();
verifyNoMoreInteractions(process, listener);
}

Expand All @@ -177,7 +199,6 @@ public void testRunJob_Ok() throws Exception {
assertThat(result, equalTo(PROCESS_RESULT));

InOrder inOrder = inOrder(process);
inOrder.verify(process).isProcessAlive();
inOrder.verify(process).readAnalyticsResults();
inOrder.verify(process).consumeAndCloseOutputStream();
inOrder.verify(process).close();
Expand Down

0 comments on commit 8cc4194

Please sign in to comment.