Skip to content

Commit

Permalink
[FLINK-9589][py] Make PythonOperationInfo immutable
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 14, 2018
1 parent eaff4da commit 2fc6499
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 41 deletions.
Expand Up @@ -20,37 +20,40 @@
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;

/**
* Generic container for all information required to an operation to the DataSet API.
*/
public class PythonOperationInfo {
public String identifier;
public int parentID; //DataSet that an operation is applied on
public int otherID; //secondary DataSet
public int setID; //ID for new DataSet
public String[] keys;
public String[] keys1; //join/cogroup keys
public String[] keys2; //join/cogroup keys
public TypeInformation<?> types; //typeinformation about output type
public Object[] values;
public int count;
public String field;
public Order order;
public String path;
public String fieldDelimiter;
public String lineDelimiter;
public long frm;
public long to;
public WriteMode writeMode;
public boolean toError;
public String name;
public boolean usesUDF;
public int parallelism;
public int envID;
public final String identifier;
public final int parentID; //DataSet that an operation is applied on
public final int otherID; //secondary DataSet
public final int setID; //ID for new DataSet
public final List<String> keys;
public final List<String> keys1; //join/cogroup keys
public final List<String> keys2; //join/cogroup keys
public final TypeInformation<?> types; //typeinformation about output type
public final List<Object> values;
public final int count;
public final String field;
public final Order order;
public final String path;
public final String fieldDelimiter;
public final String lineDelimiter;
public final long frm;
public final long to;
public final WriteMode writeMode;
public final boolean toError;
public final String name;
public final boolean usesUDF;
public final int parallelism;
public final int envID;

public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throws IOException {
identifier = (String) streamer.getRecord();
Expand All @@ -75,9 +78,9 @@ public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throw
order = Order.NONE;
break;
}
keys = normalizeKeys(streamer.getRecord(true));
keys1 = normalizeKeys(streamer.getRecord(true));
keys2 = normalizeKeys(streamer.getRecord(true));
keys = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
keys1 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
keys2 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
Object tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
usesUDF = (Boolean) streamer.getRecord();
Expand All @@ -94,10 +97,11 @@ public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throw
toError = (Boolean) streamer.getRecord();
count = (Integer) streamer.getRecord(true);
int valueCount = (Integer) streamer.getRecord(true);
values = new Object[valueCount];
List<Object> valueList = new ArrayList<>(valueCount);
for (int x = 0; x < valueCount; x++) {
values[x] = streamer.getRecord();
valueList.add(streamer.getRecord());
}
values = valueList;
parallelism = (Integer) streamer.getRecord(true);

envID = environmentID;
Expand All @@ -111,9 +115,9 @@ public String toString() {
sb.append("OtherID: ").append(otherID).append("\n");
sb.append("Name: ").append(name).append("\n");
sb.append("Types: ").append(types).append("\n");
sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
sb.append("Keys1: ").append(keys1).append("\n");
sb.append("Keys2: ").append(keys2).append("\n");
sb.append("Keys: ").append(keys).append("\n");
sb.append("Count: ").append(count).append("\n");
sb.append("Field: ").append(field).append("\n");
sb.append("Order: ").append(order.toString()).append("\n");
Expand Down
Expand Up @@ -55,6 +55,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
Expand Down Expand Up @@ -422,7 +423,7 @@ private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info
}

private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource")
sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
}

Expand Down Expand Up @@ -470,7 +471,7 @@ private void createBroadcastVariable(PythonOperationInfo info) {
private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op
.distinct(info.keys).setParallelism(info.parallelism).name("Distinct")
.distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
sets.add(info.setID, result);
}
Expand All @@ -495,13 +496,13 @@ private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {

private void createGroupOperation(PythonOperationInfo info) {
DataSet<?> op1 = sets.getDataSet(info.parentID);
sets.add(info.setID, op1.groupBy(info.keys));
sets.add(info.setID, op1.groupBy(info.keys.toArray(new String[info.keys.size()])));
}

private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op1
.partitionByHash(info.keys).setParallelism(info.parallelism)
.partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
sets.add(info.setID, result);
}
Expand Down Expand Up @@ -530,8 +531,8 @@ private <IN> void createUnionOperation(PythonOperationInfo info) {
private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
DataSet<IN1> op1 = sets.getDataSet(info.parentID);
DataSet<IN2> op2 = sets.getDataSet(info.otherID);
Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType());
Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType());
Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1.toArray(new String[info.keys1.size()]), op1.getType());
Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2.toArray(new String[info.keys2.size()]), op2.getType());
PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism));
}
Expand Down Expand Up @@ -623,19 +624,21 @@ private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, PythonOperat
}
}

private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, int parallelism) {
private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, List<String> firstKeys, List<String> secondKeys, DatasizeHint mode, int parallelism) {
String[] firstKeysArray = firstKeys.toArray(new String[firstKeys.size()]);
String[] secondKeysArray = secondKeys.toArray(new String[secondKeys.size()]);
switch (mode) {
case NONE:
return op1
.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
.join(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
case HUGE:
return op1
.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
.joinWithHuge(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
case TINY:
return op1
.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
.joinWithTiny(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
default:
throw new IllegalArgumentException("Invalid join mode specified.");
Expand Down

0 comments on commit 2fc6499

Please sign in to comment.