Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jun 27, 2018
1 parent 468f134 commit f77b12b
Showing 1 changed file with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,39 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
test("multiple partitions with coalesce - multiple transformations") {
val input = ContinuousMemoryStream[Int]

// We use a barrier to make sure predicates both before and after coalesce work
val df = input.toDF()
.coalesce(1)
.select('value as 'copy, 'value)
.where('copy =!= 1)
.planWithBarrier
.coalesce(1)
.where('copy =!= 2)
.agg(max('value))

testStream(df, OutputMode.Complete)(
AddData(input, 0, 1, 2),
CheckAnswer(1),
CheckAnswer(0),
StopStream,
AddData(input, 3, 4, 5),
StartStream(),
CheckAnswer(5),
AddData(input, -1, -2, -3),
CheckAnswer(5))
}

test("multiple partitions with multiple coalesce") {
val input = ContinuousMemoryStream[Int]

val df = input.toDF()
.coalesce(1)
.planWithBarrier
.coalesce(1)
.select('value as 'copy, 'value)
.agg(max('value))

testStream(df, OutputMode.Complete)(
AddData(input, 0, 1, 2),
CheckAnswer(2),
StopStream,
AddData(input, 3, 4, 5),
StartStream(),
Expand Down

0 comments on commit f77b12b

Please sign in to comment.