Skip to content

Commit

Permalink
[FLINK-2901] Move PythonAPI to flink-libraries
Browse files Browse the repository at this point in the history
This closes #1257
  • Loading branch information
supermegaciaccount committed Nov 12, 2015
1 parent 2063fa1 commit 824074a
Show file tree
Hide file tree
Showing 58 changed files with 1,039 additions and 1,360 deletions.
2 changes: 1 addition & 1 deletion flink-dist/src/main/assemblies/bin.xml
Expand Up @@ -170,7 +170,7 @@ under the License.


<!-- copy python package --> <!-- copy python package -->
<fileSet> <fileSet>
<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory> <directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory>
<outputDirectory>resources/python</outputDirectory> <outputDirectory>resources/python</outputDirectory>
<fileMode>0755</fileMode> <fileMode>0755</fileMode>
</fileSet> </fileSet>
Expand Down
Expand Up @@ -22,11 +22,11 @@ under the License.


<parent> <parent>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-language-binding-parent</artifactId> <artifactId>flink-libraries</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<relativePath>..</relativePath> <relativePath>..</relativePath>
</parent> </parent>

<artifactId>flink-python</artifactId> <artifactId>flink-python</artifactId>
<name>flink-python</name> <name>flink-python</name>
<packaging>jar</packaging> <packaging>jar</packaging>
Expand All @@ -43,7 +43,7 @@ under the License.
<archive> <archive>
<manifest> <manifest>
<addClasspath>true</addClasspath> <addClasspath>true</addClasspath>
<mainClass>org.apache.flink.languagebinding.api.java.python.PythonPlanBinder</mainClass> <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
</manifest> </manifest>
</archive> </archive>
</configuration> </configuration>
Expand Down Expand Up @@ -77,10 +77,5 @@ under the License.
<artifactId>flink-clients</artifactId> <artifactId>flink-clients</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-language-binding-generic</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
Expand Up @@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License. * specific language governing permissions and limitations under the License.
*/ */
package org.apache.flink.languagebinding.api.java.common; package org.apache.flink.python.api;


import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
Expand All @@ -20,17 +20,10 @@
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.languagebinding.api.java.common.PlanBinder.Operation; import org.apache.flink.python.api.PythonPlanBinder.Operation;
import static org.apache.flink.languagebinding.api.java.common.PlanBinder.normalizeKeys; import org.apache.flink.python.api.streaming.Receiver;
import static org.apache.flink.languagebinding.api.java.common.PlanBinder.toIntArray;
import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;


/** public class PythonOperationInfo {
* Container for all generic information related to operations. This class contains the absolute minimum fields that are
* required for all operations. This class should be extended to contain any additional fields required on a
* per-language basis.
*/
public class OperationInfo {
public int parentID; //DataSet that an operation is applied on public int parentID; //DataSet that an operation is applied on
public int otherID; //secondary DataSet public int otherID; //secondary DataSet
public int setID; //ID for new DataSet public int setID; //ID for new DataSet
Expand All @@ -40,6 +33,7 @@ public class OperationInfo {
public TypeInformation<?> types; //typeinformation about output type public TypeInformation<?> types; //typeinformation about output type
public AggregationEntry[] aggregates; public AggregationEntry[] aggregates;
public ProjectionEntry[] projections; //projectFirst/projectSecond public ProjectionEntry[] projections; //projectFirst/projectSecond
public boolean combine;
public Object[] values; public Object[] values;
public int count; public int count;
public int field; public int field;
Expand All @@ -54,10 +48,7 @@ public class OperationInfo {
public boolean toError; public boolean toError;
public String name; public String name;


public OperationInfo() { public PythonOperationInfo(Receiver receiver, Operation identifier) throws IOException {
}

public OperationInfo(Receiver receiver, Operation identifier) throws IOException {
Object tmpType; Object tmpType;
switch (identifier) { switch (identifier) {
case SOURCE_CSV: case SOURCE_CSV:
Expand Down Expand Up @@ -160,6 +151,63 @@ public OperationInfo(Receiver receiver, Operation identifier) throws IOException
case UNION: case UNION:
otherID = (Integer) receiver.getRecord(true); otherID = (Integer) receiver.getRecord(true);
return; return;
case COGROUP:
otherID = (Integer) receiver.getRecord(true);
keys1 = normalizeKeys(receiver.getRecord(true));
keys2 = normalizeKeys(receiver.getRecord(true));
tmpType = receiver.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) receiver.getRecord();
return;
case CROSS:
case CROSS_H:
case CROSS_T:
otherID = (Integer) receiver.getRecord(true);
tmpType = receiver.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
int cProjectCount = (Integer) receiver.getRecord(true);
projections = new ProjectionEntry[cProjectCount];
for (int x = 0; x < cProjectCount; x++) {
String side = (String) receiver.getRecord();
int[] keys = toIntArray((Tuple) receiver.getRecord(true));
projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
}
name = (String) receiver.getRecord();
return;
case REDUCE:
case GROUPREDUCE:
tmpType = receiver.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
combine = (Boolean) receiver.getRecord();
name = (String) receiver.getRecord();
return;
case JOIN:
case JOIN_H:
case JOIN_T:
keys1 = normalizeKeys(receiver.getRecord(true));
keys2 = normalizeKeys(receiver.getRecord(true));
otherID = (Integer) receiver.getRecord(true);
tmpType = receiver.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
int jProjectCount = (Integer) receiver.getRecord(true);
projections = new ProjectionEntry[jProjectCount];
for (int x = 0; x < jProjectCount; x++) {
String side = (String) receiver.getRecord();
int[] keys = toIntArray((Tuple) receiver.getRecord(true));
projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
}
name = (String) receiver.getRecord();
return;
case MAPPARTITION:
case FLATMAP:
case MAP:
case FILTER:
tmpType = receiver.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) receiver.getRecord();
return;
default:
throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier);
} }
} }


Expand All @@ -176,6 +224,7 @@ public String toString() {
sb.append("Keys: ").append(Arrays.toString(keys)).append("\n"); sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n"); sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n");
sb.append("Projections: ").append(Arrays.toString(projections)).append("\n"); sb.append("Projections: ").append(Arrays.toString(projections)).append("\n");
sb.append("Combine: ").append(combine).append("\n");
sb.append("Count: ").append(count).append("\n"); sb.append("Count: ").append(count).append("\n");
sb.append("Field: ").append(field).append("\n"); sb.append("Field: ").append(field).append("\n");
sb.append("Order: ").append(order.toString()).append("\n"); sb.append("Order: ").append(order.toString()).append("\n");
Expand Down Expand Up @@ -239,4 +288,57 @@ public enum DatasizeHint {
TINY, TINY,
HUGE HUGE
} }

//====Utility=======================================================================================================
private static String[] normalizeKeys(Object keys) {
if (keys instanceof Tuple) {
Tuple tupleKeys = (Tuple) keys;
if (tupleKeys.getArity() == 0) {
return new String[0];
}
if (tupleKeys.getField(0) instanceof Integer) {
String[] stringKeys = new String[tupleKeys.getArity()];
for (int x = 0; x < stringKeys.length; x++) {
stringKeys[x] = "f" + (Integer) tupleKeys.getField(x);
}
return stringKeys;
}
if (tupleKeys.getField(0) instanceof String) {
return tupleToStringArray(tupleKeys);
}
throw new RuntimeException("Key argument contains field that is neither an int nor a String.");
}
if (keys instanceof int[]) {
int[] intKeys = (int[]) keys;
String[] stringKeys = new String[intKeys.length];
for (int x = 0; x < stringKeys.length; x++) {
stringKeys[x] = "f" + intKeys[x];
}
return stringKeys;
}
throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
}

private static int[] toIntArray(Object key) {
if (key instanceof Tuple) {
Tuple tuple = (Tuple) key;
int[] keys = new int[tuple.getArity()];
for (int y = 0; y < tuple.getArity(); y++) {
keys[y] = (Integer) tuple.getField(y);
}
return keys;
}
if (key instanceof int[]) {
return (int[]) key;
}
throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
}

private static String[] tupleToStringArray(Tuple tuple) {
String[] keys = new String[tuple.getArity()];
for (int y = 0; y < tuple.getArity(); y++) {
keys[y] = (String) tuple.getField(y);
}
return keys;
}
} }

0 comments on commit 824074a

Please sign in to comment.