diff --git a/codefeedr-core/src/main/scala/org/codefeedr/pipeline/PipelineBuilder.scala b/codefeedr-core/src/main/scala/org/codefeedr/pipeline/PipelineBuilder.scala index 55d25d7e..c44c92bc 100644 --- a/codefeedr-core/src/main/scala/org/codefeedr/pipeline/PipelineBuilder.scala +++ b/codefeedr-core/src/main/scala/org/codefeedr/pipeline/PipelineBuilder.scala @@ -259,6 +259,7 @@ class PipelineBuilder extends Logging { def enableCheckpointing(interval: Long, checkpointingMode: CheckpointingMode) = { this.checkpointing = Some(interval) + this.checkpointingMode = checkpointingMode this } @@ -268,7 +269,7 @@ class PipelineBuilder extends Logging { * @param interval The interval to checkpoint on. * @return This builder instance. */ - def enableCheckpointing(interval: Long) = { + def enableCheckpointing(interval: Long): PipelineBuilder = { this.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE) } @@ -545,6 +546,7 @@ class PipelineBuilder extends Logging { streamTimeCharacteristic, restartStrategy, checkpointing, + checkpointingMode, stateBackend) Pipeline(name, props, graph, stageProperties.toMap) diff --git a/codefeedr-core/src/test/scala/org/codefeedr/pipeline/PipelineBuilderTest.scala b/codefeedr-core/src/test/scala/org/codefeedr/pipeline/PipelineBuilderTest.scala index 008609cd..a1503f8b 100644 --- a/codefeedr-core/src/test/scala/org/codefeedr/pipeline/PipelineBuilderTest.scala +++ b/codefeedr-core/src/test/scala/org/codefeedr/pipeline/PipelineBuilderTest.scala @@ -188,7 +188,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers { assert( pipeline.pipelineProperties.restartStrategy == RestartStrategies - .noRestart()) + .fallBackRestart()) } test("Default StateBackend is memory") { @@ -201,7 +201,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers { test("Default StateBackend can be overriden.") { val pipeline = builder .append(new SimpleSourceStage()) - .setStateBackend(new FsStateBackend("/")) + .setStateBackend(new FsStateBackend("file://test/test")) .build() assert(