Skip to content

Commit

Permalink
[streaming] Streaming runtime collector rework for increased flexibil…
Browse files Browse the repository at this point in the history
…ity with directed outputs and operator chains
  • Loading branch information
gyfora authored and mbalassi committed Jan 21, 2015
1 parent dc0d81b commit d9b942b
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 86 deletions.
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.collector;

import java.util.LinkedList;
import java.util.List;

import org.apache.flink.util.Collector;

public class CollectorWrapper<OUT> implements Collector<OUT> {

List<Collector<OUT>> outputs;

public CollectorWrapper() {
this.outputs = new LinkedList<Collector<OUT>>();
}

public void addChainedOutput(Collector<OUT> output) {
outputs.add(output);
}

@Override
public void collect(OUT record) {
for(Collector<OUT> output: outputs){
output.collect(record);;
}
}

@Override
public void close() {
}

}
Expand Up @@ -17,12 +17,13 @@


package org.apache.flink.streaming.api.collector; package org.apache.flink.streaming.api.collector;


import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;


import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
Expand All @@ -36,13 +37,16 @@
* @param <OUT> * @param <OUT>
* Type of the Tuple collected. * Type of the Tuple collected.
*/ */
public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> { public class DirectedOutputWrapper<OUT> extends StreamOutputWrapper<OUT> {


private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class); private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputWrapper.class);


OutputSelector<OUT> outputSelector; OutputSelector<OUT> outputSelector;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> selectAllOutputs;
private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted; protected Map<String, List<StreamOutput<OUT>>> outputMap;

private List<StreamOutput<OUT>> selectAllOutputs;
private Set<StreamOutput<OUT>> emitted;


/** /**
* Creates a new DirectedStreamCollector * Creates a new DirectedStreamCollector
Expand All @@ -54,50 +58,60 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
* @param outputSelector * @param outputSelector
* User defined {@link OutputSelector} * User defined {@link OutputSelector}
*/ */
public DirectedStreamCollector(int channelID, public DirectedOutputWrapper(int channelID,
SerializationDelegate<StreamRecord<OUT>> serializationDelegate, SerializationDelegate<StreamRecord<OUT>> serializationDelegate,
OutputSelector<OUT> outputSelector) { OutputSelector<OUT> outputSelector) {
super(channelID, serializationDelegate); super(channelID, serializationDelegate);
this.outputSelector = outputSelector; this.outputSelector = outputSelector;
this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); this.emitted = new HashSet<StreamOutput<OUT>>();
this.selectAllOutputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); this.selectAllOutputs = new LinkedList<StreamOutput<OUT>>();
this.outputMap = new HashMap<String, List<StreamOutput<OUT>>>();

} }


@Override @Override
public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, public void addOutput(StreamOutput<OUT> output) {
List<String> outputNames, boolean isSelectAllOutput) {


if (isSelectAllOutput) { super.addOutput(output);

if (output.isSelectAll()) {
selectAllOutputs.add(output); selectAllOutputs.add(output);
} else { } else {
addOneOutput(output, outputNames, isSelectAllOutput); for (String selectedName : output.getSelectedNames()) {
if (selectedName != null) {
if (!outputMap.containsKey(selectedName)) {
outputMap.put(selectedName, new LinkedList<StreamOutput<OUT>>());
outputMap.get(selectedName).add(output);
} else {
if (!outputMap.get(selectedName).contains(output)) {
outputMap.get(selectedName).add(output);
}
}

}
}
} }
} }


/** @Override
* Emits a StreamRecord to the outputs selected by the user defined protected void emit() {
* OutputSelector Iterable<String> outputNames = outputSelector.select(serializationDelegate.getInstance()
* .getObject());
*/
protected void emitToOutputs() {
Iterable<String> outputNames = outputSelector.select(streamRecord.getObject());
emitted.clear(); emitted.clear();


for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) { for (StreamOutput<OUT> output : selectAllOutputs) {
try { try {
output.emit(serializationDelegate); output.collect(serializationDelegate);
} catch (Exception e) { } catch (Exception e) {
if (LOG.isErrorEnabled()) { if (LOG.isErrorEnabled()) {
LOG.error("Emit to {} failed due to: {}", output, LOG.error("Emit to {} failed due to: {}", output,
StringUtils.stringifyException(e)); StringUtils.stringifyException(e));
} }
} }
} }
emitted.addAll(selectAllOutputs);


for (String outputName : outputNames) { for (String outputName : outputNames) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap List<StreamOutput<OUT>> outputList = outputMap.get(outputName);
.get(outputName);
try { try {
if (outputList == null) { if (outputList == null) {
if (LOG.isErrorEnabled()) { if (LOG.isErrorEnabled()) {
Expand All @@ -109,9 +123,9 @@ protected void emitToOutputs() {
} }
} else { } else {


for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) { for (StreamOutput<OUT> output : outputList) {
if (!emitted.contains(output)) { if (!emitted.contains(output)) {
output.emit(serializationDelegate); output.collect(serializationDelegate);
emitted.add(output); emitted.add(output);
} }
} }
Expand Down
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.collector;

import java.util.List;

import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

public class StreamOutput<OUT> implements Collector<SerializationDelegate<StreamRecord<OUT>>> {

private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;

private List<String> selectedNames;
private boolean selectAll = true;

public StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
List<String> selectedNames) {

this.output = output;

if (selectedNames != null) {
this.selectedNames = selectedNames;
selectAll = false;
}
}

public void collect(SerializationDelegate<StreamRecord<OUT>> record) {
try {
output.emit(record);
} catch (Exception e) {
throw new RuntimeException("Could not emit record: " + record.getInstance());
}
}

@Override
public void close() {
}

public boolean isSelectAll() {
return selectAll;
}

public List<String> getSelectedNames() {
return selectedNames;
}

}
Expand Up @@ -17,10 +17,8 @@


package org.apache.flink.streaming.api.collector; package org.apache.flink.streaming.api.collector;


import java.util.ArrayList; import java.util.LinkedList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;


import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.plugable.SerializationDelegate;
Expand All @@ -38,14 +36,13 @@
* @param <OUT> * @param <OUT>
* Type of the Tuples/Objects collected. * Type of the Tuples/Objects collected.
*/ */
public class StreamCollector<OUT> implements Collector<OUT> { public class StreamOutputWrapper<OUT> implements Collector<OUT> {


private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class); private static final Logger LOG = LoggerFactory.getLogger(StreamOutputWrapper.class);


protected StreamRecord<OUT> streamRecord; protected StreamRecord<OUT> streamRecord;
protected int channelID; protected int channelID;
protected List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs; protected List<StreamOutput<OUT>> outputs;
protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap;
protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate; protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate;


/** /**
Expand All @@ -56,7 +53,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
* @param serializationDelegate * @param serializationDelegate
* Serialization delegate used for serialization * Serialization delegate used for serialization
*/ */
public StreamCollector(int channelID, public StreamOutputWrapper(int channelID,
SerializationDelegate<StreamRecord<OUT>> serializationDelegate) { SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
this.serializationDelegate = serializationDelegate; this.serializationDelegate = serializationDelegate;


Expand All @@ -66,8 +63,7 @@ public StreamCollector(int channelID,
this.streamRecord = new StreamRecord<OUT>(); this.streamRecord = new StreamRecord<OUT>();
} }
this.channelID = channelID; this.channelID = channelID;
this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); this.outputs = new LinkedList<StreamOutput<OUT>>();
this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
} }


/** /**
Expand All @@ -80,29 +76,13 @@ public StreamCollector(int channelID,
* @param isSelectAllOutput * @param isSelectAllOutput
* Marks whether all the outputs are selected. * Marks whether all the outputs are selected.
*/ */
public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, public void addOutput(StreamOutput<OUT> output) {
List<String> outputNames, boolean isSelectAllOutput) { outputs.add(output);
addOneOutput(output, outputNames, isSelectAllOutput);
} }


protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
List<String> outputNames, boolean isSelectAllOutput) { List<String> outputNames, boolean isSelectAllOutput) {
outputs.add(output);
for (String outputName : outputNames) {
if (outputName != null) {
if (!outputMap.containsKey(outputName)) {
outputMap
.put(outputName,
new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
outputMap.get(outputName).add(output);
} else {
if (!outputMap.get(outputName).contains(output)) {
outputMap.get(outputName).add(output);
}
}


}
}
} }


/** /**
Expand All @@ -115,25 +95,19 @@ protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>
@Override @Override
public void collect(OUT outputObject) { public void collect(OUT outputObject) {
streamRecord.setObject(outputObject); streamRecord.setObject(outputObject);
emit(streamRecord);
}

/**
* Emits a StreamRecord to the outputs.
*
* @param streamRecord
* StreamRecord to emit.
*/
private void emit(StreamRecord<OUT> streamRecord) {
streamRecord.newId(channelID); streamRecord.newId(channelID);
serializationDelegate.setInstance(streamRecord); serializationDelegate.setInstance(streamRecord);
emitToOutputs();
emit();
} }


protected void emitToOutputs() { /**
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) { * Emits the current streamrecord to the outputs.
*/
protected void emit() {
for (StreamOutput<OUT> output : outputs) {
try { try {
output.emit(serializationDelegate); output.collect(serializationDelegate);
} catch (Exception e) { } catch (Exception e) {
if (LOG.isErrorEnabled()) { if (LOG.isErrorEnabled()) {
LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e)); LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e));
Expand Down
Expand Up @@ -49,7 +49,6 @@ public InputHandler(StreamVertex<IN, ?> streamComponent) {


} }


@SuppressWarnings("unchecked")
protected void setConfigInputs() throws StreamVertexException { protected void setConfigInputs() throws StreamVertexException {
inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader); inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);


Expand Down

0 comments on commit d9b942b

Please sign in to comment.