Skip to content

Commit

Permalink
[Flink-671] Python API additions
Browse files Browse the repository at this point in the history
Fixes several minor issues
Requesting non-available data throws Exception
Python process shutodwn more reliable
Synchronization done via TCP
  • Loading branch information
supermegaciaccount committed Apr 21, 2015
1 parent 1d669da commit e1618e2
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 283 deletions.
Expand Up @@ -13,10 +13,11 @@
package org.apache.flink.languagebinding.api.java.common.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
Expand All @@ -41,12 +42,13 @@ public abstract class Streamer implements Serializable {
private static final byte SIGNAL_LAST = 32;

private final byte[] buffer = new byte[4];
private DatagramPacket packet;
protected InetAddress host;

protected DatagramSocket socket;
protected int port1;
protected int port2;
protected ServerSocket server;
protected Socket socket;
protected InputStream in;
protected OutputStream out;
protected int port;

protected Sender sender;
protected Receiver receiver;

Expand All @@ -61,17 +63,8 @@ public Streamer(AbstractRichFunction function) {
}

public void open() throws IOException {
host = InetAddress.getByName("localhost");
packet = new DatagramPacket(buffer, 0, 4);
socket = new DatagramSocket(0, host);
socket.setSoTimeout(10000);
try {
setupProcess();
setupPorts();
} catch (SocketTimeoutException ste) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
}
socket.setSoTimeout(300000);
server = new ServerSocket(0);
setupProcess();
}

/**
Expand All @@ -92,30 +85,17 @@ public void close() throws IOException {
receiver.close();
}

/**
* Setups the required UDP-ports.The streamer requires two UDP-ports to send control-signals to, one each for
* reading/writing operations.
*
* @throws IOException
*/
private void setupPorts() throws IOException, SocketTimeoutException {
socket.receive(new DatagramPacket(buffer, 0, 4));
checkForError();
port1 = getInt(buffer, 0);
socket.receive(new DatagramPacket(buffer, 0, 4));
checkForError();
port2 = getInt(buffer, 0);
}

private void sendWriteNotification(int size, boolean hasNext) throws IOException {
byte[] tmp = new byte[5];
putInt(tmp, 0, size);
tmp[4] = hasNext ? 0 : SIGNAL_LAST;
socket.send(new DatagramPacket(tmp, 0, 5, host, port1));
out.write(tmp, 0, 5);
out.flush();
}

private void sendReadConfirmation() throws IOException {
socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2));
out.write(new byte[1], 0, 1);
out.flush();
}

private void checkForError() {
Expand Down Expand Up @@ -145,21 +125,21 @@ public final void sendBroadCastVariables(Configuration config) throws IOExceptio
names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
}

socket.receive(packet);
in.read(buffer, 0, 4);
checkForError();
int size = sender.sendRecord(broadcastCount);
sendWriteNotification(size, false);

for (String name : names) {
Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();

socket.receive(packet);
in.read(buffer, 0, 4);
checkForError();
size = sender.sendRecord(name);
sendWriteNotification(size, false);

while (bcv.hasNext() || sender.hasRemaining(0)) {
socket.receive(packet);
in.read(buffer, 0, 4);
checkForError();
size = sender.sendBuffer(bcv, 0);
sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
Expand All @@ -183,13 +163,15 @@ public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOEx
int size;
if (i.hasNext()) {
while (true) {
socket.receive(packet);
in.read(buffer, 0, 4);
int sig = getInt(buffer, 0);
switch (sig) {
case SIGNAL_BUFFER_REQUEST:
if (i.hasNext() || sender.hasRemaining(0)) {
size = sender.sendBuffer(i, 0);
sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
} else {
throw new RuntimeException("External process requested data even though none is available.");
}
break;
case SIGNAL_FINISHED:
Expand Down Expand Up @@ -226,7 +208,7 @@ public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c)
int size;
if (i1.hasNext() || i2.hasNext()) {
while (true) {
socket.receive(packet);
in.read(buffer, 0, 4);
int sig = getInt(buffer, 0);
switch (sig) {
case SIGNAL_BUFFER_REQUEST_G0:
Expand Down
Expand Up @@ -13,7 +13,7 @@
package org.apache.flink.languagebinding.api.java.python.streaming;

import java.io.IOException;
import java.net.DatagramPacket;
import java.lang.reflect.Field;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import static org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
Expand All @@ -38,6 +38,7 @@ public class PythonStreamer extends Streamer {
private final int id;
private final boolean usePython3;
private final boolean debug;
private Thread shutdownThread;

private String inputFilePath;
private String outputFilePath;
Expand Down Expand Up @@ -90,54 +91,62 @@ private void startPython() throws IOException {
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
}
pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort());
pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
} else {
try {
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
}
pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort());
pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
}
if (debug) {
socket.setSoTimeout(0);
LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
+ " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + socket.getLocalPort());
+ " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + server.getLocalPort());
} else {
process = pb.start();
new StreamPrinter(process.getInputStream()).start();
new StreamPrinter(process.getErrorStream(), true, msg).start();
}
byte[] executorPort = new byte[4];
socket.receive(new DatagramPacket(executorPort, 0, 4));
int exPort = getInt(executorPort, 0);
if (exPort == -2) {
try { //wait before terminating to ensure that the complete error message is printed
Thread.sleep(2000);
} catch (InterruptedException ex) {

shutdownThread = new Thread() {
@Override
public void run() {
try {
destroyProcess();
} catch (IOException ex) {
}
}
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
}
};

Runtime.getRuntime().addShutdownHook(shutdownThread);

socket = server.accept();
in = socket.getInputStream();
out = socket.getOutputStream();

byte[] opSize = new byte[4];
putInt(opSize, 0, operator.length);
socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
socket.send(new DatagramPacket(operator, 0, operator.length, host, exPort));
out.write(opSize, 0, 4);
out.write(operator, 0, operator.length);

byte[] meta = importString.toString().getBytes("utf-8");
putInt(opSize, 0, meta.length);
socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
socket.send(new DatagramPacket(meta, 0, meta.length, host, exPort));
out.write(opSize, 0, 4);
out.write(meta, 0, meta.length);

byte[] input = inputFilePath.getBytes("utf-8");
putInt(opSize, 0, input.length);
socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
socket.send(new DatagramPacket(input, 0, input.length, host, exPort));
out.write(opSize, 0, 4);
out.write(input, 0, input.length);

byte[] output = outputFilePath.getBytes("utf-8");
putInt(opSize, 0, output.length);
socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
socket.send(new DatagramPacket(output, 0, output.length, host, exPort));
out.write(opSize, 0, 4);
out.write(output, 0, output.length);

out.flush();

try { // wait a bit to catch syntax errors
Thread.sleep(2000);
Expand Down Expand Up @@ -165,9 +174,30 @@ public void close() throws IOException {
LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
}
if (!debug) {
try {
process.exitValue();
} catch (IllegalThreadStateException ise) { //process still active
destroyProcess();
}
if (shutdownThread != null) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
}

private void destroyProcess() throws IOException {
try {
process.exitValue();
} catch (IllegalThreadStateException ise) { //process still active
if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
int pid;
try {
Field f = process.getClass().getDeclaredField("pid");
f.setAccessible(true);
pid = f.getInt(process);
} catch (Throwable e) {
process.destroy();
return;
}
String[] args = new String[]{"kill", "-9", "" + pid};
Runtime.getRuntime().exec(args);
} else {
process.destroy();
}
}
Expand Down
@@ -1,21 +1,3 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
Expand Down
@@ -1,21 +1,3 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
Expand Down
@@ -1,21 +1,3 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
Expand Down
@@ -1,21 +1,3 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
Expand Down
@@ -1,21 +1,3 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# -*- coding: utf-8 -*-
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
Expand Down

0 comments on commit e1618e2

Please sign in to comment.