diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index afbce900da24b..578c926fc5304 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -83,6 +83,8 @@ A `Gauge` provides a value of any type on demand. In order to use a `Gauge` you There is no restriction for the type of the returned value. You can register a gauge by calling `gauge(String name, Gauge gauge)` on a `MetricGroup`. +
+
{% highlight java %} public class MyMapper extends RichMapFunction { @@ -102,6 +104,26 @@ public class MyMapper extends RichMapFunction { } {% endhighlight %} +
+ +
+{% highlight scala %} + +public class MyMapper extends RichMapFunction[String,Int] { + val valueToExpose = 5 + + override def open(parameters: Configuration): Unit = { + getRuntimeContext() + .getMetricGroup() + .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) ) + } + ... +} + +{% endhighlight %} +
+ +
Note that reporters will turn the exposed object into a `String`, which means that a meaningful `toString()` implementation is required. diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala new file mode 100644 index 0000000000000..e2f9ebfbc0ddd --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.metrics + +import org.apache.flink.metrics.Gauge + +/** + * This class allows the concise definition of a gauge from Scala using function references. + */ +class ScalaGauge[T](func: () => T) extends Gauge[T] { + override def getValue: T = { + func() + } +} + +object ScalaGauge { + def apply[T](func: () => T): ScalaGauge[T] = new ScalaGauge(func) +} diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala new file mode 100644 index 0000000000000..9d53e4c00b151 --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.metrics + +import org.apache.flink.metrics.Gauge +import org.apache.flink.runtime.metrics.{MetricRegistry, MetricRegistryConfiguration} +import org.apache.flink.runtime.metrics.groups.GenericMetricGroup +import org.apache.flink.util.TestLogger +import org.junit.Test +import org.scalatest.junit.JUnitSuiteLike + +class ScalaGaugeTest extends TestLogger with JUnitSuiteLike { + + @Test + def testGaugeCorrectValue(): Unit = { + val myGauge = ScalaGauge[Long](() => 4) + assert(myGauge.getValue == 4) + } + +}