From 56a0730a4769d74f1a3381a07d0259ad99bdc2c4 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 17 Jun 2014 19:21:02 +0200 Subject: [PATCH] CollectionInputFormat now uses the TypeSerializer to serialize the collection entries. This allows to use objects not implementing the Serializable interface as collection elements. --- .../api/common/io/ByteArrayInputView.java | 114 ++++++++++++++++ .../api/common/io/ByteArrayOutputView.java | 126 ++++++++++++++++++ .../api/java/ExecutionEnvironment.java | 2 +- .../api/java/io/CollectionInputFormat.java | 68 ++++++++-- .../java/io/CollectionInputFormatTest.java | 107 +++++++++++++++ 5 files changed, 407 insertions(+), 10 deletions(-) create mode 100644 stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java create mode 100644 stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java create mode 100644 stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java new file mode 100644 index 0000000000000..7e65f13a3d1ae --- /dev/null +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package eu.stratosphere.api.common.io; + +import eu.stratosphere.core.memory.DataInputView; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * Wrapper to use ByteArrayInputStream with TypeSerializers + */ +public class ByteArrayInputView implements DataInputView{ + + private final ByteArrayInputStream byteArrayInputStream; + private final DataInputStream inputStream; + + public ByteArrayInputView(byte[] buffer){ + byteArrayInputStream = new ByteArrayInputStream(buffer); + inputStream = new DataInputStream(byteArrayInputStream); + } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + inputStream.skipBytes(numBytes); + } + + @Override + public void readFully(byte[] b) throws IOException { + inputStream.readFully(b); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + inputStream.readFully(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException { + return inputStream.skipBytes(n); + } + + @Override + public boolean readBoolean() throws IOException { + return inputStream.readBoolean(); + } + + @Override + public byte readByte() throws IOException { + return inputStream.readByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + return inputStream.readUnsignedByte(); + } + + @Override + public short readShort() throws IOException { + return inputStream.readShort(); + } + + @Override + public int readUnsignedShort() throws IOException { + return inputStream.readUnsignedShort(); + } + + @Override + public char readChar() throws IOException { + return inputStream.readChar(); + } + + @Override + public int readInt() throws IOException { + return inputStream.readInt(); + } + + @Override + public long readLong() throws IOException { + return inputStream.readLong(); + } + + @Override + public float readFloat() throws IOException { + return inputStream.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return inputStream.readDouble(); + } + + @Override + public String readLine() throws IOException { + return inputStream.readLine(); + } + + @Override + public String readUTF() throws IOException { + return inputStream.readUTF(); + } +} diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java new file mode 100644 index 0000000000000..b96338fca576e --- /dev/null +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package eu.stratosphere.api.common.io; + +import eu.stratosphere.core.memory.DataInputView; +import eu.stratosphere.core.memory.DataOutputView; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Wrapper class to use ByteArrayOutputStream with TypeSerializers. + */ +public class ByteArrayOutputView implements DataOutputView { + private final ByteArrayOutputStream byteOutputStream; + private final DataOutputStream outputStream; + + public ByteArrayOutputView(){ + byteOutputStream = new ByteArrayOutputStream(); + outputStream = new DataOutputStream(byteOutputStream); + } + + public byte[] getByteArray(){ + return byteOutputStream.toByteArray(); + } + + public void reset() { + byteOutputStream.reset(); + } + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + for(int i=0; i DataSource fromCollection(Collection data) { public DataSource fromCollection(Collection data, TypeInformation type) { CollectionInputFormat.checkCollection(data, type.getTypeClass()); - return new DataSource(this, new CollectionInputFormat(data), type); + return new DataSource(this, new CollectionInputFormat(data, type.createSerializer()), type); } /** diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java index 8d051af62d7e4..5e513b2029c22 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java @@ -16,14 +16,20 @@ package eu.stratosphere.api.java.io; import java.io.IOException; -import java.io.Serializable; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; -import eu.stratosphere.api.common.InvalidProgramException; +import eu.stratosphere.api.common.io.ByteArrayInputView; +import eu.stratosphere.api.common.io.ByteArrayOutputView; import eu.stratosphere.api.common.io.GenericInputFormat; import eu.stratosphere.api.common.io.NonParallelInput; +import eu.stratosphere.api.common.typeutils.TypeSerializer; import eu.stratosphere.core.io.GenericInputSplit; +import eu.stratosphere.core.memory.DataInputView; /** * An input format that returns objects from a collection. @@ -32,16 +38,19 @@ public class CollectionInputFormat extends GenericInputFormat implements N private static final long serialVersionUID = 1L; - private final Collection dataSet; // input data as collection + private Collection dataSet; // input data as collection + + private TypeSerializer serializer; private transient Iterator iterator; - - public CollectionInputFormat(Collection dataSet) { + public CollectionInputFormat(Collection dataSet, TypeSerializer serializer) { if (dataSet == null) { throw new NullPointerException(); } + + this.serializer = serializer; this.dataSet = dataSet; } @@ -63,6 +72,51 @@ public void open(GenericInputSplit split) throws IOException { public T nextRecord(T record) throws IOException { return this.iterator.next(); } + + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException{ + out.writeObject(serializer); + out.writeInt(dataSet.size()); + ByteArrayOutputView outputView = new ByteArrayOutputView(); + for(T element : dataSet){ + serializer.serialize(element, outputView); + } + + byte[] blob = outputView.getByteArray(); + out.writeInt(blob.length); + out.write(blob); + } + + private void readObject(ObjectInputStream in) throws IOException{ + try{ + Object obj = in.readObject(); + + if(obj instanceof TypeSerializer){ + serializer = (TypeSerializer)obj; + } + }catch(ClassNotFoundException ex){ + throw new IOException(ex); + } + + + int collectionLength = in.readInt(); + List list = new ArrayList(collectionLength); + + int blobLength = in.readInt(); + byte[] blob = new byte[blobLength]; + in.readFully(blob); + + DataInputView inputView = new ByteArrayInputView(blob); + + for(int i=0; i< collectionLength; i++){ + T element = serializer.createInstance(); + element = serializer.deserialize(element, inputView); + list.add(element); + } + + dataSet = list; + } // -------------------------------------------------------------------------------------------- @@ -78,10 +132,6 @@ public static void checkCollection(Collection elements, Class viewedAs throw new NullPointerException(); } - if (!Serializable.class.isAssignableFrom(viewedAs)) { - throw new InvalidProgramException("The elements are not serializable (java.io.Serializable)."); - } - for (X elem : elements) { if (elem == null) { throw new IllegalArgumentException("The collection must not contain null elements."); diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java new file mode 100644 index 0000000000000..781e7a802e908 --- /dev/null +++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package eu.stratosphere.api.java.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import eu.stratosphere.api.java.typeutils.TypeExtractor; +import eu.stratosphere.core.io.GenericInputSplit; +import eu.stratosphere.types.TypeInformation; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collection; + +public class CollectionInputFormatTest { + public static class ElementType{ + private int id; + + public ElementType(){ + this(-1); + } + + public ElementType(int id){ + this.id = id; + } + + public int getId(){return id;} + + @Override + public boolean equals(Object obj){ + if(obj != null && obj instanceof ElementType){ + ElementType et = (ElementType) obj; + + return et.getId() == this.getId(); + }else { + return false; + } + } + } + + @Test + public void testSerializability(){ + Collection inputCollection = new ArrayList(); + ElementType element1 = new ElementType(1); + ElementType element2 = new ElementType(2); + ElementType element3 = new ElementType(3); + inputCollection.add(element1); + inputCollection.add(element2); + inputCollection.add(element3); + + TypeInformation info = (TypeInformation)TypeExtractor.createTypeInfo(ElementType + .class); + + CollectionInputFormat inputFormat = new CollectionInputFormat(inputCollection, + info.createSerializer()); + + try{ + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(buffer); + + out.writeObject(inputFormat); + + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray())); + + Object serializationResult = in.readObject(); + + assertNotNull(serializationResult); + assertTrue(serializationResult instanceof CollectionInputFormat); + + CollectionInputFormat result = (CollectionInputFormat) serializationResult; + + GenericInputSplit inputSplit = new GenericInputSplit(); + inputFormat.open(inputSplit); + result.open(inputSplit); + + while(!inputFormat.reachedEnd() && !result.reachedEnd()){ + ElementType expectedElement = inputFormat.nextRecord(null); + ElementType actualElement = result.nextRecord(null); + + assertEquals(expectedElement, actualElement); + } + }catch(IOException ex){ + fail(ex.toString()); + }catch(ClassNotFoundException ex){ + fail(ex.toString()); + } + } +}