diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/package.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/package.scala index 08c309bb..449cfffd 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/package.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/package.scala @@ -1,6 +1,8 @@ package org.apache.flinkx import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer package object api { @@ -13,4 +15,18 @@ package object api { /** Documentation of [[TypeInformation#getTotalFields()]] states the total number of fields must be at least 1. */ private[api] val MinimumTotalFields: Int = 1 + /** Documentation of [[TypeSerializer#getLength()]] states data type with variable length must return `-1`. */ + private[api] val VariableLengthDataType: Int = -1 + + /** Mark a null value in the stream of serialized data. It is validly used only when these conditions are met: + * - Used in both serialize and deserialize methods of the serializer. + * - The range of actual data doesn't include [[Int.MinValue]], i.e., the size of a collection can be only >= 0. + * - The actual data written in the stream is an Int: the first data to deserialize must be an Int for both null + * and non-null cases. + * + * If one of these conditions is not met, consider using another marker or wrap your serializer into a + * [[NullableSerializer]]. + */ + private[api] val NullMarker: Int = Int.MinValue + } diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala index 5ff100e6..c9b5d537 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType import scala.reflect.ClassTag @@ -38,7 +39,7 @@ class ArraySerializer[T](val child: TypeSerializer[T], clazz: Class[T]) extends } } - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): Array[T] = { val length = source.readInt() diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala index c712b74b..333b4455 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala @@ -3,6 +3,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil +import org.apache.flinkx.api.VariableLengthDataType import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]]) @@ -32,7 +33,14 @@ class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers // this one may be used for later reuse, but we never reuse coproducts due to their unclear concrete type subtypeSerializers.head.createInstance().asInstanceOf[T] - override def getLength: Int = -1 + override val getLength: Int = { + val length = subtypeSerializers(0).getLength + if (subtypeSerializers.forall(_.getLength == length)) { + length + } else { + VariableLengthDataType + } + } override def serialize(record: T, target: DataOutputView): Unit = { var subtypeIndex = 0 diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala index 2bef3077..21ef8fa8 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala @@ -20,6 +20,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeutils._ import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType /** Serializer for [[Either]]. Copied from Flink 1.14. */ @@ -50,7 +51,12 @@ class EitherSerializer[A, B]( (rightSerializer == null || rightSerializer.isImmutableType) } - override def getLength: Int = -1 + override val getLength: Int = + if (leftSerializer.getLength == rightSerializer.getLength) { + leftSerializer.getLength + } else { + VariableLengthDataType + } override def copy(from: Either[A, B]): Either[A, B] = from match { case Left(a) => if (leftSerializer.isImmutableType) from else Left(leftSerializer.copy(a)) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala index 82a5df2e..4adffab1 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[::[T]] { @@ -26,7 +27,7 @@ class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut } override def createInstance(): ::[T] = throw new IllegalArgumentException("cannot create instance of non-empty list") - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): ::[T] = { val count = source.readInt() val result = (0 until count) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala index 962b8e38..c5c858dc 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[List[T]] { @@ -25,7 +26,7 @@ class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutab } override def createInstance(): List[T] = List.empty[T] - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): List[T] = { var remaining = source.readInt() val builder = List.newBuilder[T] diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala index 4ed09b8a..a9ba562f 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala @@ -3,6 +3,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil +import org.apache.flinkx.api.VariableLengthDataType import org.apache.flinkx.api.serializer.MapSerializer._ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends MutableSerializer[Map[K, V]] { @@ -28,7 +29,7 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends } override def createInstance(): Map[K, V] = Map.empty[K, V] - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): Map[K, V] = { var remaining = source.readInt() val builder = Map.newBuilder[K, V] diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala index 09a558eb..f6b13a9e 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala @@ -18,10 +18,10 @@ package org.apache.flinkx.api.serializer import java.util.function.Supplier - import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType /** Serializer for cases where no serializer is required but the system still expects one. This happens for * OptionTypeInfo when None is used, or for Either when one of the type parameters is Nothing. @@ -35,7 +35,7 @@ class NothingSerializer extends ImmutableSerializer[Any] { Integer.valueOf(-1) } - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def copy(from: Any): Any = throw new RuntimeException("This must not be used. You encountered a bug.") diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala index 457b4870..225e2401 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala @@ -20,6 +20,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeutils._ import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType /** Serializer for [[Option]]. */ @@ -41,7 +42,7 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) extends Mutable override val isImmutableType: Boolean = elemSerializer == null || elemSerializer.isImmutableType - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def copy(from: Option[A]): Option[A] = from match { case Some(a) => if (isImmutableType) from else Some(elemSerializer.copy(a)) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala index f646a53f..9ee6f626 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala @@ -2,8 +2,8 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType -import scala.collection.immutable import scala.collection.immutable.ArraySeq import scala.reflect.ClassTag @@ -31,7 +31,7 @@ class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl } override def createInstance(): Seq[T] = Seq.empty[T] - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): Seq[T] = { val length = source.readInt() val array = new Array[T](length) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala index 49abbcb6..61c0fcd8 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType class SetSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[Set[T]] { @@ -25,7 +26,7 @@ class SetSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl } override def createInstance(): Set[T] = Set.empty[T] - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): Set[T] = { var remaining = source.readInt() val builder = Set.newBuilder[T] diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala index 167b3a4d..e4208613 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType class VectorSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[Vector[T]] { @@ -25,7 +26,7 @@ class VectorSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut } override def createInstance(): Vector[T] = Vector.empty[T] - override def getLength: Int = -1 + override def getLength: Int = VariableLengthDataType override def deserialize(source: DataInputView): Vector[T] = { var remaining = source.readInt() val builder = Vector.newBuilder[T] diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/offsetDateTimeSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/offsetDateTimeSerializer.scala new file mode 100644 index 00000000..c79e9a76 --- /dev/null +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/offsetDateTimeSerializer.scala @@ -0,0 +1,53 @@ +package org.apache.flinkx.api.serializer + +import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer +import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +import java.io.IOException +import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset} + +/** Serializer for [[OffsetDateTime]]. Handle null value. */ +object OffsetDateTimeSerializer extends ImmutableSerializer[OffsetDateTime] { + + private val localDateTimeSerializer: TypeSerializer[LocalDateTime] = LocalDateTimeSerializer.INSTANCE + private val zoneOffsetSerializer: TypeSerializer[ZoneOffset] = ZoneOffsetSerializer + + override def createInstance: OffsetDateTime = OffsetDateTime.now() + + override def getLength: Int = localDateTimeSerializer.getLength + zoneOffsetSerializer.getLength + + override def serialize(offsetDateTime: OffsetDateTime, target: DataOutputView): Unit = + if (offsetDateTime == null) { + localDateTimeSerializer.serialize(null, target) + zoneOffsetSerializer.serialize(null, target) + } else { + localDateTimeSerializer.serialize(offsetDateTime.toLocalDateTime, target) + zoneOffsetSerializer.serialize(offsetDateTime.getOffset, target) + } + + override def deserialize(source: DataInputView): OffsetDateTime = { + val localDateTime = localDateTimeSerializer.deserialize(source) + val zoneOffset = zoneOffsetSerializer.deserialize(source) + if (localDateTime == null && zoneOffset == null) { + null + } else if (localDateTime == null || zoneOffset == null) { + throw new IOException("LocalDateTime and ZoneOffset should be either both non-null, or both null") + } else { + OffsetDateTime.of(localDateTime, zoneOffset) + } + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + localDateTimeSerializer.copy(source, target) + zoneOffsetSerializer.copy(source, target) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[OffsetDateTime] = + new OffsetDateTimeSerializerSnapshot() + +} + +/** Serializer snapshot for [[OffsetDateTime]]. */ +class OffsetDateTimeSerializerSnapshot + extends SimpleTypeSerializerSnapshot[OffsetDateTime](() => OffsetDateTimeSerializer) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zoneIdSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zoneIdSerializer.scala new file mode 100644 index 00000000..94ba4227 --- /dev/null +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zoneIdSerializer.scala @@ -0,0 +1,43 @@ +package org.apache.flinkx.api.serializer + +import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType + +import java.time.ZoneId + +/** Serializer for [[ZoneId]]. Handle null value. */ +object ZoneIdSerializer extends ImmutableSerializer[ZoneId] { + + private val stringSerializer: TypeSerializer[String] = org.apache.flinkx.api.serializers.stringSerializer + + override def createInstance: ZoneId = ZoneId.systemDefault() + + override def getLength: Int = VariableLengthDataType + + override def serialize(zoneId: ZoneId, target: DataOutputView): Unit = + if (zoneId == null) { + stringSerializer.serialize(null, target) + } else { + stringSerializer.serialize(zoneId.getId, target) + } + + override def deserialize(source: DataInputView): ZoneId = { + val id = stringSerializer.deserialize(source) + if (id == null) { + null + } else { + ZoneId.of(id) + } + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = + stringSerializer.copy(source, target) + + override def snapshotConfiguration(): TypeSerializerSnapshot[ZoneId] = + new ZoneIdSerializerSnapshot() + +} + +/** Serializer snapshot for [[ZoneId]]. */ +class ZoneIdSerializerSnapshot extends SimpleTypeSerializerSnapshot[ZoneId](() => ZoneIdSerializer) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zoneOffsetSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zoneOffsetSerializer.scala new file mode 100644 index 00000000..93a0453c --- /dev/null +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zoneOffsetSerializer.scala @@ -0,0 +1,42 @@ +package org.apache.flinkx.api.serializer + +import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.NullMarker + +import java.lang.Integer.{BYTES => IntBytes} +import java.time.ZoneOffset + +/** Serializer for [[ZoneOffset]]. Handle null value. */ +object ZoneOffsetSerializer extends ImmutableSerializer[ZoneOffset] { + + override def createInstance: ZoneOffset = ZoneOffset.UTC + + override def getLength: Int = IntBytes // 1 Int + + override def serialize(zoneOffset: ZoneOffset, target: DataOutputView): Unit = + if (zoneOffset == null) { + target.writeInt(NullMarker) + } else { + target.writeInt(zoneOffset.getTotalSeconds) + } + + override def deserialize(source: DataInputView): ZoneOffset = { + val totalSeconds = source.readInt() + if (totalSeconds == NullMarker) { + null + } else { + ZoneOffset.ofTotalSeconds(totalSeconds) + } + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + target.writeInt(source.readInt()) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[ZoneOffset] = new ZoneOffsetSerializerSnapshot + +} + +/** Serializer snapshot for [[ZoneOffset]]. */ +class ZoneOffsetSerializerSnapshot extends SimpleTypeSerializerSnapshot[ZoneOffset](() => ZoneOffsetSerializer) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zonedDateTimeSerializer.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zonedDateTimeSerializer.scala new file mode 100644 index 00000000..d3eddf0e --- /dev/null +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/zonedDateTimeSerializer.scala @@ -0,0 +1,53 @@ +package org.apache.flinkx.api.serializer + +import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer +import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flinkx.api.VariableLengthDataType + +import java.io.IOException +import java.time.{LocalDateTime, ZoneId, ZonedDateTime} + +/** Serializer for [[ZonedDateTime]]. Handle null value. */ +object ZonedDateTimeSerializer extends ImmutableSerializer[ZonedDateTime] { + + private val localDateTimeSerializer: TypeSerializer[LocalDateTime] = LocalDateTimeSerializer.INSTANCE + private val zoneIdSerializer: TypeSerializer[ZoneId] = ZoneIdSerializer + + override def createInstance: ZonedDateTime = ZonedDateTime.now() + + override def getLength: Int = VariableLengthDataType + + override def serialize(zonedDateTime: ZonedDateTime, target: DataOutputView): Unit = + if (zonedDateTime == null) { + localDateTimeSerializer.serialize(null, target) + zoneIdSerializer.serialize(null, target) + } else { + localDateTimeSerializer.serialize(zonedDateTime.toLocalDateTime, target) + zoneIdSerializer.serialize(zonedDateTime.getZone, target) + } + + override def deserialize(source: DataInputView): ZonedDateTime = { + val localDateTime = localDateTimeSerializer.deserialize(source) + val zoneId = zoneIdSerializer.deserialize(source) + if (localDateTime == null && zoneId == null) { + null + } else if (localDateTime == null || zoneId == null) { + throw new IOException("LocalDateTime and ZoneId should be either both non-null, or both null") + } else { + ZonedDateTime.of(localDateTime, zoneId) + } + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + localDateTimeSerializer.copy(source, target) + zoneIdSerializer.copy(source, target) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[ZonedDateTime] = + new ZonedDateTimeSerializerSnapshot() + +} + +/** Serializer snapshot for [[ZonedDateTime]]. */ +class ZonedDateTimeSerializerSnapshot extends SimpleTypeSerializerSnapshot[ZonedDateTime](() => ZonedDateTimeSerializer) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializers.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializers.scala index 4e353ad4..292f113d 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializers.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializers.scala @@ -3,6 +3,16 @@ package org.apache.flinkx.api import org.apache.flink.api.common.serialization.{SerializerConfig, SerializerConfigImpl} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.{ + BooleanSerializer, + ByteSerializer, + CharSerializer, + DoubleSerializer, + FloatSerializer, + IntSerializer, + LongSerializer, + ShortSerializer +} import org.apache.flink.api.common.typeutils.base.array._ import org.apache.flinkx.api.mapper.{BigDecMapper, BigIntMapper, UuidMapper} import org.apache.flinkx.api.serializer.MappedSerializer.TypeMapper @@ -19,7 +29,7 @@ import java.lang.{Integer => JInteger} import java.lang.{Character => JCharacter} import java.math.{BigInteger => JBigInteger} import java.math.{BigDecimal => JBigDecimal} -import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, OffsetDateTime, ZoneId, ZoneOffset, ZonedDateTime} import java.util.UUID import scala.reflect.{ClassTag, classTag} @@ -67,22 +77,18 @@ trait serializers extends LowPrioImplicits { implicit val shortArraySerializer: TypeSerializer[Array[Short]] = new ShortPrimitiveArraySerializer() implicit val stringArraySerializer: TypeSerializer[Array[String]] = new StringArraySerializer() - implicit lazy val jIntegerSerializer: TypeSerializer[Integer] = - new org.apache.flink.api.common.typeutils.base.IntSerializer() - implicit lazy val jLongSerializer: TypeSerializer[JLong] = - new org.apache.flink.api.common.typeutils.base.LongSerializer() - implicit lazy val jFloatSerializer: TypeSerializer[JFloat] = - new org.apache.flink.api.common.typeutils.base.FloatSerializer() - implicit lazy val jDoubleSerializer: TypeSerializer[JDouble] = - new org.apache.flink.api.common.typeutils.base.DoubleSerializer() - implicit lazy val jBooleanSerializer: TypeSerializer[JBoolean] = - new org.apache.flink.api.common.typeutils.base.BooleanSerializer() - implicit lazy val jByteSerializer: TypeSerializer[JByte] = - new org.apache.flink.api.common.typeutils.base.ByteSerializer() - implicit lazy val jCharSerializer: TypeSerializer[JCharacter] = - new org.apache.flink.api.common.typeutils.base.CharSerializer() - implicit lazy val jShortSerializer: TypeSerializer[JShort] = - new org.apache.flink.api.common.typeutils.base.ShortSerializer() + implicit lazy val jIntegerSerializer: TypeSerializer[Integer] = new IntSerializer() + implicit lazy val jLongSerializer: TypeSerializer[JLong] = new LongSerializer() + implicit lazy val jFloatSerializer: TypeSerializer[JFloat] = new FloatSerializer() + implicit lazy val jDoubleSerializer: TypeSerializer[JDouble] = new DoubleSerializer() + implicit lazy val jBooleanSerializer: TypeSerializer[JBoolean] = new BooleanSerializer() + implicit lazy val jByteSerializer: TypeSerializer[JByte] = new ByteSerializer() + implicit lazy val jCharSerializer: TypeSerializer[JCharacter] = new CharSerializer() + implicit lazy val jShortSerializer: TypeSerializer[JShort] = new ShortSerializer() + implicit lazy val jZoneIdSerializer: TypeSerializer[ZoneId] = ZoneIdSerializer + implicit lazy val jZoneOffsetSerializer: TypeSerializer[ZoneOffset] = ZoneOffsetSerializer + implicit lazy val jZonedDateTimeSerializer: TypeSerializer[ZonedDateTime] = ZonedDateTimeSerializer + implicit lazy val jOffsetDateTimeSerializer: TypeSerializer[OffsetDateTime] = OffsetDateTimeSerializer // type infos implicit lazy val stringInfo: TypeInformation[String] = BasicTypeInfo.STRING_TYPE_INFO @@ -136,6 +142,10 @@ trait serializers extends LowPrioImplicits { implicit lazy val jLocalDateTypeInfo: TypeInformation[LocalDate] = LocalTimeTypeInfo.LOCAL_DATE implicit lazy val jLocalDateTimeTypeInfo: TypeInformation[LocalDateTime] = LocalTimeTypeInfo.LOCAL_DATE_TIME implicit lazy val jLocalTimeTypeInfo: TypeInformation[LocalTime] = LocalTimeTypeInfo.LOCAL_TIME + implicit lazy val jZoneIdInfo: TypeInformation[ZoneId] = SimpleTypeInfo(0) + implicit lazy val jZoneOffsetInfo: TypeInformation[ZoneOffset] = SimpleTypeInfo(3, 9, keyType = true) + implicit lazy val jZonedDateTimeInfo: TypeInformation[ZonedDateTime] = SimpleTypeInfo(3, 11, keyType = true) + implicit lazy val jOffsetDateTimeInfo: TypeInformation[OffsetDateTime] = SimpleTypeInfo(2, 10, keyType = true) implicit def listCCInfo[T: ClassTag](implicit ls: TypeSerializer[::[T]]): TypeInformation[::[T]] = new CollectionTypeInformation[::[T]](ls) diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInfo.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInfo.scala new file mode 100644 index 00000000..d4d1fa57 --- /dev/null +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInfo.scala @@ -0,0 +1,32 @@ +package org.apache.flinkx.api.typeinfo + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.serialization.SerializerConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer + +import scala.reflect.{ClassTag, classTag} + +final case class SimpleTypeInfo[T: ClassTag: TypeSerializer]( + arity: Int = 1, + totalFields: Int = 1, // The total number of fields must be at least 1. + basicType: Boolean = false, + tupleType: Boolean = false, + keyType: Boolean = false +) extends TypeInformation[T] { + + val typeClass: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]] + val serializer: TypeSerializer[T] = implicitly[TypeSerializer[T]] + + override def createSerializer(config: SerializerConfig): TypeSerializer[T] = serializer.duplicate() + // override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x + def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer.duplicate() + + override def isBasicType: Boolean = basicType + override def isTupleType: Boolean = tupleType + override def isKeyType: Boolean = keyType + override def getTotalFields: Int = totalFields + override def getTypeClass: Class[T] = typeClass + override def getArity: Int = arity + +} diff --git a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala index dbff975b..2eae486e 100644 --- a/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala +++ b/modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala @@ -7,6 +7,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer import scala.reflect.{ClassTag, classTag} +@deprecated("Use SimpleTypeInfo instead.", "1.2.10") abstract class SimpleTypeInformation[T: ClassTag: TypeSerializer] extends TypeInformation[T] { override def createSerializer(config: SerializerConfig): TypeSerializer[T] = implicitly[TypeSerializer[T]].duplicate() diff --git a/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala b/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala index 7e104f25..80ca790d 100644 --- a/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala +++ b/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala @@ -14,7 +14,7 @@ import org.scalatest.Inspectors import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import java.time.{Instant, LocalDate, LocalDateTime} +import java.time.{Instant, LocalDate, LocalDateTime, OffsetDateTime, ZoneId, ZoneOffset, ZonedDateTime} import java.util.UUID class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils { @@ -196,6 +196,25 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test testTypeInfoAndSerializer(NullableFixedSizeCaseClass(null)) } + it should "serialize ZoneId" in { + testTypeInfoAndSerializer(ZoneId.systemDefault()) + } + + it should "serialize ZoneOffset" in { + // Don't test type info, as its arity and total fields depend on the JDK version + testSerializer(ZoneOffset.UTC) + } + + it should "serialize OffsetDateTime" in { + // Don't test type info, as its arity and total fields depend on the JDK version + testSerializer(OffsetDateTime.now()) + } + + it should "serialize ZonedDateTime" in { + // Don't test type info, as its arity and total fields depend on the JDK version + testSerializer(ZonedDateTime.now()) + } + } object SerializerTest {