Skip to content

Commit

Permalink
#11: Added support for importing Parquet complex types (#111)
Browse files Browse the repository at this point in the history
Co-authored-by: jakobbraun <jakob.braun@posteo.de>
  • Loading branch information
morazow and jakobbraun committed Nov 30, 2020
1 parent 7fd21f2 commit 505a92c
Show file tree
Hide file tree
Showing 12 changed files with 1,055 additions and 162 deletions.
19 changes: 10 additions & 9 deletions doc/changes/changes_0.9.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* #106: Updated parquet-hadoop version that includes api changes (PR #107).
* #108: Added dictionary aware Parquet decoders (PR #109).
* #11: Added support for importing Parquet complex (LIST, MAP) types (PR #111).

## Documentation

Expand All @@ -13,24 +14,24 @@

### Runtime Dependency Updates

* Updated `org.apache.orc:orc-core` from `1.6.4` to `1.6.5`.
* Updated `org.apache.parquet:parquet-hadoop` from `1.10.1` to `1.11.1`.
* Updated `com.exasol:import-export-udf-common-scala` from `0.1.0` to `0.2.0`.
* Updated to `org.apache.orc:orc-core:1.6.5` (was `1.6.4`)
* Updated to `org.apache.parquet:parquet-hadoop:1.11.1` (was `1.10.1`)
* Updated to `com.exasol:import-export-udf-common-scala:0.2.0` (was `0.1.0`)
* Removed `com.exasol:exasol-script-api`
* Removed `com.typesafe.scala-logging:scala-logging`
* Removed `com.fasterxml.jackson.core:jackson-core`
* Removed `com.fasterxml.jackson.core:jackson-databind`
* Removed `com.fasterxml.jackson.core:jackson-annotations`
* Removed `com.fasterxml.jackson.module:"jackson-module-scala`
* Removed libraries are included in `import-export-udf-common-scala` dependency.
* Removed libraries are included in `import-export-udf-common-scala` dependency

### Test Dependency Updates

* Updates `org.scalatest:scalatest` from `3.2.2` to `3.2.3`.
* Updated `org.mockito:mockito-core` from `3.5.13` to `3.6.0`.
* Updates to `org.scalatest:scalatest:3.2.3` (was `3.2.2`)
* Updated to `org.mockito:mockito-core:3.6.28` (was `3.5.13`)

### Plugin Updates

* Updated `org.wartremover:sbt-wartremover` from `2.4.10` to `2.4.13`.
* Updated `org.wartremover:sbt-wartremover-contrib` from `1.3.8` to `1.3.11`.
* Updated `com.github.cb372:sbt-explicit-dependencies` from `0.2.13` to `0.2.15`.
* Updated to `org.wartremover:sbt-wartremover:2.4.13` (was `2.4.10`)
* Updated to `org.wartremover:sbt-wartremover-contrib:1.3.11` (was `1.3.8`)
* Updated to `com.github.cb372:sbt-explicit-dependencies:0.2.15` (was `0.2.13`)
4 changes: 4 additions & 0 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ column types when preparing the table.
| fixed_len_byte_array | | VARCHAR(n), CHAR(n) |
| fixed_len_byte_array | decimal(p, s) | DECIMAL(p, s) |
| int96 | | TIMESTAMP |
| group | | VARCHAR(n) |
| group | LIST | VARCHAR(n) |
| group | MAP | VARCHAR(n) |
| group | REPEATED | VARCHAR(n) |

## Amazon S3

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object Dependencies {
// Test dependencies versions
private val ScalaTestVersion = "3.2.3"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.6.0"
private val MockitoCoreVersion = "3.6.28"

val Resolvers: Seq[Resolver] = Seq(
"Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.exasol.cloudetl.parquet.converter

import org.apache.parquet.io.api.Converter
import org.apache.parquet.schema.GroupType
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type
import org.apache.parquet.schema.Type.Repetition

/**
* Parquet field data type converter factory class.
Expand All @@ -24,64 +26,112 @@ object ConverterFactory {
* @return specific data converter for the field type
*/
def apply(
fieldIndex: Int,
parquetType: Type,
fieldIndex: Int,
parentDataHolder: ValueHolder
): Converter = {
if (!parquetType.isPrimitive()) {
throw new UnsupportedOperationException("Currently only primitive types are supported")
): Converter =
if (parquetType.isPrimitive()) {
createPrimitiveConverter(parquetType.asPrimitiveType(), fieldIndex, parentDataHolder)
} else {
createComplexConverter(parquetType, fieldIndex, parentDataHolder)
}
val primitiveType = parquetType.asPrimitiveType()
primitiveType.getPrimitiveTypeName() match {
case BOOLEAN => ParquetPrimitiveConverter(fieldIndex, parentDataHolder)
case DOUBLE => ParquetPrimitiveConverter(fieldIndex, parentDataHolder)
case FLOAT => ParquetPrimitiveConverter(fieldIndex, parentDataHolder)
case BINARY => createBinaryConverter(fieldIndex, parentDataHolder, primitiveType)
case FIXED_LEN_BYTE_ARRAY =>
createFixedByteArrayConverter(fieldIndex, parentDataHolder, primitiveType)
case INT32 => createIntegerConverter(fieldIndex, parentDataHolder, primitiveType)
case INT64 => createLongConverter(fieldIndex, parentDataHolder, primitiveType)
case INT96 => ParquetTimestampInt96Converter(fieldIndex, parentDataHolder)

private[this] def createPrimitiveConverter(
parquetType: PrimitiveType,
index: Int,
parentHolder: ValueHolder
): Converter =
parquetType.getPrimitiveTypeName() match {
case BOOLEAN => ParquetPrimitiveConverter(index, parentHolder)
case DOUBLE => ParquetPrimitiveConverter(index, parentHolder)
case FLOAT => ParquetPrimitiveConverter(index, parentHolder)
case BINARY => createBinaryConverter(parquetType, index, parentHolder)
case FIXED_LEN_BYTE_ARRAY => createFixedByteArrayConverter(parquetType, index, parentHolder)
case INT32 => createIntegerConverter(parquetType, index, parentHolder)
case INT64 => createLongConverter(parquetType, index, parentHolder)
case INT96 => ParquetTimestampInt96Converter(index, parentHolder)
}

private[this] def createComplexConverter(
parquetType: Type,
index: Int,
parentHolder: ValueHolder
): Converter = {
val groupType = parquetType.asGroupType()
parquetType.getOriginalType() match {
case OriginalType.LIST => createArrayConverter(groupType.getType(0), index, parentHolder)
case OriginalType.MAP => MapConverter(parquetType.asGroupType(), index, parentHolder)
case _ =>
if (groupType.isRepetition(Repetition.REPEATED)) {
createRepeatedConverter(groupType, index, parentHolder)
} else {
StructConverter(groupType, index, parentHolder)
}
}
}

private[this] def createBinaryConverter(
primitiveType: PrimitiveType,
index: Int,
holder: ValueHolder,
primitiveType: PrimitiveType
holder: ValueHolder
): Converter = primitiveType.getOriginalType() match {
case OriginalType.UTF8 => ParquetStringConverter(index, holder)
case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType)
case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder)
case _ => ParquetPrimitiveConverter(index, holder)
}

private[this] def createFixedByteArrayConverter(
primitiveType: PrimitiveType,
index: Int,
holder: ValueHolder,
primitiveType: PrimitiveType
holder: ValueHolder
): Converter = primitiveType.getOriginalType() match {
case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType)
case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder)
case _ => ParquetPrimitiveConverter(index, holder)
}

private[this] def createIntegerConverter(
primitiveType: PrimitiveType,
index: Int,
holder: ValueHolder,
primitiveType: PrimitiveType
holder: ValueHolder
): Converter = primitiveType.getOriginalType() match {
case OriginalType.DATE => ParquetDateConverter(index, holder)
case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType)
case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder)
case _ => ParquetPrimitiveConverter(index, holder)
}

private[this] def createLongConverter(
primitiveType: PrimitiveType,
index: Int,
holder: ValueHolder,
primitiveType: PrimitiveType
holder: ValueHolder
): Converter = primitiveType.getOriginalType() match {
case OriginalType.TIMESTAMP_MILLIS => ParquetTimestampMillisConverter(index, holder)
case OriginalType.DECIMAL => ParquetDecimalConverter(index, holder, primitiveType)
case OriginalType.DECIMAL => ParquetDecimalConverter(primitiveType, index, holder)
case _ => ParquetPrimitiveConverter(index, holder)
}

private[this] def createArrayConverter(
repeatedType: Type,
index: Int,
holder: ValueHolder
): Converter =
if (repeatedType.isPrimitive()) {
ArrayPrimitiveConverter(repeatedType.asPrimitiveType(), index, holder)
} else if (repeatedType.asGroupType().getFieldCount() > 1) {
ArrayGroupConverter(repeatedType, index, holder)
} else {
val innerElementType = repeatedType.asGroupType().getType(0)
ArrayGroupConverter(innerElementType, index, holder)
}

private[this] def createRepeatedConverter(
groupType: GroupType,
index: Int,
holder: ValueHolder
): Converter =
if (groupType.getFieldCount() > 1) {
RepeatedGroupConverter(groupType, index, holder)
} else {
val innerPrimitiveType = groupType.getType(0).asPrimitiveType()
RepeatedPrimitiveConverter(innerPrimitiveType, index, holder)
}
}
Loading

0 comments on commit 505a92c

Please sign in to comment.