Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
azagrebin committed Apr 25, 2019
1 parent 7fec311 commit c5b90ba
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,8 @@ private void sendCancelRpcCall(int numberRetries) {
}

private void sendReleaseIntermediateResultPartitionsRpcCall() {
final LogicalSlot slot = assignedResource;
LOG.info("Discarding the results produced by task execution {}.", attemptId);
final LogicalSlot slot = assignedResource;

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void setupInputGate(SingleInputGate gate) throws IOException {
*/
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
for (ResultPartitionID partitionId : partitionIds) {
resultPartitionManager.releasePartitionsProducedBy(partitionId, null);
resultPartitionManager.releasePartition(partitionId, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void close() {
}

public void fail(@Nullable Throwable throwable) {
partitionManager.releasePartitionsProducedBy(partitionId, throwable);
partitionManager.releasePartition(partitionId, throwable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {

private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class);

private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>();
private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>(16);

private boolean isShutdown;

Expand Down Expand Up @@ -72,13 +72,13 @@ public ResultSubpartitionView createSubpartitionView(
}
}

public void releasePartitionsProducedBy(ResultPartitionID partitionId, Throwable cause) {
public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
synchronized (registeredPartitions) {
if (registeredPartitions.containsKey(partitionId)) {
ResultPartition resultPartition = registeredPartitions.remove(partitionId);
if (resultPartition != null) {
registeredPartitions.get(partitionId).release(cause);
registeredPartitions.remove(partitionId);
LOG.debug("Released partition {} produced by {}.",
partitionId.getPartitionId(), partitionId.getPartitionId());
partitionId.getPartitionId(), partitionId.getProducerId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {

}

@Override
Expand Down

0 comments on commit c5b90ba

Please sign in to comment.