Skip to content

Commit

Permalink
[FLINK-1607] [runtime] Moves PartialPartitionInfo from ExecutionVerte…
Browse files Browse the repository at this point in the history
…x to Execution to automatically clear them in case of restart

This closes #436.
  • Loading branch information
tillrohrmann authored and uce committed Feb 25, 2015
1 parent 2f1987a commit 607bf12
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 20 deletions.
Expand Up @@ -111,6 +111,7 @@ public class Execution implements Serializable {

private final FiniteDuration timeout;

private ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos;

private volatile ExecutionState state = CREATED;

Expand All @@ -134,6 +135,8 @@ public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp,
markTimestamp(ExecutionState.CREATED, startTimestamp);

this.timeout = timeout;

this.partialPartitionInfos = new ConcurrentLinkedQueue<PartialPartitionInfo>();
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -188,6 +191,9 @@ public void prepareForArchiving() {
throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running.");
}
assignedResource = null;

partialPartitionInfos.clear();
partialPartitionInfos = null;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -595,10 +601,11 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
}
}

void sendPartitionInfos() {
ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos =
vertex.getPartialPartitionInfos();
void cachePartitionInfo(PartialPartitionInfo partitionInfo) {
partialPartitionInfos.add(partitionInfo);
}

void sendPartitionInfos() {
// check if the ExecutionVertex has already been archived and thus cleared the
// partial partition infos queue
if(partialPartitionInfos != null && !partialPartitionInfos.isEmpty()) {
Expand Down
Expand Up @@ -47,7 +47,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.google.common.base.Preconditions.checkElementIndex;
Expand Down Expand Up @@ -75,8 +74,6 @@ public class ExecutionVertex implements Serializable {

private ExecutionEdge[][] inputEdges;

private ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos;

private final int subTaskIndex;

private final List<Execution> priorExecutions;
Expand Down Expand Up @@ -114,8 +111,6 @@ public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,

this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];

this.partialPartitionInfos = new ConcurrentLinkedQueue<PartialPartitionInfo>();

this.priorExecutions = new CopyOnWriteArrayList<Execution>();

this.currentExecution = new Execution(this, 0, createTimestamp, timeout);
Expand Down Expand Up @@ -204,10 +199,6 @@ public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
}

public ConcurrentLinkedQueue<PartialPartitionInfo> getPartialPartitionInfos() {
return partialPartitionInfos;
}

// --------------------------------------------------------------------------------------------
// Graph building
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -451,12 +442,10 @@ public void prepareForArchiving() throws IllegalStateException {
this.resultPartitions = null;
this.inputEdges = null;
this.locationConstraintInstances = null;
this.partialPartitionInfos.clear();
this.partialPartitionInfos = null;
}

public void cachePartitionInfo(PartialPartitionInfo partitionInfo){
this.partialPartitionInfos.add(partitionInfo);
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
}

void sendPartitionInfos() {
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,7 +33,6 @@
* This test starts a MiniYARNCluster with a CapacityScheduler.
* Is has, by default a queue called "default". The configuration here adds another queue: "qa-team".
*/
@Ignore("Failing as well :-(")
public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);

Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,7 +43,6 @@
* This test starts a MiniYARNCluster with a FIFO scheudler.
* There are no queues for that scheduler.
*/
@Ignore("Because if fails :-(")
public class YARNSessionFIFOITCase extends YarnTestBase {
private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);

Expand Down
2 changes: 1 addition & 1 deletion tools/log4j-travis.properties
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

log4j.rootLogger=DEBUG, file
log4j.rootLogger=INFO, file

# -----------------------------------------------------------------------------
# Console (use 'console')
Expand Down

0 comments on commit 607bf12

Please sign in to comment.