Skip to content

Commit

Permalink
[FLINK-1505] [distributed runtime] Separate reader API from result co…
Browse files Browse the repository at this point in the history
…nsumption

This closes #428.
  • Loading branch information
uce committed Feb 23, 2015
1 parent a911559 commit 5232c56
Show file tree
Hide file tree
Showing 52 changed files with 1,798 additions and 1,576 deletions.
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.BufferReader;
import org.apache.flink.runtime.io.network.api.writer.BufferWriter; import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand All @@ -50,10 +50,10 @@ public interface Environment {
* @return the ID of the job from the original job graph * @return the ID of the job from the original job graph
*/ */
JobID getJobID(); JobID getJobID();

/** /**
* Gets the ID of the jobVertex that this task corresponds to. * Gets the ID of the jobVertex that this task corresponds to.
* *
* @return The JobVertexID of this task. * @return The JobVertexID of this task.
*/ */
JobVertexID getJobVertexId(); JobVertexID getJobVertexId();
Expand Down Expand Up @@ -130,18 +130,12 @@ public interface Environment {


BroadcastVariableManager getBroadcastVariableManager(); BroadcastVariableManager getBroadcastVariableManager();


// ------------------------------------------------------------------------
// Runtime result writers and readers
// ------------------------------------------------------------------------
// The environment sets up buffer-oriented writers and readers, which the
// user can use to produce and consume results.
// ------------------------------------------------------------------------

BufferWriter getWriter(int index); BufferWriter getWriter(int index);


BufferWriter[] getAllWriters(); BufferWriter[] getAllWriters();


BufferReader getReader(int index); InputGate getInputGate(int index);

InputGate[] getAllInputGates();


BufferReader[] getAllReaders();
} }
Expand Up @@ -27,9 +27,10 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.reader.BufferReader;
import org.apache.flink.runtime.io.network.api.writer.BufferWriter; import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition; import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -95,9 +96,9 @@ public class RuntimeEnvironment implements Environment, Runnable {


private final BufferWriter[] writers; private final BufferWriter[] writers;


private final BufferReader[] readers; private final SingleInputGate[] inputGates;


private final Map<IntermediateDataSetID, BufferReader> readersById = new HashMap<IntermediateDataSetID, BufferReader>(); private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();


public RuntimeEnvironment( public RuntimeEnvironment(
ActorRef jobManager, Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader, ActorRef jobManager, Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader,
Expand Down Expand Up @@ -128,15 +129,18 @@ public RuntimeEnvironment(
// Consumed intermediate result partitions // Consumed intermediate result partitions
final List<PartitionConsumerDeploymentDescriptor> consumedPartitions = tdd.getConsumedPartitions(); final List<PartitionConsumerDeploymentDescriptor> consumedPartitions = tdd.getConsumedPartitions();


this.readers = new BufferReader[consumedPartitions.size()]; this.inputGates = new SingleInputGate[consumedPartitions.size()];


for (int i = 0; i < readers.length; i++) { for (int i = 0; i < inputGates.length; i++) {
readers[i] = BufferReader.create(this, networkEnvironment, consumedPartitions.get(i)); inputGates[i] = SingleInputGate.create(networkEnvironment, consumedPartitions.get(i));


// The readers are organized by key for task updates/channel updates at runtime // The input gates are organized by key for task updates/channel updates at runtime
readersById.put(readers[i].getConsumedResultId(), readers[i]); inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]);
} }


this.jobConfiguration = tdd.getJobConfiguration();
this.taskConfiguration = tdd.getTaskConfiguration();

// ---------------------------------------------------------------- // ----------------------------------------------------------------
// Invokable setup // Invokable setup
// ---------------------------------------------------------------- // ----------------------------------------------------------------
Expand All @@ -163,9 +167,6 @@ public RuntimeEnvironment(
throw new Exception("Could not instantiate the invokable class.", t); throw new Exception("Could not instantiate the invokable class.", t);
} }


this.jobConfiguration = tdd.getJobConfiguration();
this.taskConfiguration = tdd.getTaskConfiguration();

this.invokable.setEnvironment(this); this.invokable.setEnvironment(this);
this.invokable.registerInputOutput(); this.invokable.registerInputOutput();
} }
Expand Down Expand Up @@ -361,23 +362,23 @@ public BufferWriter[] getAllWriters() {
} }


@Override @Override
public BufferReader getReader(int index) { public InputGate getInputGate(int index) {
checkElementIndex(index, readers.length, "Illegal environment reader request."); checkElementIndex(index, inputGates.length);


return readers[index]; return inputGates[index];
} }


@Override @Override
public BufferReader[] getAllReaders() { public SingleInputGate[] getAllInputGates() {
return readers; return inputGates;
} }


public IntermediateResultPartition[] getProducedPartitions() { public IntermediateResultPartition[] getProducedPartitions() {
return producedPartitions; return producedPartitions;
} }


public BufferReader getReaderById(IntermediateDataSetID id) { public SingleInputGate getInputGateById(IntermediateDataSetID id) {
return readersById.get(id); return inputGatesById.get(id);
} }


@Override @Override
Expand Down
Expand Up @@ -21,14 +21,14 @@
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.util.Timeout; import akka.util.Timeout;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.reader.BufferReader;
import org.apache.flink.runtime.io.network.api.writer.BufferWriter; import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition; import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager; import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.taskmanager.TaskManager;
Expand Down Expand Up @@ -154,14 +154,14 @@ public void registerTask(Task task) throws IOException {
} }


// Setup the buffer pool for each buffer reader // Setup the buffer pool for each buffer reader
final BufferReader[] readers = task.getReaders(); final SingleInputGate[] inputGates = task.getInputGates();


for (BufferReader reader : readers) { for (SingleInputGate gate : inputGates) {
BufferPool bufferPool = null; BufferPool bufferPool = null;


try { try {
bufferPool = networkBufferPool.createBufferPool(reader.getNumberOfInputChannels(), false); bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
reader.setBufferPool(bufferPool); gate.setBufferPool(bufferPool);
} }
catch (Throwable t) { catch (Throwable t) {
if (bufferPool != null) { if (bufferPool != null) {
Expand Down Expand Up @@ -191,13 +191,13 @@ public void unregisterTask(Task task) {


taskEventDispatcher.unregisterWriters(executionId); taskEventDispatcher.unregisterWriters(executionId);


final BufferReader[] readers = task.getReaders(); final SingleInputGate[] inputGates = task.getInputGates();


if (readers != null) { if (inputGates != null) {
for (BufferReader reader : readers) { for (SingleInputGate gate : inputGates) {
try { try {
if (reader != null) { if (gate != null) {
reader.releaseAllResources(); gate.releaseAllResources();
} }
} }
catch (IOException e) { catch (IOException e) {
Expand Down
Expand Up @@ -16,40 +16,42 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.runtime.util.event; package org.apache.flink.runtime.io.network.api;


import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;


/** /**
* The event handler manages {@link EventListener} instances and allows to * The event handler manages {@link EventListener} instances and allows to
* to publish events to them. * to publish events to them.
*/ */
public class EventNotificationHandler<T> { public class TaskEventHandler {


// Listeners for each event type // Listeners for each event type
private final Multimap<Class<? extends T>, EventListener<T>> listeners = HashMultimap.create(); private final Multimap<Class<? extends TaskEvent>, EventListener<TaskEvent>> listeners = HashMultimap.create();


public void subscribe(EventListener<T> listener, Class<? extends T> eventType) { public void subscribe(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
synchronized (listeners) { synchronized (listeners) {
listeners.put(eventType, listener); listeners.put(eventType, listener);
} }
} }


public void unsubscribe(EventListener<T> listener, Class<? extends T> eventType) { public void unsubscribe(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
synchronized (listeners) { synchronized (listeners) {
listeners.remove(eventType, listener); listeners.remove(eventType, listener);
} }
} }


/** /**
* Publishes the event to all subscribed {@link EventListener} objects. * Publishes the task event to all subscribed event listeners..
* *
* @param event The event to publish. * @param event The event to publish.
*/ */
public void publish(T event) { public void publish(TaskEvent event) {
synchronized (listeners) { synchronized (listeners) {
for (EventListener<T> listener : listeners.get((Class<? extends T>) event.getClass())) { for (EventListener<TaskEvent> listener : listeners.get(event.getClass())) {
listener.onEvent(event); listener.onEvent(event);
} }
} }
Expand Down
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.api.reader;

import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;

import java.io.IOException;

import static com.google.common.base.Preconditions.checkState;

/**
* A basic reader implementation, which wraps an input gate and handles events.
*/
public abstract class AbstractReader implements ReaderBase {

/** The input gate to read from. */
protected final InputGate inputGate;

/** The task event handler to manage task event subscriptions. */
private final TaskEventHandler taskEventHandler = new TaskEventHandler();

/** Flag indicating whether this reader allows iteration events. */
private boolean isIterative;

/**
* The current number of end of superstep events (reset for each superstep). A superstep is
* finished after an end of superstep event has been received for each input channel.
*/
private int currentNumberOfEndOfSuperstepEvents;

protected AbstractReader(InputGate inputGate) {
this.inputGate = inputGate;
}

@Override
public boolean isFinished() {
return inputGate.isFinished();
}

// ------------------------------------------------------------------------
// Events
// ------------------------------------------------------------------------

@Override
public void registerTaskEventListener(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
taskEventHandler.subscribe(listener, eventType);
}

@Override
public void sendTaskEvent(TaskEvent event) throws IOException {
inputGate.sendTaskEvent(event);
}

/**
* Handles the event and returns whether the reader reached an end-of-stream event (either the
* end of the whole stream or the end of an superstep).
*/
protected boolean handleEvent(AbstractEvent event) throws IOException {
final Class<?> eventType = event.getClass();

try {
// ------------------------------------------------------------
// Runtime events
// ------------------------------------------------------------

// This event is also checked at the (single) input gate to release the respective
// channel, at which it was received.
if (eventType == EndOfPartitionEvent.class) {
return true;
}
else if (eventType == EndOfSuperstepEvent.class) {
return incrementEndOfSuperstepEventAndCheck();
}

// ------------------------------------------------------------
// Task events (user)
// ------------------------------------------------------------
else if (event instanceof TaskEvent) {
taskEventHandler.publish((TaskEvent) event);

return false;
}
else {
throw new IllegalStateException("Received unexpected event of type " + eventType + " at reader.");
}
}
catch (Throwable t) {
throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(), t);
}
}

// ------------------------------------------------------------------------
// Iterations
// ------------------------------------------------------------------------

@Override
public void setIterativeReader() {
isIterative = true;
}

@Override
public void startNextSuperstep() {
checkState(isIterative, "Tried to start next superstep in a non-iterative reader.");
checkState(currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels(), "Tried to start next superstep before reaching end of previous superstep.");

currentNumberOfEndOfSuperstepEvents = 0;
}

@Override
public boolean hasReachedEndOfSuperstep() {
if (isIterative) {
return currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels();
}

return false;
}

private boolean incrementEndOfSuperstepEventAndCheck() {
checkState(isIterative, "Tried to increment superstep count in a non-iterative reader.");
checkState(currentNumberOfEndOfSuperstepEvents + 1 <= inputGate.getNumberOfInputChannels(), "Received too many (" + currentNumberOfEndOfSuperstepEvents + ") end of superstep events.");

return ++currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels();
}
}

0 comments on commit 5232c56

Please sign in to comment.