Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package cz.cuni.mff.xrg.odcs.backend.execution.dpu.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
Expand Down Expand Up @@ -90,7 +93,14 @@ protected boolean execute(Node node,
// which take care about this

// looks for edges that lead to our node
Set<Edge> edges = execution.getPipeline().getGraph().getEdges();
List<Edge> edges = new ArrayList<>(execution.getPipeline().getGraph().getEdges());
Collections.sort(edges, new Comparator<Edge>() {

@Override
public int compare(Edge o1, Edge o2) {
return o1.getId().compareTo(o2.getId());
}
});
for (Edge edge : edges) {
if (edge.getTo() == node) {
// we are the target .. add data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
package cz.cuni.mff.xrg.odcs.commons.app.pipeline.graph;

import java.util.HashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -32,18 +36,18 @@ public class DependencyGraph implements Iterable<Node> {
* Structure for building dependency graph mapping nodes to dependency
* nodes.
*/
private Map<Node, DependencyNode> dGraph = new HashMap<>();
private Map<Node, DependencyNode> dGraph = new LinkedHashMap<>();

/**
* List of Extractor nodes - nodes without dependencies
*/
private Set<DependencyNode> starters = new HashSet<>();
private List<DependencyNode> starters = new ArrayList<>();

/**
* Cache used for fast searching of node ancestors. A {@link Node} with no
* ancestor (no incoming {@link Edge}) is not indexed here at all.
*/
private Map<Node, Set<Node>> ancestorCache = new HashMap<>();
private Map<Node, List<Node>> ancestorCache = new LinkedHashMap<>();

/**
* Constructs dependency graph from given pipeline graph.
Expand Down Expand Up @@ -72,18 +76,28 @@ public DependencyGraph(PipelineGraph graph, Node debugNode) {

// now create trimmed PipelineGraph containing only nodes needed to run
// debugNode
Set<Node> oNodes = getAllAncestors(debugNode);
List<Node> oNodes = getAllAncestors(debugNode);
oNodes.add(debugNode);

Set<Edge> nEdges = new HashSet<>(graph.getEdges().size());
for (Edge edge : graph.getEdges()) {
Set<Edge> nEdges = new HashSet<>();
List<Edge> edges = new ArrayList<>(graph.getEdges());
Collections.sort(edges, new Comparator<Edge>() {

@Override
public int compare(Edge o1, Edge o2) {
return o1.getId().compareTo(o2.getId());
}
});
for (Edge edge : edges) {
if (oNodes.contains(edge.getFrom())
&& oNodes.contains(edge.getTo())) {
// Copy edge so we do not work with the edge from original
// graph. Otherwise calling persist/merge on pipeline will
// cascade to edge where the trimmed graph might be found
// as a new entity. See GH-1156.
nEdges.add(new Edge(edge.getFrom(), edge.getTo(), edge.getScript()));
Edge edge2 = new Edge(edge.getFrom(), edge.getTo(), edge.getScript());
edge2.setId(edge.getId());
nEdges.add(edge2);
}
}

Expand Down Expand Up @@ -114,7 +128,7 @@ public GraphIterator iterator() {
/**
* @return the extractors without inputs = the nodes which may be run first
*/
public Set<DependencyNode> getStarters() {
public List<DependencyNode> getStarters() {
return starters;
}

Expand All @@ -124,7 +138,7 @@ public Set<DependencyNode> getStarters() {
* @param node
* @return set of ancestors
*/
public Set<Node> getAncestors(Node node) {
public List<Node> getAncestors(Node node) {
return ancestorCache.get(node);
}

Expand All @@ -134,7 +148,7 @@ public Set<Node> getAncestors(Node node) {
* Always call after dependency graph is built!
*/
private void findStarters() {
starters = new HashSet<>();
starters = new ArrayList<>();
for (DependencyNode node : dGraph.values()) {
// extractors have no dependencies
if (node.getDependencies().isEmpty()) {
Expand All @@ -154,16 +168,33 @@ private void buildDependencyGraph(PipelineGraph graph) {

// clear all previous data
int noOfNodes = graph.getNodes().size();
dGraph = new HashMap<>(noOfNodes);
ancestorCache = new HashMap<>(noOfNodes);
dGraph = new LinkedHashMap<>(noOfNodes);
ancestorCache = new LinkedHashMap<>(noOfNodes);

// initialize map for dependency nodes
for (Node node : graph.getNodes()) {
List<Node> nodes = new ArrayList<>(graph.getNodes());
Collections.sort(nodes, new Comparator<Node>() {

@Override
public int compare(Node o1, Node o2) {
return o1.getId().compareTo(o2.getId());
}
});
for (Node node : nodes) {
dGraph.put(node, new DependencyNode(node));
}

List<Edge> edges = new ArrayList<>(graph.getEdges());
Collections.sort(edges, new Comparator<Edge>() {

@Override
public int compare(Edge o1, Edge o2) {
return o1.getId().compareTo(o2.getId());
}

});
// iterate over all edges and reflect them in dependency nodes
for (Edge e : graph.getEdges()) {
for (Edge e : edges) {

// find the target node in the dependency graph
DependencyNode tNode = dGraph.get(e.getTo());
Expand All @@ -188,9 +219,9 @@ private void buildDependencyGraph(PipelineGraph graph) {
* @param tNode
*/
private void cacheAncestor(Node sNode, Node tNode) {
Set<Node> nodes = ancestorCache.get(tNode);
List<Node> nodes = ancestorCache.get(tNode);
if (nodes == null) {
nodes = new HashSet<>();
nodes = new ArrayList<>();
ancestorCache.put(tNode, nodes);
}
nodes.add(sNode);
Expand All @@ -204,10 +235,10 @@ private void cacheAncestor(Node sNode, Node tNode) {
* @param node
* @return all dependencies of given node
*/
private Set<Node> getAllAncestors(Node node) {
private List<Node> getAllAncestors(Node node) {

Set<Node> oAncestors = ancestorCache.get(node);
Set<Node> ancestors = new HashSet<>();
List<Node> oAncestors = ancestorCache.get(node);
List<Node> ancestors = new ArrayList<>();

if (oAncestors != null) {
ancestors.addAll(oAncestors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,7 @@ public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package cz.cuni.mff.xrg.odcs.commons.app.pipeline.graph;

import java.util.Iterator;
import java.util.Set;
import java.util.List;

/**
* Iterates over dependency graph in an order that satisfies all dependencies.
Expand All @@ -30,7 +30,7 @@ public class GraphIterator implements Iterator<Node> {
/**
* Stack of nodes used to perform breath-first search.
*/
private final Set<DependencyNode> stack;
private final List<DependencyNode> stack;

/**
* Constructs iterator from dependency graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,8 @@ public boolean equals(Object o) {
public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public PipelineGraph(PipelineGraph graph) {
}

// create edges
edges = new HashSet(graph.edges.size());
edges = new HashSet<>();
for (Edge oldEdge : graph.getEdges()) {
Edge newEdge = new Edge(
nMap.get(oldEdge.getFrom()),
Expand All @@ -114,7 +114,7 @@ public PipelineGraph(PipelineGraph graph) {
}

// assign nodes
nodes = new HashSet(nMap.values());
nodes = new HashSet<>(nMap.values());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,12 @@ public void saveDpusInfo(TreeSet<DpuItem> dpusInformation, ZipOutputStream zipSt
} catch (IOException ex1) {
throw new ExportException(Messages.getString("ExportService.error"), ex1);
}

byte[] buffer = new byte[4096];
try {
final ZipEntry ze = new ZipEntry(ArchiveStructure.USED_DPUS.getValue());
zipStream.putNextEntry(ze);

// move jar file into the zip file
try (FileInputStream in = new FileInputStream(serializedTarget)) {
int len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public class DependencyTest {
public void testInlineDependencyResolution() {

PipelineGraph graph = buildGraph(5);
graph.addEdge(nodes[2], nodes[3]);
graph.addEdge(nodes[0], nodes[1]);
graph.addEdge(nodes[1], nodes[2]);
graph.addEdge(nodes[3], nodes[4]);
graph.addEdge(nodes[2], nodes[3]).setId(1L);
graph.addEdge(nodes[0], nodes[1]).setId(2L);
graph.addEdge(nodes[1], nodes[2]).setId(3L);
graph.addEdge(nodes[3], nodes[4]).setId(4L);

DependencyGraph dGraph = new DependencyGraph(graph);

Expand All @@ -72,10 +72,10 @@ public void testInlineDependencyResolution() {
public void testComplexDependencyResolution() {

PipelineGraph graph = buildGraph(5);
graph.addEdge(nodes[0], nodes[1]);
graph.addEdge(nodes[1], nodes[2]);
graph.addEdge(nodes[2], nodes[3]);
graph.addEdge(nodes[4], nodes[2]);
graph.addEdge(nodes[0], nodes[1]).setId(1L);;
graph.addEdge(nodes[1], nodes[2]).setId(2L);
graph.addEdge(nodes[2], nodes[3]).setId(3L);
graph.addEdge(nodes[4], nodes[2]).setId(4L);

DependencyGraph dGraph = new DependencyGraph(graph);

Expand Down Expand Up @@ -116,11 +116,11 @@ public void testComplexDependencyResolution() {
public void testCircularDependencyResolution() {

PipelineGraph graph = buildGraph(5);
graph.addEdge(nodes[0], nodes[1]);
graph.addEdge(nodes[1], nodes[2]);
graph.addEdge(nodes[2], nodes[3]);
graph.addEdge(nodes[3], nodes[4]);
graph.addEdge(nodes[3], nodes[1]);
graph.addEdge(nodes[0], nodes[1]).setId(1L);
graph.addEdge(nodes[1], nodes[2]).setId(2L);
graph.addEdge(nodes[2], nodes[3]).setId(3L);
graph.addEdge(nodes[3], nodes[4]).setId(4L);
graph.addEdge(nodes[3], nodes[1]).setId(5L);

DependencyGraph dGraph = new DependencyGraph(graph);
GraphIterator iter = dGraph.iterator();
Expand Down Expand Up @@ -168,12 +168,12 @@ public void testDebugNodeInTrivialGraph() {
public void testDebugNodeInComplexGraph() {

PipelineGraph graph = buildGraph(7);
graph.addEdge(nodes[0], nodes[1]);
graph.addEdge(nodes[1], nodes[2]);
graph.addEdge(nodes[2], nodes[3]);
graph.addEdge(nodes[4], nodes[5]);
graph.addEdge(nodes[5], nodes[1]);
graph.addEdge(nodes[6], nodes[2]);
graph.addEdge(nodes[0], nodes[1]).setId(1L);
graph.addEdge(nodes[1], nodes[2]).setId(2L);
graph.addEdge(nodes[2], nodes[3]).setId(3L);
graph.addEdge(nodes[4], nodes[5]).setId(4L);
graph.addEdge(nodes[5], nodes[1]).setId(5L);
graph.addEdge(nodes[6], nodes[2]).setId(6L);

DependencyGraph dGraph = new DependencyGraph(graph, nodes[1]);
GraphIterator iter = dGraph.iterator();
Expand Down Expand Up @@ -207,6 +207,7 @@ private PipelineGraph buildGraph(int size) {
nodes = new Node[size];
for (int i = 0; i < size; i++) {
Node node = new Node();
node.setId(Long.valueOf(i));
nodes[i] = node;
graph.addNode(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.LinkedHashSet;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -50,7 +49,7 @@ public void testSetEdges() {
Node nodeA = new Node();
Node nodeB = new Node();
Edge edge = new Edge(nodeA, nodeB);
Set<Edge> edges = new HashSet<>(Arrays.asList(edge));
LinkedHashSet<Edge> edges = new LinkedHashSet<>(Arrays.asList(edge));
instance.setEdges(edges);

assertEquals(edges, instance.getEdges());
Expand Down