Skip to content

Commit

Permalink
[FLINK-6086] [py] Clean up PythonSender/-Streamer generics
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 5, 2017
1 parent 9bdbe60 commit a7251c5
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 202 deletions.
Expand Up @@ -14,7 +14,7 @@

import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonStreamer;
import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer;
import org.apache.flink.util.Collector;
import java.io.IOException;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
Expand All @@ -31,12 +31,12 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2,

private static final long serialVersionUID = -3997396583317513873L;

private final PythonStreamer<IN1, IN2, OUT> streamer;
private final PythonDualInputStreamer<IN1, IN2, OUT> streamer;
private final transient TypeInformation<OUT> typeInformation;

public PythonCoGroup(int envID, int setID, TypeInformation<OUT> typeInformation) {
this.typeInformation = typeInformation;
streamer = new PythonStreamer<>(this, envID, setID, true);
streamer = new PythonDualInputStreamer<>(this, envID, setID, true);
}

/**
Expand Down
Expand Up @@ -18,7 +18,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonStreamer;
import org.apache.flink.python.api.streaming.data.PythonSingleInputStreamer;
import org.apache.flink.util.Collector;

/**
Expand All @@ -32,12 +32,12 @@ public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OU

private static final long serialVersionUID = 3866306483023916413L;

private final PythonStreamer<IN, IN, OUT> streamer;
private final PythonSingleInputStreamer<IN, OUT> streamer;
private final transient TypeInformation<OUT> typeInformation;

public PythonMapPartition(int envId, int setId, TypeInformation<OUT> typeInformation) {
this.typeInformation = typeInformation;
streamer = new PythonStreamer(this, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo);
streamer = new PythonSingleInputStreamer<>(this, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo);
}

/**
Expand Down
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;

import java.io.IOException;

/**
* This class is a {@link PythonSender} for operations with two input streams.
*
* @param <IN1> first input type
* @param <IN2> second input type
*/
public class PythonDualInputSender<IN1, IN2> extends PythonSender {

private static final long serialVersionUID = 614115041181108878L;

private transient Serializer<IN1> serializer1;
private transient Serializer<IN2> serializer2;

/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param input iterator containing records
* @return size of the written buffer
* @throws IOException
*/
public int sendBuffer1(SingleElementPushBackIterator<IN1> input) throws IOException {
if (serializer1 == null) {
IN1 value = input.next();
serializer1 = getSerializer(value);
input.pushBack(value);
}
return sendBuffer(input, serializer1);
}

/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param input iterator containing records
* @return size of the written buffer
* @throws IOException
*/
public int sendBuffer2(SingleElementPushBackIterator<IN2> input) throws IOException {
if (serializer2 == null) {
IN2 value = input.next();
serializer2 = getSerializer(value);
input.pushBack(value);
}
return sendBuffer(input, serializer2);
}
}
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Iterator;

/**
* This class is a {@link PythonStreamer} for operations with two input stream.
*
* @param <IN1> first input type
* @param <IN2> second input type
* @param <OUT> output type
*/
public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<PythonDualInputSender<IN1, IN2>, OUT> {

private static final long serialVersionUID = -607175070491761873L;

public PythonDualInputStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray) {
super(function, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>());
}

/**
* Sends all values contained in both iterators to the external process and collects all results.
*
* @param iterator1 first input stream
* @param iterator2 second input stream
* @param c collector
* @throws IOException
*/
public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) throws IOException {
SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<>(iterator1);
SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<>(iterator2);
try {
int size;
if (i1.hasNext() || i2.hasNext()) {
while (true) {
int sig = in.readInt();
switch (sig) {
case SIGNAL_BUFFER_REQUEST_G0:
if (i1.hasNext()) {
size = sender.sendBuffer1(i1);
sendWriteNotification(size, i1.hasNext());
}
break;
case SIGNAL_BUFFER_REQUEST_G1:
if (i2.hasNext()) {
size = sender.sendBuffer2(i2);
sendWriteNotification(size, i2.hasNext());
}
break;
case SIGNAL_FINISHED:
return;
case SIGNAL_ERROR:
try {
outPrinter.join();
} catch (InterruptedException e) {
outPrinter.interrupt();
}
try {
errorPrinter.join();
} catch (InterruptedException e) {
errorPrinter.interrupt();
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
default:
receiver.collectBuffer(c, sig);
sendReadConfirmation();
break;
}
}
}
} catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
}
}
}
Expand Up @@ -44,6 +44,8 @@ public PythonReceiver(boolean usesByteArray) {
}

//=====Setup========================================================================================================

@SuppressWarnings("unchecked")
public void open(String path) throws IOException {
setupMappedFile(path);
deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer());
Expand Down
Expand Up @@ -12,23 +12,24 @@
*/
package org.apache.flink.python.api.streaming.data;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;

import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;

/**
* General-purpose class to write data to memory-mapped files.
*/
public class PythonSender implements Serializable {
public abstract class PythonSender implements Serializable {

private static final long serialVersionUID = -2004095650353962110L;

Expand All @@ -41,14 +42,8 @@ public class PythonSender implements Serializable {
private transient FileChannel outputChannel;
private transient MappedByteBuffer fileBuffer;

private transient ByteBuffer[] saved;

private transient Serializer[] serializer;

//=====Setup========================================================================================================
public void open(String path) throws IOException {
saved = new ByteBuffer[2];
serializer = new Serializer[2];
setupMappedFile(path);
}

Expand Down Expand Up @@ -80,88 +75,31 @@ private void closeMappedFile() throws IOException {
outputFile.delete();
}

/**
* Resets this object to the post-configuration state.
*/
public void reset() {
serializer[0] = null;
serializer[1] = null;
fileBuffer.clear();
}

//=====IO===========================================================================================================
/**
* Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
* must guarantee that the file may be written to before calling this method. This method essentially reserves the
* whole buffer for one record. As such it imposes some performance restrictions and should only be used when
* absolutely necessary.
*
* @param value record to send
* @return size of the written buffer
* @throws IOException
*/
@SuppressWarnings("unchecked")
public int sendRecord(Object value) throws IOException {
fileBuffer.clear();
int group = 0;

serializer[group] = getSerializer(value);
ByteBuffer bb = serializer[group].serialize(value);
if (bb.remaining() > MAPPED_FILE_SIZE) {
throw new RuntimeException("Serialized object does not fit into a single buffer.");
}
fileBuffer.put(bb);

int size = fileBuffer.position();

reset();
return size;
}

public boolean hasRemaining(int group) {
return saved[group] != null;
}

/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param i iterator containing records
* @param group group to which the iterator belongs, most notably used by CoGroup-functions.
* @param input iterator containing records
* @param serializer serializer for the input records
* @return size of the written buffer
* @throws IOException
*/
@SuppressWarnings("unchecked")
public int sendBuffer(Iterator<?> i, int group) throws IOException {
protected <IN> int sendBuffer(SingleElementPushBackIterator<IN> input, Serializer<IN> serializer) throws IOException {
fileBuffer.clear();

Object value;
ByteBuffer bb;
if (serializer[group] == null) {
value = i.next();
serializer[group] = getSerializer(value);
bb = serializer[group].serialize(value);
if (bb.remaining() > MAPPED_FILE_SIZE) {
throw new RuntimeException("Serialized object does not fit into a single buffer.");
}
fileBuffer.put(bb);

}
if (saved[group] != null) {
fileBuffer.put(saved[group]);
saved[group] = null;
}
while (i.hasNext() && saved[group] == null) {
value = i.next();
bb = serializer[group].serialize(value);
while (input.hasNext()) {
IN value = input.next();
ByteBuffer bb = serializer.serialize(value);
if (bb.remaining() > MAPPED_FILE_SIZE) {
throw new RuntimeException("Serialized object does not fit into a single buffer.");
}
if (bb.remaining() <= fileBuffer.remaining()) {
fileBuffer.put(bb);
} else {
saved[group] = bb;
input.pushBack(value);
break;
}
}

Expand All @@ -170,20 +108,22 @@ public int sendBuffer(Iterator<?> i, int group) throws IOException {
}

//=====Serializer===================================================================================================
private Serializer<?> getSerializer(Object value) {

@SuppressWarnings("unchecked")
protected <IN> Serializer<IN> getSerializer(IN value) {
if (value instanceof byte[]) {
return new ArraySerializer();
return (Serializer<IN>) new ArraySerializer();
}
if (((Tuple2<?, ?>) value).f0 instanceof byte[]) {
return new ValuePairSerializer();
return (Serializer<IN>) new ValuePairSerializer();
}
if (((Tuple2<?, ?>) value).f0 instanceof Tuple) {
return new KeyValuePairSerializer();
return (Serializer<IN>) new KeyValuePairSerializer();
}
throw new IllegalArgumentException("This object can't be serialized: " + value);
}

private abstract static class Serializer<T> {
protected abstract static class Serializer<T> {
protected ByteBuffer buffer;

public ByteBuffer serialize(T value) {
Expand Down

0 comments on commit a7251c5

Please sign in to comment.