Skip to content

Commit

Permalink
Merge 90e034b into 018068e
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Dec 9, 2022
2 parents 018068e + 90e034b commit 484df20
Show file tree
Hide file tree
Showing 20 changed files with 887 additions and 156 deletions.
10 changes: 6 additions & 4 deletions build.sbt
Expand Up @@ -36,7 +36,7 @@ val V = new {
val scalaTestPlus = "3.2.14.0"
val scalaCheck = "1.17.0"
val testContainersScala = "0.40.11"
val jackson = "2.14.0"
val jackson = "2.14.1"
val circe = "0.14.2"
val http4s = "0.23.12"
val enumeratum = "1.7.0"
Expand All @@ -45,12 +45,13 @@ val V = new {
val squants = "1.8.3"
val confluentAvroSerde = "7.1.1"
val parquet = "1.12.3"
val awsSdk = "1.12.332"
val awsSdk = "1.12.351"
val jdbcMysql = "8.0.30"
val jdbcPg = "42.5.0"
val jdbcPg = "42.5.1"
val jdbcMssql = "11.2.0.jre11"
val hadoop = "3.3.2"
val cassandraDriver = "3.11.3"
val uuidCreator = "5.2.0"
val jna = "5.12.1" // needed for testcontainers in some jvms
}

Expand Down Expand Up @@ -109,6 +110,7 @@ val circeDeps = Seq(
).map(d => "io.circe" %% s"circe-$d" % V.circe)

val otherDeps = Seq(
"com.github.f4b6a3" % "uuid-creator" % V.uuidCreator,
"org.apache.hadoop" % "hadoop-client" % V.hadoop % Provided,
"io.confluent" % "kafka-avro-serializer" % V.confluentAvroSerde % Provided,
"com.amazonaws" % "aws-java-sdk-core" % V.awsSdk % Provided,
Expand All @@ -121,7 +123,7 @@ val otherDeps = Seq(
"org.scalatestplus" %% "scalacheck-1-17" % V.scalaTestPlus % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % V.jackson,
"com.github.pjfanning" %% "jackson-scala-reflect-extensions" % "2.14.0",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-csv" % V.jackson,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % V.jackson,
"com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala % Test,
Expand Down
Expand Up @@ -2,11 +2,13 @@ package io.epiphanous.flinkrunner.flink.state
import scala.collection.JavaConverters._

object RichStateUtils {
implicit class RichListState[T](listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
implicit class RichListState[T](
listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
def contains(element: T): Boolean = _iterator.contains(element)
def find(element: T) : Option[T]= _iterator.find(v => v.equals(element))
def length: Int = _iterator.length
def find(element: T): Option[T] =
_iterator.find(v => v.equals(element))
def length: Int = _iterator.length
}
}
@@ -0,0 +1,64 @@
package io.epiphanous.flinkrunner.model

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.connector.jdbc.JdbcConnectionOptions
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider

import java.sql.{Connection, DriverManager}
import java.util.Properties
import scala.util.Try

/** A basic replacement for flink's SimpleJdbcConnectionProvider, which
* does not allow using other properties besides user and password when
* establishing a jdbc connection. This is useful for some jdbc drivers
* that require more complex security credentials or other special
* properties to establish a connection.
* @param jdbcOptions
* a flink JdbcConnectionOptions object
* @param props
* arbitrary properties to provide the driver during connection
*/
@SerialVersionUID(2022083817L)
class BasicJdbcConnectionProvider(
jdbcOptions: JdbcConnectionOptions,
props: Properties = new Properties())
extends JdbcConnectionProvider
with LazyLogging
with Serializable {

val url: String = jdbcOptions.getDbURL

jdbcOptions.getUsername.ifPresent(user =>
props.setProperty("user", user)
)
jdbcOptions.getPassword.ifPresent(pwd =>
props.setProperty("password", pwd)
)

@transient
var connection: Connection = _

override def getConnection: Connection = connection

override def getOrEstablishConnection(): Connection =
Option(connection).getOrElse {
Option(jdbcOptions.getDriverName).map(Class.forName)
connection = DriverManager.getConnection(url, props)
connection
}

override def isConnectionValid: Boolean = Option(connection).exists(c =>
c.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds)
)

override def closeConnection(): Unit =
Try(Option(connection).map(_.close())).fold(
err => logger.warn("Failed to close jdbc connection", err),
_ => ()
)

override def reestablishConnection(): Connection = {
closeConnection()
getOrEstablishConnection()
}
}
73 changes: 73 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/model/D64.scala
@@ -0,0 +1,73 @@
package io.epiphanous.flinkrunner.model

import java.nio.ByteBuffer
import scala.annotation.switch
import scala.collection.mutable

/** A copy-pastable, url friendly, ascii embeddable, lexicographically
* sortable binary encoding
*
* Adapted from javascript code at https://github.com/dominictarr/d64
*/
object D64 {

final val chars =
".PYFGCRLAOEUIDHTNSQJKXBMWVZ_pyfgcrlaoeuidhtnsqjkxbmwvz1234567890"
.split("")
.sorted
.mkString("")

final val codeToIndex = Array.fill[Int](128)(0)
chars.zipWithIndex.foreach { case (c, i) => codeToIndex(c.toInt) = i }

/** encode binary data to string */
def encode(data: Array[Byte]): String = {
val sb = new mutable.StringBuilder("")
val len = data.length
var hang = 0
data.zipWithIndex.foreach { case (v, i) =>
val v2 = if (v < 0) v + 0x100 else v
(i % 3: @switch) match {
case 0 =>
sb += chars(v2 >> 2)
hang = (v2 & 3) << 4
case 1 =>
sb += chars(hang | v2 >> 4)
hang = (v2 & 0xf) << 2
case 2 =>
sb += chars(hang | v2 >> 6)
sb += chars(v2 & 0x3f)
hang = 0
}
}
if ((len % 3) > 0) sb += chars(hang)
sb.mkString
}

/** decode an encoded string back to binary data */
def decode(str: String): Array[Byte] = {
val len = str.length
val bytes = ByteBuffer.allocate(Math.floor((len / 4d) * 3d).toInt)
var hang = 0
var j = 0
str.zipWithIndex.foreach { case (_, i) =>
val v = codeToIndex(str.codePointAt(i))
(i % 4: @switch) match {
case 0 => hang = v << 2
case 1 =>
bytes.put(j, (hang | v >> 4).toByte)
j += 1
hang = (v << 4) & 0xff
case 2 =>
bytes.put(j, (hang | v >> 2).toByte)
j += 1
hang = (v << 6) & 0xff
case 3 =>
bytes.put(j, (hang | v).toByte)
j += 1
}
}
bytes.array()
}

}
Expand Up @@ -23,7 +23,7 @@ trait FlinkEvent extends Product with Serializable {

/** a path for bucketing streams of this event */
@AvroIgnore def $bucketId: String =
Instant.ofEpochMilli($timestamp).prefixedTimePath(s"$$key/")
Instant.ofEpochMilli($timestamp).prefixedTimePath($key)

/** an id to use to deduplicate a stream of events */
@AvroIgnore def $dedupeId: String = $id
Expand Down
82 changes: 82 additions & 0 deletions src/main/scala/io/epiphanous/flinkrunner/model/Id64.scala
@@ -0,0 +1,82 @@
package io.epiphanous.flinkrunner.model

import com.github.f4b6a3.uuid.UuidCreator
import io.epiphanous.flinkrunner.util.UuidUtils.RichUuid

import java.time.Instant
import java.util.UUID

/** Generates short identifiers that are time-sortable, url-safe,
* lexicographically stable, and globally unique.
*
* Generated identifiers are by default 22 characters long. You can
* recover the underlying time-based UUID from the string identifier.
*
* If you don't care about recovering the underlying UUID, you can
* generate a 20 character id.
*
* Based on the original typescript implementation at
* https://github.com/epiphanous/id64
*/
object Id64 {

final val SHUFFLE_ORDER =
Array(6, 7, 4, 5, 0, 1, 2, 3, 8, 9, 10, 11, 12, 13, 14, 15)
final val SHORT_SHUFFLE_ORDER = SHUFFLE_ORDER.filterNot(_ == 8)
final val UNSHUFFLE_ORDER =
Array(4, 5, 6, 7, 2, 3, 0, 1, 8, 9, 10, 11, 12, 13, 14, 15)
final val GREGORIAN_OFFSET = 0x01b21dd213814000L

def shuffle(bytes: Array[Byte], reversible: Boolean): Array[Byte] =
if (reversible) SHUFFLE_ORDER.map(i => bytes(i))
else SHORT_SHUFFLE_ORDER.map(i => bytes(i))

def unshuffle(bytes: Array[Byte]): Array[Byte] =
UNSHUFFLE_ORDER.map(i => bytes(i))

def gen(reversible: Boolean = true): String =
fromUUID(UuidCreator.getTimeBased(), reversible)

def fromUUIDString(uuid: String, reversible: Boolean = true): String =
fromUUID(UuidCreator.fromString(uuid), reversible)

def fromUUID(uuid: UUID, reversible: Boolean = true): String = {
val version = uuid.version()
assert(
version == 1,
"ID64 requires time-based (v1) UUIDs to work"
)
D64.encode(shuffle(uuid.bytes, reversible))
}

def ungen(id: String): UUID = {
val b = D64.decode(id)
UuidCreator.fromBytes(unshuffle(if (b.length == 15) {
val bb = Array.fill[Byte](16)(0)
System.arraycopy(b, 0, bb, 0, 8)
System.arraycopy(b, 8, bb, 9, 7)
bb(8) = 17
bb
} else b))
}

def uuidOf(id: String): UUID = ungen(id)
def toUUID(id: String): UUID = ungen(id)

def ticksOf(id: String): Long = uuidOf(id).timestamp()
def microsOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10)
.toLong
def millisOf(id: String): Long =
Math
.floor((ticksOf(id) - GREGORIAN_OFFSET) / 10000)
.toLong
def instantOf(id: String): Instant = {
val t = ticksOf(id) - GREGORIAN_OFFSET
val secs = t / 10000000
val nsecs = (t - secs * 10000000) * 100
Instant.ofEpochSecond(secs, nsecs)
}

}
@@ -0,0 +1,22 @@
package io.epiphanous.flinkrunner.model

import enumeratum._

import scala.collection.immutable

sealed trait KafkaInfoHeader extends EnumEntry

object KafkaInfoHeader extends Enum[KafkaInfoHeader] {

case object SerializedValueSize extends KafkaInfoHeader
case object SerializedKeySize extends KafkaInfoHeader
case object Offset extends KafkaInfoHeader
case object Partition extends KafkaInfoHeader
case object Timestamp extends KafkaInfoHeader
case object TimestampType extends KafkaInfoHeader
case object Topic extends KafkaInfoHeader

def headerName(h: KafkaInfoHeader) = s"Kafka.${h.entryName}"

override def values: immutable.IndexedSeq[KafkaInfoHeader] = findValues
}
Expand Up @@ -186,6 +186,17 @@ object SqlColumnType {
withPrecision(6)
)

final val JSON = SqlColumnType(
"json",
Types.VARCHAR,
productConfig = Map(
Mysql -> withName("JSON", sharedDefaultConfig),
Postgresql -> withName("JSONB", sharedDefaultConfig),
Snowflake -> withName("VARIANT", sharedDefaultConfig),
SqlServer -> withName("NVARCHAR", sharedDefaultConfig)
)
)

final val values = Map(
"BOOLEAN" -> BOOLEAN,
"TINYINT" -> TINYINT,
Expand All @@ -203,7 +214,8 @@ object SqlColumnType {
"TIME" -> TIME,
"TIME_WITH_TIMEZONE" -> TIME_WITH_TIMEZONE,
"TIMESTAMP" -> TIMESTAMP,
"TIMESTAMP_WITH_TIMEZONE" -> TIMESTAMP_WITH_TIMEZONE
"TIMESTAMP_WITH_TIMEZONE" -> TIMESTAMP_WITH_TIMEZONE,
"JSON" -> JSON
)

}

0 comments on commit 484df20

Please sign in to comment.