Skip to content

Commit

Permalink
[python] Fixes retry_threshold bug
Browse files Browse the repository at this point in the history
For python worker, PyProcess is recreated for each worker, restart count is lost.
which lead the model is never marked as failure, and PING will alwasy report healthy.
  • Loading branch information
frankfliu committed Jun 7, 2023
1 parent be0bf07 commit cd089ab
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class PyProcess {
private ReaderThread out;
private AtomicInteger restartCount;
private CompletableFuture<Void> restartFuture;
private int retryThreshold;

PyProcess(Model model, PyEnv pyEnv, int workerId) {
this.model = model;
Expand All @@ -67,7 +66,6 @@ class PyProcess {
connections = Collections.singletonList(new Connection(pyEnv, workerId, -1));
}
restartCount = new AtomicInteger(0);
retryThreshold = Integer.parseInt(model.getProperty("retry_threshold", "10"));
}

Output predict(Input inputs, int timeout, boolean initialLoad) {
Expand Down Expand Up @@ -175,9 +173,8 @@ synchronized void startPythonProcess() {
synchronized void stopPythonProcess() {
int id = restartCount.getAndIncrement();
logger.info("Stop process: {}:{}", workerId, id);
if (id >= retryThreshold) {
model.setProperty("failed", "true");
}
int failures = Integer.parseInt(model.getProperty("failed", "0"));
model.setProperty("failed", String.valueOf(failures + 1));

if (restartFuture != null) {
try {
Expand Down
5 changes: 4 additions & 1 deletion wlm/src/main/java/ai/djl/serving/wlm/ModelInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,11 @@ public Status getStatus() {
} else if (status == Status.FAILED) {
return Status.FAILED;
}

for (Model m : getModels().values()) {
if (Boolean.parseBoolean(m.getProperty("failed"))) {
int retryThreshold = Integer.parseInt(m.getProperty("retry_threshold", "10"));
int failures = Integer.parseInt(m.getProperty("failed", "0"));
if (failures > retryThreshold) {
return Status.FAILED;
}
}
Expand Down
1 change: 1 addition & 0 deletions wlm/src/main/java/ai/djl/serving/wlm/WorkLoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public <I, O> CompletableFuture<O> runJob(Job<I, O> job) {
}
LinkedBlockingDeque<WorkerJob<I, O>> queue = pool.getJobQueue();
if ((queue.remainingCapacity() == 1 && pool.isAllWorkerBusy())
|| pool.isAllWorkerDied()
|| !queue.offer(new WorkerJob<>(job, result))) {
result.completeExceptionally(
new WlmCapacityException(
Expand Down
16 changes: 16 additions & 0 deletions wlm/src/main/java/ai/djl/serving/wlm/WorkerPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@ public int getMaxWorkers() {
return workerGroups.values().stream().mapToInt(g -> g.maxWorkers).reduce(0, Integer::sum);
}

/**
* Return if all workers died.
*
* @return true if all workers died
*/
public boolean isAllWorkerDied() {
for (WorkerGroup<I, O> group : workerGroups.values()) {
for (WorkerThread<?, ?> thread : group.getWorkers()) {
if (thread.isRunning()) {
return false;
}
}
}
return true;
}

/**
* Returns {@code true} if all workers are busy.
*
Expand Down

0 comments on commit cd089ab

Please sign in to comment.