diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java index 1a96e987abb04..8b19425de00f4 100644 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java @@ -13,10 +13,11 @@ package org.apache.flink.languagebinding.api.java.common.streaming; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.net.SocketTimeoutException; import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; @@ -41,12 +42,13 @@ public abstract class Streamer implements Serializable { private static final byte SIGNAL_LAST = 32; private final byte[] buffer = new byte[4]; - private DatagramPacket packet; - protected InetAddress host; - protected DatagramSocket socket; - protected int port1; - protected int port2; + protected ServerSocket server; + protected Socket socket; + protected InputStream in; + protected OutputStream out; + protected int port; + protected Sender sender; protected Receiver receiver; @@ -61,17 +63,8 @@ public Streamer(AbstractRichFunction function) { } public void open() throws IOException { - host = InetAddress.getByName("localhost"); - packet = new DatagramPacket(buffer, 0, 4); - socket = new DatagramSocket(0, host); - socket.setSoTimeout(10000); - try { - setupProcess(); - setupPorts(); - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - socket.setSoTimeout(300000); + server = new ServerSocket(0); + setupProcess(); } /** @@ -92,30 +85,17 @@ public void close() throws IOException { receiver.close(); } - /** - * Setups the required UDP-ports.The streamer requires two UDP-ports to send control-signals to, one each for - * reading/writing operations. - * - * @throws IOException - */ - private void setupPorts() throws IOException, SocketTimeoutException { - socket.receive(new DatagramPacket(buffer, 0, 4)); - checkForError(); - port1 = getInt(buffer, 0); - socket.receive(new DatagramPacket(buffer, 0, 4)); - checkForError(); - port2 = getInt(buffer, 0); - } - private void sendWriteNotification(int size, boolean hasNext) throws IOException { byte[] tmp = new byte[5]; putInt(tmp, 0, size); tmp[4] = hasNext ? 0 : SIGNAL_LAST; - socket.send(new DatagramPacket(tmp, 0, 5, host, port1)); + out.write(tmp, 0, 5); + out.flush(); } private void sendReadConfirmation() throws IOException { - socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2)); + out.write(new byte[1], 0, 1); + out.flush(); } private void checkForError() { @@ -145,7 +125,7 @@ public final void sendBroadCastVariables(Configuration config) throws IOExceptio names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null); } - socket.receive(packet); + in.read(buffer, 0, 4); checkForError(); int size = sender.sendRecord(broadcastCount); sendWriteNotification(size, false); @@ -153,13 +133,13 @@ public final void sendBroadCastVariables(Configuration config) throws IOExceptio for (String name : names) { Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator(); - socket.receive(packet); + in.read(buffer, 0, 4); checkForError(); size = sender.sendRecord(name); sendWriteNotification(size, false); while (bcv.hasNext() || sender.hasRemaining(0)) { - socket.receive(packet); + in.read(buffer, 0, 4); checkForError(); size = sender.sendBuffer(bcv, 0); sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0)); @@ -183,13 +163,15 @@ public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOEx int size; if (i.hasNext()) { while (true) { - socket.receive(packet); + in.read(buffer, 0, 4); int sig = getInt(buffer, 0); switch (sig) { case SIGNAL_BUFFER_REQUEST: if (i.hasNext() || sender.hasRemaining(0)) { size = sender.sendBuffer(i, 0); sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext()); + } else { + throw new RuntimeException("External process requested data even though none is available."); } break; case SIGNAL_FINISHED: @@ -226,7 +208,7 @@ public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) int size; if (i1.hasNext() || i2.hasNext()) { while (true) { - socket.receive(packet); + in.read(buffer, 0, 4); int sig = getInt(buffer, 0); switch (sig) { case SIGNAL_BUFFER_REQUEST_G0: diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java index 835d95b7a0e10..f636caf7fd008 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java +++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java @@ -13,7 +13,7 @@ package org.apache.flink.languagebinding.api.java.python.streaming; import java.io.IOException; -import java.net.DatagramPacket; +import java.lang.reflect.Field; import org.apache.flink.api.common.functions.AbstractRichFunction; import static org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG; import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME; @@ -38,6 +38,7 @@ public class PythonStreamer extends Streamer { private final int id; private final boolean usePython3; private final boolean debug; + private Thread shutdownThread; private String inputFilePath; private String outputFilePath; @@ -90,54 +91,62 @@ private void startPython() throws IOException { } catch (IOException ex) { throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary."); } - pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort()); + pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort()); } else { try { Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH); } catch (IOException ex) { throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary."); } - pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort()); + pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort()); } if (debug) { socket.setSoTimeout(0); LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName() - + " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + socket.getLocalPort()); + + " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + server.getLocalPort()); } else { process = pb.start(); new StreamPrinter(process.getInputStream()).start(); new StreamPrinter(process.getErrorStream(), true, msg).start(); } - byte[] executorPort = new byte[4]; - socket.receive(new DatagramPacket(executorPort, 0, 4)); - int exPort = getInt(executorPort, 0); - if (exPort == -2) { - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { + + shutdownThread = new Thread() { + @Override + public void run() { + try { + destroyProcess(); + } catch (IOException ex) { + } } - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } + }; + + Runtime.getRuntime().addShutdownHook(shutdownThread); + + socket = server.accept(); + in = socket.getInputStream(); + out = socket.getOutputStream(); byte[] opSize = new byte[4]; putInt(opSize, 0, operator.length); - socket.send(new DatagramPacket(opSize, 0, 4, host, exPort)); - socket.send(new DatagramPacket(operator, 0, operator.length, host, exPort)); + out.write(opSize, 0, 4); + out.write(operator, 0, operator.length); byte[] meta = importString.toString().getBytes("utf-8"); putInt(opSize, 0, meta.length); - socket.send(new DatagramPacket(opSize, 0, 4, host, exPort)); - socket.send(new DatagramPacket(meta, 0, meta.length, host, exPort)); + out.write(opSize, 0, 4); + out.write(meta, 0, meta.length); byte[] input = inputFilePath.getBytes("utf-8"); putInt(opSize, 0, input.length); - socket.send(new DatagramPacket(opSize, 0, 4, host, exPort)); - socket.send(new DatagramPacket(input, 0, input.length, host, exPort)); + out.write(opSize, 0, 4); + out.write(input, 0, input.length); byte[] output = outputFilePath.getBytes("utf-8"); putInt(opSize, 0, output.length); - socket.send(new DatagramPacket(opSize, 0, 4, host, exPort)); - socket.send(new DatagramPacket(output, 0, output.length, host, exPort)); + out.write(opSize, 0, 4); + out.write(output, 0, output.length); + + out.flush(); try { // wait a bit to catch syntax errors Thread.sleep(2000); @@ -165,9 +174,30 @@ public void close() throws IOException { LOG.error("Exception occurred while closing Streamer. :" + e.getMessage()); } if (!debug) { - try { - process.exitValue(); - } catch (IllegalThreadStateException ise) { //process still active + destroyProcess(); + } + if (shutdownThread != null) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + } + + private void destroyProcess() throws IOException { + try { + process.exitValue(); + } catch (IllegalThreadStateException ise) { //process still active + if (process.getClass().getName().equals("java.lang.UNIXProcess")) { + int pid; + try { + Field f = process.getClass().getDeclaredField("pid"); + f.setAccessible(true); + pid = f.getInt(process); + } catch (Throwable e) { + process.destroy(); + return; + } + String[] args = new String[]{"kill", "-9", "" + pid}; + Runtime.getRuntime().exec(args); + } else { process.destroy(); } } diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py index cebdc5dd546b5..79301a6c39651 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py index b4cbc34051604..b03eda9c777ac 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py index 457ae1e4dc3f2..b89bc0e1df62b 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py index 76357a5286adc..749a57395719b 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py index 238527ce1372f..cddb9ca122a67 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - # -*- coding: utf-8 -*- # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py deleted file mode 100644 index 65b48d4d79b4e..0000000000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py +++ /dev/null @@ -1,17 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py index a6f159e0937f7..bf0b557a54948 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py index 2cad3be080c0c..25714ea6418dc 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py index b51c007c53773..b55ca55811bc6 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - # #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py index 827efda8185db..9dedb411950ca 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py @@ -1,21 +1,3 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - #!/usr/bin/env python # # Author: Mike McKerns (mmckerns @caltech and @uqfoundation) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py index 9d80c82511c24..2cfb9d370aad9 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py @@ -20,39 +20,35 @@ import struct #argv[1] = port - +s = None try: import dill port = int(sys.argv[1]) - s1 = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) - s1.bind((socket.gethostbyname("localhost"), 0)) - s1.sendto(struct.pack(">i", s1.getsockname()[1]), (socket.gethostbyname("localhost"), port)) + s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) + s.connect((socket.gethostbyname("localhost"), port)) - size = struct.unpack(">i", s1.recv(4))[0] - serialized_operator = s1.recv(size) + size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0] + serialized_operator = s.recv(size, socket.MSG_WAITALL) - size = struct.unpack(">i", s1.recv(4))[0] - import_string = s1.recv(size).decode("utf-8") + size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0] + import_string = s.recv(size, socket.MSG_WAITALL).decode("utf-8") - size = struct.unpack(">i", s1.recv(4))[0] - input_file = s1.recv(size).decode("utf-8") + size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0] + input_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8") - size = struct.unpack(">i", s1.recv(4))[0] - output_file = s1.recv(size).decode("utf-8") + size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0] + output_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8") exec(import_string) operator = dill.loads(serialized_operator) - operator._configure(input_file, output_file, port) + operator._configure(input_file, output_file, s) operator._go() sys.stdout.flush() sys.stderr.flush() except: sys.stdout.flush() sys.stderr.flush() - s = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) - s.bind((socket.gethostbyname("localhost"), 0)) - destination = (socket.gethostbyname("localhost"), int(sys.argv[1])) - s.sendto(struct.pack(">i", -2), destination) + s.send(struct.pack(">i", -2)) raise \ No newline at end of file diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py index 7d64352b9fe39..cffe79e8861a2 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ import mmap -import socket +import socket as SOCKET import tempfile from struct import pack, unpack from collections import deque @@ -61,18 +61,12 @@ def _write_buffer(self): class BufferingUDPMappedFileConnection(object): - def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", port=25000): + def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None): self._input_file = open(input_file, "rb+") self._output_file = open(output_file, "rb+") self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ) self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) - self._socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) - self._socket.bind((socket.gethostbyname("localhost"), 0)) - self._socket.settimeout(300) - self._destination = (socket.gethostbyname("localhost"), port) - - self._socket.sendto(pack(">I", self._socket.getsockname()[1]), self._destination) - self._socket.sendto(pack(">I", self._socket.getsockname()[1]), self._destination) + self._socket = socket self._out = deque() self._out_size = 0 @@ -97,10 +91,10 @@ def write(self, msg): def _write_buffer(self): self._file_output_buffer.seek(0, 0) self._file_output_buffer.write(b"".join(self._out)) - self._socket.sendto(pack(">i", self._out_size), self._destination) + self._socket.send(pack(">i", self._out_size)) self._out.clear() self._out_size = 0 - self._socket.recvfrom(1) + self._socket.recv(1, SOCKET.MSG_WAITALL) def read(self, des_size, ignored=None): if self._input_size == self._input_offset: @@ -110,10 +104,10 @@ def read(self, des_size, ignored=None): return self._input[old_offset:self._input_offset] def _read_buffer(self): - self._socket.sendto(SIGNAL_REQUEST_BUFFER, self._destination) + self._socket.send(SIGNAL_REQUEST_BUFFER) self._file_input_buffer.seek(0, 0) self._input_offset = 0 - meta_size = self._socket.recvfrom(5)[0] + meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL) self._input_size = unpack(">I", meta_size[:4])[0] self._was_last = meta_size[4] == SIGNAL_WAS_LAST self._input = self._file_input_buffer.read(self._input_size) @@ -121,7 +115,7 @@ def _read_buffer(self): def send_end_signal(self): if self._out_size: self._write_buffer() - self._socket.sendto(SIGNAL_FINISHED, self._destination) + self._socket.send(SIGNAL_FINISHED) def has_next(self, ignored=None): return not self._was_last or not self._input_size == self._input_offset @@ -134,8 +128,8 @@ def reset(self): class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection): - def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", port=25000): - super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, port) + def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None): + super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, socket) self._input = ["", ""] self._input_offset = [0, 0] self._input_size = [0, 0] @@ -150,12 +144,12 @@ def read(self, des_size, group): def _read_buffer(self, group): if group: - self._socket.sendto(SIGNAL_REQUEST_BUFFER_G1, self._destination) + self._socket.send(SIGNAL_REQUEST_BUFFER_G1) else: - self._socket.sendto(SIGNAL_REQUEST_BUFFER_G0, self._destination) + self._socket.send(SIGNAL_REQUEST_BUFFER_G0) self._file_input_buffer.seek(0, 0) self._input_offset[group] = 0 - meta_size = self._socket.recvfrom(5)[0] + meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL) self._input_size[group] = unpack(">I", meta_size[:4])[0] self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST self._input[group] = self._file_input_buffer.read(self._input_size[group]) diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py index 7b1c5c51e60e5..fb0e26dcd4870 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py @@ -260,7 +260,7 @@ def __init__(self, read, group): def deserialize(self): size = unpack(">i", self.read(4, self._group))[0] - return bytearray(self.read(size, self._group)) + return bytearray(self.read(size, self._group)) if size else bytearray(b"") class BooleanDeserializer(object): @@ -315,7 +315,7 @@ def __init__(self, read, group): def deserialize(self): length = unpack(">i", self.read(4, self._group))[0] - return self.read(length, self._group).decode("utf-8") + return self.read(length, self._group).decode("utf-8") if length else "" class NullDeserializer(object): diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc deleted file mode 100644 index 7f3cf944d2448..0000000000000 Binary files a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc and /dev/null differ diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py index 35359e2baa517..0ce23ffff83f2 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py @@ -103,7 +103,7 @@ def _sort_and_combine(self): keys.sort() for key in keys: values = grouping[key] - for op in self._sort_ops: + for op in reversed(self._sort_ops): values.sort(key=lambda x:x[op[0]], reverse = op[1] == Order.DESCENDING) result = function(Iterator.ListIterator(values), collector) if result is not None: diff --git a/pom.xml b/pom.xml index fef5a9e5c0df9..4232fb7fc74da 100644 --- a/pom.xml +++ b/pom.xml @@ -717,6 +717,8 @@ under the License. flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text + + flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/** **/flink-bin/conf/slaves flink-contrib/docker-flink/flink/conf/slaves