Skip to content

Commit

Permalink
Hack made less hacky by writing DTO
Browse files Browse the repository at this point in the history
  • Loading branch information
TomaszGaweda committed Jun 14, 2022
1 parent d42bd5d commit e2ff7cf
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
Expand Up @@ -67,10 +67,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -96,6 +94,7 @@
import static com.hazelcast.jet.impl.util.Util.memoize;
import static com.hazelcast.spi.impl.executionservice.ExecutionService.JOB_OFFLOADABLE_EXECUTOR;
import static java.util.Collections.unmodifiableList;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

Expand Down Expand Up @@ -846,14 +845,9 @@ public List<VertexDef> getVertices() {
return unmodifiableList(vertices);
}

public void sortAccordingTo(Map<String, Integer> vertexIdMap) {
int index = 0;
Map<Integer, Integer> vertexIdToPriority = new LinkedHashMap<>(vertexIdMap.size());
for (Integer id : vertexIdMap.values()) {
vertexIdToPriority.put(id, index++);
}
public void sortVerticesAccordingTo(ExecutionPlanBuilder.VertexData vertexData) {
vertices = vertices.stream()
.sorted(Comparator.comparingInt(v -> vertexIdToPriority.get(v.vertexId())))
.sorted(comparingInt(v -> vertexData.positionById(v.vertexId())))
.collect(toList());
}
}
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

import static com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder.VertexData.assignVertexIds;
import static com.hazelcast.jet.impl.util.ExceptionUtil.sneakyThrow;
import static com.hazelcast.jet.impl.util.PrefixedLogger.prefix;
import static com.hazelcast.jet.impl.util.PrefixedLogger.prefixedLogger;
Expand Down Expand Up @@ -80,13 +81,14 @@ public static CompletableFuture<Map<MemberInfo, ExecutionPlan>> createExecutionP
plans.put(member, new ExecutionPlan(partitionsByAddress, jobConfig, lastSnapshotId, memberIndex++,
clusterSize, isLightJob, subject));
}
final Map<String, Integer> vertexIdMap = assignVertexIds(dag);
final VertexData vertexData = assignVertexIds(dag);

ExecutorService initOffloadExecutor = nodeEngine.getExecutionService().getExecutor(JOB_OFFLOADABLE_EXECUTOR);
CompletableFuture[] futures = new CompletableFuture[vertexIdMap.entrySet().size()];
CompletableFuture[] futures = new CompletableFuture[vertexData.vertexIdMap.entrySet().size()];
int index = 0;
for (Entry<String, Integer> entry : vertexIdMap.entrySet()) {
for (Entry<String, Integer> entry : vertexData.vertexIdMap.entrySet()) {
final Vertex vertex = dag.getVertex(entry.getKey());
int currentIndex = index++;
assert vertex != null;
final ProcessorMetaSupplier metaSupplier = vertex.getMetaSupplier();
final int vertexId = entry.getValue();
Expand All @@ -97,9 +99,9 @@ public static CompletableFuture<Map<MemberInfo, ExecutionPlan>> createExecutionP
final int localParallelism = vertex.determineLocalParallelism(defaultParallelism);
final int totalParallelism = localParallelism * clusterSize;
final List<EdgeDef> inbound = toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig,
e -> vertexIdMap.get(e.getSourceName()), isJobDistributed);
e -> vertexData.idByName(e.getSourceName()), isJobDistributed);
final List<EdgeDef> outbound = toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig,
e -> vertexIdMap.get(e.getDestName()), isJobDistributed);
e -> vertexData.idByName(e.getDestName()), isJobDistributed);
String prefix = prefix(jobConfig.getName(), jobId, vertex.getName(), "#PMS");
ILogger logger = prefixedLogger(nodeEngine.getLogger(metaSupplier.getClass()), prefix);

Expand Down Expand Up @@ -134,25 +136,53 @@ public static CompletableFuture<Map<MemberInfo, ExecutionPlan>> createExecutionP
};
if (metaSupplier.initIsCooperative()) {
action.run();
futures[index++] = completedFuture(null);
futures[currentIndex] = completedFuture(null);
} else {
futures[index++] = CompletableFuture.runAsync(action, initOffloadExecutor);
futures[currentIndex] = CompletableFuture.runAsync(action, initOffloadExecutor);
}
}
return CompletableFuture.allOf(futures)
.thenCompose(r -> {
for (ExecutionPlan plan : plans.values()) {
plan.sortAccordingTo(vertexIdMap);
plan.sortVerticesAccordingTo(vertexData);
}
return completedFuture(plans);
});
}

private static Map<String, Integer> assignVertexIds(DAG dag) {
Map<String, Integer> vertexIdMap = new LinkedHashMap<>();
final int[] vertexId = {0};
dag.forEach(v -> vertexIdMap.put(v.getName(), vertexId[0]++));
return vertexIdMap;
/**
* Basic vertex data wrapper:
* - id
* - name
* - position
*/
static class VertexData {
private final LinkedHashMap<String, Integer> vertexIdMap;
private final HashMap<Integer, Integer> vertexPosByName;

private VertexData(LinkedHashMap<String, Integer> vertexIdMap) {
this.vertexIdMap = vertexIdMap;
int index = 0;
vertexPosByName = new LinkedHashMap<>(vertexIdMap.size());
for (Integer vertexId : vertexIdMap.values()) {
vertexPosByName.put(vertexId, index++);
}
}

Integer idByName(String vertexName) {
return vertexIdMap.get(vertexName);
}

int positionById(int vertexId) {
return vertexPosByName.get(vertexId);
}

static VertexData assignVertexIds(DAG dag) {
LinkedHashMap<String, Integer> vertexIdMap = new LinkedHashMap<>();
final int[] vertexId = {0};
dag.forEach(v -> vertexIdMap.put(v.getName(), vertexId[0]++));
return new VertexData(vertexIdMap);
}
}

private static List<EdgeDef> toEdgeDefs(
Expand Down

0 comments on commit e2ff7cf

Please sign in to comment.