diff --git a/src/main/scala/io/epiphanous/flinkrunner/avro/ConfluentSchemaRegistryClient.scala b/src/main/scala/io/epiphanous/flinkrunner/avro/ConfluentSchemaRegistryClient.scala index d660fafa..29c17218 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/avro/ConfluentSchemaRegistryClient.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/avro/ConfluentSchemaRegistryClient.scala @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} -class ConfluentSchemaRegistryClient() (implicit +class ConfluentSchemaRegistryClient()(implicit config: FlinkConfig, decoder: Decoder[ConfluentSchemaRegistryResponse]) extends AvroSchemaRegistryClient[ConfluentSchemaRegistryContext] @@ -53,14 +53,7 @@ class ConfluentSchemaRegistryClient() (implicit lazy val parser = new Parser() @transient - lazy val schemaRegistryBaseUrl = sys.env.get("SCHEMA_BROKERS") - val urlBase = schemaRegistryBaseUrl match { - case Some(value) => - value - case None => - config.getString(s"$configPrefix.url.base") - } - logger.info(s"urlBase $urlBase") + lazy val urlBase: String = config.getString(s"$configPrefix.url.base") @transient lazy val cacheLoader: CacheLoader[String, Try[RegisteredAvroSchema]] = diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala index 62d575ef..505ec33f 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/FlinkConfig.scala @@ -304,27 +304,20 @@ class FlinkConfig( getString("watermark.strategy") ) - lazy val s3CheckpointUrlEnv = sys.env.get("S3_CHECKPOINTING_URL") - val checkpointUrl = s3CheckpointUrlEnv match { - case Some(value) => - value - case None => - getString("checkpoint.url") - } - - lazy val systemHelp = _config.getString("system.help") - lazy val jobHelp = getString("help") - lazy val jobDescription = getString("description") - lazy val globalParallelism = getInt("global.parallelism") - lazy val checkpointInterval = getLong("checkpoint.interval") - lazy val checkpointMinPause = getDuration("checkpoint.min.pause") + lazy val systemHelp = _config.getString("system.help") + lazy val jobHelp = getString("help") + lazy val jobDescription = getString("description") + lazy val globalParallelism = getInt("global.parallelism") + lazy val checkpointInterval = getLong("checkpoint.interval") + lazy val checkpointMinPause = getDuration("checkpoint.min.pause") lazy val checkpointMaxConcurrent = getInt("checkpoint.max.concurrent") - lazy val checkpointFlash = getBoolean("checkpoint.flash") - lazy val stateBackend = getString("state.backend").toLowerCase - lazy val checkpointIncremental = getBoolean("checkpoint.incremental") - lazy val showPlan = getBoolean("show.plan") - lazy val mockEdges = isDev && getBoolean("mock.edges") - lazy val maxLateness = getDuration("max.lateness") - lazy val maxIdleness = getDuration("max.idleness") + lazy val checkpointUrl = getString("checkpoint.url") + lazy val checkpointFlash = getBoolean("checkpoint.flash") + lazy val stateBackend = getString("state.backend").toLowerCase + lazy val checkpointIncremental = getBoolean("checkpoint.incremental") + lazy val showPlan = getBoolean("show.plan") + lazy val mockEdges = isDev && getBoolean("mock.edges") + lazy val maxLateness = getDuration("max.lateness") + lazy val maxIdleness = getDuration("max.idleness") } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala index 1c3dfc41..58a73b53 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SinkConfig.scala @@ -15,7 +15,6 @@ sealed trait SinkConfig { } object SinkConfig { - lazy val s3BasePath = sys.env.get("S3_BUCKET") def apply(name: String, config: FlinkConfig): SinkConfig = { val p = s"sinks.$name" FlinkConnectorName.withNameInsensitiveOption( @@ -24,21 +23,12 @@ object SinkConfig { case Some(connector) => connector match { case Kafka => - val refConf = config.getProperties(s"$p.config") - val kafkaBrokers = sys.env.get("KAFKA_BROKERS") - val kafkaConfig = kafkaBrokers match { - case Some(value) => - refConf.setProperty("bootstrap.servers", value) - refConf - case None => - refConf - } KafkaSinkConfig( connector, name, config.getString(s"$p.topic"), config.getBoolean(s"$p.isKeyed"), - kafkaConfig + config.getProperties(s"$p.config") ) case Kinesis => KinesisSinkConfig( @@ -48,16 +38,10 @@ object SinkConfig { config.getProperties(s"$p.config") ) case File => - val path = s3BasePath match { - case Some(value) => - value - case None => - config.getString(s"$p.path") - } FileSinkConfig( connector, name, - path, + config.getString(s"$p.path"), config.getProperties(s"$p.config") ) case Socket => diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala index 3d6125ac..f736f539 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SourceConfig.scala @@ -37,15 +37,6 @@ object SourceConfig { case Some(connector) => connector match { case Kafka => - val refConf = config.getProperties(s"$p.config") - val kafkaBrokers = sys.env.get("KAFKA_BROKERS") - val kafkaConfig = kafkaBrokers match { - case Some(value) => - refConf.setProperty("bootstrap.servers", value) - refConf - case None => - refConf - } KafkaSourceConfig( connector, name, @@ -53,7 +44,7 @@ object SourceConfig { config.getBoolean(s"$p.isKeyed"), timeCharacteristic, watermarkStrategy, - kafkaConfig + config.getProperties(s"$p.config") ) case Kinesis => KinesisSourceConfig(