Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.apache.flinkx

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer

package object api {

Expand All @@ -13,4 +15,18 @@ package object api {
/** Documentation of [[TypeInformation#getTotalFields()]] states the total number of fields must be at least 1. */
private[api] val MinimumTotalFields: Int = 1

/** Documentation of [[TypeSerializer#getLength()]] states data type with variable length must return `-1`. */
private[api] val VariableLengthDataType: Int = -1

/** Mark a null value in the stream of serialized data. It is validly used only when these conditions are met:
* - Used in both serialize and deserialize methods of the serializer.
* - The range of actual data doesn't include [[Int.MinValue]], i.e., the size of a collection can be only >= 0.
* - The actual data written in the stream is an Int: the first data to deserialize must be an Int for both null
* and non-null cases.
*
* If one of these conditions is not met, consider using another marker or wrap your serializer into a
* [[NullableSerializer]].
*/
private[api] val NullMarker: Int = Int.MinValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

import scala.reflect.ClassTag

Expand Down Expand Up @@ -38,7 +39,7 @@ class ArraySerializer[T](val child: TypeSerializer[T], clazz: Class[T]) extends
}
}

override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType

override def deserialize(source: DataInputView): Array[T] = {
val length = source.readInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.apache.flinkx.api.serializer
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import org.apache.flinkx.api.VariableLengthDataType
import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot

class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]])
Expand Down Expand Up @@ -32,7 +33,14 @@ class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers
// this one may be used for later reuse, but we never reuse coproducts due to their unclear concrete type
subtypeSerializers.head.createInstance().asInstanceOf[T]

override def getLength: Int = -1
override val getLength: Int = {
val length = subtypeSerializers(0).getLength
if (subtypeSerializers.forall(_.getLength == length)) {
length
} else {
VariableLengthDataType
}
}

override def serialize(record: T, target: DataOutputView): Unit = {
var subtypeIndex = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flinkx.api.serializer
import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

/** Serializer for [[Either]]. Copied from Flink 1.14.
*/
Expand Down Expand Up @@ -50,7 +51,12 @@ class EitherSerializer[A, B](
(rightSerializer == null || rightSerializer.isImmutableType)
}

override def getLength: Int = -1
override val getLength: Int =
if (leftSerializer.getLength == rightSerializer.getLength) {
leftSerializer.getLength
} else {
VariableLengthDataType
}

override def copy(from: Either[A, B]): Either[A, B] = from match {
case Left(a) => if (leftSerializer.isImmutableType) from else Left(leftSerializer.copy(a))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[::[T]] {

Expand All @@ -26,7 +27,7 @@ class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
}

override def createInstance(): ::[T] = throw new IllegalArgumentException("cannot create instance of non-empty list")
override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType
override def deserialize(source: DataInputView): ::[T] = {
val count = source.readInt()
val result = (0 until count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[List[T]] {

Expand All @@ -25,7 +26,7 @@ class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutab
}

override def createInstance(): List[T] = List.empty[T]
override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType
override def deserialize(source: DataInputView): List[T] = {
var remaining = source.readInt()
val builder = List.newBuilder[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.apache.flinkx.api.serializer
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import org.apache.flinkx.api.VariableLengthDataType
import org.apache.flinkx.api.serializer.MapSerializer._

class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends MutableSerializer[Map[K, V]] {
Expand All @@ -28,7 +29,7 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
}

override def createInstance(): Map[K, V] = Map.empty[K, V]
override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType
override def deserialize(source: DataInputView): Map[K, V] = {
var remaining = source.readInt()
val builder = Map.newBuilder[K, V]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.flinkx.api.serializer

import java.util.function.Supplier

import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

/** Serializer for cases where no serializer is required but the system still expects one. This happens for
* OptionTypeInfo when None is used, or for Either when one of the type parameters is Nothing.
Expand All @@ -35,7 +35,7 @@ class NothingSerializer extends ImmutableSerializer[Any] {
Integer.valueOf(-1)
}

override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType

override def copy(from: Any): Any =
throw new RuntimeException("This must not be used. You encountered a bug.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flinkx.api.serializer
import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

/** Serializer for [[Option]].
*/
Expand All @@ -41,7 +42,7 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) extends Mutable

override val isImmutableType: Boolean = elemSerializer == null || elemSerializer.isImmutableType

override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType

override def copy(from: Option[A]): Option[A] = from match {
case Some(a) => if (isImmutableType) from else Some(elemSerializer.copy(a))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

import scala.collection.immutable
import scala.collection.immutable.ArraySeq
import scala.reflect.ClassTag

Expand Down Expand Up @@ -31,7 +31,7 @@ class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
}

override def createInstance(): Seq[T] = Seq.empty[T]
override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType
override def deserialize(source: DataInputView): Seq[T] = {
val length = source.readInt()
val array = new Array[T](length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

class SetSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[Set[T]] {

Expand All @@ -25,7 +26,7 @@ class SetSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
}

override def createInstance(): Set[T] = Set.empty[T]
override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType
override def deserialize(source: DataInputView): Set[T] = {
var remaining = source.readInt()
val builder = Set.newBuilder[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

class VectorSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[Vector[T]] {

Expand All @@ -25,7 +26,7 @@ class VectorSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
}

override def createInstance(): Vector[T] = Vector.empty[T]
override def getLength: Int = -1
override def getLength: Int = VariableLengthDataType
override def deserialize(source: DataInputView): Vector[T] = {
var remaining = source.readInt()
val builder = Vector.newBuilder[T]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer
import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

import java.io.IOException
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}

/** Serializer for [[OffsetDateTime]]. Handle null value. */
object OffsetDateTimeSerializer extends ImmutableSerializer[OffsetDateTime] {

private val localDateTimeSerializer: TypeSerializer[LocalDateTime] = LocalDateTimeSerializer.INSTANCE
private val zoneOffsetSerializer: TypeSerializer[ZoneOffset] = ZoneOffsetSerializer

override def createInstance: OffsetDateTime = OffsetDateTime.now()

override def getLength: Int = localDateTimeSerializer.getLength + zoneOffsetSerializer.getLength

override def serialize(offsetDateTime: OffsetDateTime, target: DataOutputView): Unit =
if (offsetDateTime == null) {
localDateTimeSerializer.serialize(null, target)
zoneOffsetSerializer.serialize(null, target)
} else {
localDateTimeSerializer.serialize(offsetDateTime.toLocalDateTime, target)
zoneOffsetSerializer.serialize(offsetDateTime.getOffset, target)
}

override def deserialize(source: DataInputView): OffsetDateTime = {
val localDateTime = localDateTimeSerializer.deserialize(source)
val zoneOffset = zoneOffsetSerializer.deserialize(source)
if (localDateTime == null && zoneOffset == null) {
null
} else if (localDateTime == null || zoneOffset == null) {
throw new IOException("LocalDateTime and ZoneOffset should be either both non-null, or both null")
} else {
OffsetDateTime.of(localDateTime, zoneOffset)
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit = {
localDateTimeSerializer.copy(source, target)
zoneOffsetSerializer.copy(source, target)
}

override def snapshotConfiguration(): TypeSerializerSnapshot[OffsetDateTime] =
new OffsetDateTimeSerializerSnapshot()

}

/** Serializer snapshot for [[OffsetDateTime]]. */
class OffsetDateTimeSerializerSnapshot
extends SimpleTypeSerializerSnapshot[OffsetDateTime](() => OffsetDateTimeSerializer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.VariableLengthDataType

import java.time.ZoneId

/** Serializer for [[ZoneId]]. Handle null value. */
object ZoneIdSerializer extends ImmutableSerializer[ZoneId] {

private val stringSerializer: TypeSerializer[String] = org.apache.flinkx.api.serializers.stringSerializer

override def createInstance: ZoneId = ZoneId.systemDefault()

override def getLength: Int = VariableLengthDataType

override def serialize(zoneId: ZoneId, target: DataOutputView): Unit =
if (zoneId == null) {
stringSerializer.serialize(null, target)
} else {
stringSerializer.serialize(zoneId.getId, target)
}

override def deserialize(source: DataInputView): ZoneId = {
val id = stringSerializer.deserialize(source)
if (id == null) {
null
} else {
ZoneId.of(id)
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit =
stringSerializer.copy(source, target)

override def snapshotConfiguration(): TypeSerializerSnapshot[ZoneId] =
new ZoneIdSerializerSnapshot()

}

/** Serializer snapshot for [[ZoneId]]. */
class ZoneIdSerializerSnapshot extends SimpleTypeSerializerSnapshot[ZoneId](() => ZoneIdSerializer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flinkx.api.NullMarker

import java.lang.Integer.{BYTES => IntBytes}
import java.time.ZoneOffset

/** Serializer for [[ZoneOffset]]. Handle null value. */
object ZoneOffsetSerializer extends ImmutableSerializer[ZoneOffset] {

override def createInstance: ZoneOffset = ZoneOffset.UTC

override def getLength: Int = IntBytes // 1 Int

override def serialize(zoneOffset: ZoneOffset, target: DataOutputView): Unit =
if (zoneOffset == null) {
target.writeInt(NullMarker)
} else {
target.writeInt(zoneOffset.getTotalSeconds)
}

override def deserialize(source: DataInputView): ZoneOffset = {
val totalSeconds = source.readInt()
if (totalSeconds == NullMarker) {
null
} else {
ZoneOffset.ofTotalSeconds(totalSeconds)
}
}

override def copy(source: DataInputView, target: DataOutputView): Unit = {
target.writeInt(source.readInt())
}

override def snapshotConfiguration(): TypeSerializerSnapshot[ZoneOffset] = new ZoneOffsetSerializerSnapshot

}

/** Serializer snapshot for [[ZoneOffset]]. */
class ZoneOffsetSerializerSnapshot extends SimpleTypeSerializerSnapshot[ZoneOffset](() => ZoneOffsetSerializer)
Loading