Skip to content

Commit

Permalink
[FLINK-1464] [java-api] Added ResultTypeQueryable interface implement…
Browse files Browse the repository at this point in the history
…ation to TypeSerializerInputFormat.

This closes #349
  • Loading branch information
aalexandrov authored and StephanEwen committed Feb 3, 2015
1 parent 9906cba commit e3f6c9b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
Expand Up @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -19,7 +19,9 @@
package org.apache.flink.api.java.io; package org.apache.flink.api.java.io;


import org.apache.flink.api.common.io.BinaryInputFormat; import org.apache.flink.api.common.io.BinaryInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;


import java.io.IOException; import java.io.IOException;
Expand All @@ -28,23 +30,30 @@
* Reads elements by deserializing them with a given type serializer. * Reads elements by deserializing them with a given type serializer.
* @param <T> * @param <T>
*/ */
public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> { public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> implements ResultTypeQueryable<T> {


private static final long serialVersionUID = 2123068581665107480L; private static final long serialVersionUID = 2123068581665107480L;


private transient TypeInformation<T> resultType;

private TypeSerializer<T> serializer; private TypeSerializer<T> serializer;


public TypeSerializerInputFormat(TypeSerializer<T> serializer){ public TypeSerializerInputFormat(TypeInformation<T> resultType) {
this.serializer = serializer; this.resultType = resultType;
this.serializer = resultType.createSerializer();
} }


@Override @Override
protected T deserialize(T reuse, DataInputView dataInput) throws IOException { protected T deserialize(T reuse, DataInputView dataInput) throws IOException {
if(serializer == null){
throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to " +
"be defined.");
}

return serializer.deserialize(reuse, dataInput); return serializer.deserialize(reuse, dataInput);
} }

// --------------------------------------------------------------------------------------------
// Typing
// --------------------------------------------------------------------------------------------

@Override
public TypeInformation<T> getProducedType() {
return resultType;
}
} }
Expand Up @@ -40,16 +40,18 @@
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> { public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> {


TypeInformation<Tuple2<Integer, String>> resultType = TypeExtractor.getForObject(getRecord(0));

private TypeSerializer<Tuple2<Integer, String>> serializer; private TypeSerializer<Tuple2<Integer, String>> serializer;


private BlockInfo block; private BlockInfo block;


public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) { public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) {
super(numberOfTuples, blockSize, degreeOfParallelism); super(numberOfTuples, blockSize, degreeOfParallelism);


TypeInformation<Tuple2<Integer, String>> tti = TypeExtractor.getForObject(getRecord(0)); resultType = TypeExtractor.getForObject(getRecord(0));


serializer = tti.createSerializer(); serializer = resultType.createSerializer();
} }


@Before @Before
Expand All @@ -63,7 +65,7 @@ protected BinaryInputFormat<Tuple2<Integer, String>> createInputFormat() {
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);


final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new
TypeSerializerInputFormat<Tuple2<Integer, String>>(serializer); TypeSerializerInputFormat<Tuple2<Integer, String>>(resultType);
inputFormat.setFilePath(this.tempFile.toURI().toString()); inputFormat.setFilePath(this.tempFile.toURI().toString());


inputFormat.configure(configuration); inputFormat.configure(configuration);
Expand Down

0 comments on commit e3f6c9b

Please sign in to comment.