From f93b77763823a172c047c3c30e98d3b2a76f4dbd Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Tue, 3 Nov 2020 16:35:47 -0500 Subject: [PATCH 1/2] fix aggregations --- .../flinkrunner/model/aggregate/Count.scala | 16 ++++++------ .../aggregate/ExponentialMovingAverage.scala | 2 +- .../ExponentialMovingStandardDeviation.scala | 6 +++-- .../aggregate/ExponentialMovingVariance.scala | 10 ++++--- .../flinkrunner/model/aggregate/Max.scala | 2 +- .../flinkrunner/model/aggregate/Min.scala | 2 +- .../flinkrunner/model/aggregate/Range.scala | 2 +- .../model/aggregate/StandardDeviation.scala | 6 +++-- .../model/aggregate/Variance.scala | 8 +++--- .../model/aggregate/CountSpec.scala | 22 ++++++++++++++++ .../ExponentialMovingAverageSpec.scala | 22 ++++++++++++++++ ...ponentialMovingStandardDeviationSpec.scala | 26 +++++++++++++++++++ .../ExponentialMovingVarianceSpec.scala | 26 +++++++++++++++++++ .../flinkrunner/model/aggregate/MaxSpec.scala | 21 +++++++++++++++ .../model/aggregate/MeanSpec.scala | 24 +++++++++++++++++ .../flinkrunner/model/aggregate/MinSpec.scala | 21 +++++++++++++++ .../model/aggregate/RangeSpec.scala | 22 ++++++++++++++++ .../aggregate/StandardDeviationSpec.scala | 25 ++++++++++++++++++ .../SumOfSquaredDeviationsSpec.scala | 22 ++++++++++++++++ .../model/aggregate/VarianceSpec.scala | 25 ++++++++++++++++++ 20 files changed, 287 insertions(+), 23 deletions(-) create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/CountSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverageSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviationSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVarianceSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MaxSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MeanSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MinSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/RangeSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviationSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/SumOfSquaredDeviationsSpec.scala create mode 100644 src/test/scala/io/epiphanous/flinkrunner/model/aggregate/VarianceSpec.scala diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Count.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Count.scala index 66e085a3..db03112d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Count.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Count.scala @@ -2,7 +2,7 @@ package io.epiphanous.flinkrunner.model.aggregate import java.time.Instant import io.epiphanous.flinkrunner.model.UnitMapper -import squants.Each +import squants.{Dimensionless, Each, Quantity} final case class Count( dimension: String, @@ -19,11 +19,11 @@ final case class Count( override def outUnit: String = Each.symbol - override def update( - value: Double, - unit: String, - aggLU: Instant, - unitMapper: UnitMapper = UnitMapper.defaultUnitMapper - ) = - Some(copy(value = this.value + 1, unit = outUnit, count = count + 1, aggregatedLastUpdated = aggLU)) + override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = + current + current.unit(1) + +} + +object Count { + def apply(): Count = new Count(Dimensionless.name, Each.symbol) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverage.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverage.scala index f098de99..36f61a8d 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverage.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverage.scala @@ -17,7 +17,7 @@ final case class ExponentialMovingAverage( def alpha: Double = params.getOrElse("alpha", ExponentialMovingAverage.defaultAlpha).toDouble override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = - current * (1 - alpha) + quantity * alpha + if (count == 0) quantity else current * (1 - alpha) + quantity * alpha } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviation.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviation.scala index 1b4bc751..95da513a 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviation.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviation.scala @@ -21,8 +21,10 @@ final case class ExponentialMovingStandardDeviation( } override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = { - val updatedEmv = depAggs("ExponentialMovingVariance") - current.unit(Math.sqrt(updatedEmv.value)) + if (count == 0) current.unit(0d) else { + val updatedEmv = depAggs("ExponentialMovingVariance") + current.unit(Math.sqrt(updatedEmv.value)) + } } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVariance.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVariance.scala index a643c120..d27b05f5 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVariance.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVariance.scala @@ -23,10 +23,12 @@ final case class ExponentialMovingVariance( } override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = { - val currentEma = getDependents("ExponentialMovingAverage") - val q = quantity in current.unit - val delta = q - current.unit(currentEma.value) - (1 - alpha) * (current + delta * delta.value * alpha) + if (count == 0) quantity.unit(0d) else { + val currentEma = getDependents("ExponentialMovingAverage") + val q = quantity in current.unit + val delta = q - current.unit(currentEma.value) + (1 - alpha) * (current + delta * delta.value * alpha) + } } } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Max.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Max.scala index f8f4031f..fb39d5ac 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Max.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Max.scala @@ -15,6 +15,6 @@ final case class Max( extends Aggregate { override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = - current.max(quantity) + if (count == 0) quantity else current.max(quantity) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Min.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Min.scala index ff6addb6..3c826f7b 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Min.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Min.scala @@ -15,6 +15,6 @@ final case class Min( extends Aggregate { override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = - current.min(quantity) + if (count == 0) quantity else current.min(quantity) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Range.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Range.scala index b76cea82..91b57182 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Range.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Range.scala @@ -21,5 +21,5 @@ final case class Range( } override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = - current.unit(depAggs("Max").value - depAggs("Min").value) + if (count == 0) current else current.unit(depAggs("Max").value - depAggs("Min").value) } diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviation.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviation.scala index a4831622..098158ec 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviation.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviation.scala @@ -16,8 +16,10 @@ final case class StandardDeviation( extends Aggregate { override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = { - val updatedVariance = depAggs("Variance") - current.unit(Math.sqrt(updatedVariance.value)) + if (count == 0) current else { + val updatedVariance = depAggs("Variance") + current.unit(Math.sqrt(updatedVariance.value)) + } } override def getDependents = { diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Variance.scala b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Variance.scala index 82339919..10294f50 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Variance.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/aggregate/Variance.scala @@ -21,8 +21,10 @@ final case class Variance( } override def updateQuantity[A <: Quantity[A]](current: A, quantity: A, depAggs: Map[String, Aggregate]) = { - val k = count.doubleValue() - val s = current.unit(depAggs("SumOfSquaredDeviations").value) - s / (k - 1) + if (count < 2) current else { + val k = count.doubleValue() + val s = current.unit(depAggs("SumOfSquaredDeviations").value) + s / k + } } } diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/CountSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/CountSpec.scala new file mode 100644 index 00000000..49435161 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/CountSpec.scala @@ -0,0 +1,22 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import java.time.Instant + +import io.epiphanous.flinkrunner.BasePropSpec +import io.epiphanous.flinkrunner.model.UnitMapper +import squants.Each + +class CountSpec extends BasePropSpec { + + property("updateQuantity property") { + val c = Count() + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + c1 <- c.update(Each(1),t,u) + c2 <- c1.update(Each(1), t, u) + } yield c2.value + q.value shouldBe(2) + } + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverageSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverageSpec.scala new file mode 100644 index 00000000..d31530c6 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingAverageSpec.scala @@ -0,0 +1,22 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import java.time.Instant + +import io.epiphanous.flinkrunner.BasePropSpec +import io.epiphanous.flinkrunner.model.UnitMapper +import squants.Kilograms +import squants.mass.Mass + +class ExponentialMovingAverageSpec extends BasePropSpec { + property("updateQuantity property") { + val a = ExponentialMovingAverage(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + a1 <- a.update(Kilograms(10),t,u) + a2 <- a1.update(Kilograms(20), t, u) + a3 <- a2.update(Kilograms(30), t, u) + } yield a3.value + q.value shouldBe(26.1) + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviationSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviationSpec.scala new file mode 100644 index 00000000..aae989df --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingStandardDeviationSpec.scala @@ -0,0 +1,26 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import java.time.Instant + +import io.epiphanous.flinkrunner.BasePropSpec +import io.epiphanous.flinkrunner.model.UnitMapper +import org.scalactic.{Equality, TolerantNumerics} +import squants.Kilograms +import squants.mass.Mass + +class ExponentialMovingStandardDeviationSpec extends BasePropSpec { + + implicit val tol: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(1e-4) + + property("updateQuantity property") { + val v = ExponentialMovingStandardDeviation(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + v1 <- v.update(Kilograms(10),t,u) + v2 <- v1.update(Kilograms(20), t, u) + v3 <- v2.update(Kilograms(30), t, u) + } yield v3.value + q.value shouldEqual(Math.sqrt(41.79)) + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVarianceSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVarianceSpec.scala new file mode 100644 index 00000000..fe4030f8 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/ExponentialMovingVarianceSpec.scala @@ -0,0 +1,26 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper +import org.scalactic.{Equality, TolerantNumerics} + +class ExponentialMovingVarianceSpec extends BasePropSpec { + + implicit val tol: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(1e-4) + + property("updateQuantity property") { + val v = ExponentialMovingVariance(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + v1 <- v.update(Kilograms(10),t,u) + v2 <- v1.update(Kilograms(20), t, u) + v3 <- v2.update(Kilograms(30), t, u) + } yield v3.value + q.value shouldEqual(41.79) + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MaxSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MaxSpec.scala new file mode 100644 index 00000000..fed4888c --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MaxSpec.scala @@ -0,0 +1,21 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper + +class MaxSpec extends BasePropSpec { + + property("updateQuantity property") { + val m = Max(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + m1 <- m.update(Kilograms(10), t, u) + m2 <- m1.update(Kilograms(16), t, u) + } yield m2.value + q.value shouldBe(16) + }} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MeanSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MeanSpec.scala new file mode 100644 index 00000000..3677ab28 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MeanSpec.scala @@ -0,0 +1,24 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper + +class MeanSpec extends BasePropSpec { + + property("updateQuantity property") { + val m = Mean(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + m1 <- m.update(Kilograms(10), t, u) + m2 <- m1.update(Kilograms(20), t, u) + m3 <- m2.update(Kilograms(75), t, u) + } yield m3.value + q.value shouldBe(35) + } + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MinSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MinSpec.scala new file mode 100644 index 00000000..9c911172 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/MinSpec.scala @@ -0,0 +1,21 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper + +class MinSpec extends BasePropSpec { + property("updateQuantity property") { + val m = Min(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + m1 <- m.update(Kilograms(10), t, u) + m2 <- m1.update(Kilograms(8), t, u) + } yield m2.value + q.value shouldBe(8) + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/RangeSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/RangeSpec.scala new file mode 100644 index 00000000..3c93a903 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/RangeSpec.scala @@ -0,0 +1,22 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper + +class RangeSpec extends BasePropSpec { + + property("updateQuantity property") { + val q = for { + r <- Some(Range(Mass.name, Kilograms.symbol)) + r2 <- r.update(Kilograms(10), Instant.now(), UnitMapper.defaultUnitMapper) + r3 <- r2.update(Kilograms(30), Instant.now(), UnitMapper.defaultUnitMapper) + r4 <- r2.update(Kilograms(37), Instant.now(), UnitMapper.defaultUnitMapper) + } yield r4.labeledValue + q.value shouldBe("Range: 27.000000 kg") + } + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviationSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviationSpec.scala new file mode 100644 index 00000000..56e18048 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/StandardDeviationSpec.scala @@ -0,0 +1,25 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper + +class StandardDeviationSpec extends BasePropSpec { + + property("updateQuantity property") { + val s = StandardDeviation(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + s1 <- s.update(Kilograms(10), t, u) + s2 <- s1.update(Kilograms(20), t, u) + s3 <- s2.update(Kilograms(30), t, u) + s4 <- s3.update(Kilograms(40), t, u) + } yield s4.value + q.value shouldBe(Math.sqrt(166 + 2d/3)) + } + +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/SumOfSquaredDeviationsSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/SumOfSquaredDeviationsSpec.scala new file mode 100644 index 00000000..f66307ba --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/SumOfSquaredDeviationsSpec.scala @@ -0,0 +1,22 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import io.epiphanous.flinkrunner.BasePropSpec +import squants.Kilograms +import squants.mass.Mass +import java.time.Instant + +import io.epiphanous.flinkrunner.model.UnitMapper + +class SumOfSquaredDeviationsSpec extends BasePropSpec { + property("updateQuantity property") { + val s = SumOfSquaredDeviations(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + s1 <- s.update(Kilograms(10), t, u) + s2 <- s1.update(Kilograms(20), t, u) + s3 <- s2.update(Kilograms(30), t, u) + } yield s3.value + q.value shouldBe(200) + } +} diff --git a/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/VarianceSpec.scala b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/VarianceSpec.scala new file mode 100644 index 00000000..a2495824 --- /dev/null +++ b/src/test/scala/io/epiphanous/flinkrunner/model/aggregate/VarianceSpec.scala @@ -0,0 +1,25 @@ +package io.epiphanous.flinkrunner.model.aggregate + +import java.time.Instant + +import io.epiphanous.flinkrunner.BasePropSpec +import io.epiphanous.flinkrunner.model.UnitMapper +import squants.Kilograms +import squants.mass.Mass + +class VarianceSpec extends BasePropSpec { + + property("updateQuantity property") { + val v = Variance(Mass.name, Kilograms.symbol) + val t = Instant.now() + val u = UnitMapper.defaultUnitMapper + val q = for { + v1 <- v.update(Kilograms(10), t, u) + v2 <- v1.update(Kilograms(20), t, u) + v3 <- v2.update(Kilograms(30), t, u) + v4 <- v3.update(Kilograms(40), t, u) + } yield v4.value + q.value shouldBe(166 + 2d/3) + } + +} From f33159b0831b2945ae3647fb6357226cc2c0b278 Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Tue, 3 Nov 2020 16:37:11 -0500 Subject: [PATCH 2/2] update version in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a673250d..ee79c0df 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ `Flinkrunner 3` is available on maven central, built against Flink 1.11 with Scala 2.12 and JDK 11. ```sbtshell -libraryDependencies += "io.epiphanous" %% "flinkrunner" % "3.0.5" +libraryDependencies += "io.epiphanous" %% "flinkrunner" % "3.0.6" ``` ## What is FlinkRunner?