Skip to content

Commit

Permalink
address a few comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 12, 2019
1 parent 06f5333 commit bc2fe1b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
Expand Up @@ -80,7 +80,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
/** Type of this partition. Defines the concrete subpartition implementation to use. */
private final ResultPartitionType partitionType;

private boolean isManagedExternally;
private final boolean isManagedExternally;

/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
Expand Down Expand Up @@ -184,7 +184,7 @@ public ResultPartitionType getPartitionType() {
return partitionType;
}

public boolean isManagedExternally() {
boolean isManagedExternally() {
return isManagedExternally;
}

Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,16 +40,17 @@ public class ResultPartitionManager implements ResultPartitionProvider {

private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>(16);

private final boolean releaseExternallyManagedPartitionsOnConsumption;
private final boolean isReleaseExternallyManagedPartitionsOnConsumption;

private boolean isShutdown;

@VisibleForTesting
public ResultPartitionManager() {
this.releaseExternallyManagedPartitionsOnConsumption = true;
this.isReleaseExternallyManagedPartitionsOnConsumption = true;
}

public ResultPartitionManager(boolean releaseExternallyManagedPartitionsOnConsumption) {
this.releaseExternallyManagedPartitionsOnConsumption = releaseExternallyManagedPartitionsOnConsumption;
public ResultPartitionManager(boolean isReleaseExternallyManagedPartitionsOnConsumption) {
this.isReleaseExternallyManagedPartitionsOnConsumption = isReleaseExternallyManagedPartitionsOnConsumption;
}

public void registerResultPartition(ResultPartition partition) {
Expand Down Expand Up @@ -123,7 +126,7 @@ void onConsumedPartition(ResultPartition partition) {
final ResultPartition previous = registeredPartitions.remove(partition.getPartitionId());
// Release the partition if it was successfully removed
if (partition == previous) {
if (!partition.isManagedExternally() || releaseExternallyManagedPartitionsOnConsumption) {
if (!partition.isManagedExternally() || isReleaseExternallyManagedPartitionsOnConsumption) {
partition.release();
ResultPartitionID partitionId = partition.getPartitionId();
LOG.debug("Released partition {} produced by {}.",
Expand Down
Expand Up @@ -64,23 +64,21 @@ public void testCreateViewForRegisteredPartition() throws Exception {

@Test
public void testExternallyManagedPartitionReleaseIfFlagDisabled() {
final ResultPartitionManager partitionManager = new ResultPartitionManager(false);
final ResultPartition partition = new ResultPartitionBuilder().setIsExternallyManaged(true).build();

partitionManager.registerResultPartition(partition);
partitionManager.onConsumedPartition(partition);

assertThat(partition.isReleased(), is(false));
testExternallyManagedPartitionRelease(false);
}

@Test
public void testExternallyManagedPartitionReleaseIfFlagEnabled() {
final ResultPartitionManager partitionManager = new ResultPartitionManager(true);
testExternallyManagedPartitionRelease(true);
}

private static void testExternallyManagedPartitionRelease(boolean releaseExternallyManagedPartitions) {
final ResultPartitionManager partitionManager = new ResultPartitionManager(releaseExternallyManagedPartitions);
final ResultPartition partition = new ResultPartitionBuilder().setIsExternallyManaged(true).build();

partitionManager.registerResultPartition(partition);
partitionManager.onConsumedPartition(partition);

assertThat(partition.isReleased(), is(true));
assertThat(partition.isReleased(), is(releaseExternallyManagedPartitions));
}
}

0 comments on commit bc2fe1b

Please sign in to comment.