Skip to content

Commit

Permalink
Extracted method in schema converter.
Browse files Browse the repository at this point in the history
  • Loading branch information
morazow committed Jul 22, 2021
1 parent 80f5c93 commit 6ac9680
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/cloudetl/DataExporterIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DataExporterIT extends BaseS3IntegrationTest {

test("exports and imports character") {
val columns = LinkedHashMap(
"C_CHAR20" -> "CHAR(20)",
"C_CHAR20" -> "CHAR(20)"
)
val tableValues = Stream[Array[Any]](
Array(1L, null),
Expand Down
101 changes: 46 additions & 55 deletions src/main/scala/com/exasol/cloudetl/helper/ParquetSchemaConverter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import java.util.Locale.ENGLISH

import com.exasol.cloudetl.data.ExaColumnInfo

import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation._
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.Types
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

/**
* A class that converts Exasol column names to Parquet schema.
Expand Down Expand Up @@ -47,81 +44,75 @@ final case class ParquetSchemaConverter(isLowercaseSchemaEnabled: Boolean) exten

private[this] def getParquetType(columnName: String, columnInfo: ExaColumnInfo): Type = {
val columnType = columnInfo.`type`
val repetition = if (columnInfo.isNullable) Repetition.OPTIONAL else Repetition.REQUIRED
val repetition = if (columnInfo.isNullable) Type.Repetition.OPTIONAL else Type.Repetition.REQUIRED
columnType match {
case `jInteger` =>
if (columnInfo.precision == 0) {
Types
.primitive(PrimitiveTypeName.INT32, repetition)
.named(columnName)
getPrimitiveType(columnName, INT32, repetition, None, None)
} else {
require(
columnInfo.precision <= DECIMAL_MAX_INT_DIGITS,
s"Got an 'Integer' type with more than '$DECIMAL_MAX_INT_DIGITS' precision."
)
Types
.primitive(PrimitiveTypeName.INT32, repetition)
.as(decimalType(columnInfo.scale, columnInfo.precision))
.named(columnName)
getPrimitiveType(
columnName,
INT32,
repetition,
None,
Option(decimalType(columnInfo.scale, columnInfo.precision))
)
}
case `jLong` =>
if (columnInfo.precision == 0) {
Types
.primitive(PrimitiveTypeName.INT64, repetition)
.named(columnName)
getPrimitiveType(columnName, INT64, repetition, None, None)
} else {
require(
columnInfo.precision <= DECIMAL_MAX_LONG_DIGITS,
s"Got a 'Long' type with more than '$DECIMAL_MAX_LONG_DIGITS' precision."
)
Types
.primitive(PrimitiveTypeName.INT64, repetition)
.as(decimalType(columnInfo.scale, columnInfo.precision))
.named(columnName)
getPrimitiveType(
columnName,
INT64,
repetition,
None,
Option(decimalType(columnInfo.scale, columnInfo.precision))
)
}
case `jBigDecimal` =>
Types
.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
.length(PRECISION_TO_BYTE_SIZE(columnInfo.precision - 1))
.as(decimalType(columnInfo.scale, columnInfo.precision))
.named(columnName)
case `jDouble` =>
Types
.primitive(PrimitiveTypeName.DOUBLE, repetition)
.named(columnName)
getPrimitiveType(
columnName,
FIXED_LEN_BYTE_ARRAY,
repetition,
Option(PRECISION_TO_BYTE_SIZE(columnInfo.precision - 1)),
Option(decimalType(columnInfo.scale, columnInfo.precision))
)
case `jDouble` => getPrimitiveType(columnName, DOUBLE, repetition, None, None)
case `jString` =>
if (columnInfo.length > 0) {
Types
.primitive(PrimitiveTypeName.BINARY, repetition)
.length(columnInfo.length)
.as(stringType())
.named(columnName)
getPrimitiveType(columnName, BINARY, repetition, Option(columnInfo.length), Option(stringType()))
} else {
Types
.primitive(PrimitiveTypeName.BINARY, repetition)
.as(stringType())
.named(columnName)
getPrimitiveType(columnName, BINARY, repetition, None, Option(stringType()))
}
case `jBoolean` =>
Types
.primitive(PrimitiveTypeName.BOOLEAN, repetition)
.named(columnName)

case `jSqlDate` =>
Types
.primitive(PrimitiveTypeName.INT32, repetition)
.as(dateType())
.named(columnName)

case `jSqlTimestamp` =>
Types
.primitive(PrimitiveTypeName.INT96, repetition)
.named(columnName)
case _ =>
throw new IllegalArgumentException(s"Cannot convert Exasol type '$columnType' to Parquet type.")
case `jBoolean` => getPrimitiveType(columnName, BOOLEAN, repetition, None, None)
case `jSqlDate` => getPrimitiveType(columnName, INT32, repetition, None, Option(dateType()))
case `jSqlTimestamp` => getPrimitiveType(columnName, INT96, repetition, None, None)
case _ => throw new IllegalArgumentException(s"Cannot convert Exasol type '$columnType' to Parquet type.")
}
}

private[this] def getPrimitiveType(
name: String,
primitiveType: PrimitiveType.PrimitiveTypeName,
repetition: Type.Repetition,
lengthOption: Option[Int],
logicalTypeOption: Option[LogicalTypeAnnotation]
): Type = {
var resultType = Types.primitive(primitiveType, repetition)
resultType = lengthOption.fold(resultType)(len => resultType.length(len))
resultType = logicalTypeOption.fold(resultType)(logicalType => resultType.as(logicalType))
resultType.named(name)
}

}

object ParquetSchemaConverter {
Expand Down

0 comments on commit 6ac9680

Please sign in to comment.