Skip to content

Commit

Permalink
[ML] Debug logging for NLP process start and stop (#91798)
Browse files Browse the repository at this point in the history
For the test failures in #91422
  • Loading branch information
davidkyle committed Nov 22, 2022
1 parent 6c39dc9 commit 265f208
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ private void doStartDeployment(TrainedModelDeploymentTask task, ActionListener<T
});

ActionListener<Boolean> modelLoadedListener = ActionListener.wrap(success -> {
logger.debug("[{}] model loaded. Starting result processor thread", task.getModelId());
executorServiceForProcess.execute(() -> processContext.getResultProcessor().process(processContext.process.get()));
listener.onResponse(task);
}, listener::onFailure);
Expand Down Expand Up @@ -179,7 +180,7 @@ private void doStartDeployment(TrainedModelDeploymentTask task, ActionListener<T
// `startAndLoad` creates named pipes, blocking the calling thread, better to execute that in our utility
// executor.
executorServiceForDeployment.execute(
() -> startAndLoad(processContext, modelConfig.getLocation(), modelLoadedListener)
() -> startAndLoad(processContext, modelConfig.getLocation(), modelConfig.getModelId(), modelLoadedListener)
);
}, listener::onFailure));
} else {
Expand Down Expand Up @@ -229,10 +230,18 @@ Vocabulary parseVocabularyDocLeniently(SearchHit hit) throws IOException {
}
}

private void startAndLoad(ProcessContext processContext, TrainedModelLocation modelLocation, ActionListener<Boolean> loadedListener) {
private void startAndLoad(
ProcessContext processContext,
TrainedModelLocation modelLocation,
String modelId,
ActionListener<Boolean> loadedListener
) {
logger.debug("[{}] start and load", modelId);
try {
processContext.startProcess();
logger.debug("[{}] process started", modelId);
processContext.loadModel(modelLocation, ActionListener.wrap(success -> {
logger.debug("[{}] model loaded, starting priority process worker thread", modelId);
processContext.startPriorityProcessWorker();
loadedListener.onResponse(success);
}, loadedListener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void ignoreResponseWithoutNotifying(String requestId) {
}

public void process(PyTorchProcess process) {
logger.debug(() -> "[" + deploymentId + "] Results processing started");
try {
Iterator<PyTorchResult> iterator = process.readResults();
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public PyTorchStateStreamer(Client client, ExecutorService executorService, Name
* Cancels the state streaming at the first opportunity.
*/
public void cancel() {
logger.debug("Canceling model loading");
isCancelled = true;
}

Expand All @@ -74,7 +75,7 @@ public void writeStateToStream(String modelId, String index, OutputStream restor
ChunkedTrainedModelRestorer restorer = new ChunkedTrainedModelRestorer(modelId, client, executorService, xContentRegistry);
restorer.setSearchIndex(index);
restorer.setSearchSize(1);
restorer.restoreModelDefinition(doc -> writeChunk(doc, restoreStream), success -> {
restorer.restoreModelDefinition(doc -> writeChunk(doc, restoreStream, modelId), success -> {
logger.debug("model [{}] state restored in [{}] documents from index [{}]", modelId, restorer.getNumDocsWritten(), index);

if (success) {
Expand All @@ -89,12 +90,14 @@ public void writeStateToStream(String modelId, String index, OutputStream restor
} else {
logger.info("[{}] loading model state cancelled", modelId);
}
logger.debug("[{}] model state successfully loaded", modelId);
listener.onResponse(success);
}, listener::onFailure);
}

private boolean writeChunk(TrainedModelDefinitionDoc doc, OutputStream outputStream) throws IOException {
private boolean writeChunk(TrainedModelDefinitionDoc doc, OutputStream outputStream, String modelId) throws IOException {
if (isCancelled) {
logger.info("[{}] state streamer has been cancelled", modelId);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void start() {
Thread.currentThread().interrupt();
} finally {
awaitTermination.countDown();
logger.debug("[{}] Process worker executor has stopped ", processName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ public void kill(boolean awaitCompletion) throws IOException {
} catch (IOException e) {
// Ignore it - we're shutting down and the method itself has logged a warning
}

LOGGER.debug("[{}] process {} killed", jobId, getName());
}
}

Expand Down

0 comments on commit 265f208

Please sign in to comment.