diff --git a/doc/changes/changes_0.9.1.md b/doc/changes/changes_0.9.1.md index d0202a40..c7653cd8 100644 --- a/doc/changes/changes_0.9.1.md +++ b/doc/changes/changes_0.9.1.md @@ -4,6 +4,7 @@ * #106: Updated parquet-hadoop version that includes api changes (PR #107). * #108: Added dictionary aware Parquet decoders (PR #109). +* #11: Added support for importing Parquet complex (LIST, MAP) types (PR #111). ## Documentation @@ -13,24 +14,24 @@ ### Runtime Dependency Updates -* Updated `org.apache.orc:orc-core` from `1.6.4` to `1.6.5`. -* Updated `org.apache.parquet:parquet-hadoop` from `1.10.1` to `1.11.1`. -* Updated `com.exasol:import-export-udf-common-scala` from `0.1.0` to `0.2.0`. +* Updated to `org.apache.orc:orc-core:1.6.5` (was `1.6.4`) +* Updated to `org.apache.parquet:parquet-hadoop:1.11.1` (was `1.10.1`) +* Updated to `com.exasol:import-export-udf-common-scala:0.2.0` (was `0.1.0`) * Removed `com.exasol:exasol-script-api` * Removed `com.typesafe.scala-logging:scala-logging` * Removed `com.fasterxml.jackson.core:jackson-core` * Removed `com.fasterxml.jackson.core:jackson-databind` * Removed `com.fasterxml.jackson.core:jackson-annotations` * Removed `com.fasterxml.jackson.module:"jackson-module-scala` -* Removed libraries are included in `import-export-udf-common-scala` dependency. +* Removed libraries are included in `import-export-udf-common-scala` dependency ### Test Dependency Updates -* Updates `org.scalatest:scalatest` from `3.2.2` to `3.2.3`. -* Updated `org.mockito:mockito-core` from `3.5.13` to `3.6.0`. +* Updates to `org.scalatest:scalatest:3.2.3` (was `3.2.2`) +* Updated to `org.mockito:mockito-core:3.6.28` (was `3.5.13`) ### Plugin Updates -* Updated `org.wartremover:sbt-wartremover` from `2.4.10` to `2.4.13`. -* Updated `org.wartremover:sbt-wartremover-contrib` from `1.3.8` to `1.3.11`. -* Updated `com.github.cb372:sbt-explicit-dependencies` from `0.2.13` to `0.2.15`. +* Updated to `org.wartremover:sbt-wartremover:2.4.13` (was `2.4.10`) +* Updated to `org.wartremover:sbt-wartremover-contrib:1.3.11` (was `1.3.8`) +* Updated to `com.github.cb372:sbt-explicit-dependencies:0.2.15` (was `0.2.13`) diff --git a/doc/user_guide/user_guide.md b/doc/user_guide/user_guide.md index 11aabc6b..0a92f7a0 100644 --- a/doc/user_guide/user_guide.md +++ b/doc/user_guide/user_guide.md @@ -409,6 +409,10 @@ column types when preparing the table. | fixed_len_byte_array | | VARCHAR(n), CHAR(n) | | fixed_len_byte_array | decimal(p, s) | DECIMAL(p, s) | | int96 | | TIMESTAMP | +| group | | VARCHAR(n) | +| group | LIST | VARCHAR(n) | +| group | MAP | VARCHAR(n) | +| group | REPEATED | VARCHAR(n) | ## Amazon S3 diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 85124e8c..682530e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -18,7 +18,7 @@ object Dependencies { // Test dependencies versions private val ScalaTestVersion = "3.2.3" private val ScalaTestPlusVersion = "1.0.0-M2" - private val MockitoCoreVersion = "3.6.0" + private val MockitoCoreVersion = "3.6.28" val Resolvers: Seq[Resolver] = Seq( "Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases" diff --git a/src/main/scala/com/exasol/cloudetl/parquet/converter/ConverterFactory.scala b/src/main/scala/com/exasol/cloudetl/parquet/converter/ConverterFactory.scala index 56175558..b987c3f8 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/converter/ConverterFactory.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/converter/ConverterFactory.scala @@ -1,10 +1,12 @@ package com.exasol.cloudetl.parquet.converter import org.apache.parquet.io.api.Converter +import org.apache.parquet.schema.GroupType import org.apache.parquet.schema.OriginalType import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Type.Repetition /** * Parquet field data type converter factory class. @@ -24,64 +26,112 @@ object ConverterFactory { * @return specific data converter for the field type */ def apply( - fieldIndex: Int, parquetType: Type, + fieldIndex: Int, parentDataHolder: ValueHolder - ): Converter = { - if (!parquetType.isPrimitive()) { - throw new UnsupportedOperationException("Currently only primitive types are supported") + ): Converter = + if (parquetType.isPrimitive()) { + createPrimitiveConverter(parquetType.asPrimitiveType(), fieldIndex, parentDataHolder) + } else { + createComplexConverter(parquetType, fieldIndex, parentDataHolder) } - val primitiveType = parquetType.asPrimitiveType() - primitiveType.getPrimitiveTypeName() match { - case BOOLEAN => ParquetPrimitiveConverter(fieldIndex, parentDataHolder) - case DOUBLE => ParquetPrimitiveConverter(fieldIndex, parentDataHolder) - case FLOAT => ParquetPrimitiveConverter(fieldIndex, parentDataHolder) - case BINARY => createBinaryConverter(fieldIndex, parentDataHolder, primitiveType) - case FIXED_LEN_BYTE_ARRAY => - createFixedByteArrayConverter(fieldIndex, parentDataHolder, primitiveType) - case INT32 => createIntegerConverter(fieldIndex, parentDataHolder, primitiveType) - case INT64 => createLongConverter(fieldIndex, parentDataHolder, primitiveType) - case INT96 => ParquetTimestampInt96Converter(fieldIndex, parentDataHolder) + + private[this] def createPrimitiveConverter( + parquetType: PrimitiveType, + index: Int, + parentHolder: ValueHolder + ): Converter = + parquetType.getPrimitiveTypeName() match { + case BOOLEAN => ParquetPrimitiveConverter(index, parentHolder) + case DOUBLE => ParquetPrimitiveConverter(index, parentHolder) + case FLOAT => ParquetPrimitiveConverter(index, parentHolder) + case BINARY => createBinaryConverter(parquetType, index, parentHolder) + case FIXED_LEN_BYTE_ARRAY => createFixedByteArrayConverter(parquetType, index, parentHolder) + case INT32 => createIntegerConverter(parquetType, index, parentHolder) + case INT64 => createLongConverter(parquetType, index, parentHolder) + case INT96 => ParquetTimestampInt96Converter(index, parentHolder) + } + + private[this] def createComplexConverter( + parquetType: Type, + index: Int, + parentHolder: ValueHolder + ): Converter = { + val groupType = parquetType.asGroupType() + parquetType.getOriginalType() match { + case OriginalType.LIST => createArrayConverter(groupType.getType(0), index, parentHolder) + case OriginalType.MAP => MapConverter(parquetType.asGroupType(), index, parentHolder) + case _ => + if (groupType.isRepetition(Repetition.REPEATED)) { + createRepeatedConverter(groupType, index, parentHolder) + } else { + StructConverter(groupType, index, parentHolder) + } } } private[this] def createBinaryConverter( + primitiveType: PrimitiveType, index: Int, - holder: ValueHolder, - primitiveType: PrimitiveType + holder: ValueHolder ): Converter = primitiveType.getOriginalType() match { case OriginalType.UTF8 => ParquetStringConverter(index, holder) - case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType) + case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder) case _ => ParquetPrimitiveConverter(index, holder) } private[this] def createFixedByteArrayConverter( + primitiveType: PrimitiveType, index: Int, - holder: ValueHolder, - primitiveType: PrimitiveType + holder: ValueHolder ): Converter = primitiveType.getOriginalType() match { - case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType) + case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder) case _ => ParquetPrimitiveConverter(index, holder) } private[this] def createIntegerConverter( + primitiveType: PrimitiveType, index: Int, - holder: ValueHolder, - primitiveType: PrimitiveType + holder: ValueHolder ): Converter = primitiveType.getOriginalType() match { case OriginalType.DATE => ParquetDateConverter(index, holder) - case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType) + case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder) case _ => ParquetPrimitiveConverter(index, holder) } private[this] def createLongConverter( + primitiveType: PrimitiveType, index: Int, - holder: ValueHolder, - primitiveType: PrimitiveType + holder: ValueHolder ): Converter = primitiveType.getOriginalType() match { case OriginalType.TIMESTAMP_MILLIS => ParquetTimestampMillisConverter(index, holder) - case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType) + case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder) case _ => ParquetPrimitiveConverter(index, holder) } + private[this] def createArrayConverter( + repeatedType: Type, + index: Int, + holder: ValueHolder + ): Converter = + if (repeatedType.isPrimitive()) { + ArrayPrimitiveConverter(repeatedType.asPrimitiveType(), index, holder) + } else if (repeatedType.asGroupType().getFieldCount() > 1) { + ArrayGroupConverter(repeatedType, index, holder) + } else { + val innerElementType = repeatedType.asGroupType().getType(0) + ArrayGroupConverter(innerElementType, index, holder) + } + + private[this] def createRepeatedConverter( + groupType: GroupType, + index: Int, + holder: ValueHolder + ): Converter = + if (groupType.getFieldCount() > 1) { + RepeatedGroupConverter(groupType, index, holder) + } else { + val innerPrimitiveType = groupType.getType(0).asPrimitiveType() + RepeatedPrimitiveConverter(innerPrimitiveType, index, holder) + } } diff --git a/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetConverter.scala b/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetConverter.scala index aec6aa63..82f4074b 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetConverter.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetConverter.scala @@ -4,23 +4,50 @@ import java.math.BigDecimal import java.math.BigInteger import java.nio.ByteOrder +import scala.collection.mutable.{Map => MMap} +import scala.collection.mutable.ArrayBuffer + import com.exasol.cloudetl.util.DateTimeUtil import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.schema.GroupType import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ +import org.apache.parquet.schema.Type /** * An interface for the Parquet data type converters. * + * The Parquet reader calls the [[ParquetRootConverter]] for the top + * level Parquet schema. The root converter then generates subsequent + * converters using [[ConverterFactory]] for each type. + * * The sealed trait ensures that all the implementations should be in * this file. */ sealed trait ParquetConverter +/** + * A default converter for Parquet primitive types. + * + * The Parquet schema as below would be converted with this converter: + * + * {{{ + * message parquet_file_schema { + * required boolean column_boolean; + * required int32 column_int; + * required int64 column_long; + * required float column_float; + * required double column_double; + * required binary column_string; + * } + * }}} + */ final case class ParquetPrimitiveConverter(index: Int, holder: ValueHolder) extends PrimitiveConverter with ParquetConverter { @@ -32,6 +59,21 @@ final case class ParquetPrimitiveConverter(index: Int, holder: ValueHolder) override def addLong(value: Long): Unit = holder.put(index, value) } +/** + * A converter for Parquet binary type with {@code STRING} or {@code + * UTF8} annotation. + * + * Since string types are stored using the dictiony encoding, the + * converter uses the dictionary metadata when decoding. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * required binary name (UTF8); + * required binary surname (STRING); + * } + * }}} + */ final case class ParquetStringConverter(index: Int, holder: ValueHolder) extends PrimitiveConverter with ParquetConverter { @@ -53,10 +95,27 @@ final case class ParquetStringConverter(index: Int, holder: ValueHolder) holder.put(index, decodedDictionary(dictionaryId)) } +/** + * A converter for {@code DECIMAL} annotated Parquet types. + * + * The decimal annotation can be used for the following Parquet types: + * {@code INT32}, {@code INT64}, {@code FIXED_LEN_BYTE_ARRAY} and {@code + * BINARY}. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * required int32 decimal_int (DECIMAL(9,2)); + * required int64 decimal_long (DECIMAL(18,2)); + * required fixed_len_byte_array(20) decimal_fixed (DECIMAL(20,2)); + * required binary decimal_binary (DECIMAL(30,2)); + * } + * }}} + */ final case class ParquetDecimalConverter( + primitiveType: PrimitiveType, index: Int, - holder: ValueHolder, - primitiveType: PrimitiveType + holder: ValueHolder ) extends PrimitiveConverter with ParquetConverter { @@ -64,7 +123,6 @@ final case class ParquetDecimalConverter( primitiveType.getLogicalTypeAnnotation().asInstanceOf[DecimalLogicalTypeAnnotation] private[this] val precision = decimalType.getPrecision() private[this] val scale = decimalType.getScale() - private[this] var decodedDictionary: Array[BigDecimal] = null override def hasDictionarySupport(): Boolean = true @@ -106,6 +164,17 @@ final case class ParquetDecimalConverter( holder.put(index, decodedDictionary(dictionaryId)) } +/** + * A converter for Parquet {@code INT64} with {@code TIMESTAMP_MILLIS} + * annotation. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * required int64 timestamp (TIMESTAMP_MILLIS); + * } + * }}} + */ final case class ParquetTimestampMillisConverter(index: Int, holder: ValueHolder) extends PrimitiveConverter with ParquetConverter { @@ -113,6 +182,18 @@ final case class ParquetTimestampMillisConverter(index: Int, holder: ValueHolder holder.put(index, DateTimeUtil.getTimestampFromMillis(value)) } +/** + * A converter for Parquet {@code INT96} type. + * + * It is converted into a timestamp with nanosecond precision. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * required int96 timestamp_nanos; + * } + * }}} + */ final case class ParquetTimestampInt96Converter(index: Int, holder: ValueHolder) extends PrimitiveConverter with ParquetConverter { @@ -126,6 +207,18 @@ final case class ParquetTimestampInt96Converter(index: Int, holder: ValueHolder) } } +/** + * A converter for Parquet {@code INT32} with {@code DATE} annotation. + * + * The integer value represents the number of days since the epoch. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * required int32 date (DATE); + * } + * }}} + */ final case class ParquetDateConverter(index: Int, holder: ValueHolder) extends PrimitiveConverter with ParquetConverter { @@ -134,3 +227,318 @@ final case class ParquetDateConverter(index: Int, holder: ValueHolder) holder.put(index, date) } } + +/** + * A Parquet converter for the + * [[org.apache.parquet.schema.Type.Repetition.REPEATED]] group type. + * + * It is converted into an array of key value maps. + * + * The following schema is converted with this converter: + * {{{ + * message parquet_file_schema { + * repeated group person { + * required binary name (UTF8); + * optional int32 age; + * } + * } + * }}} + */ +final case class RepeatedGroupConverter( + groupType: GroupType, + index: Int, + parentDataHolder: ValueHolder +) extends GroupConverter + with ParquetConverter { + private[this] val size = groupType.getFieldCount() + private[this] val converters = createFieldConverters() + private[this] val dataHolder = Array.ofDim[Any](size) + private[this] var currentIndex: Int = 0 + private[this] var currentValues: MMap[String, Any] = null + + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + + override def start(): Unit = { + if (currentValues == null) { + parentDataHolder.put(index, dataHolder) + } + currentValues = MMap.empty[String, Any] + } + + override def end(): Unit = { + dataHolder(currentIndex) = currentValues + currentIndex += 1 + } + + private[this] def createFieldConverters(): Array[Converter] = { + val converters = Array.ofDim[Converter](size) + for { i <- 0 until size } { + converters(i) = ConverterFactory( + groupType.getType(i), + i, + new ValueHolder { + override def put(index: Int, value: Any): Unit = { + val _ = currentValues.put(groupType.getFieldName(i), value) + } + override def reset(): Unit = {} + override def getValues(): Seq[Any] = Seq.empty[Any] + } + ) + } + converters + } + +} + +/** + * A Parquet converter for the + * [[org.apache.parquet.schema.Type.Repetition.REPEATED]] group with a + * single type. + * + * It is converted into an array of values. + * + * The following schema is converted with this converter: + * {{{ + * message parquet_file_schema { + * repeated group person { + * required binary name (UTF8); + * } + * } + * }}} + */ +final case class RepeatedPrimitiveConverter( + elementType: Type, + index: Int, + parentDataHolder: ValueHolder +) extends GroupConverter + with ParquetConverter { + private[this] val elementConverter = createPrimitiveElementConverter() + private[this] val values = ArrayBuffer.empty[Any] + private[this] var currentValue: Any = null + + override def getConverter(fieldIndex: Int): Converter = { + if (fieldIndex != 0) { + throw new IllegalArgumentException( + s"Illegal index '$fieldIndex' to repeated primitive converter. It should be only '0'." + ) + } + elementConverter + } + override def start(): Unit = { + if (currentValue == null) { + parentDataHolder.put(index, values) + } + currentValue = null + } + override def end(): Unit = { + val _ = values += currentValue + } + + private[this] def createPrimitiveElementConverter(): Converter = + ConverterFactory( + elementType, + index, + new ValueHolder { + override def put(index: Int, value: Any): Unit = + currentValue = value + override def reset(): Unit = {} + override def getValues(): Seq[Any] = Seq.empty[Any] + } + ) +} + +/** + * A Parquet converter for the {@code LIST} annotated types. + */ +sealed trait ArrayConverter { + val index: Int + val parentDataHolder: ValueHolder + val dataHolder = new AppendedValueHolder() + val elementConverter = createElementConverter() + + def getConverter(fieldIndex: Int): Converter = { + if (fieldIndex != 0) { + throw new IllegalArgumentException( + s"Illegal index '$fieldIndex' to array converter. It should be only '0'." + ) + } + elementConverter + } + def start(): Unit = dataHolder.reset() + def end(): Unit = parentDataHolder.put(index, dataHolder.getValues()) + + def createElementConverter(): Converter +} + +/** + * A converter for the non standard Parquet list annotated group with a + * single repeated type. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * optional group heights (LIST) { + * repeated int32 height; + * } + * } + * }}} + */ +final case class ArrayPrimitiveConverter( + elementType: PrimitiveType, + val index: Int, + val parentDataHolder: ValueHolder +) extends GroupConverter + with ParquetConverter + with ArrayConverter { + + override def createElementConverter(): Converter = + ConverterFactory(elementType, index, dataHolder) +} + +/** + * A converter for the standard 3-level Parquet list annotated group + * type. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * optional group prices (LIST) { + * repeated group list { + * required double price; + * } + * } + * } + * }}} + */ +final case class ArrayGroupConverter( + elementType: Type, + val index: Int, + val parentDataHolder: ValueHolder +) extends GroupConverter + with ParquetConverter + with ArrayConverter { + + override def createElementConverter(): Converter = new GroupConverter { + val innerConverter = ConverterFactory(elementType, index, dataHolder) + override def getConverter(index: Int): Converter = innerConverter + override def start(): Unit = {} + override def end(): Unit = {} + } +} + +/** + * A Parquet converter for the {@code MAP} annotated type. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * optional group map (MAP) { + * repeated group key_value { + * required binary key (UTF8); + * required int64 value; + * } + * } + * } + * }}} + */ +final case class MapConverter(groupType: GroupType, index: Int, parentDataHolder: ValueHolder) + extends GroupConverter + with ParquetConverter { + private[this] val keysDataHolder = new AppendedValueHolder() + private[this] val valuesDataHolder = new AppendedValueHolder() + private[this] val converter = createMapConverter() + + override def getConverter(fieldIndex: Int): Converter = { + if (fieldIndex < 0 || fieldIndex > 1) { + throw new IllegalArgumentException( + s"Illegal index '$fieldIndex' to map converter. It should be " + + "either '0' for keys converter or '1' for values converter." + ) + } + converter + } + override def start(): Unit = { + keysDataHolder.reset() + valuesDataHolder.reset() + } + override def end(): Unit = { + val keys = keysDataHolder.getValues() + val values = valuesDataHolder.getValues() + val map = keys.zip(values).toMap + parentDataHolder.put(index, map) + } + + private[this] def createMapConverter(): Converter = new GroupConverter { + val mapType = groupType.getFields().get(0).asGroupType() + val mapKeyType = mapType.getFields().get(0) + val mapValueType = mapType.getFields().get(1) + val keysConverter = ConverterFactory(mapKeyType, index, keysDataHolder) + val valuesConverter = ConverterFactory(mapValueType, index, valuesDataHolder) + + override def getConverter(index: Int): Converter = + if (index == 0) { + keysConverter + } else { + valuesConverter + } + override def start(): Unit = {} + override def end(): Unit = {} + } +} + +/** + * An abstract base class for Parquet struct converters. + */ +abstract class AbstractStructConverter( + groupType: GroupType, + index: Int, + parentDataHolder: ValueHolder +) extends GroupConverter { + private[this] val size = groupType.getFieldCount() + protected[this] val dataHolder = IndexedValueHolder(size) + private[this] val converters = createFieldConverters() + + override final def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + override final def start(): Unit = dataHolder.reset() + override final def end(): Unit = endOperation() + + def endOperation(): Unit + + private[this] def createFieldConverters(): Array[Converter] = { + val converters = Array.ofDim[Converter](size) + for { i <- 0 until size } { + converters(i) = ConverterFactory(groupType.getType(i), i, dataHolder) + } + converters + } +} + +/** + * A converter for the Parquet nested group. + * + * The following schema fits this converter: + * {{{ + * message parquet_file_schema { + * required binary name (UTF8); + * required group values { + * optional int32 height; + * optional int32 weight; + * } + * } + * }}} + */ +final case class StructConverter(groupType: GroupType, index: Int, parentDataHolder: ValueHolder) + extends AbstractStructConverter(groupType, index, parentDataHolder) + with ParquetConverter { + + override def endOperation(): Unit = { + val map = dataHolder + .getValues() + .zipWithIndex + .map { + case (value, i) => (groupType.getFieldName(i), value) + } + .toMap + parentDataHolder.put(index, map) + } +} diff --git a/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetRootConverter.scala b/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetRootConverter.scala index d0204397..4ee4d75e 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetRootConverter.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/converter/ParquetRootConverter.scala @@ -1,7 +1,7 @@ package com.exasol.cloudetl.parquet.converter -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter +import com.exasol.common.json.JsonMapper + import org.apache.parquet.schema.GroupType /** @@ -12,25 +12,24 @@ import org.apache.parquet.schema.GroupType * * @param schema the main schema for the Parquet file */ -final case class ParquetRootConverter(schema: GroupType) extends GroupConverter { - private[this] val size = schema.getFieldCount() - private[this] val dataHolder = IndexedValueHolder(size) - private[this] val converters = getFieldConverters() - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - override def start(): Unit = dataHolder.reset() - override def end(): Unit = {} +final case class ParquetRootConverter(schema: GroupType) + extends AbstractStructConverter(schema, -1, EmptyValueHolder) { /** * Returns deserialized Parquet field values for this schema. + * + * It converts the non-primitive field types to JSON string. */ - def getResult(): Seq[Any] = dataHolder.getValues() - - private[this] def getFieldConverters(): Array[Converter] = { - val converters = Array.ofDim[Converter](size) - for { i <- 0 until size } { - converters(i) = ConverterFactory(i, schema.getType(i), dataHolder) + def getResult(): Seq[Any] = + dataHolder.getValues().zipWithIndex.map { + case (value, i) => + if (schema.getType(i).isPrimitive()) { + value + } else { + JsonMapper.toJson(value) + } } - converters - } + + override def endOperation(): Unit = {} + } diff --git a/src/main/scala/com/exasol/cloudetl/parquet/converter/ValueHolder.scala b/src/main/scala/com/exasol/cloudetl/parquet/converter/ValueHolder.scala index f6ff9f58..3a1067b0 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/converter/ValueHolder.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/converter/ValueHolder.scala @@ -1,12 +1,14 @@ package com.exasol.cloudetl.parquet.converter +import scala.collection.mutable.ArrayBuffer + /** - * An interface for the storing converted Parquet values. + * An interface for storing the converted Parquet values. * * Implementations of this interface choose internal structure depending * on the value converter. */ -sealed trait ValueHolder { +trait ValueHolder { /** * Reset the internal value holder data structure. @@ -37,6 +39,21 @@ final case class IndexedValueHolder(size: Int) extends ValueHolder { override def put(index: Int, value: Any): Unit = array.update(index, value) } +/** + * An appending value holder. + * + * It append converted Parquet values to the end of the values array. + */ +final case class AppendedValueHolder() extends ValueHolder { + private[this] var array = ArrayBuffer.empty[Any] + + override def reset(): Unit = array.clear() + override def getValues(): Seq[Any] = array.clone().toSeq + override def put(index: Int, value: Any): Unit = { + val _ = array += value + } +} + /** * An empty value holder used for top-level parent converter. */ diff --git a/src/test/scala/com/exasol/cloudetl/parquet/BaseParquetReaderTest.scala b/src/test/scala/com/exasol/cloudetl/parquet/BaseParquetReaderTest.scala new file mode 100644 index 00000000..0d73db7d --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/parquet/BaseParquetReaderTest.scala @@ -0,0 +1,81 @@ +package com.exasol.cloudetl.parquet + +import java.io.Closeable +import java.nio.file.Path + +import com.exasol.cloudetl.DummyRecordsTest +import com.exasol.cloudetl.source.ParquetSource +import com.exasol.common.data.Row + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path => HPath} +import org.apache.hadoop.fs.FileSystem +import org.apache.parquet.example.data.Group +import org.apache.parquet.example.data.GroupWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite + +trait BaseParquetReaderTest extends AnyFunSuite with BeforeAndAfterEach with DummyRecordsTest { + + private[this] var conf: Configuration = _ + private[this] var fileSystem: FileSystem = _ + private[this] var outputDirectory: Path = _ + private[this] var path: HPath = _ + + override final def beforeEach(): Unit = { + conf = new Configuration + fileSystem = FileSystem.get(conf) + outputDirectory = createTemporaryFolder("parquetRowReaderTest") + path = new HPath(outputDirectory.toUri.toString, "part-00000.parquet") + () + } + + override final def afterEach(): Unit = { + deleteFiles(outputDirectory) + () + } + + protected final def withResource[T <: Closeable](writer: T)(block: T => Unit): Unit = { + block(writer) + writer.close() + } + + protected final def getRecords(): Seq[Row] = + ParquetSource(path, conf, fileSystem) + .stream() + .toSeq + + protected final def getParquetWriter( + schema: MessageType, + dictionaryEncoding: Boolean + ): ParquetWriter[Group] = + BaseGroupWriterBuilder(path, schema) + .withDictionaryEncoding(dictionaryEncoding) + .build() + + private[this] case class BaseGroupWriteSupport(schema: MessageType) + extends WriteSupport[Group] { + var writer: GroupWriter = null + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = + writer = new GroupWriter(recordConsumer, schema) + + override def init(configuration: Configuration): WriteSupport.WriteContext = + new WriteSupport.WriteContext(schema, new java.util.HashMap[String, String]()) + + override def write(record: Group): Unit = + writer.write(record) + } + + private[this] case class BaseGroupWriterBuilder(path: HPath, schema: MessageType) + extends ParquetWriter.Builder[Group, BaseGroupWriterBuilder](path) { + override def getWriteSupport(conf: Configuration): WriteSupport[Group] = + BaseGroupWriteSupport(schema) + override def self(): BaseGroupWriterBuilder = this + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderComplexTypesTest.scala b/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderComplexTypesTest.scala new file mode 100644 index 00000000..82d8480c --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderComplexTypesTest.scala @@ -0,0 +1,351 @@ +package com.exasol.cloudetl.parquet + +import com.exasol.common.data.Row + +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.schema.MessageTypeParser + +class ParquetRowReaderComplexTypesTest extends BaseParquetReaderTest { + + test("reads array of strings as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group names (LIST) { + | repeated group list { + | required binary name (UTF8); + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val names = record.addGroup(0) + names.addGroup(0).append("name", "John") + names.addGroup(0).append("name", "Jane") + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("""["John","Jane"]"""))) + } + + test("reads array of ints as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group ages (LIST) { + | repeated group list { + | required int32 age; + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val ages = record.addGroup(0) + ages.addGroup(0).append("age", 3) + ages.addGroup(0).append("age", 4) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("[3,4]"))) + } + + test("reads array of doubles as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group prices (LIST) { + | repeated group list { + | required double price; + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val prices = record.addGroup(0) + prices.addGroup(0).append("price", 3.14) + prices.addGroup(0).append("price", 2.71) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("[3.14,2.71]"))) + } + + test("reads non-standard array as JSON array string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group heights (LIST) { + | repeated int32 height; + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val prices = record.addGroup(0) + prices.append("height", 314) + prices.append("height", 271) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("[314,271]"))) + } + + test("reads repeated group with single element as JSON array string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | repeated group person { + | required binary name (UTF8); + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + var person = record.addGroup(0) + person.append("name", "John") + person = record.addGroup(0) + person.append("name", "Jane") + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("""["John","Jane"]"""))) + } + + test("reads repeated group many elements as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | repeated group person { + | required binary name (UTF8); + | optional int32 age; + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + var person = record.addGroup(0) + person.append("name", "John").append("age", 24) + person = record.addGroup(0) + person.append("name", "Jane").append("age", 22) + writer.write(record) + } + val expected = Row(Seq("""[{"age":24,"name":"John"},{"age":22,"name":"Jane"}]""")) + assert(getRecords()(0) === expected) + } + + test("reads array of arrays as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group arrays (LIST) { + | repeated group list { + | required group inner (LIST) { + | repeated group list { + | required int32 element; + | } + | } + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val arrays = record.addGroup(0).addGroup(0) + var inner = arrays.addGroup("inner") + inner.addGroup(0).append("element", 1) + inner.addGroup(0).append("element", 2) + inner = arrays.addGroup("inner") + inner.addGroup(0).append("element", 3) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("[[1,2],[3]]"))) + } + + test("reads array of maps as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group maps (LIST) { + | repeated group list { + | optional group map (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | optional double price; + | } + | } + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val array = record.addGroup(0).addGroup(0) + var map = array.addGroup("map") + map.addGroup("key_value").append("key", "key1").append("price", 3.14) + map.addGroup("key_value").append("key", "key2").append("price", 2.71) + map = array.addGroup("map") + map.addGroup("key_value").append("key", "a").append("price", 100.0) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("""[{"key1":3.14,"key2":2.71},{"a":100.0}]"""))) + } + + test("reads map as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group map (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | required int64 value; + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val map = record.addGroup(0) + map.addGroup("key_value").append("key", "key1").append("value", 314L) + map.addGroup("key_value").append("key", "key2").append("value", 271L) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("""{"key1":314,"key2":271}"""))) + } + + test("reads map with array values as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group map (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | optional group prices (LIST) { + | repeated group list { + | required double price; + | } + | } + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val map = record.addGroup(0).addGroup("key_value") + val prices = map.append("key", "key1").addGroup("prices") + prices.addGroup(0).append("price", 3.14) + prices.addGroup(0).append("price", 2.71) + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("""{"key1":[3.14,2.71]}"""))) + } + + test("reads map with group values as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | optional group maps (MAP) { + | repeated group key_value { + | required binary name (UTF8); + | required group values { + | optional int32 height; + | optional int32 weight; + | } + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, true)) { writer => + val record = new SimpleGroup(schema) + val maps = record.addGroup(0) + var map = maps.addGroup("key_value") + map.append("name", "John").addGroup("values").append("height", 170).append("weight", 70) + map = maps.addGroup("key_value") + map.append("name", "Jane").addGroup("values").append("height", 160).append("weight", 60) + writer.write(record) + } + val expected = + Row(Seq("""{"John":{"height":170,"weight":70},"Jane":{"height":160,"weight":60}}""")) + assert(getRecords()(0) === expected) + } + + test("reads group as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | required binary name (UTF8); + | optional group contacts { + | required binary name (UTF8); + | optional binary phoneNumber (UTF8); + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, false)) { writer => + val record = new SimpleGroup(schema) + record.add(0, "John") + val contacts = record.addGroup(1) + contacts.append("name", "Jane").append("phoneNumber", "1337") + writer.write(record) + } + val expected = Row(Seq("John", """{"name":"Jane","phoneNumber":"1337"}""")) + assert(getRecords()(0) === expected) + } + + test("reads group with repeated group as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | required binary name (UTF8); + | optional group contacts { + | repeated group person { + | required binary name (UTF8); + | optional binary phoneNumber (UTF8); + | } + | optional int32 count; + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, false)) { writer => + val record = new SimpleGroup(schema) + record.add(0, "John") + val contacts = record.addGroup(1) + contacts.addGroup(0).append("name", "Jane").append("phoneNumber", "1337") + contacts.addGroup(0).append("name", "Jake") + contacts.append("count", 2) + writer.write(record) + } + val expected = Row( + Seq( + "John", + """{"person":[{"phoneNumber":"1337","name":"Jane"},{"name":"Jake"}],"count":2}""" + ) + ) + assert(getRecords()(0) === expected) + } + + test("reads nested groups as JSON string") { + val schema = MessageTypeParser.parseMessageType( + """|message test { + | required binary name (UTF8); + | optional group contacts (MAP) { + | repeated group key_value { + | required binary name (UTF8); + | optional group numbers (LIST) { + | repeated group list { + | optional binary phoneNumber (UTF8); + | } + | } + | } + | } + |} + |""".stripMargin + ) + withResource(getParquetWriter(schema, false)) { writer => + val record = new SimpleGroup(schema) + record.add(0, "John") + val contacts = record.addGroup(1) + val phoneNumbers = contacts.addGroup(0).append("name", "Jane").addGroup("numbers") + phoneNumbers.addGroup(0).append("phoneNumber", "1337") + writer.write(record) + } + assert(getRecords()(0) === Row(Seq("John", """{"Jane":["1337"]}"""))) + } + +} diff --git a/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala b/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderPrimitiveTypesTest.scala similarity index 63% rename from src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala rename to src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderPrimitiveTypesTest.scala index 7f7c2c38..d9aacf0a 100644 --- a/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderTest.scala +++ b/src/test/scala/com/exasol/cloudetl/parquet/ParquetRowReaderPrimitiveTypesTest.scala @@ -1,81 +1,22 @@ package com.exasol.cloudetl.parquet -import java.io.Closeable import java.math.BigDecimal import java.math.BigInteger import java.math.MathContext -import java.nio.file.Path -import com.exasol.cloudetl.DummyRecordsTest import com.exasol.cloudetl.parquet.converter.ParquetDecimalConverter -import com.exasol.cloudetl.source.ParquetSource import com.exasol.common.data.Row -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path => HPath} -import org.apache.hadoop.fs.FileSystem import org.apache.parquet.column.Dictionary -import org.apache.parquet.example.data.Group -import org.apache.parquet.example.data.GroupWriter import org.apache.parquet.example.data.simple.SimpleGroup -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema._ import org.apache.parquet.schema.LogicalTypeAnnotation.decimalType +import org.apache.parquet.schema.MessageTypeParser import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition -import org.scalatest.BeforeAndAfterEach -import org.scalatest.funsuite.AnyFunSuite +import org.apache.parquet.schema.Types -class ParquetRowReaderTest extends AnyFunSuite with BeforeAndAfterEach with DummyRecordsTest { - - private[this] var conf: Configuration = _ - private[this] var fileSystem: FileSystem = _ - private[this] var outputDirectory: Path = _ - private[this] var path: HPath = _ - - override final def beforeEach(): Unit = { - conf = new Configuration - fileSystem = FileSystem.get(conf) - outputDirectory = createTemporaryFolder("parquetRowReaderTest") - path = new HPath(outputDirectory.toUri.toString, "part-00000.parquet") - () - } - - override final def afterEach(): Unit = { - deleteFiles(outputDirectory) - () - } - - test("read throws if parquet record has complex types") { - val schema = MessageTypeParser.parseMessageType( - """|message user { - | required binary name (UTF8); - | optional group contacts { - | repeated group array { - | required binary name (UTF8); - | optional binary phoneNumber (UTF8); - | } - | } - |} - |""".stripMargin - ) - withResource(getParquetWriter(schema, false)) { writer => - val record = new SimpleGroup(schema) - record.add(0, "A. Name") - val contacts = record.addGroup(1) - contacts.addGroup(0).append("name", "A. Contact").append("phoneNumber", "1337") - contacts.addGroup(0).append("name", "Second Contact") - writer.write(record) - } - - val thrown = intercept[UnsupportedOperationException] { - getRecords() - } - assert(thrown.getMessage === "Currently only primitive types are supported") - } +class ParquetRowReaderPrimitiveTypesTest extends BaseParquetReaderTest { test("reads INT64 (TIMESTAMP_MILLIS) as timestamp value") { val schema = MessageTypeParser.parseMessageType( @@ -240,48 +181,9 @@ class ParquetRowReaderTest extends AnyFunSuite with BeforeAndAfterEach with Dumm .as(decimalType(0, 9)) .named("bytes") val thrown = intercept[UnsupportedOperationException] { - ParquetDecimalConverter(0, null, parquetType).setDictionary(DictionaryEncoding()) + ParquetDecimalConverter(parquetType, 0, null).setDictionary(DictionaryEncoding()) } assert(thrown.getMessage.contains("Cannot convert parquet type to decimal type")) } - private[this] def withResource[T <: Closeable](writer: T)(block: T => Unit): Unit = { - block(writer) - writer.close() - } - - private[this] def getRecords(): Seq[Row] = - ParquetSource(path, conf, fileSystem) - .stream() - .toSeq - - private[this] def getParquetWriter( - schema: MessageType, - dictionaryEncoding: Boolean - ): ParquetWriter[Group] = - BaseGroupWriterBuilder(path, schema) - .withDictionaryEncoding(dictionaryEncoding) - .build() - - private[this] case class BaseGroupWriteSupport(schema: MessageType) - extends WriteSupport[Group] { - var writer: GroupWriter = null - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = - writer = new GroupWriter(recordConsumer, schema) - - override def init(configuration: Configuration): WriteSupport.WriteContext = - new WriteSupport.WriteContext(schema, new java.util.HashMap[String, String]()) - - override def write(record: Group): Unit = - writer.write(record) - } - - private[this] case class BaseGroupWriterBuilder(path: HPath, schema: MessageType) - extends ParquetWriter.Builder[Group, BaseGroupWriterBuilder](path) { - override def getWriteSupport(conf: Configuration): WriteSupport[Group] = - BaseGroupWriteSupport(schema) - override def self(): BaseGroupWriterBuilder = this - } - } diff --git a/src/test/scala/com/exasol/cloudetl/parquet/converter/ParquetConverterTest.scala b/src/test/scala/com/exasol/cloudetl/parquet/converter/ParquetConverterTest.scala new file mode 100644 index 00000000..df3cc2f3 --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/parquet/converter/ParquetConverterTest.scala @@ -0,0 +1,68 @@ +package com.exasol.cloudetl.parquet.converter + +import org.apache.parquet.schema.MessageTypeParser +import org.scalatest.funsuite.AnyFunSuite + +class ParquetConverterTest extends AnyFunSuite { + + test("array converter throws for illegal index") { + val schema = MessageTypeParser.parseMessageType( + """|message parquet_group_list { + | optional group names (LIST) { + | repeated group list { + | required binary name (UTF8); + | } + | } + |} + |""".stripMargin + ) + Seq(-1, 1, 2, 5).foreach { + case index => + val thrown = intercept[IllegalArgumentException] { + ArrayGroupConverter(schema.getType(0), -1, EmptyValueHolder).getConverter(index) + } + assert(thrown.getMessage().contains(s"Illegal index '$index' to array converter")) + } + } + + test("repeated primitive converter throws for illegal index") { + val schema = MessageTypeParser.parseMessageType( + """|message parquet_repeated_primitive { + | repeated group addresses { + | required binary address (UTF8); + | } + |} + |""".stripMargin + ) + Seq(-1, 1, 3, 8).foreach { + case index => + val thrown = intercept[IllegalArgumentException] { + RepeatedPrimitiveConverter(schema.getType(0), -1, EmptyValueHolder).getConverter(index) + } + assert( + thrown.getMessage().contains(s"Illegal index '$index' to repeated primitive converter") + ) + } + } + + test("map converter throws for illegal index") { + val schema = MessageTypeParser.parseMessageType( + """|message parquet_group_map { + | optional group map (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | required binary value (UTF8); + | } + | } + |} + |""".stripMargin + ) + Seq(-1, 2, 4).foreach { + case index => + val thrown = intercept[IllegalArgumentException] { + MapConverter(schema.getType(0).asGroupType(), -1, EmptyValueHolder).getConverter(index) + } + assert(thrown.getMessage().contains(s"Illegal index '$index' to map converter")) + } + } +} diff --git a/src/test/scala/com/exasol/cloudetl/parquet/converter/ValueHolderTest.scala b/src/test/scala/com/exasol/cloudetl/parquet/converter/ValueHolderTest.scala new file mode 100644 index 00000000..da3bd85c --- /dev/null +++ b/src/test/scala/com/exasol/cloudetl/parquet/converter/ValueHolderTest.scala @@ -0,0 +1,12 @@ +package com.exasol.cloudetl.parquet.converter + +import org.scalatest.funsuite.AnyFunSuite + +class ValueHolderTest extends AnyFunSuite { + + test("empty value holder does nothing") { + assert(EmptyValueHolder.reset() === {}) + assert(EmptyValueHolder.getValues() === Seq.empty[Any]) + assert(EmptyValueHolder.put(-1, "string") === {}) + } +}