Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLINK-944 Changed serialization logic of CollectionInputFormat to use TypeSerializer #25

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package eu.stratosphere.api.common.io;

import eu.stratosphere.core.memory.DataInputView;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;

/**
* Wrapper to use ByteArrayInputStream with TypeSerializers
*/
public class ByteArrayInputView implements DataInputView{

private final ByteArrayInputStream byteArrayInputStream;
private final DataInputStream inputStream;

public ByteArrayInputView(byte[] buffer){
byteArrayInputStream = new ByteArrayInputStream(buffer);
inputStream = new DataInputStream(byteArrayInputStream);
}

@Override
public void skipBytesToRead(int numBytes) throws IOException {
inputStream.skipBytes(numBytes);
}

@Override
public void readFully(byte[] b) throws IOException {
inputStream.readFully(b);
}

@Override
public void readFully(byte[] b, int off, int len) throws IOException {
inputStream.readFully(b, off, len);
}

@Override
public int skipBytes(int n) throws IOException {
return inputStream.skipBytes(n);
}

@Override
public boolean readBoolean() throws IOException {
return inputStream.readBoolean();
}

@Override
public byte readByte() throws IOException {
return inputStream.readByte();
}

@Override
public int readUnsignedByte() throws IOException {
return inputStream.readUnsignedByte();
}

@Override
public short readShort() throws IOException {
return inputStream.readShort();
}

@Override
public int readUnsignedShort() throws IOException {
return inputStream.readUnsignedShort();
}

@Override
public char readChar() throws IOException {
return inputStream.readChar();
}

@Override
public int readInt() throws IOException {
return inputStream.readInt();
}

@Override
public long readLong() throws IOException {
return inputStream.readLong();
}

@Override
public float readFloat() throws IOException {
return inputStream.readFloat();
}

@Override
public double readDouble() throws IOException {
return inputStream.readDouble();
}

@Override
public String readLine() throws IOException {
return inputStream.readLine();
}

@Override
public String readUTF() throws IOException {
return inputStream.readUTF();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package eu.stratosphere.api.common.io;

import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

/**
* Wrapper class to use ByteArrayOutputStream with TypeSerializers.
*/
public class ByteArrayOutputView implements DataOutputView {
private final ByteArrayOutputStream byteOutputStream;
private final DataOutputStream outputStream;

public ByteArrayOutputView(){
byteOutputStream = new ByteArrayOutputStream();
outputStream = new DataOutputStream(byteOutputStream);
}

public byte[] getByteArray(){
return byteOutputStream.toByteArray();
}

public void reset() {
byteOutputStream.reset();
}

@Override
public void skipBytesToWrite(int numBytes) throws IOException {
for(int i=0; i<numBytes; i++){
writeByte(0);
}
}

@Override
public void write(DataInputView source, int numBytes) throws IOException {
byte[] buffer = new byte[numBytes];
source.readFully(buffer);
outputStream.write(buffer);
}

@Override
public void write(int b) throws IOException {
outputStream.write(b);
}

@Override
public void write(byte[] b) throws IOException {
outputStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
}

@Override
public void writeBoolean(boolean v) throws IOException {
outputStream.writeBoolean(v);
}

@Override
public void writeByte(int v) throws IOException {
outputStream.writeByte(v);
}

@Override
public void writeShort(int v) throws IOException {
outputStream.writeShort(v);
}

@Override
public void writeChar(int v) throws IOException {
outputStream.writeChar(v);
}

@Override
public void writeInt(int v) throws IOException {
outputStream.writeInt(v);
}

@Override
public void writeLong(long v) throws IOException {
outputStream.writeLong(v);
}

@Override
public void writeFloat(float v) throws IOException {
outputStream.writeFloat(v);
}

@Override
public void writeDouble(double v) throws IOException {
outputStream.writeDouble(v);
}

@Override
public void writeBytes(String s) throws IOException {
outputStream.writeBytes(s);
}

@Override
public void writeChars(String s) throws IOException {
outputStream.writeChars(s);
}

@Override
public void writeUTF(String s) throws IOException {
outputStream.writeUTF(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public <X> DataSource<X> fromCollection(Collection<X> data) {
public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
CollectionInputFormat.checkCollection(data, type.getTypeClass());

return new DataSource<X>(this, new CollectionInputFormat<X>(data), type);
return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
package eu.stratosphere.api.java.io;

import java.io.IOException;
import java.io.Serializable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.io.ByteArrayInputView;
import eu.stratosphere.api.common.io.ByteArrayOutputView;
import eu.stratosphere.api.common.io.GenericInputFormat;
import eu.stratosphere.api.common.io.NonParallelInput;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.core.memory.DataInputView;

/**
* An input format that returns objects from a collection.
Expand All @@ -32,16 +38,19 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N

private static final long serialVersionUID = 1L;

private final Collection<T> dataSet; // input data as collection
private Collection<T> dataSet; // input data as collection

private TypeSerializer<T> serializer;

private transient Iterator<T> iterator;



public CollectionInputFormat(Collection<T> dataSet) {
public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {
if (dataSet == null) {
throw new NullPointerException();
}

this.serializer = serializer;

this.dataSet = dataSet;
}
Expand All @@ -63,6 +72,51 @@ public void open(GenericInputSplit split) throws IOException {
public T nextRecord(T record) throws IOException {
return this.iterator.next();
}

// --------------------------------------------------------------------------------------------

private void writeObject(ObjectOutputStream out) throws IOException{
out.writeObject(serializer);
out.writeInt(dataSet.size());
ByteArrayOutputView outputView = new ByteArrayOutputView();
for(T element : dataSet){
serializer.serialize(element, outputView);
}

byte[] blob = outputView.getByteArray();
out.writeInt(blob.length);
out.write(blob);
}

private void readObject(ObjectInputStream in) throws IOException{
try{
Object obj = in.readObject();

if(obj instanceof TypeSerializer<?>){
serializer = (TypeSerializer<T>)obj;
}
}catch(ClassNotFoundException ex){
throw new IOException(ex);
}


int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);

int blobLength = in.readInt();
byte[] blob = new byte[blobLength];
in.readFully(blob);

DataInputView inputView = new ByteArrayInputView(blob);

for(int i=0; i< collectionLength; i++){
T element = serializer.createInstance();
element = serializer.deserialize(element, inputView);
list.add(element);
}

dataSet = list;
}

// --------------------------------------------------------------------------------------------

Expand All @@ -78,10 +132,6 @@ public static <X> void checkCollection(Collection<X> elements, Class<X> viewedAs
throw new NullPointerException();
}

if (!Serializable.class.isAssignableFrom(viewedAs)) {
throw new InvalidProgramException("The elements are not serializable (java.io.Serializable).");
}

for (X elem : elements) {
if (elem == null) {
throw new IllegalArgumentException("The collection must not contain null elements.");
Expand Down