Skip to content

Commit

Permalink
reverting the review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbhatt-mdsol committed Dec 1, 2021
1 parent b2078ab commit 5147c46
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 58 deletions.
Expand Up @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

class ConfluentSchemaRegistryClient() (implicit
class ConfluentSchemaRegistryClient()(implicit
config: FlinkConfig,
decoder: Decoder[ConfluentSchemaRegistryResponse])
extends AvroSchemaRegistryClient[ConfluentSchemaRegistryContext]
Expand Down Expand Up @@ -53,14 +53,7 @@ class ConfluentSchemaRegistryClient() (implicit
lazy val parser = new Parser()

@transient
lazy val schemaRegistryBaseUrl = sys.env.get("SCHEMA_BROKERS")
val urlBase = schemaRegistryBaseUrl match {
case Some(value) =>
value
case None =>
config.getString(s"$configPrefix.url.base")
}
logger.info(s"urlBase $urlBase")
lazy val urlBase: String = config.getString(s"$configPrefix.url.base")

@transient
lazy val cacheLoader: CacheLoader[String, Try[RegisteredAvroSchema]] =
Expand Down
35 changes: 14 additions & 21 deletions src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala
Expand Up @@ -304,27 +304,20 @@ class FlinkConfig(
getString("watermark.strategy")
)

lazy val s3CheckpointUrlEnv = sys.env.get("S3_CHECKPOINTING_URL")
val checkpointUrl = s3CheckpointUrlEnv match {
case Some(value) =>
value
case None =>
getString("checkpoint.url")
}

lazy val systemHelp = _config.getString("system.help")
lazy val jobHelp = getString("help")
lazy val jobDescription = getString("description")
lazy val globalParallelism = getInt("global.parallelism")
lazy val checkpointInterval = getLong("checkpoint.interval")
lazy val checkpointMinPause = getDuration("checkpoint.min.pause")
lazy val systemHelp = _config.getString("system.help")
lazy val jobHelp = getString("help")
lazy val jobDescription = getString("description")
lazy val globalParallelism = getInt("global.parallelism")
lazy val checkpointInterval = getLong("checkpoint.interval")
lazy val checkpointMinPause = getDuration("checkpoint.min.pause")
lazy val checkpointMaxConcurrent = getInt("checkpoint.max.concurrent")
lazy val checkpointFlash = getBoolean("checkpoint.flash")
lazy val stateBackend = getString("state.backend").toLowerCase
lazy val checkpointIncremental = getBoolean("checkpoint.incremental")
lazy val showPlan = getBoolean("show.plan")
lazy val mockEdges = isDev && getBoolean("mock.edges")
lazy val maxLateness = getDuration("max.lateness")
lazy val maxIdleness = getDuration("max.idleness")
lazy val checkpointUrl = getString("checkpoint.url")
lazy val checkpointFlash = getBoolean("checkpoint.flash")
lazy val stateBackend = getString("state.backend").toLowerCase
lazy val checkpointIncremental = getBoolean("checkpoint.incremental")
lazy val showPlan = getBoolean("show.plan")
lazy val mockEdges = isDev && getBoolean("mock.edges")
lazy val maxLateness = getDuration("max.lateness")
lazy val maxIdleness = getDuration("max.idleness")

}
20 changes: 2 additions & 18 deletions src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala
Expand Up @@ -15,7 +15,6 @@ sealed trait SinkConfig {
}

object SinkConfig {
lazy val s3BasePath = sys.env.get("S3_BUCKET")
def apply(name: String, config: FlinkConfig): SinkConfig = {
val p = s"sinks.$name"
FlinkConnectorName.withNameInsensitiveOption(
Expand All @@ -24,21 +23,12 @@ object SinkConfig {
case Some(connector) =>
connector match {
case Kafka =>
val refConf = config.getProperties(s"$p.config")
val kafkaBrokers = sys.env.get("KAFKA_BROKERS")
val kafkaConfig = kafkaBrokers match {
case Some(value) =>
refConf.setProperty("bootstrap.servers", value)
refConf
case None =>
refConf
}
KafkaSinkConfig(
connector,
name,
config.getString(s"$p.topic"),
config.getBoolean(s"$p.isKeyed"),
kafkaConfig
config.getProperties(s"$p.config")
)
case Kinesis =>
KinesisSinkConfig(
Expand All @@ -48,16 +38,10 @@ object SinkConfig {
config.getProperties(s"$p.config")
)
case File =>
val path = s3BasePath match {
case Some(value) =>
value
case None =>
config.getString(s"$p.path")
}
FileSinkConfig(
connector,
name,
path,
config.getString(s"$p.path"),
config.getProperties(s"$p.config")
)
case Socket =>
Expand Down
Expand Up @@ -37,23 +37,14 @@ object SourceConfig {
case Some(connector) =>
connector match {
case Kafka =>
val refConf = config.getProperties(s"$p.config")
val kafkaBrokers = sys.env.get("KAFKA_BROKERS")
val kafkaConfig = kafkaBrokers match {
case Some(value) =>
refConf.setProperty("bootstrap.servers", value)
refConf
case None =>
refConf
}
KafkaSourceConfig(
connector,
name,
config.getString(s"$p.topic"),
config.getBoolean(s"$p.isKeyed"),
timeCharacteristic,
watermarkStrategy,
kafkaConfig
config.getProperties(s"$p.config")
)
case Kinesis =>
KinesisSourceConfig(
Expand Down

0 comments on commit 5147c46

Please sign in to comment.