Skip to content

Commit

Permalink
Fixed some bugs in the tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 2, 2019
1 parent 767d191 commit bb60ce8
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Expand Up @@ -259,6 +259,7 @@ class PipelineBuilder extends Logging {
def enableCheckpointing(interval: Long,
checkpointingMode: CheckpointingMode) = {
this.checkpointing = Some(interval)
this.checkpointingMode = checkpointingMode

this
}
Expand All @@ -268,7 +269,7 @@ class PipelineBuilder extends Logging {
* @param interval The interval to checkpoint on.
* @return This builder instance.
*/
def enableCheckpointing(interval: Long) = {
def enableCheckpointing(interval: Long): PipelineBuilder = {
this.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
}

Expand Down Expand Up @@ -545,6 +546,7 @@ class PipelineBuilder extends Logging {
streamTimeCharacteristic,
restartStrategy,
checkpointing,
checkpointingMode,
stateBackend)

Pipeline(name, props, graph, stageProperties.toMap)
Expand Down
Expand Up @@ -188,7 +188,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {

assert(
pipeline.pipelineProperties.restartStrategy == RestartStrategies
.noRestart())
.fallBackRestart())
}

test("Default StateBackend is memory") {
Expand All @@ -201,7 +201,7 @@ class PipelineBuilderTest extends FunSuite with BeforeAndAfter with Matchers {
test("Default StateBackend can be overriden.") {
val pipeline = builder
.append(new SimpleSourceStage())
.setStateBackend(new FsStateBackend("/"))
.setStateBackend(new FsStateBackend("file://test/test"))
.build()

assert(
Expand Down

0 comments on commit bb60ce8

Please sign in to comment.