Skip to content

Commit 445bc8e

Browse files
authored
Merge bb60ce8 into 21e1772
2 parents 21e1772 + bb60ce8 commit 445bc8e

File tree

4 files changed

+185
-5
lines changed

4 files changed

+185
-5
lines changed

codefeedr-core/src/main/scala/org/codefeedr/pipeline/Pipeline.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
*/
1919
package org.codefeedr.pipeline
2020

21+
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
2122
import org.apache.flink.api.java.utils.ParameterTool
2223
import org.apache.flink.configuration.{ConfigConstants, Configuration}
23-
import org.apache.flink.streaming.api.TimeCharacteristic
24+
import org.apache.flink.runtime.state.StateBackend
25+
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
2426
import org.apache.flink.streaming.api.scala._
2527
import org.codefeedr.Properties
2628
import org.codefeedr.buffer.BufferType.BufferType
@@ -33,11 +35,17 @@ import org.codefeedr.pipeline.RuntimeType.RuntimeType
3335
* @param bufferProperties The properties of the Buffer.
3436
* @param keyManager The key manager which provide API call management at stage-level.
3537
* @param streamTimeCharacteristic The TimeCharacteristic of the whole pipeline. Event, Ingestion or Processing.
38+
* @param restartStrategy The RestartStrategy of the whole pipeline.
39+
* @param checkpointing Captures if checkpointing is enabled and if so, what the interval is.
3640
*/
3741
case class PipelineProperties(bufferType: BufferType,
3842
bufferProperties: Properties,
3943
keyManager: KeyManager,
40-
streamTimeCharacteristic: TimeCharacteristic)
44+
streamTimeCharacteristic: TimeCharacteristic,
45+
restartStrategy: RestartStrategyConfiguration,
46+
checkpointing: Option[Long],
47+
checkpointingMode: CheckpointingMode,
48+
stateBackend: StateBackend)
4149

4250
/** The Pipeline holds all the data and logic to execute a CodeFeedr job.
4351
* It stores all stages (Flink jobs) and connects them by setting up buffers (like Kafka).
@@ -63,6 +71,13 @@ case class Pipeline(var name: String,
6371
_environment = StreamExecutionEnvironment.getExecutionEnvironment
6472
_environment.setStreamTimeCharacteristic(
6573
pipelineProperties.streamTimeCharacteristic)
74+
_environment.setRestartStrategy(pipelineProperties.restartStrategy)
75+
_environment.setStateBackend(pipelineProperties.stateBackend)
76+
77+
if (pipelineProperties.checkpointing.isDefined) {
78+
_environment.enableCheckpointing(pipelineProperties.checkpointing.get,
79+
pipelineProperties.checkpointingMode)
80+
}
6681
}
6782

6883
_environment

codefeedr-core/src/main/scala/org/codefeedr/pipeline/PipelineBuilder.scala

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
*/
1919
package org.codefeedr.pipeline
2020

21-
import org.apache.flink.streaming.api.TimeCharacteristic
21+
import org.apache.flink.api.common.restartstrategy.RestartStrategies
22+
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
23+
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
24+
import org.apache.flink.runtime.state.StateBackend
25+
import org.apache.flink.runtime.state.memory.MemoryStateBackend
26+
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
2227
import org.apache.flink.streaming.api.scala.{
2328
DataStream,
2429
StreamExecutionEnvironment
@@ -80,6 +85,19 @@ class PipelineBuilder extends Logging {
8085
/** The name of the pipeline, "CodeFeedr pipeline" by default. */
8186
protected var name = "CodeFeedr pipeline"
8287

88+
/** The RestartStrategy. Default: [[RestartStrategies.noRestart()]] */
89+
protected var restartStrategy = RestartStrategies.noRestart()
90+
91+
/** The Checkpointing interval. Default: None (No checkpointing). */
92+
protected var checkpointing: Option[Long] = None
93+
94+
/** The StateBackend. Default: [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] */
95+
protected var stateBackend: StateBackend = new MemoryStateBackend()
96+
97+
/** The checkpointing mode. Default is exactly once.*/
98+
protected var checkpointingMode: CheckpointingMode =
99+
CheckpointingMode.EXACTLY_ONCE
100+
83101
/** Get the type of the buffer.
84102
*
85103
* @return The buffer type.
@@ -198,6 +216,29 @@ class PipelineBuilder extends Logging {
198216
this
199217
}
200218

219+
/** Set the RestartStrategy of the whole pipeline.
220+
*
221+
* @param strategy The strategy.
222+
* @return The builder instance.
223+
*/
224+
def setRestartStrategy(
225+
strategy: RestartStrategyConfiguration): PipelineBuilder = {
226+
this.restartStrategy = strategy
227+
228+
this
229+
}
230+
231+
/** Sets the StateBackend of the whole pipeline.
232+
*
233+
* @param stateBackend the statebackend.
234+
* @return The builder instance.
235+
*/
236+
def setStateBackend(stateBackend: StateBackend): PipelineBuilder = {
237+
this.stateBackend = stateBackend
238+
239+
this
240+
}
241+
201242
/** Sets the serializer type for the buffer.
202243
*
203244
* @param serializer The serializer type (which is basically a string).
@@ -209,6 +250,39 @@ class PipelineBuilder extends Logging {
209250
this
210251
}
211252

253+
/** Enable checkpointing for this pipeline.
254+
*
255+
* @param interval The interval to checkpoint on.
256+
* @param checkpointingMode The checkpointingmode (exactly once or at least once).
257+
* @return This builder instance.
258+
*/
259+
def enableCheckpointing(interval: Long,
260+
checkpointingMode: CheckpointingMode) = {
261+
this.checkpointing = Some(interval)
262+
this.checkpointingMode = checkpointingMode
263+
264+
this
265+
}
266+
267+
/** Enable checkpointing for this pipeline.
268+
*
269+
* @param interval The interval to checkpoint on.
270+
* @return This builder instance.
271+
*/
272+
def enableCheckpointing(interval: Long): PipelineBuilder = {
273+
this.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
274+
}
275+
276+
/** Sets the CheckpointMode for this pipeline. Note: this method does not enable checkpointing.
277+
*
278+
* @param checkpointingMode The checkpointingmode (exactly once or at least once).
279+
*/
280+
def setCheckpointingMode(checkpointingMode: CheckpointingMode) = {
281+
this.checkpointingMode = checkpointingMode
282+
283+
this
284+
}
285+
212286
/** Append a [[Stage]] in a sequential pipeline.
213287
*
214288
* @param stage The new stage to add.
@@ -469,7 +543,11 @@ class PipelineBuilder extends Logging {
469543
val props = PipelineProperties(bufferType,
470544
bufferProperties,
471545
keyManager,
472-
streamTimeCharacteristic)
546+
streamTimeCharacteristic,
547+
restartStrategy,
548+
checkpointing,
549+
checkpointingMode,
550+
stateBackend)
473551

474552
Pipeline(name, props, graph, stageProperties.toMap)
475553
}

codefeedr-core/src/test/scala/org/codefeedr/pipeline/PipelineBuilderTest.scala

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.codefeedr.pipeline
1919

20+
import org.apache.flink.api.common.restartstrategy.RestartStrategies
21+
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
2022
import org.apache.flink.streaming.api.functions.sink.SinkFunction
2123
import org.apache.flink.streaming.api.scala.{
2224
DataStream,
@@ -25,7 +27,9 @@ import org.apache.flink.streaming.api.scala.{
2527
import org.codefeedr.keymanager.StaticKeyManager
2628
import org.codefeedr.buffer.{Buffer, BufferType}
2729
import org.apache.flink.api.scala._
28-
import org.apache.flink.streaming.api.TimeCharacteristic
30+
import org.apache.flink.runtime.state.filesystem.FsStateBackend
31+
import org.apache.flink.runtime.state.memory.MemoryStateBackend
32+
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
2933
import org.codefeedr.buffer.serialization.Serializer
3034
import org.codefeedr.stages.utilities.StringType
3135
import org.codefeedr.stages.OutputStage
@@ -168,6 +172,81 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {
168172
pipeline.pipelineProperties.streamTimeCharacteristic == TimeCharacteristic.IngestionTime)
169173
}
170174

175+
test("Default RestartStrategy is no restart.") {
176+
val pipeline = builder.append(new SimpleSourceStage()).build()
177+
178+
assert(
179+
pipeline.pipelineProperties.restartStrategy == RestartStrategies
180+
.noRestart())
181+
}
182+
183+
test("Default RestartStrategy can be overriden.") {
184+
val pipeline = builder
185+
.append(new SimpleSourceStage())
186+
.setRestartStrategy(RestartStrategies.fallBackRestart())
187+
.build()
188+
189+
assert(
190+
pipeline.pipelineProperties.restartStrategy == RestartStrategies
191+
.fallBackRestart())
192+
}
193+
194+
test("Default StateBackend is memory") {
195+
val pipeline = builder.append(new SimpleSourceStage()).build()
196+
197+
assert(
198+
pipeline.pipelineProperties.stateBackend.isInstanceOf[MemoryStateBackend])
199+
}
200+
201+
test("Default StateBackend can be overriden.") {
202+
val pipeline = builder
203+
.append(new SimpleSourceStage())
204+
.setStateBackend(new FsStateBackend("file://test/test"))
205+
.build()
206+
207+
assert(
208+
pipeline.pipelineProperties.stateBackend.isInstanceOf[FsStateBackend])
209+
}
210+
211+
test("Default checkpointing is disabled") {
212+
val pipeline = builder.append(new SimpleSourceStage()).build()
213+
214+
assert(pipeline.pipelineProperties.checkpointing.isEmpty)
215+
assert(
216+
pipeline.pipelineProperties.checkpointingMode == CheckpointingMode.EXACTLY_ONCE)
217+
}
218+
219+
test("Default checkpointing can be enabled.") {
220+
val pipeline =
221+
builder.append(new SimpleSourceStage()).enableCheckpointing(1000).build()
222+
223+
assert(pipeline.pipelineProperties.checkpointing.get == 1000)
224+
}
225+
226+
test("Default checkpointing can be enabled and mode can be set.") {
227+
val pipeline =
228+
builder
229+
.append(new SimpleSourceStage())
230+
.enableCheckpointing(500, CheckpointingMode.AT_LEAST_ONCE)
231+
.build()
232+
233+
assert(pipeline.pipelineProperties.checkpointing.get == 500)
234+
assert(
235+
pipeline.pipelineProperties.checkpointingMode == CheckpointingMode.AT_LEAST_ONCE)
236+
}
237+
238+
test("Checkpointmode can be overriden.") {
239+
val pipeline =
240+
builder
241+
.append(new SimpleSourceStage())
242+
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
243+
.build()
244+
245+
assert(
246+
pipeline.pipelineProperties.checkpointingMode == CheckpointingMode.AT_LEAST_ONCE)
247+
assert(pipeline.pipelineProperties.checkpointing.isEmpty)
248+
}
249+
171250
test("A non-sequential pipeline cannot switch to a sequential pipeline") {
172251
val a = new SimpleSourceStage()
173252
val b = new SimpleTransformStage()

docs/pages/mydoc/mydoc_pipeline.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ by `builder.disablePipelineVerification()`, however we do not recommend
175175
this. If you disable this, make sure the serialization framework will
176176
support the conversion (if you remove fields, this is often supported).
177177

178+
### Flink environment configuration
179+
Many Flink environment configuration values are overloaded into the pipeline builder:
180+
- [The state backend.](https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html): `builder.setStateBackend(new MemoryStateBackend())`
181+
- [The restart strategy.](https://ci.apache.org/projects/flink/flink-docs-master/dev/restart_strategies.html): `builder.setRestartStrategy(RestartStrategies.noRestart())`
182+
- [Checkpointing](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.htmlhttps://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html): `builder.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)`
183+
184+
**Note**: All these configuration values are pipeline-wide, so they will be configured for every stage. Stage-level environment configuration is currently not supported, however a workaround is to directly configure the environment in the `transform` function.
185+
178186
### Stage Properties
179187
In the PipelineBuilder properties can be specified **per** stage. This
180188
properties map is available to the Stage at run-time. To set a stage

0 commit comments

Comments
 (0)