Skip to content

Commit

Permalink
#181: Added support for reading Parquet INT64 with `TIMESTAMP_MICRO…
Browse files Browse the repository at this point in the history
…S` logical type (#182)

Fixes #181
  • Loading branch information
morazow committed Jan 13, 2022
1 parent 3c0975a commit b2d0a65
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 26 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ jobs:
strategy:
fail-fast: false
matrix:
scala: [ 2.13.7 ]
exasol-docker-version: [ "6.2.17-d1", "7.0.14", "7.1.3" ]
scala: [ 2.13.8 ]
exasol-docker-version: [ "7.0.14", "7.1.4" ]
env:
DEFAULT_DOCKER_DB_VERSION: "7.1.3"
DEFAULT_DOCKER_DB_VERSION: "7.1.4"

steps:
- name: Checkout the Repository
Expand All @@ -30,7 +30,7 @@ jobs:
run: |
docker pull exasol/docker-db:${{ matrix.exasol-docker-version }}
docker pull localstack/localstack:0.12.16
docker pull alluxio/alluxio:2.7.1
docker pull alluxio/alluxio:2.7.2
- name: Cache Local SBT Dependencies
uses: actions/cache@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ jobs:
java-version: adopt@1.11
- name: Assembly with SBT skipping tests
run: sbt assembly
- name: Generate sha256sum files
run: find target/scala* -maxdepth 1 -name *.jar -exec bash -c 'sha256sum {} > {}.sha256' \;
- name: Upload assets to the GitHub release draft
uses: shogo82148/actions-upload-release-asset@v1
with:
upload_url: ${{ github.event.inputs.upload_url }}
asset_path: target/scala*/stripped/*.jar
- name: Upload sha256sum files
uses: shogo82148/actions-upload-release-asset@v1
with:
upload_url: ${{ github.event.inputs.upload_url }}
asset_path: target/scala*/*.sha256
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ lazy val orgSettings = Seq(
)

lazy val buildSettings = Seq(
scalaVersion := "2.13.7"
scalaVersion := "2.13.8"
)

lazy val root =
Expand Down
1 change: 1 addition & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Releases

* [2.3.0](changes_2.3.0.md)
* [2.2.0](changes_2.2.0.md)
* [2.1.0](changes_2.1.0.md)
* [2.0.0](changes_2.0.0.md)
Expand Down
26 changes: 26 additions & 0 deletions doc/changes/changes_2.3.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Cloud Storage Extension 2.3.0, released 2022-01-??

Code name: Added support for Parquet `INT64 (TIMESTAMP_MICROS)` logical type

## Summary

## Features

* #181: Added support for reading Parquet timestamp micros

## Dependency Updates

### Compile Dependency Updates

* Updated `io.grpc:grpc-netty:1.43.1` to `1.43.2`
* Updated `org.alluxio:alluxio-core-client-hdfs:2.7.1` to `2.7.2`
* Updated `org.apache.logging.log4j:log4j-api:2.17.0` to `2.17.1`
* Updated `org.apache.logging.log4j:log4j-1.2-api:2.17.0` to `2.17.1`

### Test Dependency Updates

### Plugin Updates

* Updated `org.scalameta:sbt-scalafmt:2.4.5` to `2.4.6`
* Updated `org.scoverage:sbt-scoverage:1.9.2` to `1.9.3`
* Updated `com.timushev.sbt:sbt-updates:0.6.0` to `0.6.1`
1 change: 1 addition & 0 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ If either of the tags are not set, then it is read as a `null` value.
| int32 | decimal(p, s) | DECIMAL(p, s) |
| int64 | | BIGINT, DECIMAL(36, 0) |
| int64 | timestamp_millis | TIMESTAMP |
| int64 | timestamp_micros | TIMESTAMP |
| int64 | decimal(p, s) | DECIMAL(p, s) |
| float | | FLOAT |
| double | | DOUBLE, DOUBLE PRECISION |
Expand Down
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ object Dependencies {

// Runtime dependencies versions
private val ImportExportUDFVersion = "0.2.0"
private val ParquetIOVersion = "1.2.1"
private val ParquetIOVersion = "1.3.0"
private val HadoopVersion = "3.3.1"
private val DeltaVersion = "1.1.0"
private val OrcVersion = "1.7.2"
private val GoogleStorageVersion = "1.9.4-hadoop3"
private val SparkSQLVersion = "3.2.0"
private val AlluxioCoreHDFSVersion = "2.7.1"
private val AlluxioCoreHDFSVersion = "2.7.2"

// Test dependencies versions
private val ScalaTestVersion = "3.2.10"
Expand All @@ -36,7 +36,7 @@ object Dependencies {
"com.google.guava" % "guava" % "31.0.1-jre",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.1",
"io.grpc" % "grpc-netty" % "1.43.1",
"io.grpc" % "grpc-netty" % "1.43.2",
"io.netty" % "netty-all" % "4.1.72.Final",
"com.exasol" %% "import-export-udf-common-scala" % ImportExportUDFVersion
exclude ("org.slf4j", "slf4j-simple")
Expand Down Expand Up @@ -147,8 +147,8 @@ object Dependencies {
"org.slf4j" % "jul-to-slf4j" % "1.7.32",
"org.slf4j" % "slf4j-log4j12" % "1.7.32"
exclude ("log4j", "log4j"),
"org.apache.logging.log4j" % "log4j-api" % "2.17.0",
"org.apache.logging.log4j" % "log4j-1.2-api" % "2.17.0",
"org.apache.logging.log4j" % "log4j-api" % "2.17.1",
"org.apache.logging.log4j" % "log4j-1.2-api" % "2.17.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.4"
)

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.8
sbt.version=1.6.1
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Adds a `scalafmt` sbt plugin
// https://github.com/scalameta/sbt-scalafmt
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.5")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")

// Adds a `wartremover` a flexible Scala code linting tool
// http://github.com/puffnfresh/wartremover
Expand All @@ -21,7 +21,7 @@ addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")

// Adds Scala Code Coverage (Scoverage) used during unit tests
// http://github.com/scoverage/sbt-scoverage
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")

// Adds SBT Coveralls plugin for uploading Scala code coverage to
// https://coveralls.io
Expand All @@ -31,7 +31,7 @@ addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1")
// Adds a `dependencyUpdates` task to check Maven repositories for
// dependency updates
// http://github.com/rtimush/sbt-updates
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.1")

// Adds `scalastyle` a coding style checker and enforcer
// https://github.com/scalastyle/scalastyle-sbt-plugin
Expand Down
2 changes: 1 addition & 1 deletion project/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Used to get updates for plugins
// see https://github.com/rtimush/sbt-updates/issues/10
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.1")
2 changes: 1 addition & 1 deletion src/it/scala/com/exasol/cloudetl/BaseIntegrationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.scalatest.funsuite.AnyFunSuite
trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLogging {
private[this] val JAR_DIRECTORY_PATTERN = "scala-"
private[this] val JAR_NAME_PATTERN = "cloud-storage-extension-"
private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.1.3"
private[this] val DEFAULT_EXASOL_DOCKER_IMAGE = "7.1.4"

val network = DockerNamedNetwork("it-tests", true)
val exasolContainer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.hamcrest.MatcherAssert.assertThat

class AlluxioExportImportIT extends BaseIntegrationTest {

val ALLUXIO_IMAGE = "alluxio/alluxio:2.7.1"
val ALLUXIO_IMAGE = "alluxio/alluxio:2.7.2"
val SCHEMA_NAME = "ALLUXIO_SCHEMA"

val alluxioMainContainer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,12 @@ class ParquetDataImporterIT extends BaseDataImporter {
test("imports int64 (timestamp millis)") {
val millis1 = Instant.EPOCH.toEpochMilli()
val millis2 = System.currentTimeMillis()
val zdt1 =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis1), ZoneId.of("Europe/Berlin"))
val zdt2 =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis2), ZoneId.of("Europe/Berlin"))
val zdt1 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis1), ZoneId.of("Europe/Berlin"))
val zdt2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis2), ZoneId.of("Europe/Berlin"))
val expectedTimestamp1 = Timestamp.valueOf(zdt1.toLocalDateTime())
val expectedTimestamp2 = Timestamp.valueOf(zdt2.toLocalDateTime())

ParquetChecker("optional int64 column (TIMESTAMP_MILLIS);", "TIMESTAMP", "int64_timestamp")
ParquetChecker("optional int64 column (TIMESTAMP_MILLIS);", "TIMESTAMP", "int64_timestamp_millis")
.withInputValues[Any](List(millis1, millis2, null))
.assertResultSet(
table()
Expand All @@ -122,6 +120,23 @@ class ParquetDataImporterIT extends BaseDataImporter {
)
}

test("imports int64 (timestamp micros)") {
val timestamp = Timestamp.valueOf("2022-01-12 10:28:53.123456")
val millis = timestamp.getTime()
val micros = timestamp.getTime() * 1000L + (timestamp.getNanos().toLong / 1000) % 1000L
val zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.of("Europe/Berlin"))
val expectedTimestamp = Timestamp.valueOf(zdt.toLocalDateTime())

ParquetChecker("optional int64 column (TIMESTAMP_MICROS);", "TIMESTAMP", "int64_timestamp_micros")
.withInputValues[Any](List(micros, null))
.assertResultSet(
table()
.row(expectedTimestamp)
.row(null)
.matches()
)
}

test("imports float") {
val EPS = java.math.BigDecimal.valueOf(0.0001)
ParquetChecker("optional float column;", "FLOAT", "float_table")
Expand Down
5 changes: 1 addition & 4 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog

Expand Down Expand Up @@ -94,12 +93,10 @@ abstract class Bucket extends LazyLogging {
)
}
val latestSnapshot = deltaLog.update()

latestSnapshot.allFiles
.select("path")
.collect()
.toSeq
.map { case Row(path: String) => new Path(s"$strippedBucketPath/$path") }
.map(addedFile => new Path(s"$strippedBucketPath/${addedFile.path}"))
}

private[this] def createSparkSession(): SparkSession = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ class ParquetRowReaderPrimitiveTypesTest extends BaseParquetReaderTest {
assert(getRecords()(0) === Row(Seq(153L, timestamp)))
}

test("reads INT64 (TIMESTAMP_MICROS) as timestamp value") {
val schema = MessageTypeParser.parseMessageType(
"""|message test {
| required int64 col_timestamp (TIMESTAMP_MICROS);
|}
|""".stripMargin
)
val timestamp = Timestamp.valueOf("2022-01-12 08:28:53.123456")
val micros = timestamp.getTime() * 1000L + (timestamp.getNanos().toLong / 1000) % 1000L
withResource(getParquetWriter(schema, false)) { writer =>
val record = new SimpleGroup(schema)
record.append("col_timestamp", micros)
writer.write(record)
}
assert(getRecords()(0) === Row(Seq(timestamp)))
}

test("reads FIXED_LEN_BYTE_ARRAY as string value") {
val size = 5
val schema = MessageTypeParser.parseMessageType(
Expand Down

0 comments on commit b2d0a65

Please sign in to comment.