Skip to content

Commit

Permalink
Merge c3f8199 into 018068e
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Dec 8, 2022
2 parents 018068e + c3f8199 commit 4e65233
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 4 deletions.
10 changes: 6 additions & 4 deletions build.sbt
Expand Up @@ -36,7 +36,7 @@ val V = new {
val scalaTestPlus = "3.2.14.0"
val scalaCheck = "1.17.0"
val testContainersScala = "0.40.11"
val jackson = "2.14.0"
val jackson = "2.14.1"
val circe = "0.14.2"
val http4s = "0.23.12"
val enumeratum = "1.7.0"
Expand All @@ -45,12 +45,13 @@ val V = new {
val squants = "1.8.3"
val confluentAvroSerde = "7.1.1"
val parquet = "1.12.3"
val awsSdk = "1.12.332"
val awsSdk = "1.12.351"
val jdbcMysql = "8.0.30"
val jdbcPg = "42.5.0"
val jdbcPg = "42.5.1"
val jdbcMssql = "11.2.0.jre11"
val hadoop = "3.3.2"
val cassandraDriver = "3.11.3"
val uuidCreator = "5.2.0"
val jna = "5.12.1" // needed for testcontainers in some jvms
}

Expand Down Expand Up @@ -109,6 +110,7 @@ val circeDeps = Seq(
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
Expand All @@ -121,7 +123,7 @@ val otherDeps = Seq(
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
Expand Down
73 changes: 73 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/model/D64.scala
@@ -0,0 +1,73 @@
package io.epiphanous.flinkrunner.model

import java.nio.ByteBuffer
import scala.annotation.switch
import scala.collection.mutable

/** A copy-pastable, url friendly, ascii embeddable, lexicographically
* sortable binary encoding
*
* Adapted from javascript code at https://github.com/dominictarr/d64
*/
object D64 {

final val chars =
".PYFGCRLAOEUIDHTNSQJKXBMWVZ_pyfgcrlaoeuidhtnsqjkxbmwvz1234567890"
.split("")
.sorted
.mkString("")

final val codeToIndex = Array.fill[Int](128)(0)
chars.zipWithIndex.foreach { case (c, i) => codeToIndex(c.toInt) = i }

/** encode binary data to string */
def encode(data: Array[Byte]): String = {
val sb = new mutable.StringBuilder("")
val len = data.length
var hang = 0
data.zipWithIndex.foreach { case (v, i) =>
val v2 = if (v < 0) v + 0x100 else v
(i % 3: @switch) match {
case 0 =>
sb += chars(v2 >> 2)
hang = (v2 & 3) << 4
case 1 =>
sb += chars(hang | v2 >> 4)
hang = (v2 & 0xf) << 2
case 2 =>
sb += chars(hang | v2 >> 6)
sb += chars(v2 & 0x3f)
hang = 0
}
}
if ((len % 3) > 0) sb += chars(hang)
sb.mkString
}

/** decode an encoded string back to binary data */
def decode(str: String): Array[Byte] = {
val len = str.length
val bytes = ByteBuffer.allocate(Math.floor((len / 4d) * 3d).toInt)
var hang = 0
var j = 0
str.zipWithIndex.foreach { case (_, i) =>
val v = codeToIndex(str.codePointAt(i))
(i % 4: @switch) match {
case 0 => hang = v << 2
case 1 =>
bytes.put(j, (hang | v >> 4).toByte)
j += 1
hang = (v << 4) & 0xff
case 2 =>
bytes.put(j, (hang | v >> 2).toByte)
j += 1
hang = (v << 6) & 0xff
case 3 =>
bytes.put(j, (hang | v).toByte)
j += 1
}
}
bytes.array()
}

}
82 changes: 82 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/model/Id64.scala
@@ -0,0 +1,82 @@
package io.epiphanous.flinkrunner.model

import com.github.f4b6a3.uuid.UuidCreator
import io.epiphanous.flinkrunner.util.UuidUtils.RichUuid

import java.time.Instant
import java.util.UUID

/** Generates short identifiers that are time-sortable, url-safe,
* lexicographically stable, and globally unique.
*
* Generated identifiers are by default 22 characters long. You can
* recover the underlying time-based UUID from the string identifier.
*
* If you don't care about recovering the underlying UUID, you can
* generate a 20 character id.
*
* Based on the original typescript implementation at
* https://github.com/epiphanous/id64
*/
object Id64 {

final val SHUFFLE_ORDER =
Array(6, 7, 4, 5, 0, 1, 2, 3, 8, 9, 10, 11, 12, 13, 14, 15)
final val SHORT_SHUFFLE_ORDER = SHUFFLE_ORDER.filterNot(_ == 8)
final val UNSHUFFLE_ORDER =
Array(4, 5, 6, 7, 2, 3, 0, 1, 8, 9, 10, 11, 12, 13, 14, 15)
final val GREGORIAN_OFFSET = 0x01b21dd213814000L

def shuffle(bytes: Array[Byte], reversible: Boolean): Array[Byte] =
if (reversible) SHUFFLE_ORDER.map(i => bytes(i))
else SHORT_SHUFFLE_ORDER.map(i => bytes(i))

def unshuffle(bytes: Array[Byte]): Array[Byte] =
UNSHUFFLE_ORDER.map(i => bytes(i))

def gen(reversible: Boolean = true): String =
fromUUID(UuidCreator.getTimeBased(), reversible)

def fromUUIDString(uuid: String, reversible: Boolean = true): String =
fromUUID(UuidCreator.fromString(uuid), reversible)

def fromUUID(uuid: UUID, reversible: Boolean = true): String = {
val version = uuid.version()
assert(
version == 1,
"ID64 requires time-based (v1) UUIDs to work"
)
D64.encode(shuffle(uuid.bytes, reversible))
}

def ungen(id: String): UUID = {
val b = D64.decode(id)
UuidCreator.fromBytes(unshuffle(if (b.length == 15) {
val bb = Array.fill[Byte](16)(0)
System.arraycopy(b, 0, bb, 0, 8)
System.arraycopy(b, 8, bb, 9, 7)
bb(8) = 17
bb
} else b))
}

def uuidOf(id: String): UUID = ungen(id)
def toUUID(id: String): UUID = ungen(id)

def ticksOf(id: String): Long = uuidOf(id).timestamp()
def microsOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10)
.toLong
def millisOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10000)
.toLong
def instantOf(id: String): Instant = {
val t = ticksOf(id) - GREGORIAN_OFFSET
val secs = t / 10000000
val nsecs = (t - secs * 10000000) * 100
Instant.ofEpochSecond(secs, nsecs)
}

}
46 changes: 46 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/util/UuidUtils.scala
@@ -0,0 +1,46 @@
package io.epiphanous.flinkrunner.util

import java.nio.ByteBuffer
import java.util.UUID

object UuidUtils {

/** Name string is a fully-qualified domain name */
val NameSpace_DNS = new UUID(
/* 6ba7b810-9dad-11d1-80b4-00c04fd430c8 */ 0x6ba7b8109dad11d1L,
0x80b400c04fd430c8L
)

/** Name string is a URL */
val NameSpace_URL = new UUID(
/* 6ba7b811-9dad-11d1-80b4-00c04fd430c8 */ 0x6ba7b8119dad11d1L,
0x80b400c04fd430c8L
)

/** Name string is an ISO OID */
val NameSpace_OID = new UUID(
/* 6ba7b812-9dad-11d1-80b4-00c04fd430c8 */ 0x6ba7b8129dad11d1L,
0x80b400c04fd430c8L
)

/** Name string is an X.500 DN (in DER or a text output format) */
val NameSpace_X500 = new UUID(
/* 6ba7b814-9dad-11d1-80b4-00c04fd430c8 */ 0x6ba7b8149dad11d1L,
0x80b400c04fd430c8L
)

val Variant_0_NCS: Byte = 0x0.toByte
val Variant_1_4122: Byte = 0x80.toByte
val Variant_2_Microsoft: Byte = 0xc0.toByte
val Variant_3_Future: Byte = 0xe0.toByte

implicit class RichUuid(uuid: UUID) {
def bytes: Array[Byte] = {
val buffer = ByteBuffer.allocate(16)
buffer.putLong(uuid.getMostSignificantBits)
buffer.putLong(uuid.getLeastSignificantBits)
buffer.array()
}
}

}
32 changes: 32 additions & 0 deletions src/test/scala/io/epiphanous/flinkrunner/model/D64Spec.scala
@@ -0,0 +1,32 @@
package io.epiphanous.flinkrunner.model

import io.epiphanous.flinkrunner.PropSpec

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.util.Random

class D64Spec extends PropSpec {

val rng = new Random()

def toHex(bytes: Array[Byte]): String = {
val sb = new mutable.StringBuilder("")
bytes.foreach(b => sb.append(String.format("%02x", Byte.box(b))))
sb.toString()
}

property("round trip") {
val bytes = ByteBuffer.allocate(36).array()
Range(0, 100).foreach { _ =>
rng.nextBytes(bytes)
val code = D64.encode(bytes)
// println(toHex(bytes), code)
val actual = D64.decode(code)
// println(toHex(actual))
actual shouldEqual bytes
}

}
}
47 changes: 47 additions & 0 deletions src/test/scala/io/epiphanous/flinkrunner/model/Id64Spec.scala
@@ -0,0 +1,47 @@
package io.epiphanous.flinkrunner.model

import io.epiphanous.flinkrunner.PropSpec

class Id64Spec extends PropSpec {

property("creates reversible ids of length 22") {
val id = Id64.gen()
val uuid = Id64.ungen(id)
val id2 = Id64.fromUUID(uuid)
id shouldEqual id2
id.length shouldEqual 22
}

property("uses valid characters only") {
val chars = s"[${D64.chars}]{22}"
val id = Id64.gen()
id should fullyMatch regex chars
}

property("creates non-reversible ids of length 20") {
val id = Id64.gen(false)
id.length shouldEqual 20
}

property("sortable") {
val unsorted = Range(0, 100).map(_ => Id64.gen())
unsorted shouldEqual unsorted.sorted
}

property("can extract time from id") {
val id = Id64.gen()
val t = Id64.ticksOf(id)
val mc = Id64.microsOf(id)
val ms = Id64.millisOf(id)
val i = Id64.instantOf(id)
val g = Id64.GREGORIAN_OFFSET
val e = t - g
mc shouldEqual Math.floor(e / 10)
ms shouldEqual Math.floor(mc / 1000)
i.getEpochSecond shouldEqual Math.floor(
e / 10000000
)
i.getNano shouldEqual (e - 10000000 * i.getEpochSecond) * 100
}

}

0 comments on commit 4e65233

Please sign in to comment.