Skip to content

Commit

Permalink
Updated the parquet-hadoop version from 1.10.1 to 1.11.1 (#107)
Browse files Browse the repository at this point in the history
Co-authored-by: jakobbraun <jakob.braun@posteo.de>
  • Loading branch information
morazow and jakobbraun committed Oct 23, 2020
1 parent 25b1359 commit 1531967
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 86 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ project/.gnupg/local*
project/target
target/
.history
.bsp

# Java
*.class
Expand Down
19 changes: 12 additions & 7 deletions doc/changes/changes_0.9.1.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Cloud Storage Extension 0.9.1, released 2020-10-DD

## Features / Enhancements

* #106: Update parquet-hadoop version that includes api changes (PR #107).

## Documentation

* #89: Increased the default number of characters for file path (PR #105).
Expand All @@ -8,16 +12,17 @@

### Runtime Dependency Updates

* Updated ``org.apache.orc:orc-core`` from `1.6.4` to `1.6.5`.
* Updated ``com.fasterxml.jackson.core:jackson-core`` from `2.11.2` to `2.11.3`.
* Updated ``com.fasterxml.jackson.core:jackson-databind`` from `2.11.2.3` to `2.11.3`.
* Updated ``com.fasterxml.jackson.core:jackson-annotations`` from `2.11.2` to `2.11.3`.
* Updated ``com.fasterxml.jackson.module:"jackson-module-scala`` from `2.11.2` to `2.11.3`.
* Updated `org.apache.orc:orc-core` from `1.6.4` to `1.6.5`.
* Updated `org.apache.parquet:parquet-hadoop` from `1.10.1` to `1.11.1`.
* Updated `com.fasterxml.jackson.core:jackson-core` from `2.11.2` to `2.11.3`.
* Updated `com.fasterxml.jackson.core:jackson-databind` from `2.11.2.3` to `2.11.3`.
* Updated `com.fasterxml.jackson.core:jackson-annotations` from `2.11.2` to `2.11.3`.
* Updated `com.fasterxml.jackson.module:"jackson-module-scala` from `2.11.2` to `2.11.3`.

### Test Dependency Updates

* Updated ``org.mockito:mockito-core`` from `3.5.13` to `3.5.15`.
* Updated `org.mockito:mockito-core` from `3.5.13` to `3.5.15`.

### Plugin Updates

* Updated ``com.github.cb372:sbt-explicit-dependencies`` from `0.2.13` to `0.2.15`.
* Updated `com.github.cb372:sbt-explicit-dependencies` from `0.2.13` to `0.2.15`.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object Dependencies {
private val HadoopVersion = "3.3.0"
private val DeltaVersion = "0.7.0"
private val OrcVersion = "1.6.5"
private val ParquetVersion = "1.10.1"
private val ParquetVersion = "1.11.1"
private val GoogleStorageVersion = "1.9.4-hadoop3"
private val SparkSQLVersion = "3.0.0"
private val TypesafeLoggingVersion = "3.9.2"
Expand Down
54 changes: 20 additions & 34 deletions src/main/scala/com/exasol/cloudetl/parquet/RowReadSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.parquet.io.api.GroupConverter
import org.apache.parquet.io.api.PrimitiveConverter
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.GroupType
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType
Expand Down Expand Up @@ -82,63 +83,48 @@ class RowRootConverter(schema: GroupType) extends GroupConverter {
override def end(): Unit = {}

private def makeReader(primitiveType: PrimitiveType, idx: Int): Converter = {
val typeName = primitiveType.getPrimitiveTypeName
val originalType = primitiveType.getOriginalType

typeName match {
primitiveType.getPrimitiveTypeName() match {
case PrimitiveTypeName.INT32 =>
originalType match {
case OriginalType.DATE => new RowDateConverter(this, idx)
case OriginalType.DECIMAL =>
val decimalMetadata = primitiveType.getDecimalMetadata
new RowDecimalConverter(
this,
idx,
decimalMetadata.getPrecision,
decimalMetadata.getScale
)
case _ => new RowPrimitiveConverter(this, idx)
case OriginalType.DATE => new RowDateConverter(this, idx)
case OriginalType.DECIMAL => createDecimalConverter(this, primitiveType, idx)
case _ => new RowPrimitiveConverter(this, idx)
}
case PrimitiveTypeName.BOOLEAN => new RowPrimitiveConverter(this, idx)
case PrimitiveTypeName.DOUBLE => new RowPrimitiveConverter(this, idx)
case PrimitiveTypeName.FLOAT => new RowPrimitiveConverter(this, idx)

case PrimitiveTypeName.BINARY =>
originalType match {
case OriginalType.UTF8 => new RowStringConverter(this, idx)
case _ => new RowPrimitiveConverter(this, idx)
}
case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY =>
originalType match {
case OriginalType.DECIMAL =>
val decimalMetadata = primitiveType.getDecimalMetadata
new RowDecimalConverter(
this,
idx,
decimalMetadata.getPrecision,
decimalMetadata.getScale
)
case _ => new RowPrimitiveConverter(this, idx)
case OriginalType.DECIMAL => createDecimalConverter(this, primitiveType, idx)
case _ => new RowPrimitiveConverter(this, idx)
}
case PrimitiveTypeName.INT64 =>
originalType match {
case OriginalType.TIMESTAMP_MILLIS => new RowTimestampMillisConverter(this, idx)
case OriginalType.DECIMAL =>
val decimalMetadata = primitiveType.getDecimalMetadata
new RowDecimalConverter(
this,
idx,
decimalMetadata.getPrecision,
decimalMetadata.getScale
)
case _ => new RowPrimitiveConverter(this, idx)
case OriginalType.DECIMAL => createDecimalConverter(this, primitiveType, idx)
case _ => new RowPrimitiveConverter(this, idx)
}

case PrimitiveTypeName.INT96 => new RowTimestampInt96Converter(this, idx)
}
}

private final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int)
private[this] def createDecimalConverter(
parent: RowRootConverter,
primitiveType: PrimitiveType,
index: Int
): RowDecimalConverter = {
val decimalType =
primitiveType.getLogicalTypeAnnotation().asInstanceOf[DecimalLogicalTypeAnnotation]
new RowDecimalConverter(parent, index, decimalType.getPrecision(), decimalType.getScale())
}

private[this] final class RowPrimitiveConverter(val parent: RowRootConverter, val index: Int)
extends PrimitiveConverter {

override def addBinary(value: Binary): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext
import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType
Expand Down Expand Up @@ -141,8 +142,9 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {
makeTimestampWriter()

case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY if originalType == OriginalType.DECIMAL =>
val decimalMetadata = primitiveType.getDecimalMetadata
makeDecimalWriter(decimalMetadata.getPrecision, decimalMetadata.getScale)
val decimal =
primitiveType.getLogicalTypeAnnotation().asInstanceOf[DecimalLogicalTypeAnnotation]
makeDecimalWriter(decimal.getPrecision(), decimal.getScale())

case _ => throw new UnsupportedOperationException(s"Unsupported parquet type '$typeName'.")
}
Expand Down
26 changes: 10 additions & 16 deletions src/main/scala/com/exasol/cloudetl/util/SchemaUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.exasol.cloudetl.util
import com.exasol.ExaIterator
import com.exasol.cloudetl.data.ExaColumnInfo

import org.apache.parquet.schema.LogicalTypeAnnotation._
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type
import org.apache.parquet.schema.Type.Repetition
Expand Down Expand Up @@ -32,14 +32,14 @@ object SchemaUtil {
* [[org.apache.parquet.schema.MessageType]].
*/
def createParquetMessageType(columns: Seq[ExaColumnInfo], schemaName: String): MessageType = {
val types = columns.map(exaColumnToParquetType(_))
val types = columns.map(columnToParquetType(_))
new MessageType(schemaName, types: _*)
}

// In below several lines, I try to pattern match on Class[X] of Java
// types. Please also read:
// https://stackoverflow.com/questions/7519140/pattern-matching-on-class-type
object JTypes {
private[this] object JTypes {
val jInteger: Class[java.lang.Integer] = classOf[java.lang.Integer]
val jLong: Class[java.lang.Long] = classOf[java.lang.Long]
val jBigDecimal: Class[java.math.BigDecimal] = classOf[java.math.BigDecimal]
Expand All @@ -54,7 +54,7 @@ object SchemaUtil {
* Given Exasol column [[com.exasol.cloudetl.data.ExaColumnInfo]]
* information convert it into Parquet schema type.
*/
def exaColumnToParquetType(colInfo: ExaColumnInfo): Type = {
private[this] def columnToParquetType(colInfo: ExaColumnInfo): Type = {
val colName = colInfo.name
val colType = colInfo.`type`
val repetition = if (colInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED
Expand All @@ -74,9 +74,7 @@ object SchemaUtil {
)
Types
.primitive(PrimitiveTypeName.INT32, repetition)
.precision(colInfo.precision)
.scale(colInfo.scale)
.as(OriginalType.DECIMAL)
.as(decimalType(colInfo.scale, colInfo.precision))
.named(colName)
}

Expand All @@ -92,19 +90,15 @@ object SchemaUtil {
)
Types
.primitive(PrimitiveTypeName.INT64, repetition)
.precision(colInfo.precision)
.scale(colInfo.scale)
.as(OriginalType.DECIMAL)
.as(decimalType(colInfo.scale, colInfo.precision))
.named(colName)
}

case `jBigDecimal` =>
Types
.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
.precision(colInfo.precision)
.scale(colInfo.scale)
.length(PRECISION_TO_BYTE_SIZE(colInfo.precision - 1))
.as(OriginalType.DECIMAL)
.as(decimalType(colInfo.scale, colInfo.precision))
.named(colName)

case `jDouble` =>
Expand All @@ -116,13 +110,13 @@ object SchemaUtil {
if (colInfo.length > 0) {
Types
.primitive(PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.length(colInfo.length)
.as(stringType())
.named(colName)
} else {
Types
.primitive(PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.as(stringType())
.named(colName)
}

Expand All @@ -134,7 +128,7 @@ object SchemaUtil {
case `jSqlDate` =>
Types
.primitive(PrimitiveTypeName.INT32, repetition)
.as(OriginalType.DATE)
.as(dateType())
.named(colName)

case `jSqlTimestamp` =>
Expand Down
39 changes: 13 additions & 26 deletions src/test/scala/com/exasol/cloudetl/util/SchemaUtilTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.exasol.ExaIterator
import com.exasol.cloudetl.data.ExaColumnInfo

import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.mockito.Mockito._
Expand Down Expand Up @@ -49,55 +50,41 @@ class SchemaUtilTest extends AnyFunSuite with MockitoSugar {
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "c_int"),
Types
.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
.precision(1)
.scale(0)
.as(OriginalType.DECIMAL)
.as(decimalType(0, 1))
.named("c_int"),
Types
.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
.precision(9)
.scale(0)
.as(OriginalType.DECIMAL)
.as(decimalType(0, 9))
.named("c_int"),
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "c_long"),
Types
.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL)
.precision(18)
.scale(0)
.as(OriginalType.DECIMAL)
.as(decimalType(0, 18))
.named("c_long"),
Types
.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED)
.precision(9)
.scale(0)
.length(4)
.as(OriginalType.DECIMAL)
.as(decimalType(0, 9))
.named("c_decimal_int"),
Types
.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED)
.precision(17)
.scale(0)
.length(8)
.as(OriginalType.DECIMAL)
.as(decimalType(0, 17))
.named("c_decimal_long"),
Types
.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.REQUIRED)
.precision(38)
.scale(10)
.length(16)
.as(OriginalType.DECIMAL)
.as(decimalType(10, 38))
.named("c_decimal"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.DOUBLE, "c_double"),
new PrimitiveType(
Repetition.REQUIRED,
PrimitiveType.PrimitiveTypeName.BINARY,
"c_string",
OriginalType.UTF8
),
Types
.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
.as(stringType())
.named("c_string"),
Types
.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
.length(20)
.as(OriginalType.UTF8)
.as(stringType())
.named("c_string"),
new PrimitiveType(
Repetition.REQUIRED,
Expand All @@ -106,7 +93,7 @@ class SchemaUtilTest extends AnyFunSuite with MockitoSugar {
),
Types
.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED)
.as(OriginalType.DATE)
.as(dateType())
.named("c_date"),
new PrimitiveType(Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT96, "c_timestamp")
)
Expand Down

0 comments on commit 1531967

Please sign in to comment.