Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;

Expand Down Expand Up @@ -69,54 +71,19 @@ public String filterCharacters(String input) {
private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
private final ExecutorService threadpool = Executors.newSingleThreadExecutor();

@Override
public void postStop() {
serializer.close();
if (threadpool != null && !threadpool.isShutdown()) {
threadpool.shutdownNow();
}
}

@Override
public void onReceive(Object message) {
try {
if (message instanceof AddMetric) {
AddMetric added = (AddMetric) message;

String metricName = added.metricName;
Metric metric = added.metric;
AbstractMetricGroup group = added.group;

QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);

if (metric instanceof Counter) {
counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
} else if (metric instanceof Meter) {
meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
}
} else if (message instanceof RemoveMetric) {
Metric metric = (((RemoveMetric) message).metric);
if (metric instanceof Counter) {
this.counters.remove(metric);
} else if (metric instanceof Gauge) {
this.gauges.remove(metric);
} else if (metric instanceof Histogram) {
this.histograms.remove(metric);
} else if (metric instanceof Meter) {
this.meters.remove(metric);
}
} else if (message instanceof CreateDump) {
MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
getSender().tell(dump, getSelf());
} else {
LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
}
} catch (Exception e) {
LOG.warn("An exception occurred while processing a message.", e);
}
threadpool.submit(new MetricMessageHandlerRunnable(message, gauges, counters, histograms, meters, getSender(), getSelf(), serializer));
}

/**
Expand Down Expand Up @@ -221,4 +188,75 @@ public static Object getCreateDump() {
private static class CreateDump implements Serializable {
private static final CreateDump INSTANCE = new CreateDump();
}

/**
* This runnable executes add metric, remove metric and create dump logic after notified.
*/
private static final class MetricMessageHandlerRunnable implements Runnable {
private final Object message;
private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges;
private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters;
private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms;
private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters;
private final ActorRef sender;
private final ActorRef self;
private final MetricDumpSerializer serializer;

public MetricMessageHandlerRunnable(Object message, Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
Map<Counter, Tuple2<QueryScopeInfo, String>> counters, Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
Map<Meter, Tuple2<QueryScopeInfo, String>> meters, ActorRef sender, ActorRef self,
MetricDumpSerializer serializer) {
this.message = message;
this.gauges = gauges;
this.counters = counters;
this.histograms = histograms;
this.meters = meters;
this.sender = sender;
this.self = self;
this.serializer = serializer;
}

@Override public void run() {
try {
if (message instanceof AddMetric) {
AddMetric added = (AddMetric) message;

String metricName = added.metricName;
Metric metric = added.metric;
AbstractMetricGroup group = added.group;

QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);

if (metric instanceof Counter) {
counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
} else if (metric instanceof Meter) {
meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
}
} else if (message instanceof RemoveMetric) {
Metric metric = (((RemoveMetric) message).metric);
if (metric instanceof Counter) {
this.counters.remove(metric);
} else if (metric instanceof Gauge) {
this.gauges.remove(metric);
} else if (metric instanceof Histogram) {
this.histograms.remove(metric);
} else if (metric instanceof Meter) {
this.meters.remove(metric);
}
} else if (message instanceof CreateDump) {
MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
sender.tell(dump, self);
} else {
LOG.warn("MetricQueryServiceActor received an invalid message: {}.", message.toString());
sender.tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message: " + message.toString() + ".")), self);
}
} catch (Exception e) {
LOG.warn("An exception occurred while processing a Metric related message.", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.flink.runtime.rpc.akka;

/**
* Created by Shimin Yang on 2018/9/21.
*/
public enum AkkaExecutorMode {
/** Used by default, use fork-join-executor dispatcher **/
FORK_JOIN_EXECUTOR,
/** Use single thread (Pinned) dispatcher **/
SINGLE_THREAD_EXECUTOR
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -71,8 +72,18 @@ public class AkkaRpcServiceUtils {
* @throws Exception Thrown is some other error occurs while creating akka actor system
*/
public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
return createRpcService(hostname, port, configuration, AkkaExecutorMode.FORK_JOIN_EXECUTOR);
}

public static RpcService createRpcService(
String hostname,
int port,
Configuration configuration,
@Nonnull AkkaExecutorMode executorMode) throws Exception {
LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));

Preconditions.checkNotNull(executorMode);

final ActorSystem actorSystem;

try {
Expand Down
Loading