Skip to content

Commit

Permalink
#183: Added support for Parquet UUID type (#185)
Browse files Browse the repository at this point in the history
Fixes #183
  • Loading branch information
morazow committed Jan 13, 2022
1 parent 8402dc1 commit 241eb30
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "exasol-cloud-storage-extension")
.settings(version := "2.2.0")
.settings(version := "2.3.0")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
11 changes: 8 additions & 3 deletions doc/changes/changes_2.3.0.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
# Cloud Storage Extension 2.3.0, released 2022-01-??
# Cloud Storage Extension 2.3.0, released 2022-01-13

Code name: Added support for Parquet `INT64 (TIMESTAMP_MICROS)` logical type
Code name: Added support for Parquet `TIMESTAMP_MICROS` and `UUID` logical types

## Summary

In this release, we added support for reading Parquet `INT64 (TIMESTAMP_MICROS)` and `FIXED_LEN_BYTE_ARRAY (UUID)` types. We also introduced an S3 SSL enabled parameter that can be used to disable secure connection to S3 bucket.

## Features

* #181: Added support for reading Parquet timestamp micros
* #181: Added support for reading Parquet timestamp micros values
* #183: Added support for reading Parquet UUID values
* #184: Added S3 SSL enabled parameter

## Dependency Updates

### Compile Dependency Updates

* Updated `io.grpc:grpc-netty:1.43.1` to `1.43.2`
* Updated `io.netty:netty-all:4.1.72.Final` to `4.1.73.Final`
* Updated `org.alluxio:alluxio-core-client-hdfs:2.7.1` to `2.7.2`
* Updated `org.apache.logging.log4j:log4j-api:2.17.0` to `2.17.1`
* Updated `org.apache.logging.log4j:log4j-1.2-api:2.17.0` to `2.17.1`
Expand Down
1 change: 1 addition & 0 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ If either of the tags are not set, then it is read as a `null` value.
| binary | decimal(p, s) | DECIMAL(p, s) |
| fixed_len_byte_array | | VARCHAR(n), CHAR(n) |
| fixed_len_byte_array | decimal(p, s) | DECIMAL(p, s) |
| fixed_len_byte_array | uuid | VARCHAR(n) |
| int96 | | TIMESTAMP |
| group | | VARCHAR(n) |
| group | LIST | VARCHAR(n) |
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Dependencies {
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.1",
"io.grpc" % "grpc-netty" % "1.43.2",
"io.netty" % "netty-all" % "4.1.72.Final",
"io.netty" % "netty-all" % "4.1.73.Final",
"com.exasol" %% "import-export-udf-common-scala" % ImportExportUDFVersion
exclude ("org.slf4j", "slf4j-simple")
exclude ("org.apache.avro", "avro")
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -o errtrace -o nounset -o pipefail -o errexit
BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )"/.. && pwd )"
cd "$BASE_DIR"

DEFAULT_SCALA_VERSION=2.13.7
DEFAULT_SCALA_VERSION=2.13.8

if [[ -z "${SCALA_VERSION:-}" ]]; then
echo "Environment variable SCALA_VERSION is not set."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.exasol.cloudetl.parquet

import java.math._
import java.nio.ByteOrder
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.sql.Timestamp
import java.time._
import java.util.UUID

import com.exasol.cloudetl.BaseDataImporter
import com.exasol.cloudetl.helper.DateTimeConverter._
import com.exasol.cloudetl.helper.UUIDConverter
import com.exasol.matcher.CellMatcherFactory
import com.exasol.matcher.ResultSetStructureMatcher.table
import com.exasol.matcher.TypeMatchMode._
Expand Down Expand Up @@ -226,6 +228,20 @@ class ParquetDataImporterIT extends BaseDataImporter {
)
}

test("imports fixed_len_byte_array (uuid)") {
val uuid = UUID.randomUUID()
val uuidBinary = Binary.fromConstantByteArray(UUIDConverter.toByteArray(uuid))
val uuidZeros = Binary.fromConstantByteArray(Array.fill[Byte](16)(0x0), 0, 16)
ParquetChecker("required fixed_len_byte_array(16) column (UUID);", "VARCHAR(36)", "fixed_uuid")
.withInputValues[Binary](List(uuidBinary, uuidZeros))
.assertResultSet(
table()
.row(uuid.toString())
.row("00000000-0000-0000-0000-000000000000")
.matches()
)
}

test("imports int96 (timestamp nanos)") {
val millis = System.currentTimeMillis()
val timestamp = new Timestamp(millis)
Expand Down
26 changes: 26 additions & 0 deletions src/main/scala/com/exasol/cloudetl/helper/UUIDConverter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.exasol.cloudetl.helper

import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.UUID

/**
* Helper functions to convert {@code UUID} values.
*/
object UUIDConverter {

/**
* Converts UUID value to ByteBuffer.
*
* @param uuid a UUID value
* @return a ByteBuffer value
*/
def toByteArray(uuid: UUID): Array[Byte] =
ByteBuffer
.allocate(16) // scalastyle:ignore magic.number
.order(ByteOrder.BIG_ENDIAN)
.putLong(uuid.getMostSignificantBits())
.putLong(uuid.getLeastSignificantBits())
.array()

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.exasol.cloudetl.parquet

import java.util.List
import java.util.UUID

import com.exasol.common.json.JsonMapper
import com.exasol.parquetio.data.Row

import org.apache.parquet.schema.LogicalTypeAnnotation.uuidType
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
import org.apache.parquet.schema.Type.Repetition

/**
Expand All @@ -15,6 +19,15 @@ final case class ParquetValueConverter(schema: MessageType) {
private[this] val size = schema.getFields.size()
private[this] var convertedValues = Array.ofDim[Object](size)

/**
* Converts Parquet Row into array of values.
*
* It maps the complex types (e.g, {@code MAP}, {@code LIST}) into JSON strings. The result array is emitted as an
* Exasol table row.
*
* @param row a Parquet row
* @return converted array of values
*/
def convert(row: Row): Array[Object] = convertParquetComplexValuesToJSON(row.getValues())

private[this] def convertParquetComplexValuesToJSON(values: List[Object]): Array[Object] = {
Expand All @@ -29,10 +42,18 @@ final case class ParquetValueConverter(schema: MessageType) {
private[this] def convertValue(i: Int, value: Object): Object = {
val fieldType = schema.getType(i)
if (fieldType.isPrimitive() && !fieldType.isRepetition(Repetition.REPEATED)) {
value
convertPrimitiveValue(fieldType.asPrimitiveType(), value)
} else {
JsonMapper.toJson(value)
}
}

private[this] def convertPrimitiveValue(primitiveType: PrimitiveType, value: Object): Object = {
val logicalType = primitiveType.getLogicalTypeAnnotation()
primitiveType.getPrimitiveTypeName() match {
case FIXED_LEN_BYTE_ARRAY if logicalType == uuidType() => value.asInstanceOf[UUID].toString()
case _ => value
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import java.math.BigInteger
import java.math.MathContext
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.util.UUID

import com.exasol.common.data.Row
import com.exasol.cloudetl.helper.UUIDConverter

import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.io.api.Binary
Expand Down Expand Up @@ -154,8 +156,7 @@ class ParquetRowReaderPrimitiveTypesTest extends BaseParquetReaderTest {
writer.write(record)
}
}
val expected =
new BigDecimal(new BigInteger(decimalValue.getBytes(UTF_8)), 2, new MathContext(30))
val expected = new BigDecimal(new BigInteger(decimalValue.getBytes(UTF_8)), 2, new MathContext(30))
val records = getRecords()
assert(records.size === 2)
assert(records(0) === Row(Seq(expected)))
Expand All @@ -169,8 +170,7 @@ class ParquetRowReaderPrimitiveTypesTest extends BaseParquetReaderTest {
|""".stripMargin
)
val decimalValue = "12345678901234567890"
val decimalBinary =
Binary.fromConstantByteArray(new BigDecimal(decimalValue).unscaledValue().toByteArray())
val decimalBinary = Binary.fromConstantByteArray(new BigDecimal(decimalValue).unscaledValue().toByteArray())
val zeros = Binary.fromConstantByteArray(Array.fill[Byte](9)(0x0), 0, 9)
withResource(getParquetWriter(schema, true)) { writer =>
Seq(decimalBinary, zeros).foreach { value =>
Expand All @@ -186,4 +186,20 @@ class ParquetRowReaderPrimitiveTypesTest extends BaseParquetReaderTest {
assert(records(1) === Row(Seq(BigDecimal.valueOf(0, 2))))
}

test("reads FIXED_LEN_BYTE_ARRAY (uuid) as string value") {
val schema = MessageTypeParser.parseMessageType(
"""|message test {
| required fixed_len_byte_array(16) column (UUID);
|}
|""".stripMargin
)
val uuid = UUID.randomUUID()
withResource(getParquetWriter(schema, true)) { writer =>
val record = new SimpleGroup(schema)
record.append("column", Binary.fromConstantByteArray(UUIDConverter.toByteArray(uuid)))
writer.write(record)
}
assert(getRecords()(0) === Row(Seq(uuid.toString())))
}

}

0 comments on commit 241eb30

Please sign in to comment.