Skip to content

Commit

Permalink
Added support for Avro logical types.
Browse files Browse the repository at this point in the history
Fixes #13.
  • Loading branch information
morazow committed Oct 29, 2020
1 parent 28f7c5a commit 8e29bd0
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 4 deletions.
67 changes: 63 additions & 4 deletions src/main/scala/com/exasol/avro/AvroRow.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package com.exasol.common.avro

import java.nio.ByteBuffer
import java.sql.Date
import java.sql.Timestamp
import java.time._
import java.util.{Map => JMap}
import java.util.Collection

import com.exasol.common.data.Row
import com.exasol.common.json.JsonMapper

import org.apache.avro.Conversions
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
import org.apache.avro.data.TimeConversions
import org.apache.avro.generic.GenericFixed
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.IndexedRecord
Expand Down Expand Up @@ -60,13 +66,13 @@ object AvroRow {
field.getType() match {
case Schema.Type.NULL => value
case Schema.Type.BOOLEAN => value
case Schema.Type.INT => value
case Schema.Type.LONG => value
case Schema.Type.INT => getIntValue(value, field)
case Schema.Type.LONG => getLongValue(value, field)
case Schema.Type.FLOAT => value
case Schema.Type.DOUBLE => value
case Schema.Type.STRING => getStringValue(value, field)
case Schema.Type.FIXED => getStringValue(value, field)
case Schema.Type.BYTES => getStringValue(value, field)
case Schema.Type.FIXED => getFixedValue(value, field)
case Schema.Type.BYTES => getBytesValue(value, field)
case Schema.Type.ENUM => value.toString
case Schema.Type.UNION => getUnionValue(value, field)
case Schema.Type.ARRAY => getArrayValue(value, field)
Expand All @@ -75,6 +81,59 @@ object AvroRow {
}
}

private[this] def getIntValue(value: Any, field: Schema): Any = {
val logicalType = field.getLogicalType()
logicalType match {
case _: LogicalTypes.Date => dateFromSinceEpoch(value.asInstanceOf[Int].longValue())
case _ => value
}
}

private[this] def dateFromSinceEpoch(days: Long): Date = {
// scalastyle:off magic.number
val date = LocalDateTime.of(1970, 1, 1, 0, 0, 0).plusDays(days)
// scalastyle:on
val millis = date.atZone(ZoneId.systemDefault).toInstant().toEpochMilli()
new Date(millis)
}

private[this] def getLongValue(value: Any, field: Schema): Any = {
val logicalType = field.getLogicalType()
logicalType match {
case lt: LogicalTypes.TimestampMillis =>
Timestamp.from(
new TimeConversions.TimestampMillisConversion()
.fromLong(value.asInstanceOf[Long], field, lt)
)
case lt: LogicalTypes.TimestampMicros =>
Timestamp.from(
new TimeConversions.TimestampMicrosConversion()
.fromLong(value.asInstanceOf[Long], field, lt)
)
case _ => value
}
}

private[this] def getFixedValue(value: Any, field: Schema): Any = {
val logicalType = field.getLogicalType()
logicalType match {
case lt: LogicalTypes.Decimal =>
new Conversions.DecimalConversion()
.fromFixed(value.asInstanceOf[GenericFixed], field, lt)
case _ => getStringValue(value, field)
}
}

private[this] def getBytesValue(value: Any, field: Schema): Any = {
val logicalType = field.getLogicalType()
logicalType match {
case lt: LogicalTypes.Decimal =>
new Conversions.DecimalConversion()
.fromBytes(value.asInstanceOf[ByteBuffer], field, lt)
case _ => getStringValue(value, field)
}
}

private[this] def getStringValue(value: Any, field: Schema): String =
value match {
case str: String => str
Expand Down
134 changes: 134 additions & 0 deletions src/test/scala/com/exasol/avro/AvroLogicalTypesTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.exasol.common.avro

import java.sql.Date
import java.sql.Timestamp

import com.exasol.common.data.Row

import org.apache.avro.Conversions
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.scalatest.funsuite.AnyFunSuite

class AvroLogicalTypesTest extends AnyFunSuite {

private[this] def getLogicalSchema(avroType: String): Schema =
new Schema.Parser()
.parse(
s"""|{
| "type": "record",
| "namespace": "com.exasol.avro.Types",
| "name": "LogicalTypesRecord",
| "fields": [{
| "name": "value",
| "type": $avroType
| }]
|}
|""".stripMargin
)

test("parse avro int with date logical type as Java SQL date type") {
val daysSinceEpoch = Seq(-719164, -70672, -21060, -365, -1, 0, 7252, 17317, 17937)
val expectedDates = Seq(
"0001-01-01",
"1776-07-04",
"1912-05-05",
"1969-01-01",
"1969-12-31",
"1970-01-01",
"1989-11-09",
"2017-05-31",
"2019-02-10"
)
val schema = getLogicalSchema("""{"type":"int","logicalType":"date"}""")
daysSinceEpoch.zipWithIndex.foreach {
case (days, i) =>
val record = new GenericData.Record(schema)
record.put("value", days)
assert(AvroRow(record).getAs[Date](0).toString() === expectedDates(i))
}
}

private[this] val milliseconds = Seq(-15854399877000L, 1603927542000L, 0L)

test("parse avro long with timestamp-millis as Java SQL timestamp type") {
val schema = getLogicalSchema("""{"type":"long","logicalType":"timestamp-millis"}""")
milliseconds.foreach {
case millis =>
val record = new GenericData.Record(schema)
record.put("value", millis)
assert(AvroRow(record).getAs[Timestamp](0) === new Timestamp(millis))
}
}

test("parse avro long with timestamp-micros as Java SQL timestamp type") {
val schema = getLogicalSchema("""{"type":"long","logicalType":"timestamp-micros"}""")
milliseconds.foreach {
case millis =>
val record = new GenericData.Record(schema)
record.put("value", millis * 1000L + 13)
val expected = new Timestamp(millis)
expected.setNanos(13000)
assert(AvroRow(record).getAs[Timestamp](0) === expected)
}
}

private[this] val precision = 4
private[this] val scale = 2
private[this] val decimals = Map(
"3.14" -> "3.14",
"2.01" -> "2.01",
"1.2" -> "1.20",
"0.5" -> "0.50",
"-1" -> "-1.00",
"-2.31" -> "-2.31"
)

test("parse avro bytes with decimal as big decimal type") {
val schema = getLogicalSchema(
s"""|{
| "type":"bytes",
| "logicalType":"decimal",
| "precision":4,
| "scale":2
|}""".stripMargin
)
decimals.foreach {
case (given, expected) =>
val record = new GenericData.Record(schema)
val bytes = new Conversions.DecimalConversion().toBytes(
new java.math.BigDecimal(given).setScale(scale),
schema.getField("value").schema(),
LogicalTypes.decimal(precision, scale)
)
record.put("value", bytes)
assert(AvroRow(record) === Row(Seq(new java.math.BigDecimal(expected))))
}
}

test("parse avro fixed with decimal as big decimal type") {
val schema = getLogicalSchema(
s"""|{
| "name":"fixed",
| "type":"fixed",
| "size":5,
| "logicalType":"decimal",
| "precision":4,
| "scale":2
|}""".stripMargin
)
decimals.foreach {
case (given, expected) =>
val record = new GenericData.Record(schema)
val fixed = new Conversions.DecimalConversion().toFixed(
new java.math.BigDecimal(given).setScale(scale),
schema.getField("value").schema(),
LogicalTypes.decimal(precision, scale)
)
record.put("value", fixed)
assert(AvroRow(record) === Row(Seq(new java.math.BigDecimal(expected))))
}
}

}

0 comments on commit 8e29bd0

Please sign in to comment.