Skip to content

Commit

Permalink
SQL: Fix ZonedDateTime with nanos serialisation (#68253)
Browse files Browse the repository at this point in the history
Previously the `ConstantProcessor` was using the
`[read/write]GenericValue` for ZonedDateTime which is implemented in
`StreamInput` and doesn't read/write the nanos resolution of the
objects. Implement custom serialisation to include the nanos.

Follows: #67666
  • Loading branch information
matriv committed Jan 30, 2021
1 parent 330e537 commit 47622d6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
Expand Up @@ -10,36 +10,69 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Objects;

public class ConstantProcessor implements Processor {

public static String NAME = "c";

private final Object constant;
private final boolean namedWriteable;
private Object constant;
private final Type type;

enum Type {
NAMED_WRITABLE,
ZONEDDATETIME,
GENERIC
}

public ConstantProcessor(Object value) {
this.constant = value;
this.namedWriteable = value instanceof NamedWriteable;
if (value instanceof NamedWriteable) {
type = Type.NAMED_WRITABLE;
} else if (value instanceof ZonedDateTime) {
type = Type.ZONEDDATETIME;
} else {
type = Type.GENERIC;
}
}

public ConstantProcessor(StreamInput in) throws IOException {
namedWriteable = in.readBoolean();
if (namedWriteable) {
constant = in.readNamedWriteable(ConstantNamedWriteable.class);
} else {
constant = in.readGenericValue();
type = in.readEnum(Type.class);
switch (type) {
case NAMED_WRITABLE:
constant = in.readNamedWriteable(ConstantNamedWriteable.class);
break;
case ZONEDDATETIME:
ZonedDateTime zdt;
ZoneId zoneId = in.readZoneId();
zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readLong()), zoneId);
constant = zdt.withNano(in.readInt());
break;
case GENERIC:
constant = in.readGenericValue();
break;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(namedWriteable);
if (namedWriteable) {
out.writeNamedWriteable((NamedWriteable) constant);
} else {
out.writeGenericValue(constant);
out.writeEnum(type);
switch (type) {
case NAMED_WRITABLE:
out.writeNamedWriteable((NamedWriteable) constant);
break;
case ZONEDDATETIME:
ZonedDateTime zdt = (ZonedDateTime) constant;
out.writeZoneId(zdt.getZone());
out.writeLong(zdt.toInstant().toEpochMilli());
out.writeInt(zdt.getNano());
break;
case GENERIC:
out.writeGenericValue(constant);
break;
}
}

Expand Down Expand Up @@ -76,4 +109,4 @@ public boolean equals(Object obj) {
public String toString() {
return "^" + constant;
}
}
}
Expand Up @@ -9,10 +9,22 @@
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.ZonedDateTime;

public class ConstantProcessorTests extends AbstractWireSerializingTestCase<ConstantProcessor> {

public static ConstantProcessor randomConstantProcessor() {
return new ConstantProcessor(randomAlphaOfLength(5));
if (randomBoolean()) {
Clock clock = Clock.tickMillis(randomZone());
if (randomBoolean()) {
clock = Clock.tick(clock, Duration.ofNanos(1));
}
return new ConstantProcessor( ZonedDateTime.now(clock));
} else {
return new ConstantProcessor(randomAlphaOfLength(5));
}
}

@Override
Expand Down
Expand Up @@ -80,7 +80,11 @@ public Object extract(Bucket bucket) {

Object value = agg.getHits().getAt(0).getFields().values().iterator().next().getValue();
if (fieldDataType == DATETIME || fieldDataType == DATE) {
return DateUtils.asDateTimeWithMillis(Long.parseLong(value.toString()), zoneId);
try {
return DateUtils.asDateTimeWithMillis(Long.parseLong(value.toString()), zoneId);
} catch (NumberFormatException e) {
return DateUtils.asDateTimeWithNanos(value.toString()).withZoneSameInstant(zoneId);
}
} else if (fieldDataType == DATETIME_NANOS) {
return DateUtils.asDateTimeWithNanos(value.toString());
} else if (SqlDataTypes.isTimeBased(fieldDataType)) {
Expand Down

0 comments on commit 47622d6

Please sign in to comment.