Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2158] Add support for null to the DateSerializer #780

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@ public Date createInstance() {

@Override
public Date copy(Date from) {
if(from == null) {
return null;
}
return new Date(from.getTime());
}


@Override
public Date copy(Date from, Date reuse) {
if(from == null) {
return null;
}
reuse.setTime(from.getTime());
return reuse;
}
Expand All @@ -59,17 +66,30 @@ public int getLength() {

@Override
public void serialize(Date record, DataOutputView target) throws IOException {
target.writeLong(record.getTime());
if(record == null) {
target.writeLong(-1L);
} else {
target.writeLong(record.getTime());
}
}

@Override
public Date deserialize(DataInputView source) throws IOException {
return new Date(source.readLong());
long v = source.readLong();
if(v == -1L) {
return null;
} else {
return new Date(v);
}
}

@Override
public Date deserialize(Date reuse, DataInputView source) throws IOException {
reuse.setTime(source.readLong());
long v = source.readLong();
if(v == -1L) {
return null;
}
reuse.setTime(v);
return reuse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public NullFieldException(int fieldIdx) {
super("Field " + fieldIdx + " is null, but expected to hold a value.");
this.fieldPos = fieldIdx;
}

/**
* Constructs an {@code NullFieldException} with a default message, referring to
* given field number as the null field and a cause (Throwable)
*
* @param fieldIdx The index of the field that was null, but expected to hold a value.
* @param cause Pass the root cause of the error
*/
public NullFieldException(int fieldIdx, Throwable cause) {
super("Field " + fieldIdx + " is null, but expected to hold a value.", cause);
this.fieldPos = fieldIdx;
}

/**
* Gets the field number that was attempted to access. If the number is not set, this method returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public void testSerializeIndividually() {
fail("Exception in test: " + e.getMessage());
}
}



@Test
public void testSerializeIndividuallyReusingValues() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void serialize(T value, DataOutputView target) throws IOException {
try {
fieldSerializers[i].serialize(o, target);
} catch (NullPointerException npex) {
throw new NullFieldException(i);
throw new NullFieldException(i, npex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import scala.math.BigInt;

import java.util.Collection;
import java.util.Date;
import java.util.Iterator;

@SuppressWarnings("serial")
Expand Down Expand Up @@ -1101,6 +1102,37 @@ public void testJodatimeDateTimeWithKryo() throws Exception {
expected = "(1)\n";
}

/**
* Fix for FLINK-2158.
*
* @throws Exception
*/
@Test
public void testDateNullException() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<Integer, Date>(0, new Date(1230000000)),
new Tuple2<Integer, Date>(1, null),
new Tuple2<Integer, Date>(2, new Date(1230000000))
);

DataSet<String> r = in.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, Date>, String>() {
@Override
public void reduce(Iterable<Tuple2<Integer, Date>> values, Collector<String> out) throws Exception {
for (Tuple2<Integer, Date> e : values) {
out.collect(Integer.toString(e.f0));
}
}
});

r.writeAsText(resultPath);
env.execute();

expected = "0\n1\n2\n";
}



public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Expand Down