Skip to content

Commit

Permalink
update to flink 1.14 (#32)
Browse files Browse the repository at this point in the history
* update to flink 1.14

* add tests
  • Loading branch information
nextdude committed Feb 8, 2022
1 parent 07a3939 commit 81e3811
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 143 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -41,7 +41,7 @@
## Maven Dependency

`Flinkrunner 3` is [available on maven central](https://mvnrepository.com/artifact/io.epiphanous/flinkrunner_2.12),
built against Flink 1.13 with Scala 2.12 and JDK 11.
built against Flink 1.14 with Scala 2.12 and JDK 11.

```sbtshell
libraryDependencies += "io.epiphanous" %% "flinkrunner" % <flinkrunner-version>
Expand Down
12 changes: 6 additions & 6 deletions build.sbt
Expand Up @@ -26,19 +26,19 @@ Test / fork := true
resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"

val V = new {
val flink = "1.13.2"
val logback = "1.2.6"
val flink = "1.14.3"
val logback = "1.2.10"
val scalaLogging = "3.9.4"
val scalaTest = "3.2.10"
val scalaCheck = "1.15.4"
val circe = "0.14.1"
val http4s = "0.21.29"
val http4s = "0.23.9"
val enumeratum = "1.7.0"
val typesafeConfig = "1.4.1"
val guava = "29.0-jre"
val guava = "31.0.1-jre"
val squants = "1.8.3"
val avro = "1.10.2"
val avro4s = "4.0.11"
val avro = "1.11.0"
val avro4s = "4.0.12"
}

val flinkDeps =
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
@@ -1 +1 @@
sbt.version=1.4.9
sbt.version=1.5.8
@@ -1,21 +1,20 @@
package io.epiphanous.flinkrunner.avro

import cats.effect.{ContextShift, IO, Resource, Timer}
import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Resource}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.typesafe.scalalogging.LazyLogging
import io.circe.Decoder
import io.epiphanous.flinkrunner.model.FlinkConfig
import io.epiphanous.flinkrunner.util.StringUtils
import org.apache.avro.Schema.Parser
import org.apache.flink.runtime.concurrent.Executors.directExecutionContext
import org.http4s.EntityDecoder
import org.http4s.circe.jsonOf
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.blaze.client.BlazeClientBuilder

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

class ConfluentSchemaRegistryClient()(implicit
Expand All @@ -36,18 +35,9 @@ class ConfluentSchemaRegistryClient()(implicit
: EntityDecoder[IO, ConfluentSchemaRegistryResponse] =
jsonOf[IO, ConfluentSchemaRegistryResponse]

@transient
lazy implicit val ec: ExecutionContext = directExecutionContext()

@transient
lazy implicit val cs: ContextShift[IO] = IO.contextShift(ec)

@transient
lazy implicit val timer: Timer[IO] = IO.timer(ec)

@transient
lazy val api: Resource[IO, Client[IO]] =
BlazeClientBuilder[IO](ec).resource
BlazeClientBuilder[IO].resource

@transient
lazy val parser = new Parser()
Expand Down
97 changes: 38 additions & 59 deletions src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala
Expand Up @@ -3,13 +3,9 @@ package io.epiphanous.flinkrunner.model
import com.typesafe.config.{ConfigFactory, ConfigObject}
import com.typesafe.scalalogging.LazyLogging
import io.epiphanous.flinkrunner.{FlinkRunnerFactory, SEE}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.contrib.streaming.state.{
PredefinedOptions,
RocksDBStateBackend
}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import java.io.File
Expand Down Expand Up @@ -232,9 +228,6 @@ class FlinkConfig(
else
StreamExecutionEnvironment.getExecutionEnvironment

// use event time
env.setStreamTimeCharacteristic(timeCharacteristic)

// set parallelism
env.setParallelism(globalParallelism)

Expand All @@ -250,46 +243,21 @@ class FlinkConfig(
checkpointMaxConcurrent
)

val backend = if (stateBackend == "rocksdb") {
logger.info(s"Using ROCKS DB state backend at $checkpointUrl")
val rocksBackend =
new RocksDBStateBackend(checkpointUrl, checkpointIncremental)
if (checkpointFlash)
rocksBackend.setPredefinedOptions(
PredefinedOptions.FLASH_SSD_OPTIMIZED
)
rocksBackend
} else {
logger.info(s"Using FILE SYSTEM state backend at $checkpointUrl")
new FsStateBackend(checkpointUrl)
}
/* this deprecation is annoying; its due to rocksdb's state backend
extending AbstractStateBackend which is deprecated */
env.setStateBackend(backend)
logger.info(s"Using ROCKS DB state backend at $checkpointUrl")
env.setStateBackend(
new EmbeddedRocksDBStateBackend(checkpointIncremental)
)
env.getCheckpointConfig.setCheckpointStorage(checkpointUrl)
}

env
}
env.setRuntimeMode(executionRuntimeMode)

def getTimeCharacteristic(tc: String): TimeCharacteristic = {
tc.toLowerCase
.replaceFirst("\\s*time$", "") match {
case "event" => TimeCharacteristic.EventTime
case "processing" => TimeCharacteristic.ProcessingTime
case "ingestion" => TimeCharacteristic.IngestionTime
case unknown =>
throw new RuntimeException(
s"Unknown time.characteristic setting: '$unknown'"
)
}
env
}

lazy val timeCharacteristic = getTimeCharacteristic(
getString("time.characteristic")
)

def getWatermarkStrategy(ws: String) =
def getWatermarkStrategy(ws: String): String =
ws.toLowerCase.replaceAll("[^a-z]", "") match {
case "none" => "none"
case "boundedlateness" => "bounded lateness"
case "boundedoutoforderness" => "bounded out of orderness"
case "ascendingtimestamps" => "ascending timestamps"
Expand All @@ -304,20 +272,31 @@ class FlinkConfig(
getString("watermark.strategy")
)

lazy val systemHelp = _config.getString("system.help")
lazy val jobHelp = getString("help")
lazy val jobDescription = getString("description")
lazy val globalParallelism = getInt("global.parallelism")
lazy val checkpointInterval = getLong("checkpoint.interval")
lazy val checkpointMinPause = getDuration("checkpoint.min.pause")
lazy val checkpointMaxConcurrent = getInt("checkpoint.max.concurrent")
lazy val checkpointUrl = getString("checkpoint.url")
lazy val checkpointFlash = getBoolean("checkpoint.flash")
lazy val stateBackend = getString("state.backend").toLowerCase
lazy val checkpointIncremental = getBoolean("checkpoint.incremental")
lazy val showPlan = getBoolean("show.plan")
lazy val mockEdges = isDev && getBoolean("mock.edges")
lazy val maxLateness = getDuration("max.lateness")
lazy val maxIdleness = getDuration("max.idleness")

lazy val systemHelp: String = _config.getString("system.help")
lazy val jobHelp: String = getString("help")
lazy val jobDescription: String = getString("description")
lazy val globalParallelism: Int = getInt("global.parallelism")
lazy val checkpointInterval: Long = getLong("checkpoint.interval")
lazy val checkpointMinPause: Duration = getDuration(
"checkpoint.min.pause"
)
lazy val checkpointMaxConcurrent: Int = getInt(
"checkpoint.max.concurrent"
)
lazy val checkpointUrl: String = getString("checkpoint.url")
lazy val checkpointIncremental: Boolean = getBoolean(
"checkpoint.incremental"
)
lazy val showPlan: Boolean = getBoolean("show.plan")
lazy val mockEdges: Boolean = isDev && getBoolean("mock.edges")
lazy val maxLateness: Duration = getDuration("max.lateness")
lazy val maxIdleness: Duration = getDuration("max.idleness")

lazy val executionRuntimeMode: RuntimeExecutionMode =
Try(getString("execution.runtime-mode")).toOption
.map(_.toUpperCase) match {
case Some("BATCH") => RuntimeExecutionMode.BATCH
case Some("AUTOMATIC") => RuntimeExecutionMode.AUTOMATIC
case _ => RuntimeExecutionMode.STREAMING
}
}
21 changes: 2 additions & 19 deletions src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala
@@ -1,7 +1,6 @@
package io.epiphanous.flinkrunner.model

import io.epiphanous.flinkrunner.model.FlinkConnectorName._
import org.apache.flink.streaming.api.TimeCharacteristic

import java.util.Properties
import scala.util.Try
Expand All @@ -13,21 +12,15 @@ sealed trait SourceConfig {

def label: String = s"$connector/$name"

def timeCharacteristic: TimeCharacteristic

def watermarkStrategy: String

def properties: Properties
}

object SourceConfig {
def apply(name: String, config: FlinkConfig): SourceConfig = {
val p = s"sources.$name"
val timeCharacteristic =
Try(config.getString(s"$p.time.characteristic"))
.map(config.getTimeCharacteristic)
.getOrElse(config.timeCharacteristic)
val watermarkStrategy = Try(config.getString(s"$p.watermark.strategy"))
val p = s"sources.$name"
val watermarkStrategy = Try(config.getString(s"$p.watermark.strategy"))
.map(config.getWatermarkStrategy)
.getOrElse(config.watermarkStrategy)

Expand All @@ -42,7 +35,6 @@ object SourceConfig {
name,
config.getString(s"$p.topic"),
config.getBoolean(s"$p.isKeyed"),
timeCharacteristic,
watermarkStrategy,
config.getProperties(s"$p.config")
)
Expand All @@ -51,7 +43,6 @@ object SourceConfig {
connector,
name,
config.getString(s"$p.stream"),
timeCharacteristic,
watermarkStrategy,
config.getProperties(s"$p.config")
)
Expand All @@ -60,7 +51,6 @@ object SourceConfig {
connector,
name,
config.getString(s"$p.path"),
timeCharacteristic,
watermarkStrategy,
config.getProperties(s"$p.config")
)
Expand All @@ -70,7 +60,6 @@ object SourceConfig {
name,
config.getString(s"$p.host"),
config.getInt(s"$p.port"),
timeCharacteristic,
watermarkStrategy,
config.getProperties(s"$p.config")
)
Expand All @@ -79,7 +68,6 @@ object SourceConfig {
connector,
name,
name,
timeCharacteristic,
watermarkStrategy,
config.getProperties(s"$p.config")
)
Expand All @@ -101,7 +89,6 @@ final case class KafkaSourceConfig(
name: String,
topic: String,
isKeyed: Boolean,
timeCharacteristic: TimeCharacteristic,
watermarkStrategy: String,
properties: Properties)
extends SourceConfig
Expand All @@ -110,7 +97,6 @@ final case class KinesisSourceConfig(
connector: FlinkConnectorName = Kinesis,
name: String,
stream: String,
timeCharacteristic: TimeCharacteristic,
watermarkStrategy: String,
properties: Properties)
extends SourceConfig
Expand All @@ -119,7 +105,6 @@ final case class FileSourceConfig(
connector: FlinkConnectorName = File,
name: String,
path: String,
timeCharacteristic: TimeCharacteristic,
watermarkStrategy: String,
properties: Properties)
extends SourceConfig
Expand All @@ -129,7 +114,6 @@ final case class SocketSourceConfig(
name: String,
host: String,
port: Int,
timeCharacteristic: TimeCharacteristic,
watermarkStrategy: String,
properties: Properties)
extends SourceConfig
Expand All @@ -138,7 +122,6 @@ final case class CollectionSourceConfig(
connector: FlinkConnectorName = Collection,
name: String,
topic: String,
timeCharacteristic: TimeCharacteristic,
watermarkStrategy: String,
properties: Properties)
extends SourceConfig

0 comments on commit 81e3811

Please sign in to comment.