Skip to content

Commit

Permalink
[FLINK-5464] [metrics] Improve MetricDumpSerialization exception hand…
Browse files Browse the repository at this point in the history
…ling

This closes #3128.
  • Loading branch information
zentol committed Jan 23, 2017
1 parent 7704724 commit a8e85a2
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 141 deletions.
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
Expand All @@ -42,7 +43,6 @@
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -191,8 +191,8 @@ public void onSuccess(Object result) throws Throwable {
logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
}

private void addMetrics(Object result) throws IOException {
byte[] data = (byte[]) result;
private void addMetrics(Object result) {
MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
for (MetricDump metric : dumpedMetrics) {
metrics.add(metric);
Expand Down
Expand Up @@ -115,7 +115,7 @@ public void testUpdate() throws Exception {

MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
.thenReturn(Future$.MODULE$.successful((Object) new byte[16]));
.thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));

MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
Expand Down Expand Up @@ -171,7 +171,7 @@ public void execute(Runnable r) {
}
}

private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
Expand Down Expand Up @@ -213,7 +213,7 @@ public String getValue() {
histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));

MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
serializer.close();

return dump;
Expand Down

0 comments on commit a8e85a2

Please sign in to comment.