Skip to content

Commit

Permalink
[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
    "avg" : "1970-01-01T00:00:00.000Z",
    "max" : "2019-01-31T12:57:00.000Z",
    "min" : "2019-01-30T18:44:04.000Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit 03a928c)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
zsxwing committed Feb 1, 2019
1 parent 537d15c commit a5d22da
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
Expand Up @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou
}

def merge(that: EventTimeStats): Unit = {
this.max = math.max(this.max, that.max)
this.min = math.min(this.min, that.min)
this.count += that.count
this.avg += (that.avg - this.avg) * that.count / this.count
if (that.count == 0) {
// no-op
} else if (this.count == 0) {
this.max = that.max
this.min = that.min
this.count = that.count
this.avg = that.avg
} else {
this.max = math.max(this.max, that.max)
this.min = math.min(this.min, that.min)
this.count += that.count
this.avg += (that.avg - this.avg) * that.count / this.count
}
}
}

Expand Down
Expand Up @@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
sqlContext.streams.active.foreach(_.stop())
}

test("EventTimeStats") {
val epsilon = 10E-6
private val epsilon = 10E-6

test("EventTimeStats") {
val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
stats.add(80L)
stats.max should be (100)
Expand All @@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}

test("EventTimeStats: avg on large values") {
val epsilon = 10E-6
val largeValue = 10000000000L // 10B
// Make sure `largeValue` will cause overflow if we use a Long sum to calc avg.
assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
Expand All @@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
stats.avg should be ((largeValue + 0.5) +- epsilon)
}

test("EventTimeStats: zero merge zero") {
val stats = EventTimeStats.zero
val stats2 = EventTimeStats.zero
stats.merge(stats2)
stats should be (EventTimeStats.zero)
}

test("EventTimeStats: non-zero merge zero") {
val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
val stats2 = EventTimeStats.zero
stats.merge(stats2)
stats.max should be (10L)
stats.min should be (1L)
stats.avg should be (5.0 +- epsilon)
stats.count should be (3L)
}

test("EventTimeStats: zero merge non-zero") {
val stats = EventTimeStats.zero
val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
stats.merge(stats2)
stats.max should be (10L)
stats.min should be (1L)
stats.avg should be (5.0 +- epsilon)
stats.count should be (3L)
}

test("error on bad column") {
val inputData = MemoryStream[Int].toDF()
val e = intercept[AnalysisException] {
Expand Down

0 comments on commit a5d22da

Please sign in to comment.