Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fixing bug in scheduler (#215)
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent 4b62d14 commit 2dccc85
Showing 1 changed file with 8 additions and 9 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.functions.worker.scheduler.IScheduler; import org.apache.pulsar.functions.worker.scheduler.IScheduler;


import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -112,17 +113,15 @@ private void invokeScheduler() {
Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager.getCurrentAssignments(); Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager.getCurrentAssignments();


//delete assignments of functions that don't exist anymore //delete assignments of functions that don't exist anymore
List<Assignment> invalidAssignments = new LinkedList<>(); Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator();
for (Map<String, Assignment> entryMap : workerIdToAssignments.values()) { while(it.hasNext()) {
for (Map.Entry<String, Assignment> entry : entryMap.entrySet()) { Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next();
String fullyQualifiedName = entry.getKey(); Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue();
Assignment assignment = entry.getValue(); functionMap.entrySet().removeIf(entry -> !fullyQualifiedNames.contains(entry.getKey()));
if (!fullyQualifiedNames.contains(fullyQualifiedName)) { if (functionMap.isEmpty()) {
invalidAssignments.add(assignment); it.remove();
}
} }
} }
this.functionRuntimeManager.removeAssignments(invalidAssignments);


List<Assignment> currentAssignments = workerIdToAssignments List<Assignment> currentAssignments = workerIdToAssignments
.entrySet().stream() .entrySet().stream()
Expand Down

0 comments on commit 2dccc85

Please sign in to comment.