Skip to content

Commit

Permalink
[FLINK-2158] Add support for null to the DateSerializer
Browse files Browse the repository at this point in the history
This closes #780
  • Loading branch information
rmetzger authored and StephanEwen committed Jul 1, 2015
1 parent f5c1768 commit 4e9e0d6
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 5 deletions.
Expand Up @@ -43,11 +43,18 @@ public Date createInstance() {

@Override
public Date copy(Date from) {
if(from == null) {
return null;
}
return new Date(from.getTime());
}


@Override
public Date copy(Date from, Date reuse) {
if(from == null) {
return null;
}
reuse.setTime(from.getTime());
return reuse;
}
Expand All @@ -59,17 +66,30 @@ public int getLength() {

@Override
public void serialize(Date record, DataOutputView target) throws IOException {
target.writeLong(record.getTime());
if(record == null) {
target.writeLong(-1L);
} else {
target.writeLong(record.getTime());
}
}

@Override
public Date deserialize(DataInputView source) throws IOException {
return new Date(source.readLong());
long v = source.readLong();
if(v == -1L) {
return null;
} else {
return new Date(v);
}
}

@Override
public Date deserialize(Date reuse, DataInputView source) throws IOException {
reuse.setTime(source.readLong());
long v = source.readLong();
if(v == -1L) {
return null;
}
reuse.setTime(v);
return reuse;
}

Expand Down
Expand Up @@ -59,6 +59,18 @@ public NullFieldException(int fieldIdx) {
super("Field " + fieldIdx + " is null, but expected to hold a value.");
this.fieldPos = fieldIdx;
}

/**
* Constructs an {@code NullFieldException} with a default message, referring to
* given field number as the null field and a cause (Throwable)
*
* @param fieldIdx The index of the field that was null, but expected to hold a value.
* @param cause Pass the root cause of the error
*/
public NullFieldException(int fieldIdx, Throwable cause) {
super("Field " + fieldIdx + " is null, but expected to hold a value.", cause);
this.fieldPos = fieldIdx;
}

/**
* Gets the field number that was attempted to access. If the number is not set, this method returns
Expand Down
Expand Up @@ -182,7 +182,8 @@ public void testSerializeIndividually() {
fail("Exception in test: " + e.getMessage());
}
}



@Test
public void testSerializeIndividuallyReusingValues() {
try {
Expand Down
Expand Up @@ -115,7 +115,7 @@ public void serialize(T value, DataOutputView target) throws IOException {
try {
fieldSerializers[i].serialize(o, target);
} catch (NullPointerException npex) {
throw new NullFieldException(i);
throw new NullFieldException(i, npex);
}
}
}
Expand Down
Expand Up @@ -52,6 +52,7 @@
import scala.math.BigInt;

import java.util.Collection;
import java.util.Date;
import java.util.Iterator;

@SuppressWarnings("serial")
Expand Down Expand Up @@ -1101,6 +1102,37 @@ public void testJodatimeDateTimeWithKryo() throws Exception {
expected = "(1)\n";
}

/**
* Fix for FLINK-2158.
*
* @throws Exception
*/
@Test
public void testDateNullException() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<Integer, Date>(0, new Date(1230000000)),
new Tuple2<Integer, Date>(1, null),
new Tuple2<Integer, Date>(2, new Date(1230000000))
);

DataSet<String> r = in.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, Date>, String>() {
@Override
public void reduce(Iterable<Tuple2<Integer, Date>> values, Collector<String> out) throws Exception {
for (Tuple2<Integer, Date> e : values) {
out.collect(Integer.toString(e.f0));
}
}
});

r.writeAsText(resultPath);
env.execute();

expected = "0\n1\n2\n";
}



public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Expand Down

0 comments on commit 4e9e0d6

Please sign in to comment.